Stream BullMQ job progress with Remix EventSources
When we think about streaming, we often think about streaming video or audio. But streaming can be used for more than just media. It can be used to stream data from a server to a client. In this guide, we'll look at how to stream the progress of a BullMQ job to the client.
BullMQ is a modern, fast, and robust queue system for Node.js. It is a successor to the popular Bull library, and is built on top of Redis. If you've integrated BullMQ into your app, you can create a resource route that streams the progress of a running job to the client.
Create a new resource route at app/routes/jobs.$id.progress.tsx
This route will use the eventStream
utility from remix-utils
to create an event source that will listen for the progress
event from the job queue, and send the progress to the client.
import { processItemQueue } from "~/workers/processItem.server.ts"import { eventStream } from "remix-utils/sse/server"export async function loader({ request, params,}: LoaderFunctionArgs) { const id = params.id as string if (!id) { return new Response("Not found", { status: 404 }) } const job = await processItemQueue.getJob(id) if (!job) { return new Response("Not found", { status: 404 }) } return eventStream(request.signal, function setup(send) { job.isCompleted().then((completed) => { if (completed) { send({ event: "progress", data: String(100) }) } }) processItemQueue.events.addListener( "progress", onProgress, ) function onProgress({ jobId, data, }: { jobId: string data: number | object }) { if (jobId !== id) return send({ event: "progress", data: String(data) }) if (data === 100) { console.log("progress is 100, removing listener") processItemQueue.events.removeListener( "progress", onProgress, ) } } return function clear() { processItemQueue.events.removeListener( "progress", onProgress, ) } })}
In any route that needs to read this progress, you can use the useEventSource
hook to subscribe to the event source.
This is like useState
but will automatically update the state when the event source sends a new event.
import { useEventSource } from "remix-utils/sse/react"export default function Route() { const jobId = "123" const progress = useEventSource( `/jobs/${jobId}/progress`, { event: "progress", }, ) return ( <div> <h1>Job Progress</h1> <p>Progress: {progress}</p> </div> )}