NotificationQueue

Queue

Durable queue and a retrying worker

Queue & worker

Decouple enqueue from delivery. The default queue is in-process; back it with unstorage for durability.

import { MemoryQueue, createQueueWorker } from "@visulima/notification/queue";

const queue = new MemoryQueue();

queue.enqueue({ sms: { to: "+15555550100", text: "Hi" } });
queue.enqueue({ chat: { text: "Hello" } }, { scheduledAt: Date.now() + 60_000 }); // delayed

const worker = createQueueWorker(queue, notify, {
    maxAttempts: 5,
    backoff: (attempt) => Math.min(30_000, 1000 * 2 ** (attempt - 1)),
    onDrop: (job, receipts) => console.error("dropped", job.id, receipts),
});

worker.start(); // poll continuously
// or
await worker.drain(); // process all currently-due jobs once, then resolve

A job is retried (with backoff) when any channel fails, and dropped after maxAttempts.

Durable queue (unstorage)

import { UnstorageQueue } from "@visulima/notification/queue/unstorage";
import { createStorage } from "unstorage";
import redisDriver from "unstorage/drivers/redis";

const queue = new UnstorageQueue(createStorage({ driver: redisDriver({ base: "notify" }) }));

UnstorageQueue implements the same NotificationQueue interface, so the worker is identical. Any unstorage driver (Redis, filesystem, Cloudflare KV, …) works.

Durable adapters (Node-only)

Ready-made adapters for the common backends. These need Node built-ins / SDKs, so they are not Cloudflare-safe (use MemoryQueue or UnstorageQueue on the edge).

import { createBullMqQueue } from "@visulima/notification/queue/bullmq"; // Redis (optional peer: bullmq)
import { createPgBossQueue } from "@visulima/notification/queue/pg-boss"; // Postgres (optional peer: pg-boss)
import { createSqsQueue } from "@visulima/notification/queue/sqs"; // AWS SQS (optional peer: @aws-sdk/client-sqs)

const queue = createBullMqQueue(bullQueue); // pass a configured client instance

Each implements the same NotificationQueue interface, so the worker is identical.

Custom queue

Implement NotificationQueue (enqueue / reserve / ack / retry / size) to integrate any other store.

Support

Contribute to our work and keep us going

Community is the heart of open source. The success of our packages wouldn't be possible without the incredible contributions of users, testers, and developers who collaborate with us every day.Want to get involved? Here are some tips on how you can make a meaningful impact on our open source projects.

Ready to help us out?

Be sure to check out the package's contribution guidelines first. They'll walk you through the process on how to properly submit an issue or pull request to our repositories.

Submit a pull request

Found something to improve? Fork the repo, make your changes, and open a PR. We review every contribution and provide feedback to help you get merged.

Good first issues

Simple issues suited for people new to open source development, and often a good place to start working on a package.
View good first issues