TCP Protocol Reference
High-performance binary protocol on port 6789 (default). All messages use MessagePack encoding with length-prefixed framing. Supports pipelining for concurrent command processing.
Wire Format
Every message (request and response) is wrapped in a length-prefixed frame:
┌──────────────────────┬──────────────────────────────┐│ 4 bytes (Big-Endian │ N bytes (MessagePack payload)││ unsigned 32-bit │ ││ payload length) │ │└──────────────────────┴──────────────────────────────┘The framing protocol works as follows:
- The first 4 bytes are a big-endian unsigned 32-bit integer indicating the length of the MessagePack payload.
- The next N bytes are the MessagePack-encoded command or response object.
- Maximum frame size is 64 MB. Frames exceeding this limit cause the connection to be terminated.
Encoding Example
import { pack, unpack } from 'msgpackr';
// Encode a command into a framed messagefunction frameCommand(cmd: object): Uint8Array { const payload = pack(cmd); const frame = new Uint8Array(4 + payload.length); // Write length prefix (big-endian u32) frame[0] = (payload.length >> 24) & 0xff; frame[1] = (payload.length >> 16) & 0xff; frame[2] = (payload.length >> 8) & 0xff; frame[3] = payload.length & 0xff; frame.set(payload, 4); return frame;}
// Decode a framed responsefunction decodeFrame(frame: Uint8Array): object { return unpack(frame);}Connection
import { pack, unpack } from 'msgpackr';
const socket = await Bun.connect({ hostname: 'localhost', port: 6789, socket: { data(socket, data) { // Parse frames from data, then unpack each frame with msgpackr }, },});
// Send a commandconst cmd = pack({ cmd: 'Ping' });const frame = new Uint8Array(4 + cmd.length);frame[0] = (cmd.length >> 24) & 0xff;frame[1] = (cmd.length >> 16) & 0xff;frame[2] = (cmd.length >> 8) & 0xff;frame[3] = cmd.length & 0xff;frame.set(cmd, 4);socket.write(frame);Protocol Negotiation (Hello)
Clients should send a Hello command after connecting to negotiate protocol version and discover server capabilities.
Request:
{ cmd: 'Hello', protocolVersion: 2, capabilities: ['pipelining'] }Response:
{ ok: true, protocolVersion: 2, capabilities: ['pipelining'], server: 'bunqueue', version: '2.1.8' // Server version string}The current protocol version is 2. The only supported capability is pipelining.
Pipelining
The server supports pipelining: clients can send multiple commands without waiting for each response. The server processes frames in parallel with a concurrency limit of 50 commands per connection, controlled by a semaphore.
To correlate responses with requests when pipelining, include a reqId field in each command. The server echoes reqId back in the corresponding response.
// Send two commands simultaneouslysocket.write(frameCommand({ cmd: 'PUSH', queue: 'emails', data: { to: 'a@b.com' }, reqId: '1' }));socket.write(frameCommand({ cmd: 'PUSH', queue: 'emails', data: { to: 'c@d.com' }, reqId: '2' }));
// Responses may arrive in any order - match by reqId// { ok: true, id: 'abc-123', reqId: '1' }// { ok: true, id: 'def-456', reqId: '2' }Authentication
When the server is configured with AUTH_TOKENS, all connections must authenticate before sending other commands. The Auth command is always permitted regardless of authentication state.
Request:
{ cmd: 'Auth', token: 'your-secret-token' }Response (success):
{ ok: true }Response (failure):
{ ok: false, error: 'Invalid token' }If auth tokens are configured and a client sends any command before authenticating, the server responds with:
{ ok: false, error: 'Not authenticated' }Response Format
All responses include an ok boolean field. On success ok is true with command-specific data. On failure ok is false with an error string.
// Success{ ok: true, ...data, reqId?: string }
// Error{ ok: false, error: 'Error message', reqId?: string }Connection Lifecycle
When a TCP connection closes, the server automatically releases all jobs that were being processed by that client back to their queues. This uses retry logic with exponential backoff (up to 3 attempts) to ensure jobs are not left in an inconsistent state.
Rate Limiting
Each connection is subject to server-side rate limiting. If exceeded, the server responds with:
{ ok: false, error: 'Rate limit exceeded' }Command Reference
Every command object must include a cmd field. An optional reqId field can be included for request-response correlation (required for pipelining).
Core Commands
PUSH
Add a single job to a queue.
Request:
{ cmd: 'PUSH', queue: string, // Queue name (required, max 256 chars, alphanumeric/underscore/dash/dot/colon) data: any, // Job payload (required, max 10 MB) priority?: number, // Higher = processed sooner (default: 0, range: -1000000 to 1000000) delay?: number, // Delay in ms before processing (default: 0, max: 1 year) maxAttempts?: number, // Max retry attempts (default: 3, range: 1-1000) backoff?: number, // Retry backoff delay in ms (default: 1000, max: 1 day) ttl?: number, // Time-to-live in ms (max: 1 year) timeout?: number, // Processing timeout in ms (max: 1 day) uniqueKey?: string, // Deduplication key jobId?: string, // Custom job ID (idempotent) dependsOn?: string[], // Job IDs this job depends on tags?: string[], // Metadata tags groupId?: string, // Job group identifier lifo?: boolean, // Last-in-first-out (default: false) removeOnComplete?: boolean, // Auto-remove on completion (default: false) removeOnFail?: boolean, // Auto-remove on failure (default: false) durable?: boolean, // Force immediate disk write, bypassing write buffer (default: false) repeat?: { // Repeat configuration every?: number, // Repeat interval in ms limit?: number, // Max repetitions count?: number // Current count }}Response:
{ ok: true, id: string } // The generated job ID (UUIDv7)PUSHB
Batch push multiple jobs to a queue.
Request:
{ cmd: 'PUSHB', queue: string, jobs: Array<{ data: any, priority?: number, delay?: number, maxAttempts?: number, backoff?: number, ttl?: number, timeout?: number, uniqueKey?: string, customId?: string, tags?: string[], groupId?: string, lifo?: boolean, removeOnComplete?: boolean, removeOnFail?: boolean, durable?: boolean }>}Response:
{ ok: true, ids: string[] } // Array of generated job IDsPULL
Pull the next available job from a queue. Supports optional long polling and lock-based ownership.
Request:
{ cmd: 'PULL', queue: string, timeout?: number, // Long poll timeout in ms (0-60000, default: 0) owner?: string, // Client identifier for lock-based pull lockTtl?: number // Lock TTL in ms (default: 30000)}Response (without owner):
{ ok: true, job: Job | null }Response (with owner — includes lock token):
{ ok: true, job: Job | null, token: string | null }The token must be passed to ACK or FAIL to verify ownership.
PULLB
Batch pull multiple jobs from a queue.
Request:
{ cmd: 'PULLB', queue: string, count: number, // Number of jobs to pull (1-1000) timeout?: number, // Long poll timeout in ms (0-60000) owner?: string, // Client identifier for lock-based pull lockTtl?: number // Lock TTL in ms (default: 30000)}Response (without owner):
{ ok: true, jobs: Job[] }Response (with owner — includes lock tokens):
{ ok: true, jobs: Job[], tokens: string[] }ACK
Acknowledge a job as completed.
Request:
{ cmd: 'ACK', id: string, // Job ID result?: any, // Optional result data token?: string // Lock token (required if pulled with owner)}Response:
{ ok: true }ACKB
Batch acknowledge multiple jobs.
Request:
{ cmd: 'ACKB', ids: string[], // Job IDs results?: any[], // Optional results (same order as ids; if provided, length must match ids) tokens?: string[] // Lock tokens (same order as ids)}Response:
{ ok: true }FAIL
Mark a job as failed. The job will be retried with exponential backoff if it has remaining attempts, otherwise it is moved to the dead-letter queue.
Request:
{ cmd: 'FAIL', id: string, // Job ID error?: string, // Error message token?: string // Lock token (required if pulled with owner)}Response:
{ ok: true }Query Commands
GetJob
Retrieve a job by its internal ID.
Request:
{ cmd: 'GetJob', id: string }Response:
{ ok: true, job: Job }Returns an error if the job is not found.
GetState
Get the current state of a job.
Request:
{ cmd: 'GetState', id: string }Response:
{ ok: true, id: string, state: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed' }GetResult
Get the stored result of a completed job.
Request:
{ cmd: 'GetResult', id: string }Response:
{ ok: true, id: string, result: any }The result field is the value passed via ACK. It may be null or undefined if no result was stored or if the result has been evicted from the LRU cache.
GetJobs
List jobs with filtering and pagination.
Request:
{ cmd: 'GetJobs', queue: string, state?: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed', limit?: number, // Max results (default: 100) offset?: number // Skip N results (default: 0)}Response:
{ ok: true, jobs: Job[] }GetJobCounts
Get job counts grouped by state for a specific queue.
Request:
{ cmd: 'GetJobCounts', queue: string }Response:
{ ok: true, counts: { waiting: number, delayed: number, active: number, completed: number, failed: number }}GetCountsPerPriority
Get job counts grouped by priority level for a specific queue.
Request:
{ cmd: 'GetCountsPerPriority', queue: string }Response:
{ ok: true, queue: string, counts: Record<number, number> }GetJobByCustomId
Look up a job by its custom ID (the jobId field from PUSH).
Request:
{ cmd: 'GetJobByCustomId', customId: string }Response:
{ ok: true, job: Job }Returns an error if no job with that custom ID exists.
Count
Get the total number of jobs in a queue (all states).
Request:
{ cmd: 'Count', queue: string }Response:
{ ok: true, count: number }GetProgress
Get the progress of an active job.
Request:
{ cmd: 'GetProgress', id: string }Response:
{ ok: true, progress: number, message: string | null }Control Commands
Cancel
Cancel a waiting or delayed job.
Request:
{ cmd: 'Cancel', id: string }Response:
{ ok: true }Progress
Update the progress of an active job.
Request:
{ cmd: 'Progress', id: string, progress: number, // 0-100 message?: string // Optional progress message}Response:
{ ok: true }Update
Update the data payload of an existing job.
Request:
{ cmd: 'Update', id: string, data: any // New job data}Response:
{ ok: true }ChangePriority
Change the priority of a queued job.
Request:
{ cmd: 'ChangePriority', id: string, priority: number}Response:
{ ok: true }Promote
Move a delayed job to the waiting state immediately.
Request:
{ cmd: 'Promote', id: string }Response:
{ ok: true }MoveToDelayed
Move an active job back to the delayed state.
Request:
{ cmd: 'MoveToDelayed', id: string, delay: number // Delay in ms from now}Response:
{ ok: true }Discard
Discard a job by moving it to the dead-letter queue.
Request:
{ cmd: 'Discard', id: string }Response:
{ ok: true }WaitJob
Wait for a job to complete. This is event-driven (no polling). Returns immediately if the job is already completed.
Request:
{ cmd: 'WaitJob', id: string, timeout?: number // Max wait time in ms (default: 30000)}Response:
{ ok: true, completed: boolean, result?: any }Pause
Pause a queue. Workers will stop pulling new jobs.
Request:
{ cmd: 'Pause', queue: string }Response:
{ ok: true }Resume
Resume a paused queue.
Request:
{ cmd: 'Resume', queue: string }Response:
{ ok: true }IsPaused
Check whether a queue is currently paused.
Request:
{ cmd: 'IsPaused', queue: string }Response:
{ ok: true, paused: boolean }Drain
Remove all waiting jobs from a queue.
Request:
{ cmd: 'Drain', queue: string }Response:
{ ok: true, count: number }Obliterate
Remove all data for a queue (all jobs in all states).
Request:
{ cmd: 'Obliterate', queue: string }Response:
{ ok: true }Clean
Remove jobs older than a grace period, optionally filtered by state.
Request:
{ cmd: 'Clean', queue: string, grace: number, // Grace period in ms - jobs older than this are removed state?: string, // Filter by state (optional) limit?: number // Max jobs to remove (optional)}Response:
{ ok: true, count: number }ListQueues
List all known queues with their status.
Request:
{ cmd: 'ListQueues' }Response:
{ ok: true, queues: Array<{ name: string, waiting: number, delayed: number, active: number, paused: boolean }>}DLQ Commands
Dlq
Retrieve jobs from the dead-letter queue.
Request:
{ cmd: 'Dlq', queue: string, count?: number // Max entries to return (optional)}Response:
{ ok: true, jobs: Job[] }RetryDlq
Retry jobs from the dead-letter queue (move them back to waiting).
Request:
{ cmd: 'RetryDlq', queue: string, jobId?: string // Retry a specific job (optional; omit to retry all)}Response:
{ ok: true, count: number } // Number of jobs retriedPurgeDlq
Clear all jobs from the dead-letter queue.
Request:
{ cmd: 'PurgeDlq', queue: string }Response:
{ ok: true, count: number } // Number of jobs purgedRetryCompleted
Re-queue completed jobs back to waiting state.
Request:
{ cmd: 'RetryCompleted', queue: string, id?: string // Retry a specific job (optional; omit to retry all)}Response:
{ ok: true, count: number }Cron Commands
Cron
Create or update a cron/repeating job schedule.
Request:
{ cmd: 'Cron', name: string, // Unique cron job name queue: string, // Target queue data: any, // Job data payload schedule?: string, // Cron expression (e.g., '*/5 * * * *') repeatEvery?: number, // Repeat interval in ms (alternative to schedule) priority?: number, // Job priority maxLimit?: number, // Max executions timezone?: string // IANA timezone (e.g., 'Europe/Rome', 'America/New_York')}Response:
{ ok: true, cron: { name: string, queue: string, schedule: string | null, repeatEvery: number | null, nextRun: number, timezone: string | undefined }}CronDelete
Delete a cron job schedule by name.
Request:
{ cmd: 'CronDelete', name: string }Response:
{ ok: true }CronList
List all registered cron job schedules.
Request:
{ cmd: 'CronList' }Response:
{ ok: true, crons: Array<{ name: string, queue: string, schedule: string | null, repeatEvery: number | null, nextRun: number, executions: number, maxLimit: number | undefined, timezone: string | undefined }>}Monitoring Commands
Ping
Connection health check.
Request:
{ cmd: 'Ping' }Response:
{ ok: true, data: { pong: true, time: number } }Hello
Protocol version negotiation and server capability discovery. See the Protocol Negotiation section above for details.
Request:
{ cmd: 'Hello', protocolVersion: number, capabilities?: ['pipelining']}Response:
{ ok: true, protocolVersion: number, capabilities: ['pipelining'], server: 'bunqueue', version: string}Stats
Get high-level server statistics.
Request:
{ cmd: 'Stats' }Response:
{ ok: true, stats: { queued: number, // Waiting jobs processing: number, // Active jobs delayed: number, // Delayed jobs dlq: number, // Dead-letter queue size completed: number, // Completed count uptime: number, // Server uptime in ms pushPerSec: number, // Push throughput pullPerSec: number // Pull throughput }}Metrics
Get detailed server metrics.
Request:
{ cmd: 'Metrics' }Response:
{ ok: true, metrics: { totalPushed: number, totalPulled: number, totalCompleted: number, totalFailed: number, avgLatencyMs: number, avgProcessingMs: number, memoryUsageMb: number, sqliteSizeMb: number, activeConnections: number }}Prometheus
Get metrics in Prometheus text exposition format.
Request:
{ cmd: 'Prometheus' }Response:
{ ok: true, data: { metrics: string } }Heartbeat
Send a heartbeat for a registered worker (keeps the worker registration alive).
Request:
{ cmd: 'Heartbeat', id: string } // Worker IDResponse:
{ ok: true, data: { ok: true } }JobHeartbeat
Send a heartbeat for an active job (prevents stall detection from marking it as stalled). Also renews the lock if a token is provided.
Request:
{ cmd: 'JobHeartbeat', id: string, // Job ID token?: string // Lock token for renewal}Response:
{ ok: true, data: { ok: true } }JobHeartbeatB
Batch job heartbeat for multiple active jobs.
Request:
{ cmd: 'JobHeartbeatB', ids: string[], // Job IDs tokens?: string[] // Lock tokens (same order as ids)}Response:
{ ok: true, data: { ok: true, count: number } }Worker Commands
RegisterWorker
Register a worker with the server for monitoring.
Request:
{ cmd: 'RegisterWorker', name: string, queues: string[] // Queues this worker processes}Response:
{ ok: true, data: { workerId: string, name: string, queues: string[], registeredAt: number }}UnregisterWorker
Remove a worker registration.
Request:
{ cmd: 'UnregisterWorker', workerId: string }Response:
{ ok: true, data: { removed: true } }ListWorkers
List all registered workers and their stats.
Request:
{ cmd: 'ListWorkers' }Response:
{ ok: true, data: { workers: Array<{ id: string, name: string, queues: string[], registeredAt: number, lastSeen: number, activeJobs: number, processedJobs: number, failedJobs: number }>, stats: object // Aggregated worker stats }}Webhook Commands
AddWebhook
Register a webhook to receive event notifications. URLs are validated to prevent SSRF (localhost, private IPs, and cloud metadata endpoints are blocked).
Request:
{ cmd: 'AddWebhook', url: string, // Webhook URL (https required for production) events: string[], // Event types to subscribe to queue?: string, // Filter by queue (optional) secret?: string // Signing secret for payload verification}Response:
{ ok: true, data: { webhookId: string, url: string, events: string[], queue: string | undefined, createdAt: number }}RemoveWebhook
Remove a registered webhook.
Request:
{ cmd: 'RemoveWebhook', webhookId: string }Response:
{ ok: true, data: { removed: true } }ListWebhooks
List all registered webhooks.
Request:
{ cmd: 'ListWebhooks' }Response:
{ ok: true, data: { webhooks: Array<{ id: string, url: string, events: string[], queue: string | undefined, createdAt: number, lastTriggered: number | null, successCount: number, failureCount: number, enabled: boolean }>, stats: object }}Rate Limiting Commands
RateLimit
Set a rate limit on a queue (max jobs processed per second).
Request:
{ cmd: 'RateLimit', queue: string, limit: number // Jobs per second}Response:
{ ok: true }RateLimitClear
Remove the rate limit from a queue.
Request:
{ cmd: 'RateLimitClear', queue: string }Response:
{ ok: true }SetConcurrency
Set a concurrency limit on a queue (max concurrent active jobs).
Request:
{ cmd: 'SetConcurrency', queue: string, limit: number}Response:
{ ok: true }ClearConcurrency
Remove the concurrency limit from a queue.
Request:
{ cmd: 'ClearConcurrency', queue: string }Response:
{ ok: true }Log Commands
AddLog
Add a log entry to a job.
Request:
{ cmd: 'AddLog', id: string, // Job ID message: string, // Log message level?: 'info' | 'warn' | 'error' // Log level (default: 'info')}Response:
{ ok: true, data: { added: true } }GetLogs
Get all log entries for a job.
Request:
{ cmd: 'GetLogs', id: string }Response:
{ ok: true, data: { logs: Array<{ message: string, level: string, timestamp: number }> } }Queue Name Validation
Queue names must satisfy the following constraints:
- Not empty and at most 256 characters
- Only alphanumeric characters, underscores, dashes, dots, and colons:
[a-zA-Z0-9_\-.:]+
Job Data Limits
Job data payloads are limited to 10 MB when serialized.
Command Summary
| Category | Command | Description |
|---|---|---|
| Core | PUSH | Add a job to a queue |
PUSHB | Batch push multiple jobs | |
PULL | Pull next job (supports long poll and locks) | |
PULLB | Batch pull jobs | |
ACK | Acknowledge job completion | |
ACKB | Batch acknowledge | |
FAIL | Mark job as failed | |
| Query | GetJob | Get job by ID |
GetState | Get job state | |
GetResult | Get job result | |
GetJobs | List jobs with filtering | |
GetJobCounts | Count jobs by state | |
GetCountsPerPriority | Count jobs by priority | |
GetJobByCustomId | Look up job by custom ID | |
Count | Total job count for a queue | |
GetProgress | Get job progress | |
| Control | Cancel | Cancel a job |
Progress | Update job progress | |
Update | Update job data | |
ChangePriority | Change job priority | |
Promote | Move delayed job to waiting | |
MoveToDelayed | Move active job to delayed | |
Discard | Move job to DLQ | |
WaitJob | Wait for job completion | |
Pause | Pause a queue | |
Resume | Resume a queue | |
IsPaused | Check if queue is paused | |
Drain | Remove all waiting jobs | |
Obliterate | Remove all queue data | |
Clean | Remove old jobs | |
ListQueues | List all queues | |
| DLQ | Dlq | Get DLQ entries |
RetryDlq | Retry DLQ jobs | |
PurgeDlq | Clear DLQ | |
RetryCompleted | Re-queue completed jobs | |
| Cron | Cron | Create/update cron schedule |
CronDelete | Delete cron schedule | |
CronList | List cron schedules | |
| Monitoring | Ping | Health check |
Hello | Protocol negotiation | |
Stats | Server statistics | |
Metrics | Detailed metrics | |
Prometheus | Prometheus-format metrics | |
Heartbeat | Worker heartbeat | |
JobHeartbeat | Job heartbeat (stall prevention) | |
JobHeartbeatB | Batch job heartbeat | |
| Workers | RegisterWorker | Register a worker |
UnregisterWorker | Unregister a worker | |
ListWorkers | List workers | |
| Webhooks | AddWebhook | Register a webhook |
RemoveWebhook | Remove a webhook | |
ListWebhooks | List webhooks | |
| Rate | RateLimit | Set queue rate limit |
RateLimitClear | Clear queue rate limit | |
SetConcurrency | Set queue concurrency limit | |
ClearConcurrency | Clear concurrency limit | |
| Logs | AddLog | Add job log entry |
GetLogs | Get job logs | |
| Auth | Auth | Authenticate connection |