Queue
Queue emails for background delivery with retry, backoff, and dead-lettering using @visulima/email
Queue
@visulima/email/queue lets you decouple accepting an email from delivering it. You enqueue() a message, and a worker reserves jobs, sends them with bounded concurrency, retries failures with backoff, and dead-letters jobs that exhaust their attempts. An in-memory queue ships for tests and single-process apps; a durable unstorage-backed queue (importable from @visulima/email/queue/unstorage) persists jobs across restarts and across many backends.
import { createWorker, MemoryQueue } from "@visulima/email/queue";
import type { EmailQueue, QueueJob } from "@visulima/email/queue";How It Works
Delivery follows a reserve → ack/retry cycle:
enqueue(message)adds a job to the queue.reserve()hands a job to a worker and hides it for a visibility window so other workers don't pick it up.- On success the worker
acks the job (removing it); on failure itretrys the job (returning it to the queue, optionally after a delay).
If a worker crashes mid-process, the reserved job becomes deliverable again once its visibility timeout elapses.
MemoryQueue
MemoryQueue is an in-memory EmailQueue with scheduledAt support and a reservation visibility timeout. It is ideal for tests, single-process apps, and as a reference for adapter authors. Jobs do not survive a process restart — use a durable adapter in production.
import { MemoryQueue } from "@visulima/email/queue";
const queue = new MemoryQueue({
visibilityTimeout: 30_000, // ms a reserved job stays hidden (default 30000)
now: () => Date.now(), // injectable time source, mainly for tests
});EmailQueue Methods
| Method | Description |
|---|---|
enqueue(message, { scheduledAt }) | Adds a message. Returns the new job id. scheduledAt accepts a Date or epoch ms. |
reserve() | Reserves the next ready job, hiding it for the visibility window. |
ack(id) | Marks a reserved job as processed and removes it. |
retry(id, delayMs?) | Returns a job to the queue after an optional delay, incrementing its attempt count. |
size() | Returns the number of jobs not yet acked (ready plus reserved). |
Scheduling
scheduledAt delays when a job first becomes deliverable. It accepts either a Date or epoch milliseconds:
// Deliver in one hour.
await queue.enqueue(message, { scheduledAt: new Date(Date.now() + 60 * 60 * 1000) });
// Same, using epoch milliseconds.
await queue.enqueue(message, { scheduledAt: Date.now() + 60 * 60 * 1000 });createWorker
createWorker(options) returns a worker that processes queued jobs with bounded concurrency, retry/backoff, and dead-lettering. The send function is typically mail.send.bind(mail).
const worker = createWorker({
queue,
send: mail.send.bind(mail),
concurrency: 5,
maxAttempts: 3,
backoff: (attempts) => 1000 * 2 ** (attempts - 1),
pollInterval: 1000,
onError: (job, result) => console.warn("attempt failed", job.id, result.error),
onDeadLetter: (job, result) => console.error("dead-lettered", job.id, result.error),
});Worker Options
| Option | Type | Default | Description |
|---|---|---|---|
queue | EmailQueue | — | The queue to consume. |
send | (message) => Promise<Result<EmailResult>> | — | The send function, typically mail.send.bind(mail). |
concurrency | number | 1 | How many jobs to process concurrently. |
maxAttempts | number | 3 | Maximum delivery attempts before a job is dead-lettered. |
backoff | (attempts) => number | attempts => 1000 * 2 ** (attempts - 1) | Delay (ms) before re-attempting after attempts failures. |
pollInterval | number | 1000 | Idle poll interval (ms) when no job is ready. |
onError | (job, result) => void | — | Called on every failed attempt, before any retry. |
onDeadLetter | (job, result) => void | — | Called when a job exhausts its attempts and is dropped. |
Worker Handle
createWorker() returns { start, stop, drain }:
| Method | Description |
|---|---|
start() | Begins polling and processing jobs in the background. |
stop() | Stops polling. In-flight jobs are allowed to finish. |
drain() | Processes all currently-ready jobs, then resolves. Does not wait for future-scheduled jobs. |
drain() is handy in tests and one-shot runs where you want to flush the queue and await completion rather than poll in the background.
Enqueue and Run a Worker
import { createMail } from "@visulima/email";
import { resendProvider } from "@visulima/email/providers/resend";
import { createWorker, MemoryQueue } from "@visulima/email/queue";
const mail = createMail(resendProvider({ apiKey: "re_xxx" }));
const queue = new MemoryQueue();
// Enqueue messages instead of sending them inline.
await queue.enqueue({
from: { email: "sender@example.com" },
to: { email: "user@example.com" },
subject: "Welcome",
html: "<h1>Welcome!</h1>",
});
// Process in the background.
const worker = createWorker({
queue,
send: mail.send.bind(mail),
concurrency: 5,
});
worker.start();
// … later, on shutdown:
worker.stop();In a test or one-shot script, drain instead of starting a long-running loop:
const worker = createWorker({ queue, send: mail.send.bind(mail) });
await queue.enqueue(message);
await worker.drain(); // processes everything ready, then resolvesUnstorageQueue
UnstorageQueue (and the createUnstorageQueue factory) from @visulima/email/queue/unstorage is a durable EmailQueue backed by any unstorage Storage. Because unstorage abstracts many backends — memory, filesystem, Redis, Cloudflare KV, Postgres, and more — a single adapter persists the queue across all of them.
unstorage is an optional peer dependency. Install it (and any driver you need) alongside @visulima/email:
Install: npm install unstorage
import { createUnstorageQueue, UnstorageQueue } from "@visulima/email/queue/unstorage";Options
createUnstorageQueue(storage, options) accepts:
| Option | Type | Default | Description |
|---|---|---|---|
prefix | string | "email:queue" | Key prefix under which jobs are stored. |
visibilityTimeout | number | 30000 | Ms a reserved job stays hidden before becoming deliverable. |
now | () => number | Date.now | Injectable time source, mainly for tests. |
Caveats
- The stored
messagemust be JSON-serializable. AvoidBufferattachments — passpath/href/ base64contentinstead. - unstorage has no transactions, so
reserve()is best-effort (non-transactional) under concurrent workers. The visibility timeout bounds duplicate delivery on a crash.
Durable Queue with Redis
import { createMail } from "@visulima/email";
import { resendProvider } from "@visulima/email/providers/resend";
import { createWorker } from "@visulima/email/queue";
import { createUnstorageQueue } from "@visulima/email/queue/unstorage";
import { createStorage } from "unstorage";
import redisDriver from "unstorage/drivers/redis";
const mail = createMail(resendProvider({ apiKey: "re_xxx" }));
const storage = createStorage({
driver: redisDriver({
base: "email",
url: "redis://localhost:6379",
}),
});
const queue = createUnstorageQueue(storage, {
prefix: "email:queue",
visibilityTimeout: 30_000,
});
// Enqueue a JSON-serializable message.
await queue.enqueue({
from: { email: "sender@example.com" },
to: { email: "user@example.com" },
subject: "Welcome",
html: "<h1>Welcome!</h1>",
});
// The worker API is identical regardless of the queue backend.
const worker = createWorker({
queue,
send: mail.send.bind(mail),
concurrency: 5,
});
worker.start();Because both MemoryQueue and UnstorageQueue implement the same EmailQueue interface, you can develop and test against MemoryQueue and switch to a durable backend in production without changing your worker code.