Skip to content

Commit 17bc292

Browse files
committedJun 2, 2020
Squashed commit of the following:
commit ae945ff1cd1b8c3f36130fec2ceb1e26f221c0c3 Author: Andrew den Hertog <andrew.denhertog@gmail.com> Date: Tue Jun 2 15:16:46 2020 +1000 add hooks
1 parent dab44de commit 17bc292

File tree

6 files changed

+69
-8
lines changed

6 files changed

+69
-8
lines changed
 

‎packages/bus-core/README.md

+13
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,17 @@ export class Application {
8787
await this.bus.stop()
8888
}
8989
}
90+
```
91+
92+
## Hooks
93+
94+
Hooks are callback functions that are invoked each time an action occurs. These are commonly used to add in testing, logging or health probes centrally to the application.
95+
96+
Hooks can be added by calling `.on()` on the bus. For example, to trigger a callback each time a message is attempted to be sent, use:
97+
98+
```typescript
99+
addHook (): void {
100+
const bus = this.container.get<Bus>(BUS_SYMBOLS.Bus)
101+
bus.on('send', message => console.log('Sending', JSON.stringify(message)))
102+
}
90103
```
+9-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Event, Command, MessageAttributes } from '@node-ts/bus-messages'
1+
import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages'
22

33
export enum BusState {
44
Stopped = 'stopped',
@@ -7,9 +7,17 @@ export enum BusState {
77
Stopping = 'stopping'
88
}
99

10+
export type HookAction = 'send' | 'publish'
11+
export type HookCallback = (message: Message, messageAttributes?: MessageAttributes) => Promise<void>
12+
1013
export interface Bus {
1114
publish<EventType extends Event> (event: EventType, messageOptions?: MessageAttributes): Promise<void>
1215
send<CommandType extends Command> (command: CommandType, messageOptions?: MessageAttributes): Promise<void>
1316
start (): Promise<void>
1417
stop (): Promise<void>
18+
19+
/**
20+
* Registers a @param callback function that is invoked for every instance of @param action occuring
21+
*/
22+
on (action: HookAction, callback: HookCallback): void
1523
}

‎packages/bus-core/src/service-bus/service-bus.integration.ts

+24-5
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ import { ServiceBus } from './service-bus'
22
import { MemoryQueue } from '../transport'
33
import { BusState } from './bus'
44
import { TestEvent } from '../test/test-event'
5+
import { TestEvent2 } from '../test/test-event-2'
6+
import { TestCommand } from '../test/test-command'
57
import { sleep } from '../util'
68
import { Container, inject } from 'inversify'
79
import { TestContainer } from '../test/test-container'
810
import { BUS_SYMBOLS } from '../bus-symbols'
911
import { Logger } from '@node-ts/logger-core'
1012
import { Mock, IMock, Times } from 'typemoq'
11-
import { HandlerRegistry, HandlesMessage } from '../handler'
13+
import { HandlesMessage } from '../handler'
1214
import { ApplicationBootstrap } from '../application-bootstrap'
15+
import { MessageAttributes } from '@node-ts/bus-messages'
1316

1417
const event = new TestEvent()
1518
type Callback = () => void
@@ -40,9 +43,6 @@ describe('ServiceBus', () => {
4043
container = new TestContainer().silenceLogs()
4144
queue = new MemoryQueue(Mock.ofType<Logger>().object)
4245

43-
const transport = container.get<MemoryQueue>(BUS_SYMBOLS.Transport)
44-
const registry = container.get<HandlerRegistry>(BUS_SYMBOLS.HandlerRegistry)
45-
4646
bootstrapper = container.get<ApplicationBootstrap>(BUS_SYMBOLS.ApplicationBootstrap)
4747
bootstrapper.registerHandler(TestEventHandler)
4848

@@ -56,6 +56,26 @@ describe('ServiceBus', () => {
5656
await bootstrapper.dispose()
5757
})
5858

59+
describe('when registering a send hook', () => {
60+
it('should trigger the hook when send() is called', async () => {
61+
const sendCallback = jest.fn()
62+
sut.on('send', sendCallback)
63+
const command = new TestCommand()
64+
await sut.send(command)
65+
expect(sendCallback).toHaveBeenCalledWith(command, expect.any(MessageAttributes))
66+
})
67+
})
68+
69+
describe('when registering a publish hook', () => {
70+
it('should trigger the hook when publish() is called', async () => {
71+
const publishCallback = jest.fn()
72+
sut.on('publish', publishCallback)
73+
const evt = new TestEvent2()
74+
await sut.publish(evt)
75+
expect(publishCallback).toHaveBeenCalledWith(evt, expect.any(MessageAttributes))
76+
})
77+
})
78+
5979
describe('when starting the service bus', () => {
6080
it('should complete into a started state', () => {
6181
expect(sut.state).toEqual(BusState.Started)
@@ -68,7 +88,6 @@ describe('ServiceBus', () => {
6888
})
6989
})
7090

71-
7291
describe('when a message is successfully handled from the queue', () => {
7392
it('should delete the message from the queue', async () => {
7493
callback.reset()

‎packages/bus-core/src/service-bus/service-bus.ts

+15-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { injectable, inject } from 'inversify'
22
import autobind from 'autobind-decorator'
3-
import { Bus, BusState } from './bus'
3+
import { Bus, BusState, HookAction, HookCallback } from './bus'
44
import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from '../bus-symbols'
55
import { Transport } from '../transport'
66
import { Event, Command, Message, MessageAttributes } from '@node-ts/bus-messages'
77
import { Logger, LOGGER_SYMBOLS } from '@node-ts/logger-core'
8-
import { sleep } from '../util'
8+
import { sleep, assertUnreachable } from '../util'
99
import { HandlerRegistry, HandlerRegistration } from '../handler'
1010
import * as serializeError from 'serialize-error'
1111
import { SessionScopeBinder } from '../bus-module'
@@ -20,6 +20,11 @@ export class ServiceBus implements Bus {
2020
private internalState: BusState = BusState.Stopped
2121
private runningWorkerCount = 0
2222

23+
private messageHooks: { [key: string]: HookCallback[] } = {
24+
send: [],
25+
publish: []
26+
}
27+
2328
constructor (
2429
@inject(BUS_SYMBOLS.Transport) private readonly transport: Transport<{}>,
2530
@inject(LOGGER_SYMBOLS.Logger) private readonly logger: Logger,
@@ -34,6 +39,8 @@ export class ServiceBus implements Bus {
3439
): Promise<void> {
3540
this.logger.debug('publish', { event })
3641
const transportOptions = this.prepareTransportOptions(messageOptions)
42+
43+
this.messageHooks.publish.map(callback => callback(event, messageOptions))
3744
return this.transport.publish(event, transportOptions)
3845
}
3946

@@ -43,6 +50,8 @@ export class ServiceBus implements Bus {
4350
): Promise<void> {
4451
this.logger.debug('send', { command })
4552
const transportOptions = this.prepareTransportOptions(messageOptions)
53+
54+
this.messageHooks.send.map(callback => callback(command, messageOptions))
4655
return this.transport.send(command, transportOptions)
4756
}
4857

@@ -72,6 +81,10 @@ export class ServiceBus implements Bus {
7281
return this.internalState
7382
}
7483

84+
on (action: HookAction, callback: HookCallback): void {
85+
this.messageHooks[action].push(callback)
86+
}
87+
7588
private async applicationLoop (): Promise<void> {
7689
this.runningWorkerCount++
7790
while (this.internalState === BusState.Started) {

‎packages/bus-core/src/test/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export * from './test-container'
55
export * from './test-command-handler'
66
export * from './test-command-2'
77
export * from './test-system-message'
8+
export * from './test-event-2'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { Event } from '@node-ts/bus-messages'
2+
3+
export class TestEvent2 extends Event {
4+
static NAME = '@node-ts/bus-core/test-event-2'
5+
$name = TestEvent2.NAME
6+
$version = 1
7+
}

0 commit comments

Comments
 (0)
Please sign in to comment.