Skip to content

Commit 243aef8

Browse files
authoredOct 18, 2021
Merge pull request #635 from jimmywarting/feature/modernize-channel-model
Feature/modernize channel model
2 parents d7911a0 + c25440e commit 243aef8

File tree

1 file changed

+268
-287
lines changed

1 file changed

+268
-287
lines changed
 

‎lib/channel_model.js

+268-287
Original file line numberDiff line numberDiff line change
@@ -4,264 +4,259 @@
44

55
'use strict';
66

7-
var defs = require('./defs');
8-
var Promise = require('bluebird');
9-
var inherits = require('util').inherits;
10-
var EventEmitter = require('events').EventEmitter;
11-
var BaseChannel = require('./channel').BaseChannel;
12-
var acceptMessage = require('./channel').acceptMessage;
13-
var Args = require('./api_args');
14-
15-
function ChannelModel(connection) {
16-
if (!(this instanceof ChannelModel))
17-
return new ChannelModel(connection);
18-
EventEmitter.call( this );
19-
this.connection = connection;
20-
var self = this;
21-
['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) {
22-
connection.on(ev, self.emit.bind(self, ev));
23-
});
7+
const EventEmitter = require('events');
8+
const Promise = require('bluebird');
9+
const defs = require('./defs');
10+
const {BaseChannel} = require('./channel');
11+
const {acceptMessage} = require('./channel');
12+
const Args = require('./api_args');
13+
const {inspect} = require('./format');
14+
15+
class ChannelModel extends EventEmitter {
16+
constructor(connection) {
17+
super();
18+
this.connection = connection;
19+
20+
['error', 'close', 'blocked', 'unblocked'].forEach(ev => {
21+
connection.on(ev, this.emit.bind(this, ev));
22+
});
23+
}
24+
25+
close() {
26+
return Promise.fromCallback(this.connection.close.bind(this.connection));
27+
}
28+
29+
async createChannel() {
30+
const channel = new Channel(this.connection);
31+
await channel.open();
32+
return channel;
33+
}
34+
35+
async createConfirmChannel() {
36+
const channel = new ConfirmChannel(this.connection);
37+
await channel.open();
38+
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
39+
return channel;
40+
}
2441
}
25-
inherits(ChannelModel, EventEmitter);
26-
27-
module.exports.ChannelModel = ChannelModel;
28-
29-
var CM = ChannelModel.prototype;
30-
31-
CM.close = function() {
32-
return Promise.fromCallback(this.connection.close.bind(this.connection));
33-
};
3442

3543
// Channels
3644

37-
function Channel(connection) {
38-
BaseChannel.call(this, connection);
39-
this.on('delivery', this.handleDelivery.bind(this));
40-
this.on('cancel', this.handleCancel.bind(this));
41-
}
42-
inherits(Channel, BaseChannel);
43-
44-
module.exports.Channel = Channel;
45+
class Channel extends BaseChannel {
46+
constructor(connection) {
47+
super(connection);
48+
this.on('delivery', this.handleDelivery.bind(this));
49+
this.on('cancel', this.handleCancel.bind(this));
50+
}
51+
52+
// An RPC that returns a 'proper' promise, which resolves to just the
53+
// response's fields; this is intended to be suitable for implementing
54+
// API procedures.
55+
async rpc(method, fields, expect) {
56+
const f = await Promise.fromCallback(cb => {
57+
return this._rpc(method, fields, expect, cb);
58+
})
4559

46-
CM.createChannel = function() {
47-
var c = new Channel(this.connection);
48-
return c.open().then(function(openOk) { return c; });
49-
};
50-
51-
var C = Channel.prototype;
52-
53-
// An RPC that returns a 'proper' promise, which resolves to just the
54-
// response's fields; this is intended to be suitable for implementing
55-
// API procedures.
56-
C.rpc = function(method, fields, expect) {
57-
var self = this;
58-
return Promise.fromCallback(function(cb) {
59-
return self._rpc(method, fields, expect, cb);
60-
})
61-
.then(function(f) {
6260
return f.fields;
63-
});
64-
};
65-
66-
// Do the remarkably simple channel open handshake
67-
C.open = function() {
68-
return Promise.try(this.allocate.bind(this)).then(
69-
function(ch) {
70-
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
71-
defs.ChannelOpenOk);
61+
}
62+
63+
// Do the remarkably simple channel open handshake
64+
open() {
65+
return Promise.try(this.allocate.bind(this)).then(
66+
ch => {
67+
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
68+
defs.ChannelOpenOk);
69+
});
70+
}
71+
72+
close() {
73+
return Promise.fromCallback(cb => {
74+
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
75+
cb);
76+
});
77+
}
78+
79+
// === Public API, declaring queues and stuff ===
80+
81+
assertQueue(queue, options) {
82+
return this.rpc(defs.QueueDeclare,
83+
Args.assertQueue(queue, options),
84+
defs.QueueDeclareOk);
85+
}
86+
87+
checkQueue(queue) {
88+
return this.rpc(defs.QueueDeclare,
89+
Args.checkQueue(queue),
90+
defs.QueueDeclareOk);
91+
}
92+
93+
deleteQueue(queue, options) {
94+
return this.rpc(defs.QueueDelete,
95+
Args.deleteQueue(queue, options),
96+
defs.QueueDeleteOk);
97+
}
98+
99+
purgeQueue(queue) {
100+
return this.rpc(defs.QueuePurge,
101+
Args.purgeQueue(queue),
102+
defs.QueuePurgeOk);
103+
}
104+
105+
bindQueue(queue, source, pattern, argt) {
106+
return this.rpc(defs.QueueBind,
107+
Args.bindQueue(queue, source, pattern, argt),
108+
defs.QueueBindOk);
109+
}
110+
111+
unbindQueue(queue, source, pattern, argt) {
112+
return this.rpc(defs.QueueUnbind,
113+
Args.unbindQueue(queue, source, pattern, argt),
114+
defs.QueueUnbindOk);
115+
}
116+
117+
assertExchange(exchange, type, options) {
118+
// The server reply is an empty set of fields, but it's convenient
119+
// to have the exchange name handed to the continuation.
120+
return this.rpc(defs.ExchangeDeclare,
121+
Args.assertExchange(exchange, type, options),
122+
defs.ExchangeDeclareOk)
123+
.then(_ok => { return { exchange }; });
124+
}
125+
126+
checkExchange(exchange) {
127+
return this.rpc(defs.ExchangeDeclare,
128+
Args.checkExchange(exchange),
129+
defs.ExchangeDeclareOk);
130+
}
131+
132+
deleteExchange(name, options) {
133+
return this.rpc(defs.ExchangeDelete,
134+
Args.deleteExchange(name, options),
135+
defs.ExchangeDeleteOk);
136+
}
137+
138+
bindExchange(dest, source, pattern, argt) {
139+
return this.rpc(defs.ExchangeBind,
140+
Args.bindExchange(dest, source, pattern, argt),
141+
defs.ExchangeBindOk);
142+
}
143+
144+
unbindExchange(dest, source, pattern, argt) {
145+
return this.rpc(defs.ExchangeUnbind,
146+
Args.unbindExchange(dest, source, pattern, argt),
147+
defs.ExchangeUnbindOk);
148+
}
149+
150+
// Working with messages
151+
152+
publish(exchange, routingKey, content, options) {
153+
const fieldsAndProps = Args.publish(exchange, routingKey, options);
154+
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
155+
}
156+
157+
sendToQueue(queue, content, options) {
158+
return this.publish('', queue, content, options);
159+
}
160+
161+
consume(queue, callback, options) {
162+
// NB we want the callback to be run synchronously, so that we've
163+
// registered the consumerTag before any messages can arrive.
164+
const fields = Args.consume(queue, options);
165+
return Promise.fromCallback(cb => {
166+
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
167+
})
168+
.then(ok => {
169+
this.registerConsumer(ok.fields.consumerTag, callback);
170+
return ok.fields;
171+
});
172+
}
173+
174+
async cancel(consumerTag) {
175+
const ok = await Promise.fromCallback(cb => {
176+
this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
177+
defs.BasicCancelOk,
178+
cb);
179+
})
180+
.then(ok => {
181+
this.unregisterConsumer(consumerTag);
182+
return ok.fields;
72183
});
73-
};
74-
75-
C.close = function() {
76-
var self = this;
77-
return Promise.fromCallback(function(cb) {
78-
return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
79-
cb);
80-
});
81-
};
82-
83-
// === Public API, declaring queues and stuff ===
84-
85-
C.assertQueue = function(queue, options) {
86-
return this.rpc(defs.QueueDeclare,
87-
Args.assertQueue(queue, options),
88-
defs.QueueDeclareOk);
89-
};
90-
91-
C.checkQueue = function(queue) {
92-
return this.rpc(defs.QueueDeclare,
93-
Args.checkQueue(queue),
94-
defs.QueueDeclareOk);
95-
};
96-
97-
C.deleteQueue = function(queue, options) {
98-
return this.rpc(defs.QueueDelete,
99-
Args.deleteQueue(queue, options),
100-
defs.QueueDeleteOk);
101-
};
102-
103-
C.purgeQueue = function(queue) {
104-
return this.rpc(defs.QueuePurge,
105-
Args.purgeQueue(queue),
106-
defs.QueuePurgeOk);
107-
};
108-
109-
C.bindQueue = function(queue, source, pattern, argt) {
110-
return this.rpc(defs.QueueBind,
111-
Args.bindQueue(queue, source, pattern, argt),
112-
defs.QueueBindOk);
113-
};
114-
115-
C.unbindQueue = function(queue, source, pattern, argt) {
116-
return this.rpc(defs.QueueUnbind,
117-
Args.unbindQueue(queue, source, pattern, argt),
118-
defs.QueueUnbindOk);
119-
};
120-
121-
C.assertExchange = function(exchange, type, options) {
122-
// The server reply is an empty set of fields, but it's convenient
123-
// to have the exchange name handed to the continuation.
124-
return this.rpc(defs.ExchangeDeclare,
125-
Args.assertExchange(exchange, type, options),
126-
defs.ExchangeDeclareOk)
127-
.then(function(_ok) { return { exchange: exchange }; });
128-
};
129-
130-
C.checkExchange = function(exchange) {
131-
return this.rpc(defs.ExchangeDeclare,
132-
Args.checkExchange(exchange),
133-
defs.ExchangeDeclareOk);
134-
};
135-
136-
C.deleteExchange = function(name, options) {
137-
return this.rpc(defs.ExchangeDelete,
138-
Args.deleteExchange(name, options),
139-
defs.ExchangeDeleteOk);
140-
};
141-
142-
C.bindExchange = function(dest, source, pattern, argt) {
143-
return this.rpc(defs.ExchangeBind,
144-
Args.bindExchange(dest, source, pattern, argt),
145-
defs.ExchangeBindOk);
146-
};
147-
148-
C.unbindExchange = function(dest, source, pattern, argt) {
149-
return this.rpc(defs.ExchangeUnbind,
150-
Args.unbindExchange(dest, source, pattern, argt),
151-
defs.ExchangeUnbindOk);
152-
};
153-
154-
// Working with messages
155-
156-
C.publish = function(exchange, routingKey, content, options) {
157-
var fieldsAndProps = Args.publish(exchange, routingKey, options);
158-
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
159-
};
160-
161-
C.sendToQueue = function(queue, content, options) {
162-
return this.publish('', queue, content, options);
163-
};
164-
165-
C.consume = function(queue, callback, options) {
166-
var self = this;
167-
// NB we want the callback to be run synchronously, so that we've
168-
// registered the consumerTag before any messages can arrive.
169-
var fields = Args.consume(queue, options);
170-
return Promise.fromCallback(function(cb) {
171-
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
172-
})
173-
.then(function(ok) {
174-
self.registerConsumer(ok.fields.consumerTag, callback);
175-
return ok.fields;
176-
});
177-
};
178-
179-
C.cancel = function(consumerTag) {
180-
var self = this;
181-
return Promise.fromCallback(function(cb) {
182-
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
183-
defs.BasicCancelOk,
184-
cb);
185-
})
186-
.then(function(ok) {
187-
self.unregisterConsumer(consumerTag);
188-
return ok.fields;
189-
});
190-
};
191-
192-
C.get = function(queue, options) {
193-
var self = this;
194-
var fields = Args.get(queue, options);
195-
return Promise.fromCallback(function(cb) {
196-
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
197-
})
198-
.then(function(f) {
199-
if (f.id === defs.BasicGetEmpty) {
200-
return false;
201-
}
202-
else if (f.id === defs.BasicGetOk) {
203-
var fields = f.fields;
204-
return new Promise(function(resolve) {
205-
self.handleMessage = acceptMessage(function(m) {
206-
m.fields = fields;
207-
resolve(m);
184+
}
185+
186+
get(queue, options) {
187+
const fields = Args.get(queue, options);
188+
return Promise.fromCallback(cb => {
189+
return this.sendOrEnqueue(defs.BasicGet, fields, cb);
190+
})
191+
.then(f => {
192+
if (f.id === defs.BasicGetEmpty) {
193+
return false;
194+
}
195+
else if (f.id === defs.BasicGetOk) {
196+
const fields = f.fields;
197+
return new Promise(resolve => {
198+
this.handleMessage = acceptMessage(m => {
199+
m.fields = fields;
200+
resolve(m);
201+
});
208202
});
209-
});
210-
}
211-
else {
212-
throw new Error("Unexpected response to BasicGet: " +
213-
inspect(f));
214-
}
215-
})
216-
};
217-
218-
C.ack = function(message, allUpTo) {
219-
this.sendImmediately(
220-
defs.BasicAck,
221-
Args.ack(message.fields.deliveryTag, allUpTo));
222-
};
223-
224-
C.ackAll = function() {
225-
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
226-
};
227-
228-
C.nack = function(message, allUpTo, requeue) {
229-
this.sendImmediately(
230-
defs.BasicNack,
231-
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
232-
};
233-
234-
C.nackAll = function(requeue) {
235-
this.sendImmediately(defs.BasicNack,
236-
Args.nack(0, true, requeue));
237-
};
238-
239-
// `Basic.Nack` is not available in older RabbitMQ versions (or in the
240-
// AMQP specification), so you have to use the one-at-a-time
241-
// `Basic.Reject`. This is otherwise synonymous with
242-
// `#nack(message, false, requeue)`.
243-
C.reject = function(message, requeue) {
244-
this.sendImmediately(
245-
defs.BasicReject,
246-
Args.reject(message.fields.deliveryTag, requeue));
247-
};
203+
}
204+
else {
205+
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`);
206+
}
207+
});
208+
}
209+
210+
ack(message, allUpTo) {
211+
this.sendImmediately(
212+
defs.BasicAck,
213+
Args.ack(message.fields.deliveryTag, allUpTo));
214+
}
215+
216+
ackAll() {
217+
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
218+
}
219+
220+
nack(message, allUpTo, requeue) {
221+
this.sendImmediately(
222+
defs.BasicNack,
223+
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
224+
}
225+
226+
nackAll(requeue) {
227+
this.sendImmediately(defs.BasicNack,
228+
Args.nack(0, true, requeue));
229+
}
230+
231+
// `Basic.Nack` is not available in older RabbitMQ versions (or in the
232+
// AMQP specification), so you have to use the one-at-a-time
233+
// `Basic.Reject`. This is otherwise synonymous with
234+
// `#nack(message, false, requeue)`.
235+
reject(message, requeue) {
236+
this.sendImmediately(
237+
defs.BasicReject,
238+
Args.reject(message.fields.deliveryTag, requeue));
239+
}
240+
241+
recover() {
242+
return this.rpc(defs.BasicRecover,
243+
Args.recover(),
244+
defs.BasicRecoverOk);
245+
}
246+
247+
qos(count, global) {
248+
return this.rpc(defs.BasicQos,
249+
Args.prefetch(count, global),
250+
defs.BasicQosOk);
251+
}
252+
}
248253

249254
// There are more options in AMQP than exposed here; RabbitMQ only
250255
// implements prefetch based on message count, and only for individual
251256
// channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
252257
// (without `global` set) as per-consumer (for consumers following),
253258
// and prefetch with `global` set as per-channel.
254-
C.prefetch = C.qos = function(count, global) {
255-
return this.rpc(defs.BasicQos,
256-
Args.prefetch(count, global),
257-
defs.BasicQosOk);
258-
};
259-
260-
C.recover = function() {
261-
return this.rpc(defs.BasicRecover,
262-
Args.recover(),
263-
defs.BasicRecoverOk);
264-
};
259+
Channel.prototype.prefetch = Channel.prototype.qos
265260

266261
// Confirm channel. This is a channel with confirms 'switched on',
267262
// meaning sent messages will provoke a responding 'ack' or 'nack'
@@ -270,49 +265,35 @@ C.recover = function() {
270265
// with `null` as its argument to signify 'ack', or an exception as
271266
// its argument to signify 'nack'.
272267

273-
function ConfirmChannel(connection) {
274-
Channel.call(this, connection);
268+
class ConfirmChannel extends Channel {
269+
publish(exchange, routingKey, content, options, cb) {
270+
this.pushConfirmCallback(cb);
271+
return Channel.prototype.publish.call(this, exchange, routingKey, content, options);
272+
}
273+
274+
sendToQueue(queue, content, options, cb) {
275+
return this.publish('', queue, content, options, cb);
276+
}
277+
278+
waitForConfirms() {
279+
const awaiting = [];
280+
const unconfirmed = this.unconfirmed;
281+
unconfirmed.forEach((val, index) => {
282+
if (val !== null) {
283+
const confirmed = new Promise((resolve, reject) => {
284+
unconfirmed[index] = err => {
285+
if (val) val(err);
286+
if (err === null) resolve();
287+
else reject(err);
288+
};
289+
});
290+
awaiting.push(confirmed);
291+
}
292+
});
293+
return Promise.all(awaiting);
294+
}
275295
}
276-
inherits(ConfirmChannel, Channel);
277296

278297
module.exports.ConfirmChannel = ConfirmChannel;
279-
280-
CM.createConfirmChannel = function() {
281-
var c = new ConfirmChannel(this.connection);
282-
return c.open()
283-
.then(function(openOk) {
284-
return c.rpc(defs.ConfirmSelect, {nowait: false},
285-
defs.ConfirmSelectOk)
286-
})
287-
.then(function() { return c; });
288-
};
289-
290-
var CC = ConfirmChannel.prototype;
291-
292-
CC.publish = function(exchange, routingKey, content, options, cb) {
293-
this.pushConfirmCallback(cb);
294-
return C.publish.call(this, exchange, routingKey, content, options);
295-
};
296-
297-
CC.sendToQueue = function(queue, content, options, cb) {
298-
return this.publish('', queue, content, options, cb);
299-
};
300-
301-
CC.waitForConfirms = function() {
302-
var awaiting = [];
303-
var unconfirmed = this.unconfirmed;
304-
unconfirmed.forEach(function(val, index) {
305-
if (val === null); // already confirmed
306-
else {
307-
var confirmed = new Promise(function(resolve, reject) {
308-
unconfirmed[index] = function(err) {
309-
if (val) val(err);
310-
if (err === null) resolve();
311-
else reject(err);
312-
};
313-
});
314-
awaiting.push(confirmed);
315-
}
316-
});
317-
return Promise.all(awaiting);
318-
};
298+
module.exports.Channel = Channel;
299+
module.exports.ChannelModel = ChannelModel;

0 commit comments

Comments
 (0)
Please sign in to comment.