Skip to content

Commit bb5b24b

Browse files
authoredNov 17, 2021
Fix: Memory queue fail deletes other messages (#163)
* fix memory-queue fail handling * package all from service-bus
1 parent 0aad404 commit bb5b24b

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed
 

‎packages/bus-core/src/index.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
export * from './service-bus/bus'
2-
export * from './service-bus/bus-instance'
31
export {
42
Transport,
53
TransportMessage,
@@ -13,3 +11,4 @@ export * from './workflow'
1311
export * from './error'
1412
export * from './container'
1513
export * from './logger'
14+
export * from './service-bus'

‎packages/bus-core/src/transport/memory-queue.spec.ts

+22-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import { MessageAttributes } from '@node-ts/bus-messages'
55
import * as faker from 'faker'
66
import { IMock, Mock } from 'typemoq'
77
import { Logger, LoggerFactory } from '../logger'
8-
import { DefaultHandlerRegistry, HandlerRegistry } from '../handler'
8+
import { DefaultHandlerRegistry, handlerFor, HandlerRegistry } from '../handler'
99
import { JsonSerializer, MessageSerializer } from '../serialization'
10+
import EventEmitter from 'events'
11+
import { Bus } from '../service-bus/bus'
1012

1113
const event = new TestEvent()
1214
const command = new TestCommand()
@@ -163,5 +165,24 @@ describe('MemoryQueue', () => {
163165
it('should forward it to the dead letter queue', () => {
164166
expect(sut.deadLetterQueueDepth).toEqual(1)
165167
})
168+
169+
it('should only fail the handled message', async () => {
170+
const emitter = new EventEmitter()
171+
const bus = await Bus.configure()
172+
.withConcurrency(1)
173+
.withHandler(handlerFor(TestEvent, async () => {
174+
await bus.send(new TestCommand())
175+
await bus.fail()
176+
}))
177+
.withHandler(handlerFor(TestCommand, () => { emitter.emit('done') }))
178+
.initialize()
179+
180+
await bus.start()
181+
182+
const completion = new Promise<void>(resolve => emitter.once('done', resolve))
183+
await bus.publish(new TestEvent())
184+
await completion
185+
await bus.dispose()
186+
})
166187
})
167188
})

‎packages/bus-core/src/transport/memory-queue.ts

+16-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ export class MemoryQueue implements Transport<InMemoryMessage> {
4040

4141
private queue: TransportMessage<InMemoryMessage>[] = []
4242
private queuePushed: EventEmitter = new EventEmitter()
43-
private deadLetterQueue: TransportMessage<InMemoryMessage>[] = []
43+
private _deadLetterQueue: TransportMessage<InMemoryMessage>[] = []
4444
private messagesWithHandlers: { [key: string]: {} }
4545
private logger: Logger
4646
private coreDependencies: CoreDependencies
@@ -116,6 +116,11 @@ export class MemoryQueue implements Transport<InMemoryMessage> {
116116

117117
async deleteMessage (message: TransportMessage<InMemoryMessage>): Promise<void> {
118118
const messageIndex = this.queue.indexOf(message)
119+
if (messageIndex < 0) {
120+
// actions like .fail() will cause the message to already be deleted
121+
this.logger.debug('Message already deleted', { message, messageIndex })
122+
return
123+
}
119124
this.logger.debug('Deleting message', { queueDepth: this.depth, messageIndex })
120125
this.queue.splice(messageIndex, 1)
121126
this.logger.debug('Message Deleted', { queueDepth: this.depth })
@@ -141,7 +146,15 @@ export class MemoryQueue implements Transport<InMemoryMessage> {
141146
}
142147

143148
get deadLetterQueueDepth (): number {
144-
return this.deadLetterQueue.length
149+
return this._deadLetterQueue.length
150+
}
151+
152+
/**
153+
* Returns all messages sitting in the dead letter queue. This is a copy of the queue
154+
* so mutative actions on this array will have no consequence.
155+
*/
156+
get deadLetterQueue (): TransportMessage<InMemoryMessage>[] {
157+
return [...this._deadLetterQueue]
145158
}
146159

147160
/**
@@ -153,7 +166,7 @@ export class MemoryQueue implements Transport<InMemoryMessage> {
153166

154167

155168
private async sendToDeadLetterQueue (message: TransportMessage<InMemoryMessage>): Promise<void> {
156-
this.deadLetterQueue.push(message)
169+
this._deadLetterQueue.push(message)
157170
await this.deleteMessage(message)
158171
}
159172

0 commit comments

Comments
 (0)
Please sign in to comment.