TypeScript Types Reference
bunqueue is written in TypeScript and provides comprehensive type definitions. All public types are exported from bunqueue/client.
Job Types
JobStateType
type JobStateType = 'waiting' | 'delayed' | 'active' | 'completed' | 'failed' | 'unknown';Job
The main job interface returned by Queue methods and passed to worker processors.
interface Job<T = unknown> { // ── Core Properties ──────────────────────────────────────────
/** Unique job identifier (UUIDv7) */ id: string;
/** Job name/type */ name: string;
/** Job payload data */ data: T;
/** Queue name this job belongs to */ queueName: string;
/** Number of processing attempts made */ attemptsMade: number;
/** Job creation timestamp (ms since epoch) */ timestamp: number;
/** Current progress (0-100) */ progress: number;
/** Return value after successful completion */ returnvalue?: unknown;
/** Error message if the job failed */ failedReason?: string;
/** Parent job reference (if this job is part of a flow) */ parent?: { id: string; queueQualifiedName: string };
// ── Scheduling & Timing ──────────────────────────────────────
/** Delay in ms before job becomes available for processing */ delay: number;
/** Timestamp when job started processing */ processedOn?: number;
/** Timestamp when job finished (completed or failed) */ finishedOn?: number;
/** Job priority (higher = processed sooner) */ priority: number;
// ── Failure & Stall Tracking ─────────────────────────────────
/** Stack traces from failed attempts */ stacktrace: string[] | null;
/** Number of times this job has been stalled */ stalledCounter: number;
// ── Metadata ─────────────────────────────────────────────────
/** Parent key in format queueName:jobId */ parentKey?: string;
/** Original job options used when adding this job */ opts: JobOptions;
/** Lock token for this job (present when processing) */ token?: string;
/** Worker/client identifier processing this job */ processedBy?: string;
/** Deduplication ID (if set via jobId or deduplication option) */ deduplicationId?: string;
/** Repeat job key (for repeatable jobs) */ repeatJobKey?: string;
/** Number of times job processing has been started (includes retries) */ attemptsStarted: number;
// ── Core Methods ─────────────────────────────────────────────
/** Update job progress (0-100) with optional status message */ updateProgress(progress: number, message?: string): Promise<void>;
/** Add a log entry to the job */ log(message: string): Promise<void>;
/** Get the current state of the job */ getState(): Promise<JobStateType>;
/** Remove this job from the queue */ remove(): Promise<void>;
/** Retry this failed job */ retry(): Promise<void>;
/** * Get the return values of all children jobs. * Keys are job keys (queueName:jobId), values are return values. */ getChildrenValues<R = unknown>(): Promise<Record<string, R>>;
// ── State Check Methods ──────────────────────────────────────
/** Check if job is in waiting state */ isWaiting(): Promise<boolean>;
/** Check if job is currently active/processing */ isActive(): Promise<boolean>;
/** Check if job is delayed */ isDelayed(): Promise<boolean>;
/** Check if job has completed successfully */ isCompleted(): Promise<boolean>;
/** Check if job has failed */ isFailed(): Promise<boolean>;
/** Check if job is waiting for children to complete */ isWaitingChildren(): Promise<boolean>;
// ── Mutation Methods ─────────────────────────────────────────
/** Update the job's data payload */ updateData(data: T): Promise<void>;
/** Promote a delayed job to the waiting state */ promote(): Promise<void>;
/** Change the delay on a delayed job */ changeDelay(delay: number): Promise<void>;
/** Change the job's priority */ changePriority(opts: ChangePriorityOpts): Promise<void>;
/** Extend the job's lock duration */ extendLock(token: string, duration: number): Promise<number>;
/** Clear job logs, optionally keeping the last N entries */ clearLogs(keepLogs?: number): Promise<void>;
/** * Discard this job. Marks it to not be processed further. * The job will be moved to failed state with a "discarded" reason. */ discard(): void;
// ── Dependency Methods ───────────────────────────────────────
/** Get job dependencies (children) with pagination */ getDependencies(opts?: GetDependenciesOpts): Promise<JobDependencies>;
/** Get count of job dependencies */ getDependenciesCount(opts?: GetDependenciesOpts): Promise<JobDependenciesCount>;
/** Get return values of failed children jobs */ getFailedChildrenValues(): Promise<Record<string, string>>;
/** Get ignored child failures (via ignoreDependencyOnFailure) */ getIgnoredChildrenFailures(): Promise<Record<string, string>>;
/** Remove this job's dependency relationship with its parent */ removeChildDependency(): Promise<boolean>;
/** Remove the deduplication key associated with this job */ removeDeduplicationKey(): Promise<boolean>;
/** Remove all unprocessed child jobs of this job */ removeUnprocessedChildren(): Promise<void>;
// ── Move Methods ─────────────────────────────────────────────
/** * Move job to completed state. * @param returnValue - The return value of the job * @param token - Lock token (optional in embedded mode) * @param fetchNext - Whether to fetch the next job (default: true) * @returns The next job data or null */ moveToCompleted(returnValue: unknown, token?: string, fetchNext?: boolean): Promise<unknown>;
/** * Move job to failed state. * @param error - The error that caused the failure * @param token - Lock token (optional in embedded mode) * @param fetchNext - Whether to fetch the next job (default: true) */ moveToFailed(error: Error, token?: string, fetchNext?: boolean): Promise<void>;
/** * Move job back to waiting state. * @param token - Lock token (optional in embedded mode) * @returns true if job was moved */ moveToWait(token?: string): Promise<boolean>;
/** * Move job to delayed state. * @param timestamp - When the job should become available * @param token - Lock token (optional in embedded mode) */ moveToDelayed(timestamp: number, token?: string): Promise<void>;
/** * Move job to waiting-children state. * Job will wait for all children to complete before processing. * @param token - Lock token (optional in embedded mode) * @param opts - Options including child reference * @returns true if job was moved */ moveToWaitingChildren( token?: string, opts?: { child?: { id: string; queue: string } } ): Promise<boolean>;
/** * Wait until the job has finished (completed or failed). * @param queueEvents - QueueEvents instance to listen on * @param ttl - Maximum time to wait in ms (optional) * @returns The job's return value * @throws Error if job fails or times out */ waitUntilFinished(queueEvents: unknown, ttl?: number): Promise<unknown>;
// ── Serialization Methods ────────────────────────────────────
/** Get job as a typed JSON object */ toJSON(): JobJson<T>;
/** Get job as raw JSON (all values stringified) */ asJSON(): JobJsonRaw;}JobJson
Typed JSON representation of a job.
interface JobJson<T = unknown> { id: string; name: string; data: T; opts: JobOptions; progress: number; delay: number; timestamp: number; attemptsMade: number; stacktrace: string[] | null; returnvalue?: unknown; failedReason?: string; finishedOn?: number; processedOn?: number; queueQualifiedName: string; parentKey?: string;}JobJsonRaw
Raw JSON representation with all values as strings.
interface JobJsonRaw { id: string; name: string; data: string; // JSON stringified opts: string; // JSON stringified progress: string; // JSON stringified delay: string; timestamp: string; attemptsMade: string; stacktrace: string | null; // JSON stringified returnvalue?: string; // JSON stringified failedReason?: string; finishedOn?: string; processedOn?: string; parentKey?: string;}ChangePriorityOpts
interface ChangePriorityOpts { /** New priority value */ priority: number; /** Process in LIFO order after priority change */ lifo?: boolean;}GetDependenciesOpts
interface GetDependenciesOpts { processed?: { cursor?: number; count?: number }; unprocessed?: { cursor?: number; count?: number };}JobDependencies
interface JobDependencies { processed: Record<string, unknown>; unprocessed: string[]; nextProcessedCursor?: number; nextUnprocessedCursor?: number;}JobDependenciesCount
interface JobDependenciesCount { processed: number; unprocessed: number;}JobOptions
Options when adding a job to a queue.
interface JobOptions { /** Job priority (higher = processed sooner, default: 0) */ priority?: number;
/** Delay in milliseconds before job becomes available (default: 0) */ delay?: number;
/** Maximum number of processing attempts (default: 3) */ attempts?: number;
/** * Backoff between retries. Either a delay in ms or a BackoffOptions object. * Default: 1000 */ backoff?: number | BackoffOptions;
/** Processing timeout in milliseconds. Job fails if exceeded. */ timeout?: number;
/** * Custom job ID for idempotent/deduplication. * If a job with this ID already exists, the existing job is returned. */ jobId?: string;
/** * Remove job on completion. * - `true`: remove immediately * - `number`: keep for N ms then remove * - `KeepJobs`: keep by age and/or count * Default: false */ removeOnComplete?: boolean | number | KeepJobs;
/** * Remove job on failure. * - `true`: remove immediately * - `number`: keep for N ms then remove * - `KeepJobs`: keep by age and/or count * Default: false */ removeOnFail?: boolean | number | KeepJobs;
/** Stall timeout in ms. Job is stalled if no heartbeat after this time. */ stallTimeout?: number;
/** Repeat configuration for recurring jobs */ repeat?: RepeatOptions;
/** * Force immediate persistence to disk (bypass write buffer). * Use for critical jobs where data loss is unacceptable. * Default: false (uses buffered writes for ~100k jobs/sec throughput) */ durable?: boolean;
/** * Parent job reference for flow dependencies. * When set, this job becomes a child of the specified parent. * The parent will wait for all children to complete before processing. */ parent?: ParentOpts;
/** Process jobs in LIFO order (newest first, default: false) */ lifo?: boolean;
/** Maximum stack trace lines to store on failure (default: 10) */ stackTraceLimit?: number;
/** Maximum number of log entries to keep per job */ keepLogs?: number;
/** Maximum job data size in bytes. Jobs exceeding this are rejected. */ sizeLimit?: number;
/** Fail parent job if this child job fails (default: false) */ failParentOnFailure?: boolean;
/** Remove dependency relationship if this job fails (default: false) */ removeDependencyOnFailure?: boolean;
/** Continue parent processing even if this child fails (default: false) */ continueParentOnFailure?: boolean;
/** Move job to parent's failed dependencies instead of blocking parent (default: false) */ ignoreDependencyOnFailure?: boolean;
/** Job creation timestamp in ms (default: Date.now()) */ timestamp?: number;
/** Deduplication configuration */ deduplication?: DeduplicationOptions;
/** Debounce configuration */ debounce?: DebounceOptions;}ParentOpts
interface ParentOpts { /** Parent job ID */ id: string; /** Parent job queue name */ queue: string;}BackoffOptions
interface BackoffOptions { /** Backoff strategy type */ type: 'fixed' | 'exponential'; /** Base delay in milliseconds */ delay: number; /** Maximum delay cap in milliseconds (default: 3,600,000 = 1 hour) */ maxDelay?: number;}All backoff delays include automatic jitter to prevent thundering herd:
- Exponential: ±50% jitter around the computed delay
- Fixed: ±20% jitter around the configured delay
Delays are capped at maxDelay (default: 1 hour). This prevents runaway delays at high attempt counts.
KeepJobs
interface KeepJobs { /** Maximum age in milliseconds */ age?: number; /** Maximum count of jobs to keep */ count?: number;}RepeatOptions
Configuration for recurring/repeatable jobs.
interface RepeatOptions { /** Repeat every N milliseconds (alternative to pattern) */ every?: number;
/** Maximum repetitions (omit or null for infinite) */ limit?: number;
/** Cron pattern (alternative to every) */ pattern?: string;
/** Start date for repeat jobs */ startDate?: Date | string | number;
/** End date for repeat jobs */ endDate?: Date | string | number;
/** Timezone for cron pattern (e.g. 'America/New_York') */ tz?: string;
/** Execute immediately on start (default: false) */ immediately?: boolean;
/** Current repeat count (internal) */ count?: number;
/** Previous execution timestamp (internal) */ prevMillis?: number;
/** Offset in milliseconds */ offset?: number;
/** Custom job ID for repeat jobs */ jobId?: string;}DeduplicationOptions
Prevent duplicate jobs from being added to the queue.
interface DeduplicationOptions { /** Unique deduplication ID (required) */ id: string;
/** TTL in milliseconds for the deduplication key */ ttl?: number;
/** Extend TTL when a duplicate job arrives (for debounce mode) */ extend?: boolean;
/** Replace job data when duplicate arrives while in delayed state */ replace?: boolean;}DebounceOptions
Debounce job creation within a time window.
interface DebounceOptions { /** Unique debounce ID (required) */ id: string;
/** TTL in milliseconds for the debounce window (required) */ ttl: number;}Processor
type Processor<T = unknown, R = unknown> = (job: Job<T>) => Promise<R> | R;JobCounts
interface JobCounts { waiting: number; active: number; completed: number; failed: number;}Queue Types
QueueOptions
interface QueueOptions { /** Default job options applied to all jobs in this queue */ defaultJobOptions?: JobOptions;
/** TCP connection options (for server mode) */ connection?: ConnectionOptions;
/** Use embedded mode (in-process SQLite, default: false) */ embedded?: boolean;}ConnectionOptions
interface ConnectionOptions { /** Server hostname (default: 'localhost', ignored if socketPath is set) */ host?: string;
/** TCP port (default: 6789, ignored if socketPath is set) */ port?: number;
/** Unix socket path (takes priority over host/port) */ socketPath?: string;
/** Authentication token */ token?: string;
/** * Connection pool size for parallel operations. * Source JSDoc default: 1. Runtime default for Queue/FlowProducer: 4. * Set >1 to enable connection pooling. */ poolSize?: number;
/** Ping interval in ms for health checks (default: 30000, 0 to disable) */ pingInterval?: number;
/** Command timeout in ms (default: 30000) */ commandTimeout?: number;
/** Enable TCP pipelining (default: true) */ pipelining?: boolean;
/** Max commands in flight per connection (default: 100) */ maxInFlight?: number;}RateLimiterOptions
interface RateLimiterOptions { /** Maximum number of jobs to process in the duration window */ max: number;
/** Duration window in milliseconds */ duration: number;
/** Optional group key for per-group rate limiting */ groupKey?: string;}Worker Types
WorkerOptions
interface WorkerOptions { /** Number of concurrent jobs (default: 1) */ concurrency?: number;
/** Auto-run on creation (default: true) */ autorun?: boolean;
/** Heartbeat interval in ms (default: 10000, 0 to disable) */ heartbeatInterval?: number;
/** TCP connection options (for server mode) */ connection?: ConnectionOptions;
/** Use embedded mode (in-process SQLite, default: false) */ embedded?: boolean;
/** Number of jobs to pull per batch (default: 10, max: 1000) */ batchSize?: number;
/** Long poll timeout in ms when queue is empty (default: 0, max: 30000) */ pollTimeout?: number;
/** * Use lock-based job ownership. * When enabled, each pulled job gets a lock renewed via heartbeat. * Disable for high-throughput scenarios where stall detection is sufficient. * Default: true */ useLocks?: boolean;
/** Rate limiter configuration for controlling job processing rate */ limiter?: RateLimiterOptions;
/** Lock duration in ms (default: 30000) */ lockDuration?: number;
/** Maximum stalls before moving job to failed (default: 1) */ maxStalledCount?: number;
/** Skip stalled job check entirely (default: false) */ skipStalledCheck?: boolean;
/** Skip lock renewal via heartbeat (default: false) */ skipLockRenewal?: boolean;
/** Delay in ms when draining queue (default: 5000) */ drainDelay?: number;
/** * Remove completed jobs. * - `true`: remove immediately * - `number`: keep for N ms then remove * - `KeepJobs`: keep by age and/or count */ removeOnComplete?: boolean | number | KeepJobs;
/** * Remove failed jobs. * - `true`: remove immediately * - `number`: keep for N ms then remove * - `KeepJobs`: keep by age and/or count */ removeOnFail?: boolean | number | KeepJobs;}Worker Events
// Worker emits these events:worker.on('ready', () => void);worker.on('active', (job: Job) => void);worker.on('completed', (job: Job, result: any) => void);worker.on('failed', (job: Job, error: Error) => void);worker.on('progress', (job: Job, progress: number) => void);worker.on('error', (error: Error) => void);worker.on('closed', () => void);QueueEvents Types
QueueEvents
Event listener class for monitoring queue activity without processing jobs.
class QueueEvents<R = unknown, P = unknown> extends EventEmitter { /** Queue name being monitored */ readonly name: string;
constructor(name: string);
/** Wait until the QueueEvents instance is ready to receive events */ waitUntilReady(): Promise<void>;
/** Close the event listener and stop receiving events */ close(): void;
/** Disconnect from the event stream (alias for close) */ disconnect(): Promise<void>;}QueueEvents Event Payloads
Each event emitted by QueueEvents has a typed payload:
/** Emitted when a job is added to the queue */interface WaitingEvent { jobId: string;}
/** Emitted when a job begins processing */interface ActiveEvent { jobId: string;}
/** Emitted when a job completes successfully */interface CompletedEvent<R = unknown> { jobId: string; returnvalue: R;}
/** Emitted when a job fails */interface FailedEvent { jobId: string; failedReason: string;}
/** Emitted when job progress is updated */interface ProgressEvent<P = unknown> { jobId: string; data: P;}
/** Emitted when a job stalls (no heartbeat) */interface StalledEvent { jobId: string;}
/** Emitted when a job is removed from the queue */interface RemovedEvent { jobId: string; prev: string;}
/** Emitted when a job is moved to delayed state */interface DelayedEvent { jobId: string; delay: number;}
/** Emitted when a duplicate job is detected */interface DuplicatedEvent { jobId: string;}
/** Emitted when a job is retried */interface RetriedEvent { jobId: string; prev: string;}
/** Emitted when a job enters waiting-children state */interface WaitingChildrenEvent { jobId: string;}
/** Emitted when the queue has no more waiting jobs */interface DrainedEvent { id: string;}QueueEvents Usage
const events = new QueueEvents('my-queue');
events.on('waiting', ({ jobId }) => { /* ... */ });events.on('active', ({ jobId }) => { /* ... */ });events.on('completed', ({ jobId, returnvalue }) => { /* ... */ });events.on('failed', ({ jobId, failedReason }) => { /* ... */ });events.on('progress', ({ jobId, data }) => { /* ... */ });events.on('stalled', ({ jobId }) => { /* ... */ });events.on('removed', ({ jobId, prev }) => { /* ... */ });events.on('delayed', ({ jobId, delay }) => { /* ... */ });events.on('duplicated', ({ jobId }) => { /* ... */ });events.on('retried', ({ jobId, prev }) => { /* ... */ });events.on('waiting-children', ({ jobId }) => { /* ... */ });events.on('drained', ({ id }) => { /* ... */ });events.on('error', (error: Error) => { /* ... */ });QueueEventType
type QueueEventType = | 'waiting' | 'active' | 'completed' | 'failed' | 'progress' | 'removed' | 'drained';FlowProducer Types
FlowProducerOptions
interface FlowProducerOptions { /** Use embedded mode (no server) */ embedded?: boolean; /** TCP connection options */ connection?: ConnectionOptions;}FlowJob
A job definition within a flow. Children are processed before the parent.
interface FlowJob<T = unknown> { /** Job name */ name: string; /** Queue name */ queueName: string; /** Job data */ data?: T; /** Job options */ opts?: JobOptions; /** Child jobs (processed BEFORE this job) */ children?: FlowJob<T>[];}JobNode
Result from adding a flow. Contains the created job and its children.
interface JobNode<T = unknown> { /** The created job instance */ job: Job<T>; /** Child nodes (if any) */ children?: JobNode<T>[];}GetFlowOpts
interface GetFlowOpts { /** Job ID to get the flow for */ id: string; /** Queue name where the job is located */ queueName: string; /** Maximum depth to traverse (default: unlimited) */ depth?: number; /** Maximum number of children to fetch per level (default: unlimited) */ maxChildren?: number;}SandboxedWorker Types
SandboxedWorkerOptions
interface SandboxedWorkerOptions { /** Path to processor file (must export default async function) */ processor: string;
/** Number of worker processes (default: 1) */ concurrency?: number;
/** Job timeout in ms (default: 30000) */ timeout?: number;
/** Max memory per worker in MB (default: 256, uses smol mode if <= 64) */ maxMemory?: number;
/** Max restarts before giving up (default: 10) */ maxRestarts?: number;
/** Auto-restart crashed workers (default: true) */ autoRestart?: boolean;
/** Poll interval in ms when no workers are idle (default: 10) */ pollInterval?: number;}SandboxedWorker Stats
interface SandboxedWorkerStats { total: number; // Total worker processes busy: number; // Currently processing idle: number; // Available for work restarts: number; // Total restarts across all workers}Stall Detection Types
StallConfig
interface StallConfig { /** Enable stall detection (default: true) */ enabled?: boolean;
/** Stall timeout in ms (default: 30000) */ stallInterval?: number;
/** Max stalls before moving to DLQ (default: 3) */ maxStalls?: number;
/** Grace period after job start in ms (default: 5000) */ gracePeriod?: number;}DLQ Types
DlqConfig
interface DlqConfig { /** Enable auto-retry from DLQ (default: false) */ autoRetry?: boolean;
/** Auto-retry interval in ms (default: 3600000 = 1 hour) */ autoRetryInterval?: number;
/** Max auto-retries before giving up (default: 3) */ maxAutoRetries?: number;
/** Max age before auto-purge in ms (default: 604800000 = 7 days, null = never) */ maxAge?: number | null;
/** Max entries per queue (default: 10000) */ maxEntries?: number;}FailureReason
type FailureReason = | 'explicit_fail' // Job explicitly failed via fail() or thrown error | 'max_attempts_exceeded' // Exceeded all retry attempts | 'timeout' // Job processing timed out (exceeded timeout option) | 'stalled' // Job stalled (no heartbeat within stallInterval) | 'ttl_expired' // Time-to-live expired before processing | 'worker_lost' // Worker disconnected while processing (TCP mode) | 'unknown'; // Catch-all for edge casesDlqEntry
interface DlqEntry<T = unknown> { /** The failed job */ job: Job<T>;
/** When job entered DLQ (ms since epoch) */ enteredAt: number;
/** Last failure reason */ reason: FailureReason;
/** Last error message */ error: string | null;
/** Full attempt history */ attempts: Array<AttemptRecord>;
/** Number of retry attempts from DLQ */ retryCount: number;
/** Last retry timestamp */ lastRetryAt: number | null;
/** Next scheduled auto-retry (null = no auto-retry) */ nextRetryAt: number | null;
/** When entry expires for auto-purge (null = never) */ expiresAt: number | null;}AttemptRecord
interface AttemptRecord { /** Attempt number (1-based) */ attempt: number;
/** When this attempt started (ms since epoch) */ startedAt: number;
/** When this attempt failed (ms since epoch) */ failedAt: number;
/** Failure reason for this attempt */ reason: FailureReason;
/** Error message if any */ error: string | null;
/** Duration of this attempt in ms */ duration: number;}DlqFilter
interface DlqFilter { /** Filter by failure reason */ reason?: FailureReason;
/** Only entries older than this timestamp */ olderThan?: number;
/** Only entries newer than this timestamp */ newerThan?: number;
/** Only entries that can be retried */ retriable?: boolean;
/** Only entries that are expired */ expired?: boolean;
/** Limit number of results */ limit?: number;
/** Offset for pagination */ offset?: number;}DlqStats
interface DlqStats { /** Total DLQ entries */ total: number;
/** Entries grouped by failure reason */ byReason: Record<FailureReason, number>;
/** Entries awaiting auto-retry */ pendingRetry: number;
/** Expired entries (awaiting cleanup) */ expired: number;
/** Oldest entry timestamp (null if empty) */ oldestEntry: number | null;
/** Newest entry timestamp (null if empty) */ newestEntry: number | null;}Generic Type Helpers
bunqueue supports generic types for type-safe job data and results:
// Define typed job datainterface EmailJobData { to: string; subject: string; body: string;}
interface EmailResult { sent: boolean; messageId: string;}
// Queue with typed dataconst queue = new Queue<EmailJobData>('emails');
// TypeScript enforces the data shapeawait queue.add('welcome', { to: 'user@example.com', subject: 'Welcome!', body: 'Hello and welcome.',});
// Worker with typed data and resultconst worker = new Worker<EmailJobData, EmailResult>( 'emails', async (job) => { // job.data is typed as EmailJobData const { to, subject, body } = job.data; return { sent: true, messageId: 'msg-123' }; });
// Type error at compile time: missing required fieldsawait queue.add('send', { to: 'test@example.com' }); // Error!
// QueueEvents with typed result and progressconst events = new QueueEvents<EmailResult, number>('emails');events.on('completed', ({ jobId, returnvalue }) => { // returnvalue is typed as EmailResult console.log(returnvalue.messageId);});