Skip to content

Commit d72af39

Browse files
committedOct 18, 2021
start using async where it may fit
1 parent 5f130f2 commit d72af39

File tree

1 file changed

+49
-54
lines changed

1 file changed

+49
-54
lines changed
 

‎lib/channel_model.js

+49-54
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,17 @@ class ChannelModel extends EventEmitter {
2626
return Promise.fromCallback(this.connection.close.bind(this.connection));
2727
}
2828

29-
createChannel() {
30-
const c = new Channel(this.connection);
31-
return c.open().then(openOk => { return c; });
29+
async createChannel() {
30+
const channel = new Channel(this.connection);
31+
await channel.open();
32+
return channel;
3233
}
3334

34-
createConfirmChannel() {
35-
const c = new ConfirmChannel(this.connection);
36-
return c.open()
37-
.then(openOk => {
38-
return c.rpc(defs.ConfirmSelect, {nowait: false},
39-
defs.ConfirmSelectOk)
40-
})
41-
.then(() => { return c; });
35+
async createConfirmChannel() {
36+
const channel = new ConfirmChannel(this.connection);
37+
await channel.open();
38+
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
39+
return channel;
4240
}
4341
}
4442

@@ -54,13 +52,12 @@ class Channel extends BaseChannel {
5452
// An RPC that returns a 'proper' promise, which resolves to just the
5553
// response's fields; this is intended to be suitable for implementing
5654
// API procedures.
57-
rpc(method, fields, expect) {
58-
return Promise.fromCallback(cb => {
55+
async rpc(method, fields, expect) {
56+
const f = await Promise.fromCallback(cb => {
5957
return this._rpc(method, fields, expect, cb);
6058
})
61-
.then(f => {
62-
return f.fields;
63-
});
59+
60+
return f.fields;
6461
}
6562

6663
// Do the remarkably simple channel open handshake
@@ -161,53 +158,50 @@ class Channel extends BaseChannel {
161158
return this.publish('', queue, content, options);
162159
}
163160

164-
consume(queue, callback, options) {
161+
async consume(queue, callback, options) {
165162
// NB we want the callback to be run synchronously, so that we've
166163
// registered the consumerTag before any messages can arrive.
167164
const fields = Args.consume(queue, options);
168-
return Promise.fromCallback(cb => {
165+
const ok = await Promise.fromCallback(cb => {
169166
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
170167
})
171-
.then(ok => {
172-
this.registerConsumer(ok.fields.consumerTag, callback);
173-
return ok.fields;
174-
});
168+
169+
this.registerConsumer(ok.fields.consumerTag, callback);
170+
return ok.fields;
175171
}
176172

177-
cancel(consumerTag) {
178-
return Promise.fromCallback(cb => {
173+
async cancel(consumerTag) {
174+
const ok = await Promise.fromCallback(cb => {
179175
this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
180176
defs.BasicCancelOk,
181177
cb);
182178
})
183-
.then(ok => {
184-
this.unregisterConsumer(consumerTag);
185-
return ok.fields;
186-
});
179+
180+
this.unregisterConsumer(consumerTag);
181+
return ok.fields;
187182
}
188183

189-
get(queue, options) {
184+
async get(queue, options) {
190185
const fields = Args.get(queue, options);
191-
return Promise.fromCallback(cb => {
192-
return this.sendOrEnqueue(defs.BasicGet, fields, cb);
186+
const f = await Promise.fromCallback(cb => {
187+
this.sendOrEnqueue(defs.BasicGet, fields, cb);
193188
})
194-
.then(f => {
195-
if (f.id === defs.BasicGetEmpty) {
196-
return false;
197-
}
198-
else if (f.id === defs.BasicGetOk) {
199-
const fields = f.fields;
200-
return new Promise(resolve => {
201-
this.handleMessage = acceptMessage(m => {
202-
m.fields = fields;
203-
resolve(m);
204-
});
189+
190+
if (f.id === defs.BasicGetEmpty) {
191+
return false;
192+
}
193+
else if (f.id === defs.BasicGetOk) {
194+
const fields = f.fields;
195+
return new Promise(resolve => {
196+
this.handleMessage = acceptMessage(m => {
197+
m.fields = fields;
198+
resolve(m);
205199
});
206-
}
207-
else {
208-
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`);
209-
}
210-
});
200+
});
201+
}
202+
else {
203+
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`);
204+
}
211205
}
212206

213207
ack(message, allUpTo) {
@@ -246,18 +240,20 @@ class Channel extends BaseChannel {
246240
Args.recover(),
247241
defs.BasicRecoverOk);
248242
}
243+
244+
qos(count, global) {
245+
return this.rpc(defs.BasicQos,
246+
Args.prefetch(count, global),
247+
defs.BasicQosOk);
248+
}
249249
}
250250

251251
// There are more options in AMQP than exposed here; RabbitMQ only
252252
// implements prefetch based on message count, and only for individual
253253
// channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
254254
// (without `global` set) as per-consumer (for consumers following),
255255
// and prefetch with `global` set as per-channel.
256-
Channel.prototype.prefetch = Channel.prototype.qos = function(count, global) {
257-
return this.rpc(defs.BasicQos,
258-
Args.prefetch(count, global),
259-
defs.BasicQosOk);
260-
};
256+
Channel.prototype.prefetch = Channel.prototype.qos
261257

262258
// Confirm channel. This is a channel with confirms 'switched on',
263259
// meaning sent messages will provoke a responding 'ack' or 'nack'
@@ -280,8 +276,7 @@ class ConfirmChannel extends Channel {
280276
const awaiting = [];
281277
const unconfirmed = this.unconfirmed;
282278
unconfirmed.forEach((val, index) => {
283-
if (val === null); // already confirmed
284-
else {
279+
if (val !== null) {
285280
const confirmed = new Promise((resolve, reject) => {
286281
unconfirmed[index] = err => {
287282
if (val) val(err);

0 commit comments

Comments
 (0)
Please sign in to comment.