Getting started
Define a workflow, trigger a run, and resume it with sweep and signal
Getting started
1. Define a workflow
A workflow is an id, an optional payload schema, and a run function driven by the run context (ctx).
import { defineWorkflow } from "@visulima/workflow";
import { z } from "zod";
const order = defineWorkflow({
id: "order-fulfilment",
payload: z.object({ orderId: z.string() }),
run: async (ctx) => {
await ctx.step("charge", () => charge(ctx.payload.orderId));
await ctx.sleep("settle", { amount: 1, unit: "hours" });
await ctx.step("ship", () => ship(ctx.payload.orderId));
},
});The payload schema may be any Standard Schema validator; it is validated on trigger
and the result is typed inside run as ctx.payload.
2. Create a runtime and trigger a run
import { createRuntime } from "@visulima/workflow";
const runtime = createRuntime({ workflows: [order] });
const result = await runtime.trigger(order, { orderId: "o_1" });
// result.status is "suspended" (sleeping), "completed", or "failed".
// result.runId identifies this run for later resume / inspection.trigger returns as soon as the run first suspends (or completes). The default store is in-memory; pass your own
via createRuntime({ store }) for durability — see Stores.
You can also register workflows after the runtime is created with register() — useful for dynamic or
lazily-loaded workflows, and required so a worker can resume runs of a workflow it didn't define at startup:
const runtime = createRuntime();
runtime.register(order);
await runtime.trigger("order-fulfilment", { orderId: "o_1" }); // trigger by id once registered3. Resume due runs
Sleeps and timeouts are advanced by sweeping. Call sweep() from a cron job, a Cloudflare alarm, or any timer; it
resumes every run whose wake-at has passed:
// e.g. once a minute
const resumed = await runtime.sweep();4. Deliver external events
ctx.waitForEvent suspends a run until you deliver a matching signal:
const review = defineWorkflow({
id: "publish",
run: async (ctx) => {
const decision = await ctx.waitForEvent<{ approved: boolean }>("review", "review-decision", {
timeout: { amount: 2, unit: "days" }, // resolves to undefined if nobody decides
});
if (decision?.approved) {
await ctx.step("publish", () => publish());
}
},
});
const { runId } = await runtime.trigger(review, {});
// later, when a reviewer acts:
await runtime.signal(runId, "review-decision", { approved: true });5. Inspect a run
const info = await runtime.getRun(runId);
// { runId, status, output?, error?, pending?, history }After a restart
Re-create the runtime with the same store and re-register the workflows, then sweep/signal as usual — runs
resume from where they left off:
const runtime = createRuntime({ store, workflows: [order, review] });
await runtime.sweep();