Skip to content

Commit

Permalink
Added saveUniqueJob to Queue#schedule waterfall functions
Browse files Browse the repository at this point in the history
  • Loading branch information
respinha committed Mar 16, 2018
1 parent 2beaa2c commit 8b11efd
Showing 1 changed file with 46 additions and 35 deletions.
81 changes: 46 additions & 35 deletions index.js
Expand Up @@ -60,12 +60,12 @@ function ensureUniqueJob(job, done) {
//assuming updated_at is in the past or now
// updated_at is a built-in from kue.
var timeSinceLastUpdate = now.getTime() - job.updated_at; // jshint ignore:line
var arbitraryThreshold = job.data.ttl + (job.data.ttl/2);
var arbitraryThreshold = job.data.ttl + (job.data.ttl / 2);
var isStaleJob =
(job.state() === 'active' &&
(job.state() === 'active' &&
timeSinceLastUpdate > arbitraryThreshold
);
if (isCompletedOrFailedJob|| isStaleJob) {
if (isCompletedOrFailedJob || isStaleJob) {
//resave job for next run
//
//NOTE!: We inactivate job to allow kue to queue the same job for next run.
Expand Down Expand Up @@ -286,8 +286,8 @@ Queue.prototype._getJobUUID = function (key) {

var splits = key.split(':');

splits = _.filter(splits, function(o) { return o !== ''; });
if(splits.length > 0){
splits = _.filter(splits, function (o) { return o !== ''; });
if (splits.length > 0) {
uuid = splits[splits.length - 1];
}

Expand Down Expand Up @@ -338,7 +338,7 @@ Queue.prototype._saveJobData = function (jobDataKey, jobData, done) {
//TODO make use of redis hash i.e redis.hmset(<key>, <data>);
this
._scheduler
.set(jobDataKey, JSON.stringify(jobData), function (error /*, response*/ ) {
.set(jobDataKey, JSON.stringify(jobData), function (error /*, response*/) {
done(error, jobData);
});
};
Expand Down Expand Up @@ -407,33 +407,33 @@ Queue.prototype._buildJob = function (jobDefinition, done) {
//this refer to kue Queue instance context

async.parallel({
isDefined: function (next) {
//is job definition provided
var isObject = _.isPlainObject(jobDefinition);
if (!isObject) {
next(new Error('Invalid job definition'));
} else {
next(null, true);
}
},
isValid: function (next) {
//check job for required attributes
//
//a valid job must have a type and
//associated data
var isValidJob = _.has(jobDefinition, 'type') &&
(
_.has(jobDefinition, 'data') &&
_.isPlainObject(jobDefinition.data)
);

if (!isValidJob) {
next(new Error('Missing job type or data'));
} else {
next(null, true);
}
isDefined: function (next) {
//is job definition provided
var isObject = _.isPlainObject(jobDefinition);
if (!isObject) {
next(new Error('Invalid job definition'));
} else {
next(null, true);
}
},
isValid: function (next) {
//check job for required attributes
//
//a valid job must have a type and
//associated data
var isValidJob = _.has(jobDefinition, 'type') &&
(
_.has(jobDefinition, 'data') &&
_.isPlainObject(jobDefinition.data)
);

if (!isValidJob) {
next(new Error('Missing job type or data'));
} else {
next(null, true);
}
}
},
function finish(error, validations) {
//is not well formatted job
//back-off
Expand Down Expand Up @@ -954,7 +954,18 @@ Queue.prototype.schedule = function (when, job, done) {

function ensureSingleUniqueJob(job, next) {
ensureUniqueJob(job, next);
}
},
function saveUniqueJob(job, next) {
// if a unique name is specified, save it with the job details
if (job.data && job.data.unique) {
const jobDataKey = this._getJobDataKey(job.data.unique);
this._saveJobData(jobDataKey, job, function (error) {
next(error, job);
});
} else {
next(null, job);
}
}.bind(this)

], function (error, job) {
//fire schedule error event
Expand Down Expand Up @@ -1086,7 +1097,7 @@ var shutdown = Queue.prototype.shutdown;
* @return {Queue} for chaining
* @api public
*/
Queue.prototype.shutdown = function ( /*fn, timeout, type*/ ) {
Queue.prototype.shutdown = function ( /*fn, timeout, type*/) {
//this refer to kue Queue instance context

//TODO ensure all client shutdown with waiting delay
Expand Down Expand Up @@ -1204,7 +1215,7 @@ kue.createQueue = function (options) {
*/
Queue.prototype.remove = Queue.prototype.removeJob = function (criteria, done) {
//normalize callback
done = done || function noop() {};
done = done || function noop() { };

//compute criteria and job instance
async.parallel({
Expand Down Expand Up @@ -1449,7 +1460,7 @@ Queue.prototype._getAllJobData = function (done) {
*/
Queue.prototype.restore = function (done) {
//ensure callback
done = _.isFunction(done) ? done : function () {};
done = _.isFunction(done) ? done : function () { };

//fetch all job data
this._getAllJobData(function (error, data) {
Expand Down

0 comments on commit 8b11efd

Please sign in to comment.