Skip to content

Commit 8b5829b

Browse files
committedOct 18, 2021
auto format code
1 parent d7911a0 commit 8b5829b

File tree

1 file changed

+274
-280
lines changed

1 file changed

+274
-280
lines changed
 

‎lib/channel_model.js

+274-280
Original file line numberDiff line numberDiff line change
@@ -4,315 +4,309 @@
44

55
'use strict';
66

7-
var defs = require('./defs');
8-
var Promise = require('bluebird');
9-
var inherits = require('util').inherits;
10-
var EventEmitter = require('events').EventEmitter;
11-
var BaseChannel = require('./channel').BaseChannel;
12-
var acceptMessage = require('./channel').acceptMessage;
13-
var Args = require('./api_args');
14-
15-
function ChannelModel(connection) {
16-
if (!(this instanceof ChannelModel))
17-
return new ChannelModel(connection);
18-
EventEmitter.call( this );
19-
this.connection = connection;
20-
var self = this;
21-
['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) {
22-
connection.on(ev, self.emit.bind(self, ev));
23-
});
7+
const defs = require('./defs');
8+
const Promise = require('bluebird');
9+
const inherits = require('util').inherits;
10+
const EventEmitter = require('events').EventEmitter;
11+
const BaseChannel = require('./channel').BaseChannel;
12+
const acceptMessage = require('./channel').acceptMessage;
13+
const Args = require('./api_args');
14+
15+
class ChannelModel extends EventEmitter {
16+
constructor(connection) {
17+
super();
18+
this.connection = connection;
19+
const self = this;
20+
['error', 'close', 'blocked', 'unblocked'].forEach(ev => {
21+
connection.on(ev, self.emit.bind(self, ev));
22+
});
23+
}
24+
25+
close() {
26+
return Promise.fromCallback(this.connection.close.bind(this.connection));
27+
}
28+
29+
createChannel() {
30+
const c = new Channel(this.connection);
31+
return c.open().then(openOk => { return c; });
32+
}
33+
34+
createConfirmChannel() {
35+
const c = new ConfirmChannel(this.connection);
36+
return c.open()
37+
.then(openOk => {
38+
return c.rpc(defs.ConfirmSelect, {nowait: false},
39+
defs.ConfirmSelectOk)
40+
})
41+
.then(() => { return c; });
42+
}
2443
}
25-
inherits(ChannelModel, EventEmitter);
2644

2745
module.exports.ChannelModel = ChannelModel;
2846

29-
var CM = ChannelModel.prototype;
30-
31-
CM.close = function() {
32-
return Promise.fromCallback(this.connection.close.bind(this.connection));
33-
};
34-
3547
// Channels
3648

37-
function Channel(connection) {
38-
BaseChannel.call(this, connection);
39-
this.on('delivery', this.handleDelivery.bind(this));
40-
this.on('cancel', this.handleCancel.bind(this));
41-
}
42-
inherits(Channel, BaseChannel);
43-
44-
module.exports.Channel = Channel;
45-
46-
CM.createChannel = function() {
47-
var c = new Channel(this.connection);
48-
return c.open().then(function(openOk) { return c; });
49-
};
50-
51-
var C = Channel.prototype;
52-
53-
// An RPC that returns a 'proper' promise, which resolves to just the
54-
// response's fields; this is intended to be suitable for implementing
55-
// API procedures.
56-
C.rpc = function(method, fields, expect) {
57-
var self = this;
58-
return Promise.fromCallback(function(cb) {
59-
return self._rpc(method, fields, expect, cb);
60-
})
61-
.then(function(f) {
62-
return f.fields;
63-
});
64-
};
65-
66-
// Do the remarkably simple channel open handshake
67-
C.open = function() {
68-
return Promise.try(this.allocate.bind(this)).then(
69-
function(ch) {
70-
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
71-
defs.ChannelOpenOk);
49+
class Channel extends BaseChannel {
50+
constructor(connection) {
51+
super(connection);
52+
this.on('delivery', this.handleDelivery.bind(this));
53+
this.on('cancel', this.handleCancel.bind(this));
54+
}
55+
56+
// An RPC that returns a 'proper' promise, which resolves to just the
57+
// response's fields; this is intended to be suitable for implementing
58+
// API procedures.
59+
rpc(method, fields, expect) {
60+
const self = this;
61+
return Promise.fromCallback(cb => {
62+
return self._rpc(method, fields, expect, cb);
63+
})
64+
.then(f => {
65+
return f.fields;
7266
});
73-
};
74-
75-
C.close = function() {
76-
var self = this;
77-
return Promise.fromCallback(function(cb) {
78-
return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
79-
cb);
80-
});
81-
};
82-
83-
// === Public API, declaring queues and stuff ===
84-
85-
C.assertQueue = function(queue, options) {
86-
return this.rpc(defs.QueueDeclare,
87-
Args.assertQueue(queue, options),
88-
defs.QueueDeclareOk);
89-
};
90-
91-
C.checkQueue = function(queue) {
92-
return this.rpc(defs.QueueDeclare,
93-
Args.checkQueue(queue),
94-
defs.QueueDeclareOk);
95-
};
96-
97-
C.deleteQueue = function(queue, options) {
98-
return this.rpc(defs.QueueDelete,
99-
Args.deleteQueue(queue, options),
100-
defs.QueueDeleteOk);
101-
};
102-
103-
C.purgeQueue = function(queue) {
104-
return this.rpc(defs.QueuePurge,
105-
Args.purgeQueue(queue),
106-
defs.QueuePurgeOk);
107-
};
108-
109-
C.bindQueue = function(queue, source, pattern, argt) {
110-
return this.rpc(defs.QueueBind,
111-
Args.bindQueue(queue, source, pattern, argt),
112-
defs.QueueBindOk);
113-
};
114-
115-
C.unbindQueue = function(queue, source, pattern, argt) {
116-
return this.rpc(defs.QueueUnbind,
117-
Args.unbindQueue(queue, source, pattern, argt),
118-
defs.QueueUnbindOk);
119-
};
120-
121-
C.assertExchange = function(exchange, type, options) {
122-
// The server reply is an empty set of fields, but it's convenient
123-
// to have the exchange name handed to the continuation.
124-
return this.rpc(defs.ExchangeDeclare,
125-
Args.assertExchange(exchange, type, options),
126-
defs.ExchangeDeclareOk)
127-
.then(function(_ok) { return { exchange: exchange }; });
128-
};
129-
130-
C.checkExchange = function(exchange) {
131-
return this.rpc(defs.ExchangeDeclare,
132-
Args.checkExchange(exchange),
133-
defs.ExchangeDeclareOk);
134-
};
135-
136-
C.deleteExchange = function(name, options) {
137-
return this.rpc(defs.ExchangeDelete,
138-
Args.deleteExchange(name, options),
139-
defs.ExchangeDeleteOk);
140-
};
141-
142-
C.bindExchange = function(dest, source, pattern, argt) {
143-
return this.rpc(defs.ExchangeBind,
144-
Args.bindExchange(dest, source, pattern, argt),
145-
defs.ExchangeBindOk);
146-
};
147-
148-
C.unbindExchange = function(dest, source, pattern, argt) {
149-
return this.rpc(defs.ExchangeUnbind,
150-
Args.unbindExchange(dest, source, pattern, argt),
151-
defs.ExchangeUnbindOk);
152-
};
153-
154-
// Working with messages
155-
156-
C.publish = function(exchange, routingKey, content, options) {
157-
var fieldsAndProps = Args.publish(exchange, routingKey, options);
158-
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
159-
};
160-
161-
C.sendToQueue = function(queue, content, options) {
162-
return this.publish('', queue, content, options);
163-
};
164-
165-
C.consume = function(queue, callback, options) {
166-
var self = this;
167-
// NB we want the callback to be run synchronously, so that we've
168-
// registered the consumerTag before any messages can arrive.
169-
var fields = Args.consume(queue, options);
170-
return Promise.fromCallback(function(cb) {
171-
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
172-
})
173-
.then(function(ok) {
174-
self.registerConsumer(ok.fields.consumerTag, callback);
175-
return ok.fields;
176-
});
177-
};
178-
179-
C.cancel = function(consumerTag) {
180-
var self = this;
181-
return Promise.fromCallback(function(cb) {
182-
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
183-
defs.BasicCancelOk,
184-
cb);
185-
})
186-
.then(function(ok) {
187-
self.unregisterConsumer(consumerTag);
188-
return ok.fields;
189-
});
190-
};
191-
192-
C.get = function(queue, options) {
193-
var self = this;
194-
var fields = Args.get(queue, options);
195-
return Promise.fromCallback(function(cb) {
196-
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
197-
})
198-
.then(function(f) {
199-
if (f.id === defs.BasicGetEmpty) {
200-
return false;
201-
}
202-
else if (f.id === defs.BasicGetOk) {
203-
var fields = f.fields;
204-
return new Promise(function(resolve) {
205-
self.handleMessage = acceptMessage(function(m) {
206-
m.fields = fields;
207-
resolve(m);
208-
});
67+
}
68+
69+
// Do the remarkably simple channel open handshake
70+
open() {
71+
return Promise.try(this.allocate.bind(this)).then(
72+
ch => {
73+
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
74+
defs.ChannelOpenOk);
20975
});
210-
}
211-
else {
212-
throw new Error("Unexpected response to BasicGet: " +
213-
inspect(f));
214-
}
215-
})
216-
};
217-
218-
C.ack = function(message, allUpTo) {
219-
this.sendImmediately(
220-
defs.BasicAck,
221-
Args.ack(message.fields.deliveryTag, allUpTo));
222-
};
76+
}
22377

224-
C.ackAll = function() {
225-
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
226-
};
227-
228-
C.nack = function(message, allUpTo, requeue) {
229-
this.sendImmediately(
230-
defs.BasicNack,
231-
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
232-
};
78+
close() {
79+
const self = this;
80+
return Promise.fromCallback(cb => {
81+
return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
82+
cb);
83+
});
84+
}
85+
86+
// === Public API, declaring queues and stuff ===
87+
88+
assertQueue(queue, options) {
89+
return this.rpc(defs.QueueDeclare,
90+
Args.assertQueue(queue, options),
91+
defs.QueueDeclareOk);
92+
}
93+
94+
checkQueue(queue) {
95+
return this.rpc(defs.QueueDeclare,
96+
Args.checkQueue(queue),
97+
defs.QueueDeclareOk);
98+
}
99+
100+
deleteQueue(queue, options) {
101+
return this.rpc(defs.QueueDelete,
102+
Args.deleteQueue(queue, options),
103+
defs.QueueDeleteOk);
104+
}
105+
106+
purgeQueue(queue) {
107+
return this.rpc(defs.QueuePurge,
108+
Args.purgeQueue(queue),
109+
defs.QueuePurgeOk);
110+
}
111+
112+
bindQueue(queue, source, pattern, argt) {
113+
return this.rpc(defs.QueueBind,
114+
Args.bindQueue(queue, source, pattern, argt),
115+
defs.QueueBindOk);
116+
}
117+
118+
unbindQueue(queue, source, pattern, argt) {
119+
return this.rpc(defs.QueueUnbind,
120+
Args.unbindQueue(queue, source, pattern, argt),
121+
defs.QueueUnbindOk);
122+
}
123+
124+
assertExchange(exchange, type, options) {
125+
// The server reply is an empty set of fields, but it's convenient
126+
// to have the exchange name handed to the continuation.
127+
return this.rpc(defs.ExchangeDeclare,
128+
Args.assertExchange(exchange, type, options),
129+
defs.ExchangeDeclareOk)
130+
.then(_ok => { return { exchange }; });
131+
}
132+
133+
checkExchange(exchange) {
134+
return this.rpc(defs.ExchangeDeclare,
135+
Args.checkExchange(exchange),
136+
defs.ExchangeDeclareOk);
137+
}
138+
139+
deleteExchange(name, options) {
140+
return this.rpc(defs.ExchangeDelete,
141+
Args.deleteExchange(name, options),
142+
defs.ExchangeDeleteOk);
143+
}
144+
145+
bindExchange(dest, source, pattern, argt) {
146+
return this.rpc(defs.ExchangeBind,
147+
Args.bindExchange(dest, source, pattern, argt),
148+
defs.ExchangeBindOk);
149+
}
150+
151+
unbindExchange(dest, source, pattern, argt) {
152+
return this.rpc(defs.ExchangeUnbind,
153+
Args.unbindExchange(dest, source, pattern, argt),
154+
defs.ExchangeUnbindOk);
155+
}
156+
157+
// Working with messages
158+
159+
publish(exchange, routingKey, content, options) {
160+
const fieldsAndProps = Args.publish(exchange, routingKey, options);
161+
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
162+
}
163+
164+
sendToQueue(queue, content, options) {
165+
return this.publish('', queue, content, options);
166+
}
167+
168+
consume(queue, callback, options) {
169+
const self = this;
170+
// NB we want the callback to be run synchronously, so that we've
171+
// registered the consumerTag before any messages can arrive.
172+
const fields = Args.consume(queue, options);
173+
return Promise.fromCallback(cb => {
174+
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
175+
})
176+
.then(ok => {
177+
self.registerConsumer(ok.fields.consumerTag, callback);
178+
return ok.fields;
179+
});
180+
}
181+
182+
cancel(consumerTag) {
183+
const self = this;
184+
return Promise.fromCallback(cb => {
185+
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
186+
defs.BasicCancelOk,
187+
cb);
188+
})
189+
.then(ok => {
190+
self.unregisterConsumer(consumerTag);
191+
return ok.fields;
192+
});
193+
}
233194

234-
C.nackAll = function(requeue) {
235-
this.sendImmediately(defs.BasicNack,
236-
Args.nack(0, true, requeue));
237-
};
195+
get(queue, options) {
196+
const self = this;
197+
const fields = Args.get(queue, options);
198+
return Promise.fromCallback(cb => {
199+
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
200+
})
201+
.then(f => {
202+
if (f.id === defs.BasicGetEmpty) {
203+
return false;
204+
}
205+
else if (f.id === defs.BasicGetOk) {
206+
const fields = f.fields;
207+
return new Promise(resolve => {
208+
self.handleMessage = acceptMessage(m => {
209+
m.fields = fields;
210+
resolve(m);
211+
});
212+
});
213+
}
214+
else {
215+
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`);
216+
}
217+
});
218+
}
219+
220+
ack(message, allUpTo) {
221+
this.sendImmediately(
222+
defs.BasicAck,
223+
Args.ack(message.fields.deliveryTag, allUpTo));
224+
}
225+
226+
ackAll() {
227+
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
228+
}
229+
230+
nack(message, allUpTo, requeue) {
231+
this.sendImmediately(
232+
defs.BasicNack,
233+
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
234+
}
235+
236+
nackAll(requeue) {
237+
this.sendImmediately(defs.BasicNack,
238+
Args.nack(0, true, requeue));
239+
}
240+
241+
// `Basic.Nack` is not available in older RabbitMQ versions (or in the
242+
// AMQP specification), so you have to use the one-at-a-time
243+
// `Basic.Reject`. This is otherwise synonymous with
244+
// `#nack(message, false, requeue)`.
245+
reject(message, requeue) {
246+
this.sendImmediately(
247+
defs.BasicReject,
248+
Args.reject(message.fields.deliveryTag, requeue));
249+
}
250+
251+
recover() {
252+
return this.rpc(defs.BasicRecover,
253+
Args.recover(),
254+
defs.BasicRecoverOk);
255+
}
256+
}
238257

239-
// `Basic.Nack` is not available in older RabbitMQ versions (or in the
240-
// AMQP specification), so you have to use the one-at-a-time
241-
// `Basic.Reject`. This is otherwise synonymous with
242-
// `#nack(message, false, requeue)`.
243-
C.reject = function(message, requeue) {
244-
this.sendImmediately(
245-
defs.BasicReject,
246-
Args.reject(message.fields.deliveryTag, requeue));
247-
};
258+
module.exports.Channel = Channel;
248259

249260
// There are more options in AMQP than exposed here; RabbitMQ only
250261
// implements prefetch based on message count, and only for individual
251262
// channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
252263
// (without `global` set) as per-consumer (for consumers following),
253264
// and prefetch with `global` set as per-channel.
254-
C.prefetch = C.qos = function(count, global) {
265+
Channel.prototype.prefetch = Channel.prototype.qos = function(count, global) {
255266
return this.rpc(defs.BasicQos,
256267
Args.prefetch(count, global),
257268
defs.BasicQosOk);
258269
};
259270

260-
C.recover = function() {
261-
return this.rpc(defs.BasicRecover,
262-
Args.recover(),
263-
defs.BasicRecoverOk);
264-
};
265-
266271
// Confirm channel. This is a channel with confirms 'switched on',
267272
// meaning sent messages will provoke a responding 'ack' or 'nack'
268273
// from the server. The upshot of this is that `publish` and
269274
// `sendToQueue` both take a callback, which will be called either
270275
// with `null` as its argument to signify 'ack', or an exception as
271276
// its argument to signify 'nack'.
272277

273-
function ConfirmChannel(connection) {
274-
Channel.call(this, connection);
278+
class ConfirmChannel extends Channel {
279+
constructor(connection) {
280+
super(connection);
281+
}
282+
283+
publish(exchange, routingKey, content, options, cb) {
284+
this.pushConfirmCallback(cb);
285+
return Channel.prototype.publish.call(this, exchange, routingKey, content, options);
286+
}
287+
288+
sendToQueue(queue, content, options, cb) {
289+
return this.publish('', queue, content, options, cb);
290+
}
291+
292+
waitForConfirms() {
293+
const awaiting = [];
294+
const unconfirmed = this.unconfirmed;
295+
unconfirmed.forEach((val, index) => {
296+
if (val === null); // already confirmed
297+
else {
298+
const confirmed = new Promise((resolve, reject) => {
299+
unconfirmed[index] = err => {
300+
if (val) val(err);
301+
if (err === null) resolve();
302+
else reject(err);
303+
};
304+
});
305+
awaiting.push(confirmed);
306+
}
307+
});
308+
return Promise.all(awaiting);
309+
}
275310
}
276-
inherits(ConfirmChannel, Channel);
277311

278312
module.exports.ConfirmChannel = ConfirmChannel;
279-
280-
CM.createConfirmChannel = function() {
281-
var c = new ConfirmChannel(this.connection);
282-
return c.open()
283-
.then(function(openOk) {
284-
return c.rpc(defs.ConfirmSelect, {nowait: false},
285-
defs.ConfirmSelectOk)
286-
})
287-
.then(function() { return c; });
288-
};
289-
290-
var CC = ConfirmChannel.prototype;
291-
292-
CC.publish = function(exchange, routingKey, content, options, cb) {
293-
this.pushConfirmCallback(cb);
294-
return C.publish.call(this, exchange, routingKey, content, options);
295-
};
296-
297-
CC.sendToQueue = function(queue, content, options, cb) {
298-
return this.publish('', queue, content, options, cb);
299-
};
300-
301-
CC.waitForConfirms = function() {
302-
var awaiting = [];
303-
var unconfirmed = this.unconfirmed;
304-
unconfirmed.forEach(function(val, index) {
305-
if (val === null); // already confirmed
306-
else {
307-
var confirmed = new Promise(function(resolve, reject) {
308-
unconfirmed[index] = function(err) {
309-
if (val) val(err);
310-
if (err === null) resolve();
311-
else reject(err);
312-
};
313-
});
314-
awaiting.push(confirmed);
315-
}
316-
});
317-
return Promise.all(awaiting);
318-
};

0 commit comments

Comments
 (0)
Please sign in to comment.