Skip to content

Commit a06ef44

Browse files
committedFeb 22, 2024·
Add connection-update-secret
1 parent bbe579e commit a06ef44

6 files changed

+658
-622
lines changed
 

‎Makefile

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
RABBITMQ_SRC_VERSION=rabbitmq_v3_2_1
1+
RABBITMQ_SRC_VERSION=v3.12.13
22
JSON=amqp-rabbitmq-0.9.1.json
3-
RABBITMQ_CODEGEN=https://raw.githubusercontent.com/rabbitmq/rabbitmq-codegen
4-
AMQP_JSON=$(RABBITMQ_CODEGEN)/$(RABBITMQ_SRC_VERSION)/$(JSON)
3+
AMQP_JSON=https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/$(RABBITMQ_SRC_VERSION)/deps/rabbitmq_codegen/$(JSON)
54

65
NODEJS_VERSIONS='10.21' '11.15' '12.18' '13.14' '14.5' '15.8' '16.3.0' '18.1.0' '20.10.0'
76

‎lib/callback_model.js

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ class CallbackModel extends EventEmitter {
2424
this.connection.close(cb);
2525
}
2626

27+
updateSecret(newSecret, reason, cb) {
28+
this.connection._updateSecret(newSecret, reason, cb);
29+
}
30+
2731
createChannel (cb) {
2832
var ch = new Channel(this.connection);
2933
ch.open(function (err, ok) {

‎lib/channel_model.js

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ class ChannelModel extends EventEmitter {
2626
return promisify(this.connection.close.bind(this.connection))();
2727
}
2828

29+
updateSecret(newSecret, reason) {
30+
return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
31+
}
32+
2933
async createChannel() {
3034
const channel = new Channel(this.connection);
3135
await channel.open();

‎lib/connection.js

+11
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,14 @@ class Connection extends EventEmitter {
361361
this.emit('close', maybeErr);
362362
}
363363

364+
_updateSecret(newSecret, reason, cb) {
365+
this.sendMethod(0, defs.ConnectionUpdateSecret, {
366+
newSecret,
367+
reason
368+
});
369+
this.on('update-secret-ok', cb);
370+
}
371+
364372
// ===
365373
startHeartbeater () {
366374
if (this.heartbeat === 0)
@@ -611,6 +619,9 @@ function channel0(connection) {
611619
else if (f.id === defs.ConnectionUnblocked) {
612620
connection.emit('unblocked');
613621
}
622+
else if (f.id === defs.ConnectionUpdateSecretOk) {
623+
connection.emit('update-secret-ok');
624+
}
614625
else {
615626
connection.closeWithError(
616627
fmt("Unexpected frame on channel 0"),

‎test/callback_api.js

+182-174
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,18 @@ function waitForMessages(ch, q, k) {
5757

5858
suite('connect', function() {
5959

60-
test('at all', function(done) {
61-
connect(doneCallback(done));
60+
test('at all', function(done) {
61+
connect(doneCallback(done));
62+
});
63+
6264
});
6365

66+
suite('updateSecret', function() {
67+
test('updateSecret', function(done) {
68+
connect(kCallback(function(c) {
69+
c.updateSecret(Buffer.from('new secret'), 'no reason', doneCallback(done));
70+
}));
71+
});
6472
});
6573

6674
function channel_test_fn(method) {
@@ -79,234 +87,234 @@ var confirm_channel_test = channel_test_fn('createConfirmChannel');
7987

8088
suite('channel open', function() {
8189

82-
channel_test('at all', function(ch, done) {
83-
done();
84-
});
90+
channel_test('at all', function(ch, done) {
91+
done();
92+
});
8593

86-
channel_test('open and close', function(ch, done) {
87-
ch.close(doneCallback(done));
88-
});
94+
channel_test('open and close', function(ch, done) {
95+
ch.close(doneCallback(done));
96+
});
8997

9098
});
9199

92100
suite('assert, check, delete', function() {
93101

94-
channel_test('assert, check, delete queue', function(ch, done) {
95-
ch.assertQueue('test.cb.queue', {}, kCallback(function(q) {
96-
ch.checkQueue('test.cb.queue', kCallback(function(ok) {
97-
ch.deleteQueue('test.cb.queue', {}, doneCallback(done));
102+
channel_test('assert, check, delete queue', function(ch, done) {
103+
ch.assertQueue('test.cb.queue', {}, kCallback(function(q) {
104+
ch.checkQueue('test.cb.queue', kCallback(function(ok) {
105+
ch.deleteQueue('test.cb.queue', {}, doneCallback(done));
106+
}, done));
98107
}, done));
99-
}, done));
100-
});
108+
});
101109

102-
channel_test('assert, check, delete exchange', function(ch, done) {
103-
ch.assertExchange(
104-
'test.cb.exchange', 'topic', {}, kCallback(function(ex) {
105-
ch.checkExchange('test.cb.exchange', kCallback(function(ok) {
106-
ch.deleteExchange('test.cb.exchange', {}, doneCallback(done));
110+
channel_test('assert, check, delete exchange', function(ch, done) {
111+
ch.assertExchange(
112+
'test.cb.exchange', 'topic', {}, kCallback(function(ex) {
113+
ch.checkExchange('test.cb.exchange', kCallback(function(ok) {
114+
ch.deleteExchange('test.cb.exchange', {}, doneCallback(done));
115+
}, done));
107116
}, done));
108-
}, done));
109-
});
117+
});
110118

111-
channel_test('fail on check non-queue', function(ch, done) {
112-
var both = twice(done);
113-
ch.on('error', failCallback(both.first));
114-
ch.checkQueue('test.cb.nothere', failCallback(both.second));
115-
});
119+
channel_test('fail on check non-queue', function(ch, done) {
120+
var both = twice(done);
121+
ch.on('error', failCallback(both.first));
122+
ch.checkQueue('test.cb.nothere', failCallback(both.second));
123+
});
116124

117-
channel_test('fail on check non-exchange', function(ch, done) {
118-
var both = twice(done);
119-
ch.on('error', failCallback(both.first));
120-
ch.checkExchange('test.cb.nothere', failCallback(both.second));
121-
});
125+
channel_test('fail on check non-exchange', function(ch, done) {
126+
var both = twice(done);
127+
ch.on('error', failCallback(both.first));
128+
ch.checkExchange('test.cb.nothere', failCallback(both.second));
129+
});
122130

123131
});
124132

125133
suite('bindings', function() {
126134

127-
channel_test('bind queue', function(ch, done) {
128-
ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) {
129-
ch.assertExchange(
130-
'test.cb.bindex', 'fanout', {}, kCallback(function(ex) {
131-
ch.bindQueue(q.queue, ex.exchange, '', {},
132-
doneCallback(done));
133-
}, done));
134-
}, done));
135-
});
136-
137-
channel_test('bind exchange', function(ch, done) {
138-
ch.assertExchange(
139-
'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) {
135+
channel_test('bind queue', function(ch, done) {
136+
ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) {
140137
ch.assertExchange(
141-
'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) {
142-
ch.bindExchange(ex1.exchange,
143-
ex2.exchange, '', {},
144-
doneCallback(done));
138+
'test.cb.bindex', 'fanout', {}, kCallback(function(ex) {
139+
ch.bindQueue(q.queue, ex.exchange, '', {},
140+
doneCallback(done));
145141
}, done));
146142
}, done));
147-
});
143+
});
144+
145+
channel_test('bind exchange', function(ch, done) {
146+
ch.assertExchange(
147+
'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) {
148+
ch.assertExchange(
149+
'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) {
150+
ch.bindExchange(ex1.exchange,
151+
ex2.exchange, '', {},
152+
doneCallback(done));
153+
}, done));
154+
}, done));
155+
});
148156

149157
});
150158

151159
suite('sending messages', function() {
152160

153-
channel_test('send to queue and consume noAck', function(ch, done) {
154-
var msg = randomString();
155-
ch.assertQueue('', {exclusive: true}, function(e, q) {
156-
if (e !== null) return done(e);
157-
ch.consume(q.queue, function(m) {
158-
if (m.content.toString() == msg) done();
159-
else done(new Error("message content doesn't match:" +
160-
msg + " =/= " + m.content.toString()));
161-
}, {noAck: true, exclusive: true});
162-
ch.sendToQueue(q.queue, Buffer.from(msg));
161+
channel_test('send to queue and consume noAck', function(ch, done) {
162+
var msg = randomString();
163+
ch.assertQueue('', {exclusive: true}, function(e, q) {
164+
if (e !== null) return done(e);
165+
ch.consume(q.queue, function(m) {
166+
if (m.content.toString() == msg) done();
167+
else done(new Error("message content doesn't match:" +
168+
msg + " =/= " + m.content.toString()));
169+
}, {noAck: true, exclusive: true});
170+
ch.sendToQueue(q.queue, Buffer.from(msg));
171+
});
163172
});
164-
});
165173

166-
channel_test('send to queue and consume ack', function(ch, done) {
167-
var msg = randomString();
168-
ch.assertQueue('', {exclusive: true}, function(e, q) {
169-
if (e !== null) return done(e);
170-
ch.consume(q.queue, function(m) {
171-
if (m.content.toString() == msg) {
172-
ch.ack(m);
173-
done();
174-
}
175-
else done(new Error("message content doesn't match:" +
176-
msg + " =/= " + m.content.toString()));
177-
}, {noAck: false, exclusive: true});
178-
ch.sendToQueue(q.queue, Buffer.from(msg));
174+
channel_test('send to queue and consume ack', function(ch, done) {
175+
var msg = randomString();
176+
ch.assertQueue('', {exclusive: true}, function(e, q) {
177+
if (e !== null) return done(e);
178+
ch.consume(q.queue, function(m) {
179+
if (m.content.toString() == msg) {
180+
ch.ack(m);
181+
done();
182+
}
183+
else done(new Error("message content doesn't match:" +
184+
msg + " =/= " + m.content.toString()));
185+
}, {noAck: false, exclusive: true});
186+
ch.sendToQueue(q.queue, Buffer.from(msg));
187+
});
179188
});
180-
});
181189

182-
channel_test('send to and get from queue', function(ch, done) {
183-
ch.assertQueue('', {exclusive: true}, function(e, q) {
184-
if (e != null) return done(e);
185-
var msg = randomString();
186-
ch.sendToQueue(q.queue, Buffer.from(msg));
187-
waitForMessages(ch, q.queue, function(e, _) {
190+
channel_test('send to and get from queue', function(ch, done) {
191+
ch.assertQueue('', {exclusive: true}, function(e, q) {
188192
if (e != null) return done(e);
189-
ch.get(q.queue, {noAck: true}, function(e, m) {
190-
if (e != null)
191-
return done(e);
192-
else if (!m)
193-
return done(new Error('Empty (false) not expected'));
194-
else if (m.content.toString() == msg)
195-
return done();
196-
else
197-
return done(
198-
new Error('Messages do not match: ' +
199-
msg + ' =/= ' + m.content.toString()));
193+
var msg = randomString();
194+
ch.sendToQueue(q.queue, Buffer.from(msg));
195+
waitForMessages(ch, q.queue, function(e, _) {
196+
if (e != null) return done(e);
197+
ch.get(q.queue, {noAck: true}, function(e, m) {
198+
if (e != null)
199+
return done(e);
200+
else if (!m)
201+
return done(new Error('Empty (false) not expected'));
202+
else if (m.content.toString() == msg)
203+
return done();
204+
else
205+
return done(
206+
new Error('Messages do not match: ' +
207+
msg + ' =/= ' + m.content.toString()));
208+
});
200209
});
201210
});
202211
});
203-
});
204212

205213
});
206214

207215
suite('ConfirmChannel', function() {
208216

209-
confirm_channel_test('Receive confirmation', function(ch, done) {
210-
// An unroutable message, on the basis that you're not allowed a
211-
// queue with an empty name, and you can't make bindings to the
212-
// default exchange. Tricky eh?
213-
ch.publish('', '', Buffer.from('foo'), {}, done);
214-
});
217+
confirm_channel_test('Receive confirmation', function(ch, done) {
218+
// An unroutable message, on the basis that you're not allowed a
219+
// queue with an empty name, and you can't make bindings to the
220+
// default exchange. Tricky eh?
221+
ch.publish('', '', Buffer.from('foo'), {}, done);
222+
});
215223

216-
confirm_channel_test('Wait for confirms', function(ch, done) {
217-
for (var i=0; i < 1000; i++) {
218-
ch.publish('', '', Buffer.from('foo'), {});
219-
}
220-
ch.waitForConfirms(done);
221-
});
224+
confirm_channel_test('Wait for confirms', function(ch, done) {
225+
for (var i=0; i < 1000; i++) {
226+
ch.publish('', '', Buffer.from('foo'), {});
227+
}
228+
ch.waitForConfirms(done);
229+
});
222230

223231
});
224232

225233
suite("Error handling", function() {
226234

227-
/*
228-
I don't like having to do this, but there appears to be something
229-
broken about domains in Node.JS v0.8 and mocha. Apparently it has to
230-
do with how mocha and domains hook into error propogation:
231-
https://github.com/visionmedia/mocha/issues/513 (summary: domains in
232-
Node.JS v0.8 don't prevent uncaughtException from firing, and that's
233-
what mocha uses to detect .. an uncaught exception).
234-
235-
Using domains with amqplib *does* work in practice in Node.JS v0.8:
236-
that is, it's possible to throw an exception in a callback and deal
237-
with it in the active domain, and thereby avoid it crashing the
238-
program.
239-
*/
240-
if (util.versionGreaterThan(process.versions.node, '0.8')) {
241-
test('Throw error in connection open callback', function(done) {
242-
var dom = domain.createDomain();
243-
dom.on('error', failCallback(done));
244-
connect(dom.bind(function(err, conn) {
245-
throw new Error('Spurious connection open callback error');
246-
}));
247-
});
248-
}
235+
/*
236+
I don't like having to do this, but there appears to be something
237+
broken about domains in Node.JS v0.8 and mocha. Apparently it has to
238+
do with how mocha and domains hook into error propogation:
239+
https://github.com/visionmedia/mocha/issues/513 (summary: domains in
240+
Node.JS v0.8 don't prevent uncaughtException from firing, and that's
241+
what mocha uses to detect .. an uncaught exception).
242+
243+
Using domains with amqplib *does* work in practice in Node.JS v0.8:
244+
that is, it's possible to throw an exception in a callback and deal
245+
with it in the active domain, and thereby avoid it crashing the
246+
program.
247+
*/
248+
if (util.versionGreaterThan(process.versions.node, '0.8')) {
249+
test('Throw error in connection open callback', function(done) {
250+
var dom = domain.createDomain();
251+
dom.on('error', failCallback(done));
252+
connect(dom.bind(function(err, conn) {
253+
throw new Error('Spurious connection open callback error');
254+
}));
255+
});
256+
}
249257

250-
// TODO: refactor {error_test, channel_test}
251-
function error_test(name, fun) {
252-
test(name, function(done) {
253-
var dom = domain.createDomain();
254-
dom.run(function() {
255-
connect(kCallback(function(c) {
256-
// Seems like there were some unironed wrinkles in 0.8's
257-
// implementation of domains; explicitly adding the connection
258-
// to the domain makes sure any exception thrown in the course
259-
// of processing frames is handled by the domain. For other
260-
// versions of Node.JS, this ends up being belt-and-braces.
261-
dom.add(c);
262-
c.createChannel(kCallback(function(ch) {
263-
fun(ch, done, dom);
258+
// TODO: refactor {error_test, channel_test}
259+
function error_test(name, fun) {
260+
test(name, function(done) {
261+
var dom = domain.createDomain();
262+
dom.run(function() {
263+
connect(kCallback(function(c) {
264+
// Seems like there were some unironed wrinkles in 0.8's
265+
// implementation of domains; explicitly adding the connection
266+
// to the domain makes sure any exception thrown in the course
267+
// of processing frames is handled by the domain. For other
268+
// versions of Node.JS, this ends up being belt-and-braces.
269+
dom.add(c);
270+
c.createChannel(kCallback(function(ch) {
271+
fun(ch, done, dom);
272+
}, done));
264273
}, done));
265-
}, done));
274+
});
266275
});
267-
});
268-
}
269-
270-
error_test('Channel open callback throws an error', function(ch, done, dom) {
271-
dom.on('error', failCallback(done));
272-
throw new Error('Error in open callback');
273-
});
276+
}
274277

275-
error_test('RPC callback throws error', function(ch, done, dom) {
276-
dom.on('error', failCallback(done));
277-
ch.prefetch(0, false, function(err, ok) {
278-
throw new Error('Spurious callback error');
278+
error_test('Channel open callback throws an error', function(ch, done, dom) {
279+
dom.on('error', failCallback(done));
280+
throw new Error('Error in open callback');
279281
});
280-
});
281282

282-
error_test('Get callback throws error', function(ch, done, dom) {
283-
dom.on('error', failCallback(done));
284-
ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) {
285-
ch.get('test.cb.get-with-error', {noAck: true}, function() {
283+
error_test('RPC callback throws error', function(ch, done, dom) {
284+
dom.on('error', failCallback(done));
285+
ch.prefetch(0, false, function(err, ok) {
286286
throw new Error('Spurious callback error');
287287
});
288288
});
289-
});
290289

291-
error_test('Consume callback throws error', function(ch, done, dom) {
292-
dom.on('error', failCallback(done));
293-
ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) {
294-
ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() {
295-
throw new Error('Spurious callback error');
290+
error_test('Get callback throws error', function(ch, done, dom) {
291+
dom.on('error', failCallback(done));
292+
ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) {
293+
ch.get('test.cb.get-with-error', {noAck: true}, function() {
294+
throw new Error('Spurious callback error');
295+
});
296296
});
297297
});
298-
});
299298

300-
error_test('Get from non-queue invokes error k', function(ch, done, dom) {
301-
var both = twice(failCallback(done));
302-
dom.on('error', both.first);
303-
ch.get('', {}, both.second);
304-
});
299+
error_test('Consume callback throws error', function(ch, done, dom) {
300+
dom.on('error', failCallback(done));
301+
ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) {
302+
ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() {
303+
throw new Error('Spurious callback error');
304+
});
305+
});
306+
});
305307

306-
error_test('Consume from non-queue invokes error k', function(ch, done, dom) {
307-
var both = twice(failCallback(done));
308-
dom.on('error', both.first);
309-
ch.consume('', both.second);
310-
});
308+
error_test('Get from non-queue invokes error k', function(ch, done, dom) {
309+
var both = twice(failCallback(done));
310+
dom.on('error', both.first);
311+
ch.get('', {}, both.second);
312+
});
313+
314+
error_test('Consume from non-queue invokes error k', function(ch, done, dom) {
315+
var both = twice(failCallback(done));
316+
dom.on('error', both.first);
317+
ch.consume('', both.second);
318+
});
311319

312320
});

‎test/channel_api.js

+455-445
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.