Integrating BullMQ into a Node application
If you can perform a task instantly, you should do it instantly. If you can't, you should queue it up to be done later.
Most applications opt for a medium zone between these two ideas, and their user experience suffers for it. Users are left waiting several seconds or longer for feedback on their actions, and the application feels sluggish.
Deleting a single email from your inbox should feel instant, but deleting a thousand emails may take much longer for the server to process. If we queue up the deletion of each email, we can give the user feedback that the action was successful, and then process the deletion in the background.
Managing each process in a queue that tracks the status of each job, stores the results, and allows for easy retrying is a great way to handle these tasks.
in this tutorial, we will be using BullMQ, a modern, Typescript-first, redis-backed queue to handle our background tasks
Why do we need Redis?
Redis is a key-value store that is often used as a database, cache, or message broker.
It's a great tool for storing any data that needs to be accessed quickly
If our application stalls or crashes, Redis will still be there to hold our data so we won't lose any progress.
Setting up BullMQ
First, we need to install BullMQ and the types for it
npm install bullmq @types/bullmq
Next, we need to create a new file called queue.server.ts
.
Creating a global registry of queues
This file will use a singleton pattern to create a global registry of any queues we create in our application. This will allow us to access the same queue from any file in our application.
In order to check the progress or results of any job we've dispatched to the queue, we'll refer to the queues that are saved in this registry.
import { Queue, Worker, QueueEvents, Processor,} from "bullmq"type RegisteredQueue = { queue: Queue queueEvents: QueueEvents worker: Worker}declare global { var __registeredQueues: | Record<string, RegisteredQueue> | undefined}const registeredQueues = global.__registeredQueues || (global.__registeredQueues = {})
Registering a queue
Now that we have a global registry of queues, we can create a function called registerQueue
that will create a new queue and add it to the registry.
This function will take a name and a processor function as arguments.
The processor function will be the function that will be called when a job is processed by the queue.
export function registerQueue<T>( name: string, processor: Processor<T>,) { if (!registeredQueues[name]) { const queue = new Queue(name, { connection }) const queueEvents = new QueueEvents(name, { connection, }) const worker = new Worker<T>(name, processor, { connection, lockDuration: 1000 * 60 * 15, concurrency: 8, }) registeredQueues[name] = { queue, queueEvents, worker, } } const queue = registeredQueues[name].queue return queue}
This function will return the queue that was created, so we can use it to export queues that can be used anywhere else in our application.
export const emailQueue = registerQueue( "email", async (job) => { const { to, subject, text } = job.data await sendEmail(to, subject, text) },)
Queuing background processes
In the above example, we created a queue called email
that will send an email when a job is processed.
This will run in the same thread as the main application. For many async tasks that aren't processor intensive, this is fine.
However, if we have a task that takes a long time to complete and consumes a lot of processing power, like rendering images or video, we should offload this work to a background process.
If we pass a path to a javascript file instead of a function to the registerQueue
function, BullMQ will spawn a new process to run the file. These are called sandboxed processors.
export const renderQueue = registerQueue( "render", path.join(__dirname, "../workers/render.worker.js"),)
We can register multiple queues in this fashion so that some run background processes and some run in the main thread.
Providing access to progress and completion events
BullMQ uses a separate class called QueueEvents
to process event listeners on the queue, and in order to listen to these events, we need both the Queue
and QueueEvents
classes.
To make managing these easier, we'll create a new type called AugmentedQueue
that extends the Queue
class and adds the events
property.
// queue.server.ts registerQueuetype AugmentedQueue<T> = Queue<T> & { events: QueueEvents}const queue = registeredQueues[name] .queue as AugmentedQueue<T>queue.events = registeredQueues[name].queueEventsreturn queue
Full code example
The completed queue.server.ts
file should look like this:
import type { Processor } from "bullmq"import { QueueEvents } from "bullmq"import { Queue, Worker } from "bullmq"import connection from "./redis.server.ts"import path from "path"type AugmentedQueue<T> = Queue<T> & { events: QueueEvents}type RegisteredQueue = { queue: Queue queueEvents: QueueEvents worker: Worker}declare global { var __registeredQueues: | Record<string, RegisteredQueue> | undefined}const registeredQueues = global.__registeredQueues || (global.__registeredQueues = {})/** * * @param name Unique name of the queue * @param processor */export function registerQueue<T>( name: string, processor: Processor<T>,) { if (!registeredQueues[name]) { const queue = new Queue(name, { connection }) const queueEvents = new QueueEvents(name, { connection, }) const worker = new Worker<T>(name, processor, { connection, lockDuration: 1000 * 60 * 15, concurrency: 8, }) registeredQueues[name] = { queue, queueEvents, worker, } } const queue = registeredQueues[name] .queue as AugmentedQueue<T> queue.events = registeredQueues[name].queueEvents return queue}export const emailQueue = registerQueue( "email", async (job) => { const { to, subject, text } = job.data await sendEmail(to, subject, text) },)export const renderQueue = registerQueue( "render", path.join(__dirname, "../workers/render.worker.js"),)
Running a job in the queue
We can now import our queue from anywhere in the application and add new jobs to it.
import { emailQueue } from "./queue.server.ts"await emailQueue.add("send-welcome-email", { to: "help@example.com", subject: "Welcome to our app!", text: "Thanks for signing up!",})
The emailQueue will now process the job and send the email, and the rest of your application is free to continue working on its own things.
Checking the status of a job
From our queue, we can get any job by its ID and check its status.
import { renderQueue } from "./queue.server.ts"const job = await renderQueue.getJob("123")console.log(job.progress) // 0.5console.log(job.returnvalue) // undefined
Waiting for a job to finish
We can also wait for a job to finish processing before continuing. This requires accessing the QueueEvents
class that is attached to the same queue.
Making this easy to access for this purpose was the reasoning behind creating the AugmentedQueue
type and binding the QueueEvents
class to renderQueue.events
in the registerQueue
function.
const TIMEOUT_MILLISECONDS = 30 * 1000await job.waitUntilFinished( renderQueue.events, TIMEOUT_MILLISECONDS,)console.log(job.progress) // 1console.log(job.returnvalue) // { url: "123.png" }
Further leveraging the power of the queue system will depend on your tech stack and the needs of your application. For example, if you're using Remix, you can create an endpoint that streams the progress of a queued job to the client. If you're using Typescript, you may want to automatically compile your worker files to javascript before running them.