Skip to content

Commit 7a19075

Browse files
authoredJun 4, 2020
feat: add support for Redis Cluster
The 'numsub' command only returns the number of subscribers on the current node, so we have to ask every node of the cluster. Fixes #267 Fixes #210
1 parent 597a8d1 commit 7a19075

File tree

1 file changed

+47
-24
lines changed

1 file changed

+47
-24
lines changed
 

‎index.js

+47-24
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,41 @@ function adapter(uri, opts) {
416416
Adapter.prototype.broadcast.call(this, packet, opts);
417417
};
418418

419+
420+
/**
421+
* Get the number of subscribers of a channel
422+
*
423+
* @param {String} channel
424+
*/
425+
426+
function getNumSub(channel){
427+
if(pub.constructor.name != 'Cluster'){
428+
// RedisClient or Redis
429+
return new Promise(function(resolve,reject) {
430+
pub.send_command('pubsub', ['numsub', channel], function(err, numsub){
431+
if (err) return reject(err);
432+
resolve(parseInt(numsub[1], 10));
433+
});
434+
})
435+
}else{
436+
// Cluster
437+
var nodes = pub.nodes();
438+
return Promise.all(
439+
nodes.map(function(node) {
440+
return node.send_command('pubsub', ['numsub', channel]);
441+
})
442+
).then(function(values) {
443+
var numsub = 0;
444+
values.map(function(value){
445+
numsub += parseInt(value[1], 10);
446+
})
447+
return numsub;
448+
}).catch(function(err){
449+
throw err;
450+
});
451+
}
452+
}
453+
419454
/**
420455
* Gets a list of clients by sid.
421456
*
@@ -435,14 +470,7 @@ function adapter(uri, opts) {
435470
var self = this;
436471
var requestid = uid2(6);
437472

438-
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
439-
if (err) {
440-
self.emit('error', err);
441-
if (fn) fn(err);
442-
return;
443-
}
444-
445-
numsub = parseInt(numsub[1], 10);
473+
getNumSub(self.requestChannel).then(numsub => {
446474
debug('waiting for %d responses to "clients" request', numsub);
447475

448476
var request = JSON.stringify({
@@ -468,6 +496,9 @@ function adapter(uri, opts) {
468496
};
469497

470498
pub.publish(self.requestChannel, request);
499+
}).catch(err => {
500+
self.emit('error', err);
501+
if (fn) fn(err);
471502
});
472503
};
473504

@@ -524,14 +555,7 @@ function adapter(uri, opts) {
524555
var self = this;
525556
var requestid = uid2(6);
526557

527-
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
528-
if (err) {
529-
self.emit('error', err);
530-
if (fn) fn(err);
531-
return;
532-
}
533-
534-
numsub = parseInt(numsub[1], 10);
558+
getNumSub(self.requestChannel).then(numsub => {
535559
debug('waiting for %d responses to "allRooms" request', numsub);
536560

537561
var request = JSON.stringify({
@@ -556,6 +580,9 @@ function adapter(uri, opts) {
556580
};
557581

558582
pub.publish(self.requestChannel, request);
583+
}).catch(err => {
584+
self.emit('error', err);
585+
if (fn) fn(err);
559586
});
560587
};
561588

@@ -700,14 +727,7 @@ function adapter(uri, opts) {
700727
var self = this;
701728
var requestid = uid2(6);
702729

703-
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
704-
if (err) {
705-
self.emit('error', err);
706-
if (fn) fn(err);
707-
return;
708-
}
709-
710-
numsub = parseInt(numsub[1], 10);
730+
getNumSub(self.requestChannel).then(numsub => {
711731
debug('waiting for %d responses to "customRequest" request', numsub);
712732

713733
var request = JSON.stringify({
@@ -733,6 +753,9 @@ function adapter(uri, opts) {
733753
};
734754

735755
pub.publish(self.requestChannel, request);
756+
}).catch(err => {
757+
self.emit('error', err);
758+
if (fn) fn(err);
736759
});
737760
};
738761

0 commit comments

Comments
 (0)
Please sign in to comment.