Skip to content

Commit d7526dd

Browse files
authoredJan 16, 2023
classify callback model (#691)
1 parent 7096242 commit d7526dd

File tree

1 file changed

+290
-289
lines changed

1 file changed

+290
-289
lines changed
 

‎lib/callback_model.js

+290-289
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,271 @@
55
'use strict';
66

77
var defs = require('./defs');
8-
var inherits = require('util').inherits;
98
var EventEmitter = require('events').EventEmitter;
109
var BaseChannel = require('./channel').BaseChannel;
1110
var acceptMessage = require('./channel').acceptMessage;
1211
var Args = require('./api_args');
1312

14-
function CallbackModel(connection) {
15-
if (!(this instanceof CallbackModel))
16-
return new CallbackModel(connection);
17-
EventEmitter.call( this );
18-
this.connection = connection;
19-
var self = this;
20-
['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) {
21-
connection.on(ev, self.emit.bind(self, ev));
22-
});
13+
class CallbackModel extends EventEmitter {
14+
constructor (connection) {
15+
super();
16+
this.connection = connection;
17+
var self = this;
18+
['error', 'close', 'blocked', 'unblocked'].forEach(function (ev) {
19+
connection.on(ev, self.emit.bind(self, ev));
20+
});
21+
}
22+
23+
close (cb) {
24+
this.connection.close(cb);
25+
}
26+
27+
createChannel (cb) {
28+
var ch = new Channel(this.connection);
29+
ch.open(function (err, ok) {
30+
if (err === null)
31+
cb && cb(null, ch);
32+
else
33+
cb && cb(err);
34+
});
35+
return ch;
36+
}
37+
38+
createConfirmChannel (cb) {
39+
var ch = new ConfirmChannel(this.connection);
40+
ch.open(function (err) {
41+
if (err !== null)
42+
return cb && cb(err);
43+
else {
44+
ch.rpc(defs.ConfirmSelect, { nowait: false },
45+
defs.ConfirmSelectOk, function (err, _ok) {
46+
if (err !== null)
47+
return cb && cb(err);
48+
else
49+
cb && cb(null, ch);
50+
});
51+
}
52+
});
53+
return ch;
54+
}
2355
}
24-
inherits(CallbackModel, EventEmitter);
2556

26-
module.exports.CallbackModel = CallbackModel;
57+
class Channel extends BaseChannel {
58+
constructor (connection) {
59+
super(connection);
60+
this.on('delivery', this.handleDelivery.bind(this));
61+
this.on('cancel', this.handleCancel.bind(this));
62+
}
63+
64+
// This encodes straight-forward RPC: no side-effects and return the
65+
// fields from the server response. It wraps the callback given it, so
66+
// the calling method argument can be passed as-is. For anything that
67+
// needs to have side-effects, or needs to change the server response,
68+
// use `#_rpc(...)` and remember to dereference `.fields` of the
69+
// server response.
70+
rpc (method, fields, expect, cb0) {
71+
var cb = callbackWrapper(this, cb0);
72+
this._rpc(method, fields, expect, function (err, ok) {
73+
cb(err, ok && ok.fields); // in case of an error, ok will be
74+
75+
// undefined
76+
});
77+
return this;
78+
}
79+
80+
// === Public API ===
81+
open (cb) {
82+
try { this.allocate(); }
83+
catch (e) { return cb(e); }
84+
85+
return this.rpc(defs.ChannelOpen, { outOfBand: "" },
86+
defs.ChannelOpenOk, cb);
87+
}
88+
89+
close (cb) {
90+
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
91+
function () { cb && cb(null); });
92+
}
93+
94+
assertQueue (queue, options, cb) {
95+
return this.rpc(defs.QueueDeclare,
96+
Args.assertQueue(queue, options),
97+
defs.QueueDeclareOk, cb);
98+
}
99+
100+
checkQueue (queue, cb) {
101+
return this.rpc(defs.QueueDeclare,
102+
Args.checkQueue(queue),
103+
defs.QueueDeclareOk, cb);
104+
}
105+
106+
deleteQueue (queue, options, cb) {
107+
return this.rpc(defs.QueueDelete,
108+
Args.deleteQueue(queue, options),
109+
defs.QueueDeleteOk, cb);
110+
}
111+
112+
purgeQueue (queue, cb) {
113+
return this.rpc(defs.QueuePurge,
114+
Args.purgeQueue(queue),
115+
defs.QueuePurgeOk, cb);
116+
}
117+
118+
bindQueue (queue, source, pattern, argt, cb) {
119+
return this.rpc(defs.QueueBind,
120+
Args.bindQueue(queue, source, pattern, argt),
121+
defs.QueueBindOk, cb);
122+
}
27123

28-
CallbackModel.prototype.close = function(cb) {
29-
this.connection.close(cb);
30-
};
124+
unbindQueue (queue, source, pattern, argt, cb) {
125+
return this.rpc(defs.QueueUnbind,
126+
Args.unbindQueue(queue, source, pattern, argt),
127+
defs.QueueUnbindOk, cb);
128+
}
129+
130+
assertExchange (ex, type, options, cb0) {
131+
var cb = callbackWrapper(this, cb0);
132+
this._rpc(defs.ExchangeDeclare,
133+
Args.assertExchange(ex, type, options),
134+
defs.ExchangeDeclareOk,
135+
function (e, _) { cb(e, { exchange: ex }); });
136+
return this;
137+
}
138+
139+
checkExchange (exchange, cb) {
140+
return this.rpc(defs.ExchangeDeclare,
141+
Args.checkExchange(exchange),
142+
defs.ExchangeDeclareOk, cb);
143+
}
144+
145+
deleteExchange (exchange, options, cb) {
146+
return this.rpc(defs.ExchangeDelete,
147+
Args.deleteExchange(exchange, options),
148+
defs.ExchangeDeleteOk, cb);
149+
}
150+
151+
bindExchange (dest, source, pattern, argt, cb) {
152+
return this.rpc(defs.ExchangeBind,
153+
Args.bindExchange(dest, source, pattern, argt),
154+
defs.ExchangeBindOk, cb);
155+
}
31156

32-
function Channel(connection) {
33-
BaseChannel.call(this, connection);
34-
this.on('delivery', this.handleDelivery.bind(this));
35-
this.on('cancel', this.handleCancel.bind(this));
36-
}
37-
inherits(Channel, BaseChannel);
157+
unbindExchange (dest, source, pattern, argt, cb) {
158+
return this.rpc(defs.ExchangeUnbind,
159+
Args.unbindExchange(dest, source, pattern, argt),
160+
defs.ExchangeUnbindOk, cb);
161+
}
38162

39-
module.exports.Channel = Channel;
163+
publish (exchange, routingKey, content, options) {
164+
var fieldsAndProps = Args.publish(exchange, routingKey, options);
165+
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
166+
}
167+
168+
sendToQueue (queue, content, options) {
169+
return this.publish('', queue, content, options);
170+
}
171+
172+
consume (queue, callback, options, cb0) {
173+
var cb = callbackWrapper(this, cb0);
174+
var fields = Args.consume(queue, options);
175+
var self = this;
176+
this._rpc(
177+
defs.BasicConsume, fields, defs.BasicConsumeOk,
178+
function (err, ok) {
179+
if (err === null) {
180+
self.registerConsumer(ok.fields.consumerTag, callback);
181+
cb(null, ok.fields);
182+
}
183+
else
184+
cb(err);
185+
});
186+
return this;
187+
}
188+
189+
cancel (consumerTag, cb0) {
190+
var cb = callbackWrapper(this, cb0);
191+
var self = this;
192+
this._rpc(
193+
defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk,
194+
function (err, ok) {
195+
if (err === null) {
196+
self.unregisterConsumer(consumerTag);
197+
cb(null, ok.fields);
198+
}
199+
else
200+
cb(err);
201+
});
202+
return this;
203+
}
204+
205+
get (queue, options, cb0) {
206+
var self = this;
207+
var fields = Args.get(queue, options);
208+
var cb = callbackWrapper(this, cb0);
209+
this.sendOrEnqueue(defs.BasicGet, fields, function (err, f) {
210+
if (err === null) {
211+
if (f.id === defs.BasicGetEmpty) {
212+
cb(null, false);
213+
}
214+
else if (f.id === defs.BasicGetOk) {
215+
self.handleMessage = acceptMessage(function (m) {
216+
m.fields = f.fields;
217+
cb(null, m);
218+
});
219+
}
220+
else {
221+
cb(new Error("Unexpected response to BasicGet: " +
222+
inspect(f)));
223+
}
224+
}
225+
});
226+
return this;
227+
}
228+
229+
ack (message, allUpTo) {
230+
this.sendImmediately(
231+
defs.BasicAck, Args.ack(message.fields.deliveryTag, allUpTo));
232+
return this;
233+
}
234+
235+
ackAll () {
236+
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
237+
return this;
238+
}
239+
240+
nack (message, allUpTo, requeue) {
241+
this.sendImmediately(
242+
defs.BasicNack,
243+
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
244+
return this;
245+
}
246+
247+
nackAll (requeue) {
248+
this.sendImmediately(
249+
defs.BasicNack, Args.nack(0, true, requeue));
250+
return this;
251+
}
252+
253+
reject (message, requeue) {
254+
this.sendImmediately(
255+
defs.BasicReject,
256+
Args.reject(message.fields.deliveryTag, requeue));
257+
return this;
258+
}
259+
260+
prefetch (count, global, cb) {
261+
return this.rpc(defs.BasicQos,
262+
Args.prefetch(count, global),
263+
defs.BasicQosOk, cb);
264+
}
265+
266+
recover (cb) {
267+
return this.rpc(defs.BasicRecover,
268+
Args.recover(),
269+
defs.BasicRecoverOk, cb);
270+
}
271+
}
40272

41-
CallbackModel.prototype.createChannel = function(cb) {
42-
var ch = new Channel(this.connection);
43-
ch.open(function(err, ok) {
44-
if (err === null) cb && cb(null, ch);
45-
else cb && cb(err);
46-
});
47-
return ch;
48-
};
49273

50274
// Wrap an RPC callback to make sure the callback is invoked with
51275
// either `(null, value)` or `(error)`, i.e., never two non-null
@@ -61,267 +285,44 @@ function callbackWrapper(ch, cb) {
61285
} : function() {};
62286
}
63287

64-
// This encodes straight-forward RPC: no side-effects and return the
65-
// fields from the server response. It wraps the callback given it, so
66-
// the calling method argument can be passed as-is. For anything that
67-
// needs to have side-effects, or needs to change the server response,
68-
// use `#_rpc(...)` and remember to dereference `.fields` of the
69-
// server response.
70-
Channel.prototype.rpc = function(method, fields, expect, cb0) {
71-
var cb = callbackWrapper(this, cb0);
72-
this._rpc(method, fields, expect, function(err, ok) {
73-
cb(err, ok && ok.fields); // in case of an error, ok will be
74-
// undefined
75-
});
76-
return this;
77-
};
78-
79-
// === Public API ===
80-
81-
Channel.prototype.open = function(cb) {
82-
try { this.allocate(); }
83-
catch (e) { return cb(e); }
84-
85-
return this.rpc(defs.ChannelOpen, {outOfBand: ""},
86-
defs.ChannelOpenOk, cb);
87-
};
88-
89-
Channel.prototype.close = function(cb) {
90-
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
91-
function() { cb && cb(null); });
92-
};
93-
94-
Channel.prototype.assertQueue = function(queue, options, cb) {
95-
return this.rpc(defs.QueueDeclare,
96-
Args.assertQueue(queue, options),
97-
defs.QueueDeclareOk, cb);
98-
};
99-
100-
Channel.prototype.checkQueue = function(queue, cb) {
101-
return this.rpc(defs.QueueDeclare,
102-
Args.checkQueue(queue),
103-
defs.QueueDeclareOk, cb);
104-
};
105-
106-
Channel.prototype.deleteQueue = function(queue, options, cb) {
107-
return this.rpc(defs.QueueDelete,
108-
Args.deleteQueue(queue, options),
109-
defs.QueueDeleteOk, cb);
110-
};
111-
112-
Channel.prototype.purgeQueue = function(queue, cb) {
113-
return this.rpc(defs.QueuePurge,
114-
Args.purgeQueue(queue),
115-
defs.QueuePurgeOk, cb);
116-
};
117-
118-
Channel.prototype.bindQueue =
119-
function(queue, source, pattern, argt, cb) {
120-
return this.rpc(defs.QueueBind,
121-
Args.bindQueue(queue, source, pattern, argt),
122-
defs.QueueBindOk, cb);
123-
};
124-
125-
Channel.prototype.unbindQueue =
126-
function(queue, source, pattern, argt, cb) {
127-
return this.rpc(defs.QueueUnbind,
128-
Args.unbindQueue(queue, source, pattern, argt),
129-
defs.QueueUnbindOk, cb);
130-
};
131-
132-
Channel.prototype.assertExchange = function(ex, type, options, cb0) {
133-
var cb = callbackWrapper(this, cb0);
134-
this._rpc(defs.ExchangeDeclare,
135-
Args.assertExchange(ex, type, options),
136-
defs.ExchangeDeclareOk,
137-
function(e, _) { cb(e, {exchange: ex}); });
138-
return this;
139-
};
140-
141-
Channel.prototype.checkExchange = function(exchange, cb) {
142-
return this.rpc(defs.ExchangeDeclare,
143-
Args.checkExchange(exchange),
144-
defs.ExchangeDeclareOk, cb);
145-
};
146-
147-
Channel.prototype.deleteExchange = function(exchange, options, cb) {
148-
return this.rpc(defs.ExchangeDelete,
149-
Args.deleteExchange(exchange, options),
150-
defs.ExchangeDeleteOk, cb);
151-
};
152-
153-
Channel.prototype.bindExchange =
154-
function(dest, source, pattern, argt, cb) {
155-
return this.rpc(defs.ExchangeBind,
156-
Args.bindExchange(dest, source, pattern, argt),
157-
defs.ExchangeBindOk, cb);
158-
};
159-
160-
Channel.prototype.unbindExchange =
161-
function(dest, source, pattern, argt, cb) {
162-
return this.rpc(defs.ExchangeUnbind,
163-
Args.unbindExchange(dest, source, pattern, argt),
164-
defs.ExchangeUnbindOk, cb);
165-
};
166-
167-
Channel.prototype.publish =
168-
function(exchange, routingKey, content, options) {
169-
var fieldsAndProps = Args.publish(exchange, routingKey, options);
170-
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
171-
};
172-
173-
Channel.prototype.sendToQueue = function(queue, content, options) {
174-
return this.publish('', queue, content, options);
175-
};
176-
177-
Channel.prototype.consume = function(queue, callback, options, cb0) {
178-
var cb = callbackWrapper(this, cb0);
179-
var fields = Args.consume(queue, options);
180-
var self = this;
181-
this._rpc(
182-
defs.BasicConsume, fields, defs.BasicConsumeOk,
183-
function(err, ok) {
184-
if (err === null) {
185-
self.registerConsumer(ok.fields.consumerTag, callback);
186-
cb(null, ok.fields);
187-
}
188-
else cb(err);
189-
});
190-
return this;
191-
};
192-
193-
Channel.prototype.cancel = function(consumerTag, cb0) {
194-
var cb = callbackWrapper(this, cb0);
195-
var self = this;
196-
this._rpc(
197-
defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk,
198-
function(err, ok) {
199-
if (err === null) {
200-
self.unregisterConsumer(consumerTag);
201-
cb(null, ok.fields);
202-
}
203-
else cb(err);
204-
});
205-
return this;
206-
};
207-
208-
Channel.prototype.get = function(queue, options, cb0) {
209-
var self = this;
210-
var fields = Args.get(queue, options);
211-
var cb = callbackWrapper(this, cb0);
212-
this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
213-
if (err === null) {
214-
if (f.id === defs.BasicGetEmpty) {
215-
cb(null, false);
216-
}
217-
else if (f.id === defs.BasicGetOk) {
218-
self.handleMessage = acceptMessage(function(m) {
219-
m.fields = f.fields;
220-
cb(null, m);
221-
});
222-
}
288+
class ConfirmChannel extends Channel {
289+
publish (exchange, routingKey,
290+
content, options, cb) {
291+
this.pushConfirmCallback(cb);
292+
return Channel.prototype.publish.call(
293+
this, exchange, routingKey, content, options);
294+
}
295+
296+
sendToQueue (queue, content,
297+
options, cb) {
298+
return this.publish('', queue, content, options, cb);
299+
}
300+
301+
waitForConfirms (k) {
302+
var awaiting = [];
303+
var unconfirmed = this.unconfirmed;
304+
unconfirmed.forEach(function (val, index) {
305+
if (val === null)
306+
; // already confirmed
223307
else {
224-
cb(new Error("Unexpected response to BasicGet: " +
225-
inspect(f)));
308+
var confirmed = new Promise(function (resolve, reject) {
309+
unconfirmed[index] = function (err) {
310+
if (val)
311+
val(err);
312+
if (err === null)
313+
resolve();
314+
else
315+
reject(err);
316+
};
317+
});
318+
awaiting.push(confirmed);
226319
}
227-
}
228-
});
229-
return this;
230-
};
231-
232-
Channel.prototype.ack = function(message, allUpTo) {
233-
this.sendImmediately(
234-
defs.BasicAck, Args.ack(message.fields.deliveryTag, allUpTo));
235-
return this;
236-
};
237-
238-
Channel.prototype.ackAll = function() {
239-
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
240-
return this;
241-
};
242-
243-
Channel.prototype.nack = function(message, allUpTo, requeue) {
244-
this.sendImmediately(
245-
defs.BasicNack,
246-
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
247-
return this;
248-
};
249-
250-
Channel.prototype.nackAll = function(requeue) {
251-
this.sendImmediately(
252-
defs.BasicNack, Args.nack(0, true, requeue))
253-
return this;
254-
};
255-
256-
Channel.prototype.reject = function(message, requeue) {
257-
this.sendImmediately(
258-
defs.BasicReject,
259-
Args.reject(message.fields.deliveryTag, requeue));
260-
return this;
261-
};
262-
263-
Channel.prototype.prefetch = function(count, global, cb) {
264-
return this.rpc(defs.BasicQos,
265-
Args.prefetch(count, global),
266-
defs.BasicQosOk, cb);
267-
};
268-
269-
Channel.prototype.recover = function(cb) {
270-
return this.rpc(defs.BasicRecover,
271-
Args.recover(),
272-
defs.BasicRecoverOk, cb);
273-
};
274-
275-
function ConfirmChannel(connection) {
276-
Channel.call(this, connection);
320+
});
321+
return Promise.all(awaiting).then(function () { k(); },
322+
function (err) { k(err); });
323+
}
277324
}
278-
inherits(ConfirmChannel, Channel);
279325

326+
module.exports.CallbackModel = CallbackModel;
327+
module.exports.Channel = Channel;
280328
module.exports.ConfirmChannel = ConfirmChannel;
281-
282-
CallbackModel.prototype.createConfirmChannel = function(cb) {
283-
var ch = new ConfirmChannel(this.connection);
284-
ch.open(function(err) {
285-
if (err !== null) return cb && cb(err);
286-
else {
287-
ch.rpc(defs.ConfirmSelect, {nowait: false},
288-
defs.ConfirmSelectOk, function(err, _ok) {
289-
if (err !== null) return cb && cb(err);
290-
else cb && cb(null, ch);
291-
});
292-
}
293-
});
294-
return ch;
295-
};
296-
297-
ConfirmChannel.prototype.publish = function(exchange, routingKey,
298-
content, options, cb) {
299-
this.pushConfirmCallback(cb);
300-
return Channel.prototype.publish.call(
301-
this, exchange, routingKey, content, options);
302-
};
303-
304-
ConfirmChannel.prototype.sendToQueue = function(queue, content,
305-
options, cb) {
306-
return this.publish('', queue, content, options, cb);
307-
};
308-
309-
ConfirmChannel.prototype.waitForConfirms = function(k) {
310-
var awaiting = [];
311-
var unconfirmed = this.unconfirmed;
312-
unconfirmed.forEach(function(val, index) {
313-
if (val === null); // already confirmed
314-
else {
315-
var confirmed = new Promise(function(resolve, reject) {
316-
unconfirmed[index] = function(err) {
317-
if (val) val(err);
318-
if (err === null) resolve();
319-
else reject(err);
320-
};
321-
});
322-
awaiting.push(confirmed);
323-
}
324-
});
325-
return Promise.all(awaiting).then(function() { k(); },
326-
function(err) { k(err); });
327-
};

0 commit comments

Comments
 (0)
Please sign in to comment.