Skip to content
This repository has been archived by the owner on Apr 10, 2020. It is now read-only.

Commit

Permalink
feat(1319): add scheduler mode (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
minzcmu authored Nov 15, 2018
1 parent 14a3d94 commit 5754107
Show file tree
Hide file tree
Showing 11 changed files with 6,066 additions and 9 deletions.
14 changes: 14 additions & 0 deletions Dockerfile.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM node:8

# Create our application directory
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

COPY package.json /usr/src/app/
RUN npm install --production

# Copy everything else
COPY . /usr/src/app

# Run the service
CMD [ "npm", "start" ]
23 changes: 23 additions & 0 deletions config/custom-environment-variables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,26 @@ plugins:
blockTimeout: PLUGIN_BLOCKEDBY_BLOCK_TIMEOUT
# job block by itself
blockedBySelf: PLUGIN_BLOCKEDBY_BLOCKED_BY_SELF

# Run queue-worker as a scheduler, instead of calling executor to start/stop builds, push it to rabbitmq
scheduler:
# Enabled schduler mode or not
enabled: SCHEDULER_ENABLED
# To enable schduler mode, you need rabbitmq server and consumer
rabbitmq:
# Host of rabbitmq cluster
host: RABBITMQ_HOST
# Port of rabbitmq cluster
port: RABBITMQ_PORT
# User to push to rabbitmq
username: RABBITMQ_USERNAME
# Password to connect to rabbitmq cluster
password: RABBITMQ_PASSWORD
# Protocol for rabbitmq server, use amqps for ssl
protocol: RABBITMQ_PROTOCOL
# Exchange / router name for rabbitmq
exchange: RABBITMQ_EXCHANGE
# Exchange type for rabbitmq
exchangeType: RABBITMQ_EXCHANGE_TYPE
# Virtual host to connect to
vhost: RABBITMQ_VHOST
24 changes: 24 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,27 @@ plugins:
blockTimeout: 120
# job blocked by itself
blockedBySelf: true

# Run queue-worker as a scheduler, instead of calling executor to start/stop builds, push it to rabbitmq
scheduler:
# Enabled schduler mode or not
enabled: false
# To enable schduler mode, you need rabbitmq server and consumer
rabbitmq:
# Host of rabbitmq cluster
host: 127.0.0.1
# Port of rabbitmq cluster
port: 5672
# User to push to rabbitmq
username: sd-buidbot
# Password to connect to rabbitmq cluster
password: fakepassword
# Protocol for rabbitmq server, use amqps for ssl
protocol: amqp
# Exchange / router name for rabbitmq
exchange: build
# Exchange type for rabbitmq
exchangeType: topic
# Virtual host to connect to
vhost: /screwdriver

24 changes: 24 additions & 0 deletions config/rabbitmq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const config = require('config');

const rabbitmqConfig = config.get('scheduler').rabbitmq;
const { protocol, username, password, host, port, exchange, exchangeType, vhost } = rabbitmqConfig;
const amqpURI = `${protocol}://${username}:${password}@${host}:${port}${vhost}`;
const schedulerMode = config.get('scheduler').enabled;

/**
* get configurations for rabbitmq
* @method getConfig
* @return {Object}
*/
function getConfig() {
return {
schedulerMode,
amqpURI,
exchange,
exchangeType
};
}

module.exports = { getConfig };
67 changes: 67 additions & 0 deletions lib/Filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';

const NodeResque = require('node-resque');
const { queuePrefix } = require('../config/redis');
const rabbitmqConf = require('../config/rabbitmq');

class Filter extends NodeResque.Plugin {
/**
* Construct a new Filter plugin
* @method constructor
*/
constructor(worker, func, queue, job, args, options) {
super(worker, func, queue, job, args, options);

this.name = 'Filter';
}

/**
* Checks if the job belongs to this queue-worker
* If no, re-enqueue.
* @method beforePerform
* @return {Promise}
*/
async beforePerform() {
const { buildId } = this.args[0];
const buildConfig = await this.queueObject
.connection.redis.hget(`${queuePrefix}buildConfigs`, buildId).then(JSON.parse);

// if schedulerMode enabled, don't take anything without buildClusterName
if (rabbitmqConf.getConfig().schedulerMode) {
if (!buildConfig.buildClusterName) {
await this.reEnqueue();

return false;
}

return true;
}

if (buildConfig.buildClusterName) {
await this.reEnqueue();

return false;
}

return true;
}

/**
* Re-enqueue job if it doesn't belong to this queue worker
* @method reEnqueue
* @return {Promise}
*/
async reEnqueue() {
await this.queueObject.enqueueIn(this.reenqueueTimeout(), this.queue, this.func, this.args);
}

reenqueueTimeout() {
if (this.options.enqueueTimeout) {
return this.options.enqueueTimeout;
}

return 1000; // in ms
}
}

exports.Filter = Filter;
66 changes: 62 additions & 4 deletions lib/jobs.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
'use strict';

const amqplib = require('amqplib');
const Redis = require('ioredis');
const config = require('config');
const winston = require('winston');
const hoek = require('hoek');
const BlockedBy = require('./BlockedBy').BlockedBy;
const Filter = require('./Filter').Filter;
const blockedByConfig = config.get('plugins').blockedBy;
const ExecutorRouter = require('screwdriver-executor-router');
const { connectionDetails, queuePrefix, runningJobsPrefix, waitingJobsPrefix }
= require('../config/redis');
const rabbitmqConf = require('../config/rabbitmq');
const { amqpURI, exchange, exchangeType } = rabbitmqConf.getConfig();

const RETRY_LIMIT = 3;
// This is in milliseconds, reference: https://github.com/taskrabbit/node-resque/blob/master/lib/plugins/Retry.js#L12
Expand Down Expand Up @@ -45,6 +49,56 @@ const blockedByOptions = {

blockedBySelf: blockedByConfig.blockedBySelf
};
let rabbitmqConn;

/**
* Get Rabbitmq connection, if it exists reuse it, otherwise create it
* @method getRabbitmqConn
* @return {Promise}
*/
async function getRabbitmqConn() {
if (rabbitmqConn) {
return rabbitmqConn;
}

rabbitmqConn = await amqplib.connect(amqpURI);

return rabbitmqConn;
}

/**
* Schedule a job based on mode
* @method schedule
* @param {String} job job name, either start or stop
* @param {Object} buildConfig build config
* @return {Promise}
*/
function schedule(job, buildConfig) {
const buildCluster = buildConfig.buildClusterName;

delete buildConfig.buildClusterName;

if (rabbitmqConf.getConfig().schedulerMode) {
return getRabbitmqConn().then(conn => conn.createChannel())
.then((ch) => {
const msg = JSON.stringify({
job,
buildConfig
});

// ugly way of doing finally to close the channel
return ch.assertExchange(exchange, exchangeType)
.then(() => ch.publish(
exchange,
buildCluster,
Buffer.from(msg))
.then(() => ch.close()))
.catch(err => ch.close().then(() => Promise.reject(err)));
});
}

return executor[job](buildConfig);
}

/**
* Call executor.start with the buildConfig obtained from the redis database
Expand All @@ -57,7 +111,7 @@ const blockedByOptions = {
*/
function start(buildConfig) {
return redis.hget(`${queuePrefix}buildConfigs`, buildConfig.buildId)
.then(fullBuildConfig => executor.start(JSON.parse(fullBuildConfig)))
.then(fullBuildConfig => schedule('start', JSON.parse(fullBuildConfig)))
.catch((err) => {
winston.error('err in start job: ', err);

Expand Down Expand Up @@ -88,6 +142,10 @@ function stop(buildConfig) {
if (parsedConfig && parsedConfig.annotations) {
stopConfig.annotations = parsedConfig.annotations;
}

if (parsedConfig && parsedConfig.buildClusterName) {
stopConfig.buildClusterName = parsedConfig.buildClusterName;
}
})
.catch((err) => {
winston.error(`[Stop Build] failed to get config for build ${buildId}: ${err.message}`);
Expand All @@ -104,20 +162,20 @@ function stop(buildConfig) {
})
// If this is a waiting job
.then(() => redis.lrem(`${waitingJobsPrefix}${jobId}`, 0, buildId))
.then(() => (started ? executor.stop(stopConfig) : null));
.then(() => (started ? schedule('stop', stopConfig) : null));
}

module.exports = {
start: {
plugins: ['Retry', BlockedBy],
plugins: [Filter, 'Retry', BlockedBy],
pluginOptions: {
Retry: retryOptions,
BlockedBy: blockedByOptions
},
perform: start
},
stop: {
plugins: ['Retry'], // stop shouldn't use blockedBy
plugins: [Filter, 'Retry'], // stop shouldn't use blockedBy
pluginOptions: {
Retry: retryOptions
},
Expand Down
Loading

0 comments on commit 5754107

Please sign in to comment.