EmailQueueQueue

Queue

Queue emails for background delivery with retry, backoff, and dead-lettering using @visulima/email

Queue

@visulima/email/queue lets you decouple accepting an email from delivering it. You enqueue() a message, and a worker reserves jobs, sends them with bounded concurrency, retries failures with backoff, and dead-letters jobs that exhaust their attempts. An in-memory queue ships for tests and single-process apps; a durable unstorage-backed queue (importable from @visulima/email/queue/unstorage) persists jobs across restarts and across many backends.

import { createWorker, MemoryQueue } from "@visulima/email/queue";
import type { EmailQueue, QueueJob } from "@visulima/email/queue";

How It Works

Delivery follows a reserve → ack/retry cycle:

  1. enqueue(message) adds a job to the queue.
  2. reserve() hands a job to a worker and hides it for a visibility window so other workers don't pick it up.
  3. On success the worker acks the job (removing it); on failure it retrys the job (returning it to the queue, optionally after a delay).

If a worker crashes mid-process, the reserved job becomes deliverable again once its visibility timeout elapses.

MemoryQueue

MemoryQueue is an in-memory EmailQueue with scheduledAt support and a reservation visibility timeout. It is ideal for tests, single-process apps, and as a reference for adapter authors. Jobs do not survive a process restart — use a durable adapter in production.

import { MemoryQueue } from "@visulima/email/queue";

const queue = new MemoryQueue({
    visibilityTimeout: 30_000, // ms a reserved job stays hidden (default 30000)
    now: () => Date.now(), // injectable time source, mainly for tests
});

EmailQueue Methods

MethodDescription
enqueue(message, { scheduledAt })Adds a message. Returns the new job id. scheduledAt accepts a Date or epoch ms.
reserve()Reserves the next ready job, hiding it for the visibility window.
ack(id)Marks a reserved job as processed and removes it.
retry(id, delayMs?)Returns a job to the queue after an optional delay, incrementing its attempt count.
size()Returns the number of jobs not yet acked (ready plus reserved).

Scheduling

scheduledAt delays when a job first becomes deliverable. It accepts either a Date or epoch milliseconds:

// Deliver in one hour.
await queue.enqueue(message, { scheduledAt: new Date(Date.now() + 60 * 60 * 1000) });

// Same, using epoch milliseconds.
await queue.enqueue(message, { scheduledAt: Date.now() + 60 * 60 * 1000 });

createWorker

createWorker(options) returns a worker that processes queued jobs with bounded concurrency, retry/backoff, and dead-lettering. The send function is typically mail.send.bind(mail).

const worker = createWorker({
    queue,
    send: mail.send.bind(mail),
    concurrency: 5,
    maxAttempts: 3,
    backoff: (attempts) => 1000 * 2 ** (attempts - 1),
    pollInterval: 1000,
    onError: (job, result) => console.warn("attempt failed", job.id, result.error),
    onDeadLetter: (job, result) => console.error("dead-lettered", job.id, result.error),
});

Worker Options

OptionTypeDefaultDescription
queueEmailQueueThe queue to consume.
send(message) => Promise<Result<EmailResult>>The send function, typically mail.send.bind(mail).
concurrencynumber1How many jobs to process concurrently.
maxAttemptsnumber3Maximum delivery attempts before a job is dead-lettered.
backoff(attempts) => numberattempts => 1000 * 2 ** (attempts - 1)Delay (ms) before re-attempting after attempts failures.
pollIntervalnumber1000Idle poll interval (ms) when no job is ready.
onError(job, result) => voidCalled on every failed attempt, before any retry.
onDeadLetter(job, result) => voidCalled when a job exhausts its attempts and is dropped.

Worker Handle

createWorker() returns { start, stop, drain }:

MethodDescription
start()Begins polling and processing jobs in the background.
stop()Stops polling. In-flight jobs are allowed to finish.
drain()Processes all currently-ready jobs, then resolves. Does not wait for future-scheduled jobs.

drain() is handy in tests and one-shot runs where you want to flush the queue and await completion rather than poll in the background.

Enqueue and Run a Worker

import { createMail } from "@visulima/email";
import { resendProvider } from "@visulima/email/providers/resend";
import { createWorker, MemoryQueue } from "@visulima/email/queue";

const mail = createMail(resendProvider({ apiKey: "re_xxx" }));
const queue = new MemoryQueue();

// Enqueue messages instead of sending them inline.
await queue.enqueue({
    from: { email: "sender@example.com" },
    to: { email: "user@example.com" },
    subject: "Welcome",
    html: "<h1>Welcome!</h1>",
});

// Process in the background.
const worker = createWorker({
    queue,
    send: mail.send.bind(mail),
    concurrency: 5,
});

worker.start();

// … later, on shutdown:
worker.stop();

In a test or one-shot script, drain instead of starting a long-running loop:

const worker = createWorker({ queue, send: mail.send.bind(mail) });

await queue.enqueue(message);
await worker.drain(); // processes everything ready, then resolves

UnstorageQueue

UnstorageQueue (and the createUnstorageQueue factory) from @visulima/email/queue/unstorage is a durable EmailQueue backed by any unstorage Storage. Because unstorage abstracts many backends — memory, filesystem, Redis, Cloudflare KV, Postgres, and more — a single adapter persists the queue across all of them.

unstorage is an optional peer dependency. Install it (and any driver you need) alongside @visulima/email:

Install: npm install unstorage

import { createUnstorageQueue, UnstorageQueue } from "@visulima/email/queue/unstorage";

Options

createUnstorageQueue(storage, options) accepts:

OptionTypeDefaultDescription
prefixstring"email:queue"Key prefix under which jobs are stored.
visibilityTimeoutnumber30000Ms a reserved job stays hidden before becoming deliverable.
now() => numberDate.nowInjectable time source, mainly for tests.

Caveats

  • The stored message must be JSON-serializable. Avoid Buffer attachments — pass path / href / base64 content instead.
  • unstorage has no transactions, so reserve() is best-effort (non-transactional) under concurrent workers. The visibility timeout bounds duplicate delivery on a crash.

Durable Queue with Redis

import { createMail } from "@visulima/email";
import { resendProvider } from "@visulima/email/providers/resend";
import { createWorker } from "@visulima/email/queue";
import { createUnstorageQueue } from "@visulima/email/queue/unstorage";
import { createStorage } from "unstorage";
import redisDriver from "unstorage/drivers/redis";

const mail = createMail(resendProvider({ apiKey: "re_xxx" }));

const storage = createStorage({
    driver: redisDriver({
        base: "email",
        url: "redis://localhost:6379",
    }),
});

const queue = createUnstorageQueue(storage, {
    prefix: "email:queue",
    visibilityTimeout: 30_000,
});

// Enqueue a JSON-serializable message.
await queue.enqueue({
    from: { email: "sender@example.com" },
    to: { email: "user@example.com" },
    subject: "Welcome",
    html: "<h1>Welcome!</h1>",
});

// The worker API is identical regardless of the queue backend.
const worker = createWorker({
    queue,
    send: mail.send.bind(mail),
    concurrency: 5,
});

worker.start();

Because both MemoryQueue and UnstorageQueue implement the same EmailQueue interface, you can develop and test against MemoryQueue and switch to a durable backend in production without changing your worker code.

Support

Contribute to our work and keep us going

Community is the heart of open source. The success of our packages wouldn't be possible without the incredible contributions of users, testers, and developers who collaborate with us every day.Want to get involved? Here are some tips on how you can make a meaningful impact on our open source projects.

Ready to help us out?

Be sure to check out the package's contribution guidelines first. They'll walk you through the process on how to properly submit an issue or pull request to our repositories.

Submit a pull request

Found something to improve? Fork the repo, make your changes, and open a PR. We review every contribution and provide feedback to help you get merged.

Good first issues

Simple issues suited for people new to open source development, and often a good place to start working on a package.
View good first issues