Skip to content

Commit

Permalink
Implement BullMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
cyri113 committed May 21, 2023
1 parent 74d139f commit 34f0895
Show file tree
Hide file tree
Showing 17 changed files with 552 additions and 14 deletions.
330 changes: 319 additions & 11 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
},
"homepage": "https://github.com/TogetherCrew/typescript-service#readme",
"dependencies": {
"bullmq": "^3.13.4",
"env-cmd": "^10.1.0",
"express": "^4.18.2",
"joi": "^17.9.2",
Expand Down
6 changes: 3 additions & 3 deletions src/app.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import express, { type Express, type Request, type Response } from "express";
import postsRoute from "./routes/posts.route";
import routes from "./routes";

const app: Express = express();

app.use("/posts", postsRoute);
app.use(express.json())
app.use(routes)

app.get("/", (req: Request, res: Response) => {
res.send("Express + TypeScript Server");
Expand Down
27 changes: 27 additions & 0 deletions src/controllers/jobs/createJob.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { type Request, type Response } from "express";
import { type Queue } from "bullmq";
import { queueByName } from "../../queues";

/* eslint-disable @typescript-eslint/explicit-function-return-type */
const createJob = async function (req: Request, res: Response) {
try {
const { type, data } = req.body; // Assuming you send the job type and data in the request body

let queue: Queue
try {
queue = queueByName(type)
} catch (error) {
console.error(error)
return res.status(400).json({ error });
}

const job = await queue.add(type, data); // Add the job to the appropriate queue

return res.json({ jobId: job.id });
} catch (error) {
console.error('Error creating job:', error);
return res.status(500).json({ error: 'Failed to create job' });
}
}

export default createJob
38 changes: 38 additions & 0 deletions src/controllers/jobs/getJob.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* eslint-disable @typescript-eslint/explicit-function-return-type */
import { type Request, type Response } from "express";
import { Job, type Queue } from "bullmq";
import { queueByName } from "../../queues";

const getJob = async function (req: Request, res: Response) {
try {
const { type, jobId } = req.params; // Assuming the jobId is passed as a route parameter

let queue: Queue
try {
queue = queueByName(type)
} catch (error) {
console.error(error)
return res.status(400).json({ error });
}

// Fetch the job by its id
const job = await Job.fromId(queue, jobId);

if (job == null) {
return res.status(404).json({ error: 'Job not found' });
}

return res.json({
id: job.id,
name: job.name,
status: await job.getState(),
progress: job.progress,
data: job.data,
});
} catch (error) {
console.error('Error getting job status:', error);
return res.status(500).json({ error: 'Failed to get job status' });
}
}

export default getJob
7 changes: 7 additions & 0 deletions src/controllers/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import createJob from "./createJob.controller";
import getJob from "./getJob.controller";

export {
createJob,
getJob
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import mongoose from "mongoose";
import app from "./app";
import { env } from "./config";
import './workers'

mongoose.set("strictQuery", true);

Expand Down
9 changes: 9 additions & 0 deletions src/queues/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { type RedisOptions } from "bullmq";
import { env } from "../config";

const connection: RedisOptions = {
host: env.REDIS_QUEUE_HOST,
port: env.REDIS_QUEUE_PORT
}

export default connection
6 changes: 6 additions & 0 deletions src/queues/email.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Queue } from "bullmq";
import connection from "./connection";

const emailQueue = new Queue('email', { connection })

export default emailQueue;
6 changes: 6 additions & 0 deletions src/queues/imageProcessing.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Queue } from "bullmq";
import connection from "./connection";

const imageProcessingQueue = new Queue('imageProcessing', { connection })

export default imageProcessingQueue;
9 changes: 9 additions & 0 deletions src/queues/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import emailQueue from "./email.queue";
import imageProcessingQueue from "./imageProcessing.queue"
import queueByName from "./queueByName";

export {
emailQueue,
imageProcessingQueue,
queueByName
}
16 changes: 16 additions & 0 deletions src/queues/queueByName.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { type Queue } from "bullmq";
import emailQueue from "./email.queue";
import imageProcessingQueue from "./imageProcessing.queue";

const queueByName = (name: string): Queue => {
switch (name) {
case emailQueue.name:
return emailQueue;
case imageProcessingQueue.name:
return imageProcessingQueue;
default:
throw new Error(`No Queue called ${name}`)
}
}

export default queueByName
10 changes: 10 additions & 0 deletions src/routes/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import express from "express";
import jobsRoute from "./jobs.route"
import postsRoute from "./posts.route"

const router = express.Router();

router.use("/jobs", jobsRoute)
router.use("/posts", postsRoute)

export default router
10 changes: 10 additions & 0 deletions src/routes/jobs.route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import express from "express";
import { createJob, getJob } from "../controllers/jobs";

const router = express.Router();

router.post("/", createJob);
router.get("/:type/:jobId", getJob)

export default router;
42 changes: 42 additions & 0 deletions src/workers/emailWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import { Worker } from 'bullmq';
import { emailQueue } from '../queues'; // Assuming you have already defined the emailQueue
import connection from '../queues/connection';

// Create a new BullMQ worker instance for emailQueue
const emailWorker = new Worker(emailQueue.name, async (job) => {
// Define the processing logic for email jobs
const { to, subject, body } = job.data; // Assuming the job data contains the recipient, subject, and body of the email

try {
// Perform email processing logic here
console.log(`Sending email to ${to}: ${subject}`);
console.log('Body:', body);

// Simulating email sending time
await new Promise((resolve) => setTimeout(resolve, 2000));

console.log('Email sent successfully');
} catch (error) {
console.error('Error processing email:', error);
throw new Error('Failed to process email'); // Throw an error if email processing fails
}
}, { connection });

emailWorker.on('failed', (job, err) => {
if (job !== undefined) {
console.error(`Job ${job.id} failed with error:`, err);
} else {
console.error('The job was undefined.')
}
});

emailWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed successfully`);
});

emailWorker.on('error', (error) => {
console.error('Worker error:', error);
});

export default emailWorker
41 changes: 41 additions & 0 deletions src/workers/imageProcessingWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import { Worker } from 'bullmq';
import { imageProcessingQueue } from '../queues'; // Assuming you have already defined the imageProcessingQueue
import connection from '../queues/connection';

// Create a new BullMQ worker instance for imageProcessingQueue
const imageProcessingWorker = new Worker(imageProcessingQueue.name, async (job) => {
// Define the processing logic for image processing jobs
const { imageUrl } = job.data; // Assuming the job data contains the URL of the image to be processed

try {
// Perform image processing logic here
console.log(`Processing image: ${imageUrl}`);

// Simulating image processing time
await new Promise((resolve) => setTimeout(resolve, 3000));

console.log('Image processing completed');
} catch (error) {
console.error('Error processing image:', error);
throw new Error('Failed to process image'); // Throw an error if image processing fails
}
}, { connection });

imageProcessingWorker.on('failed', (job, err) => {
if (job !== undefined) {
console.error(`Job ${job.id} failed with error:`, err);
} else {
console.error('The job was undefined.')
}
});

imageProcessingWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed successfully`);
});

imageProcessingWorker.on('error', (error) => {
console.error('Worker error:', error);
});

export default imageProcessingWorker
7 changes: 7 additions & 0 deletions src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import emailWorker from './emailWorker'
import imageProcessingWorker from './imageProcessingWorker'

export {
emailWorker,
imageProcessingWorker
}

0 comments on commit 34f0895

Please sign in to comment.