Jacob Paris
← Back to all content

Integrating BullMQ into a Node application

If you can perform a task instantly, you should do it instantly. If you can't, you should queue it up to be done later.

Most applications opt for a medium zone between these two ideas, and their user experience suffers for it. Users are left waiting several seconds or longer for feedback on their actions, and the application feels sluggish.

Deleting a single email from your inbox should feel instant, but deleting a thousand emails may take much longer for the server to process. If we queue up the deletion of each email, we can give the user feedback that the action was successful, and then process the deletion in the background.

Managing each process in a queue that tracks the status of each job, stores the results, and allows for easy retrying is a great way to handle these tasks.

in this tutorial, we will be using BullMQ, a modern, Typescript-first, redis-backed queue to handle our background tasks

Why do we need Redis?

Redis is a key-value store that is often used as a database, cache, or message broker.

It's a great tool for storing any data that needs to be accessed quickly

If our application stalls or crashes, Redis will still be there to hold our data so we won't lose any progress.

Setting up BullMQ

First, we need to install BullMQ and the types for it

sh
npm install bullmq @types/bullmq

Next, we need to create a new file called queue.server.ts.

Creating a global registry of queues

This file will use a singleton pattern to create a global registry of any queues we create in our application. This will allow us to access the same queue from any file in our application.

In order to check the progress or results of any job we've dispatched to the queue, we'll refer to the queues that are saved in this registry.

ts
import {
Queue,
Worker,
QueueEvents,
Processor,
} from "bullmq"
type RegisteredQueue = {
queue: Queue
queueEvents: QueueEvents
worker: Worker
}
declare global {
var __registeredQueues:
| Record<string, RegisteredQueue>
| undefined
}
const registeredQueues =
global.__registeredQueues ||
(global.__registeredQueues = {})

Registering a queue

Now that we have a global registry of queues, we can create a function called registerQueue that will create a new queue and add it to the registry.

This function will take a name and a processor function as arguments.

The processor function will be the function that will be called when a job is processed by the queue.

ts
export function registerQueue<T>(
name: string,
processor: Processor<T>,
) {
if (!registeredQueues[name]) {
const queue = new Queue(name, { connection })
const queueEvents = new QueueEvents(name, {
connection,
})
const worker = new Worker<T>(name, processor, {
connection,
lockDuration: 1000 * 60 * 15,
concurrency: 8,
})
registeredQueues[name] = {
queue,
queueEvents,
worker,
}
}
const queue = registeredQueues[name].queue
return queue
}

This function will return the queue that was created, so we can use it to export queues that can be used anywhere else in our application.

ts
export const emailQueue = registerQueue(
"email",
async (job) => {
const { to, subject, text } = job.data
await sendEmail(to, subject, text)
},
)

Queuing background processes

In the above example, we created a queue called email that will send an email when a job is processed.

This will run in the same thread as the main application. For many async tasks that aren't processor intensive, this is fine.

However, if we have a task that takes a long time to complete and consumes a lot of processing power, like rendering images or video, we should offload this work to a background process.

If we pass a path to a javascript file instead of a function to the registerQueue function, BullMQ will spawn a new process to run the file. These are called sandboxed processors.

ts
export const renderQueue = registerQueue(
"render",
path.join(__dirname, "../workers/render.worker.js"),
)

We can register multiple queues in this fashion so that some run background processes and some run in the main thread.

Providing access to progress and completion events

BullMQ uses a separate class called QueueEvents to process event listeners on the queue, and in order to listen to these events, we need both the Queue and QueueEvents classes.

To make managing these easier, we'll create a new type called AugmentedQueue that extends the Queue class and adds the events property.

ts
// queue.server.ts registerQueue
type AugmentedQueue<T> = Queue<T> & {
events: QueueEvents
}
const queue = registeredQueues[name]
.queue as AugmentedQueue<T>
queue.events = registeredQueues[name].queueEvents
return queue

Full code example

The completed queue.server.ts file should look like this:

ts
import type { Processor } from "bullmq"
import { QueueEvents } from "bullmq"
import { Queue, Worker } from "bullmq"
import connection from "./redis.server.ts"
import path from "path"
type AugmentedQueue<T> = Queue<T> & {
events: QueueEvents
}
type RegisteredQueue = {
queue: Queue
queueEvents: QueueEvents
worker: Worker
}
declare global {
var __registeredQueues:
| Record<string, RegisteredQueue>
| undefined
}
const registeredQueues =
global.__registeredQueues ||
(global.__registeredQueues = {})
/**
*
* @param name Unique name of the queue
* @param processor
*/
export function registerQueue<T>(
name: string,
processor: Processor<T>,
) {
if (!registeredQueues[name]) {
const queue = new Queue(name, { connection })
const queueEvents = new QueueEvents(name, {
connection,
})
const worker = new Worker<T>(name, processor, {
connection,
lockDuration: 1000 * 60 * 15,
concurrency: 8,
})
registeredQueues[name] = {
queue,
queueEvents,
worker,
}
}
const queue = registeredQueues[name]
.queue as AugmentedQueue<T>
queue.events = registeredQueues[name].queueEvents
return queue
}
export const emailQueue = registerQueue(
"email",
async (job) => {
const { to, subject, text } = job.data
await sendEmail(to, subject, text)
},
)
export const renderQueue = registerQueue(
"render",
path.join(__dirname, "../workers/render.worker.js"),
)

Running a job in the queue

We can now import our queue from anywhere in the application and add new jobs to it.

ts
import { emailQueue } from "./queue.server.ts"
await emailQueue.add("send-welcome-email", {
to: "help@example.com",
subject: "Welcome to our app!",
text: "Thanks for signing up!",
})

The emailQueue will now process the job and send the email, and the rest of your application is free to continue working on its own things.

Checking the status of a job

From our queue, we can get any job by its ID and check its status.

ts
import { renderQueue } from "./queue.server.ts"
const job = await renderQueue.getJob("123")
console.log(job.progress) // 0.5
console.log(job.returnvalue) // undefined

Waiting for a job to finish

We can also wait for a job to finish processing before continuing. This requires accessing the QueueEvents class that is attached to the same queue.

Making this easy to access for this purpose was the reasoning behind creating the AugmentedQueue type and binding the QueueEvents class to renderQueue.events in the registerQueue function.

ts
const TIMEOUT_MILLISECONDS = 30 * 1000
await job.waitUntilFinished(
renderQueue.events,
TIMEOUT_MILLISECONDS,
)
console.log(job.progress) // 1
console.log(job.returnvalue) // { url: "123.png" }

Further leveraging the power of the queue system will depend on your tech stack and the needs of your application. For example, if you're using Remix, you can create an endpoint that streams the progress of a queued job to the client. If you're using Typescript, you may want to automatically compile your worker files to javascript before running them.

Professional headshot
Moulton
Moulton

Hey there! I'm a developer, designer, and digital nomad building cool things with Remix, and I'm also writing Moulton, the Remix Community Newsletter

About once per month, I send an email with:

  • New guides and tutorials
  • Upcoming talks, meetups, and events
  • Cool new libraries and packages
  • What's new in the latest versions of Remix

Stay up to date with everything in the Remix community by entering your email below.

Unsubscribe at any time.