Skip to content

Commit 87a4315

Browse files
authoredNov 11, 2021
port middleware (#160)
1 parent cb11628 commit 87a4315

File tree

7 files changed

+397
-276
lines changed

7 files changed

+397
-276
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
import { Message } from '@node-ts/bus-messages'
2+
import { DefaultHandlerRegistry, Handler } from '../handler'
3+
import { HandlerDefinition, isClassHandler } from '../handler/handler'
4+
import { JsonSerializer, Serializer } from '../serialization'
5+
import { MemoryQueue, Transport, TransportMessage } from '../transport'
6+
import { ClassConstructor, CoreDependencies, Middleware, MiddlewareDispatcher } from '../util'
7+
import { BusInstance } from './bus-instance'
8+
import { Persistence, Workflow, WorkflowState } from '../workflow'
9+
import { WorkflowRegistry } from '../workflow/registry/workflow-registry'
10+
import { BusAlreadyInitialized } from './error'
11+
import { ContainerAdapter } from '../container'
12+
import { defaultLoggerFactory, LoggerFactory } from '../logger'
13+
import { ContainerNotRegistered } from '../error'
14+
import { MessageSerializer } from '../serialization/message-serializer'
15+
import { InMemoryPersistence } from '../workflow/persistence'
16+
17+
export interface BusInitializeOptions {
18+
/**
19+
* If true, will initialize the bus in send only mode.
20+
* This will provide a bus instance that is capable of sending/publishing
21+
* messages only and won't handle incoming messages or workflows
22+
* @default false
23+
*/
24+
sendOnly: boolean
25+
}
26+
27+
const defaultBusInitializeOptions: BusInitializeOptions = {
28+
sendOnly: false
29+
}
30+
31+
export class BusConfiguration {
32+
33+
private configuredTransport: Transport | undefined
34+
private concurrency = 1
35+
private busInstance: BusInstance | undefined
36+
private container: ContainerAdapter | undefined
37+
private workflowRegistry = new WorkflowRegistry()
38+
private handlerRegistry = new DefaultHandlerRegistry()
39+
private loggerFactory: LoggerFactory = defaultLoggerFactory
40+
private serializer = new JsonSerializer()
41+
private persistence: Persistence = new InMemoryPersistence()
42+
private messageReadMiddlewares = new MiddlewareDispatcher<TransportMessage<unknown>>()
43+
44+
/**
45+
* Initializes the bus with the provided configuration
46+
* @param options Changes the default startup mode of the bus
47+
*/
48+
async initialize (options = defaultBusInitializeOptions): Promise<BusInstance> {
49+
const { sendOnly } = options
50+
const logger = this.loggerFactory('@node-ts/bus-core:bus')
51+
logger.debug('Initializing bus', { sendOnly })
52+
53+
if (!!this.busInstance) {
54+
throw new BusAlreadyInitialized()
55+
}
56+
57+
const coreDependencies: CoreDependencies = {
58+
container: this.container,
59+
handlerRegistry: this.handlerRegistry,
60+
loggerFactory: this.loggerFactory,
61+
serializer: this.serializer,
62+
messageSerializer: new MessageSerializer(this.serializer, this.handlerRegistry)
63+
}
64+
65+
if (!sendOnly) {
66+
this.persistence?.prepare(coreDependencies)
67+
this.workflowRegistry.prepare(coreDependencies, this.persistence)
68+
await this.workflowRegistry.initialize(this.handlerRegistry, this.container)
69+
70+
const classHandlers = this.handlerRegistry.getClassHandlers()
71+
if (!this.container && classHandlers.length) {
72+
throw new ContainerNotRegistered(classHandlers[0].constructor.name)
73+
}
74+
}
75+
76+
const transport: Transport = this.configuredTransport || new MemoryQueue()
77+
transport.prepare(coreDependencies)
78+
if (transport.connect) {
79+
await transport.connect()
80+
}
81+
if (!sendOnly && transport.initialize) {
82+
await transport.initialize(this.handlerRegistry)
83+
}
84+
this.busInstance = new BusInstance(
85+
transport,
86+
this.concurrency,
87+
this.workflowRegistry,
88+
coreDependencies,
89+
this.messageReadMiddlewares
90+
)
91+
92+
logger.debug('Bus initialized', { sendOnly, registeredMessages: this.handlerRegistry.getMessageNames() })
93+
94+
return this.busInstance
95+
}
96+
97+
98+
/**
99+
* Register a handler for a specific message type. When Bus is initialized it will configure
100+
* the transport to subscribe to this type of message and upon receipt will forward the message
101+
* through to the provided message handler
102+
* @param messageType Which message will be subscribed to and routed to the handler
103+
* @param messageHandler A callback that will be invoked when the message is received
104+
* @param customResolver Subscribe to a topic that's created and maintained outside of the application
105+
*/
106+
withHandler (classHandler: ClassConstructor<Handler<Message>>): this
107+
withHandler <MessageType extends (Message | object)>(
108+
functionHandler: {
109+
messageType: ClassConstructor<MessageType>,
110+
messageHandler: HandlerDefinition<MessageType>
111+
}
112+
): this
113+
withHandler <MessageType extends (Message | object)>(
114+
handler: ClassConstructor<Handler<Message>> | {
115+
messageType: ClassConstructor<MessageType>,
116+
messageHandler: HandlerDefinition<MessageType>
117+
}): this
118+
{
119+
if (!!this.busInstance) {
120+
throw new BusAlreadyInitialized()
121+
}
122+
123+
if ('messageHandler' in handler) {
124+
this.handlerRegistry.register(
125+
handler.messageType,
126+
handler.messageHandler
127+
)
128+
} else if (isClassHandler(handler)) {
129+
const handlerInstance = new handler()
130+
this.handlerRegistry.register(
131+
handlerInstance.messageType,
132+
handler
133+
)
134+
}
135+
136+
137+
return this
138+
}
139+
140+
/**
141+
* Registers a custom handler that receives messages from external systems, or messages that don't implement the
142+
* Message interface from @node-ts/bus-messages
143+
* @param messageHandler A handler that receives the custom message
144+
* @param customResolver A discriminator that determines if an incoming message should be mapped to this handler.
145+
*/
146+
withCustomHandler<MessageType extends (Message | object)> (
147+
messageHandler: HandlerDefinition<MessageType>,
148+
customResolver: {
149+
resolveWith: ((message: MessageType) => boolean),
150+
topicIdentifier?: string
151+
}
152+
): this
153+
{
154+
if (!!this.busInstance) {
155+
throw new BusAlreadyInitialized()
156+
}
157+
158+
this.handlerRegistry.registerCustom(
159+
messageHandler,
160+
customResolver
161+
)
162+
return this
163+
}
164+
165+
/**
166+
* Register a workflow definition so that all of the messages it depends on will be subscribed to
167+
* and forwarded to the handlers inside the workflow
168+
*/
169+
withWorkflow<TWorkflowState extends WorkflowState> (workflow: ClassConstructor<Workflow<TWorkflowState>>): this {
170+
if (!!this.busInstance) {
171+
throw new BusAlreadyInitialized()
172+
}
173+
174+
this.workflowRegistry.register(
175+
workflow
176+
)
177+
return this
178+
}
179+
180+
/**
181+
* Configures Bus to use a different transport than the default MemoryQueue
182+
*/
183+
withTransport (transportConfiguration: Transport): this {
184+
if (!!this.busInstance) {
185+
throw new BusAlreadyInitialized()
186+
}
187+
188+
this.configuredTransport = transportConfiguration
189+
return this
190+
}
191+
192+
/**
193+
* Configures Bus to use a different logging provider than the default console logger
194+
*/
195+
withLogger (loggerFactory: LoggerFactory): this {
196+
if (!!this.busInstance) {
197+
throw new BusAlreadyInitialized()
198+
}
199+
200+
this.loggerFactory = loggerFactory
201+
return this
202+
}
203+
204+
/**
205+
* Configures Bus to use a different serialization provider. The provider is responsible for
206+
* transforming messages to/from a serialized representation, as well as ensuring all object
207+
* properties are a strong type
208+
*/
209+
withSerializer (serializer: Serializer): this {
210+
if (!!this.busInstance) {
211+
throw new BusAlreadyInitialized()
212+
}
213+
214+
this.serializer = serializer
215+
return this
216+
}
217+
218+
/**
219+
* Configures Bus to use a different persistence provider than the default InMemoryPersistence provider.
220+
* This is used to persist workflow data and is unused if not using workflows.
221+
*/
222+
withPersistence (persistence: Persistence): this {
223+
if (!!this.busInstance) {
224+
throw new BusAlreadyInitialized()
225+
}
226+
227+
this.persistence = persistence
228+
return this
229+
}
230+
231+
/**
232+
* Sets the message handling concurrency beyond the default value of 1, which will increase the number of messages
233+
* handled in parallel.
234+
*/
235+
withConcurrency (concurrency: number): this {
236+
if (concurrency < 1) {
237+
throw new Error('Invalid concurrency setting. Must be set to 1 or greater')
238+
}
239+
240+
this.concurrency = concurrency
241+
return this
242+
}
243+
244+
/**
245+
* Use a local dependency injection/IoC container to resolve handlers
246+
* and workflows.
247+
* @param container An adapter to an existing DI container to fetch class instances from
248+
*/
249+
withContainer (container: { get <T>(type: ClassConstructor<T>): T }): this {
250+
this.container = container
251+
return this
252+
}
253+
254+
/**
255+
* Register optional middlewares that will run for each message that is polled from the transport
256+
* Note these middlewares only run when polling successfully pulls a message off the Transports queue
257+
* After all the user defined middlewares have registered.
258+
*/
259+
withMessageReadMiddleware<TransportMessageType = unknown> (
260+
messageReadMiddleware: Middleware<TransportMessage<TransportMessageType>>
261+
): this {
262+
this.messageReadMiddlewares.use(messageReadMiddleware)
263+
return this
264+
}
265+
}

‎packages/bus-core/src/service-bus/bus-instance.integration.ts

+43-20
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { InMemoryMessage, MemoryQueue, TransportMessage } from '../transport'
22
import { Bus } from './bus'
33
import { BusState } from './bus-state'
44
import { TestEvent } from '../test/test-event'
5-
import { sleep } from '../util'
5+
import { Middleware, sleep } from '../util'
66
import { Mock, IMock, Times, It } from 'typemoq'
77
import { handlerFor, SystemMessageMissingResolver } from '../handler'
88
import { TestCommand } from '../test/test-command'
@@ -26,14 +26,17 @@ describe('BusInstance', () => {
2626
let queue: MemoryQueue
2727
let callback: IMock<Callback>
2828
const handler = handlerFor(TestEvent, async (_: TestEvent) => callback.object())
29+
let messageReadMiddleware: IMock<Middleware<TransportMessage<unknown>>>
2930

3031
beforeAll(async () => {
3132
queue = new MemoryQueue()
3233
callback = Mock.ofType<Callback>()
34+
messageReadMiddleware = Mock.ofType<Middleware<TransportMessage<unknown>>>()
3335

3436
bus = await Bus.configure()
3537
.withTransport(queue)
3638
.withHandler(handler)
39+
.withMessageReadMiddleware(messageReadMiddleware.object)
3740
.initialize()
3841
})
3942

@@ -70,21 +73,37 @@ describe('BusInstance', () => {
7073
})
7174

7275
describe('when a message is successfully handled from the queue', () => {
73-
beforeEach(async () => bus.start())
74-
afterEach(async () => bus.stop())
76+
beforeAll(async () => {
77+
messageReadMiddleware.reset()
7578

76-
it('should delete the message from the queue', async () => {
77-
callback.reset()
78-
callback
79-
.setup(c => c())
80-
.callback(() => undefined)
79+
messageReadMiddleware
80+
.setup(x => x(It.isAny(), It.isAny()))
81+
.returns((_, next) => next())
8182
.verifiable(Times.once())
82-
await bus.publish(event)
83-
await sleep(10)
8483

84+
await bus.start()
85+
86+
await new Promise(async resolve => {
87+
callback.reset()
88+
callback
89+
.setup(c => c())
90+
.callback(resolve)
91+
.verifiable(Times.once())
92+
93+
await bus.publish(event)
94+
})
95+
})
96+
97+
afterAll(async () => bus.stop())
98+
99+
it('should delete the message from the queue', async () => {
85100
expect(queue.depth).toEqual(0)
86101
callback.verifyAll()
87102
})
103+
104+
it('should invoke the message read middlewares', async () => {
105+
messageReadMiddleware.verifyAll()
106+
})
88107
})
89108

90109
describe('when a handled message throws an Error', () => {
@@ -94,17 +113,21 @@ describe('BusInstance', () => {
94113
it('should return the message for retry', async () => {
95114
callback.reset()
96115
let callCount = 0
97-
callback
98-
.setup(c => c())
99-
.callback(() => {
100-
if (callCount++ === 0) {
101-
throw new Error()
102-
}
103-
})
104-
.verifiable(Times.exactly(2))
105116

106-
await bus.publish(event)
107-
await sleep(2000)
117+
await new Promise<void>(async resolve => {
118+
callback
119+
.setup(c => c())
120+
.callback(() => {
121+
if (callCount++ === 0) {
122+
throw new Error()
123+
} else {
124+
resolve()
125+
}
126+
})
127+
.verifiable(Times.exactly(2))
128+
129+
await bus.publish(event)
130+
})
108131

109132
callback.verifyAll()
110133
})

‎packages/bus-core/src/service-bus/bus-instance.ts

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Transport, TransportMessage } from '../transport'
22
import { Event, Command, Message, MessageAttributes } from '@node-ts/bus-messages'
3-
import { sleep, ClassConstructor, TypedEmitter, CoreDependencies} from '../util'
3+
import { sleep, ClassConstructor, TypedEmitter, CoreDependencies, MiddlewareDispatcher, Middleware, Next} from '../util'
44
import { Handler, FunctionHandler, HandlerDefinition, isClassHandler } from '../handler'
55
import { serializeError } from 'serialize-error'
66
import { BusState } from './bus-state'
@@ -42,10 +42,11 @@ export class BusInstance {
4242
private readonly transport: Transport<{}>,
4343
private readonly concurrency: number,
4444
private readonly workflowRegistry: WorkflowRegistry,
45-
private readonly coreDependencies: CoreDependencies
45+
private readonly coreDependencies: CoreDependencies,
46+
private readonly messageReadMiddleware: MiddlewareDispatcher<TransportMessage<unknown>>
4647
) {
4748
this.logger = coreDependencies.loggerFactory('@node-ts/bus-core:service-bus')
48-
49+
this.messageReadMiddleware.useFinal(this.handleNextMessagePolled)
4950
}
5051

5152
/**
@@ -199,8 +200,7 @@ export class BusInstance {
199200
try {
200201
messageHandlingContext.set(message)
201202

202-
await this.dispatchMessageToHandlers(message.domainMessage, message.attributes)
203-
await this.transport.deleteMessage(message)
203+
await this.messageReadMiddleware.dispatch(message)
204204

205205
this.afterDispatch.emit({
206206
message: message.domainMessage,
@@ -300,4 +300,18 @@ export class BusInstance {
300300
)
301301
}
302302
}
303+
304+
/**
305+
* The final middleware that runs, after all the useBeforeHandleNextMessage middlewares have completed
306+
* It dispatches a message that has been polled from the queue
307+
* and deletes the message from the transport
308+
*/
309+
private handleNextMessagePolled: Middleware<TransportMessage<{}>> = async (
310+
message: TransportMessage<{}>,
311+
next: Next
312+
): Promise<void> => {
313+
await this.dispatchMessageToHandlers(message.domainMessage, message.attributes)
314+
await this.transport.deleteMessage(message)
315+
return next()
316+
}
303317
}

‎packages/bus-core/src/service-bus/bus.ts

+1-251
Original file line numberDiff line numberDiff line change
@@ -1,254 +1,4 @@
1-
import { Message } from '@node-ts/bus-messages'
2-
import { DefaultHandlerRegistry, Handler } from '../handler'
3-
import { HandlerDefinition, isClassHandler } from '../handler/handler'
4-
import { JsonSerializer, Serializer } from '../serialization'
5-
import { MemoryQueue, Transport } from '../transport'
6-
import { ClassConstructor, CoreDependencies } from '../util'
7-
import { BusInstance } from './bus-instance'
8-
import { Persistence, Workflow, WorkflowState } from '../workflow'
9-
import { WorkflowRegistry } from '../workflow/registry/workflow-registry'
10-
import { BusAlreadyInitialized } from './error'
11-
import { ContainerAdapter } from '../container'
12-
import { defaultLoggerFactory, LoggerFactory } from '../logger'
13-
import { ContainerNotRegistered } from '../error'
14-
import { MessageSerializer } from '../serialization/message-serializer'
15-
import { InMemoryPersistence } from '../workflow/persistence'
16-
17-
export interface BusInitializeOptions {
18-
/**
19-
* If true, will initialize the bus in send only mode.
20-
* This will provide a bus instance that is capable of sending/publishing
21-
* messages only and won't handle incoming messages or workflows
22-
* @default false
23-
*/
24-
sendOnly: boolean
25-
}
26-
27-
const defaultBusInitializeOptions: BusInitializeOptions = {
28-
sendOnly: false
29-
}
30-
31-
export class BusConfiguration {
32-
33-
private configuredTransport: Transport | undefined
34-
private concurrency = 1
35-
private busInstance: BusInstance | undefined
36-
private container: ContainerAdapter | undefined
37-
private workflowRegistry = new WorkflowRegistry()
38-
private handlerRegistry = new DefaultHandlerRegistry()
39-
private loggerFactory: LoggerFactory = defaultLoggerFactory
40-
private serializer = new JsonSerializer()
41-
private persistence: Persistence = new InMemoryPersistence()
42-
43-
/**
44-
* Initializes the bus with the provided configuration
45-
* @param options Changes the default startup mode of the bus
46-
*/
47-
async initialize (options = defaultBusInitializeOptions): Promise<BusInstance> {
48-
const { sendOnly } = options
49-
const logger = this.loggerFactory('@node-ts/bus-core:bus')
50-
logger.debug('Initializing bus', { sendOnly })
51-
52-
if (!!this.busInstance) {
53-
throw new BusAlreadyInitialized()
54-
}
55-
56-
const coreDependencies: CoreDependencies = {
57-
container: this.container,
58-
handlerRegistry: this.handlerRegistry,
59-
loggerFactory: this.loggerFactory,
60-
serializer: this.serializer,
61-
messageSerializer: new MessageSerializer(this.serializer, this.handlerRegistry)
62-
}
63-
64-
if (!sendOnly) {
65-
this.persistence?.prepare(coreDependencies)
66-
this.workflowRegistry.prepare(coreDependencies, this.persistence)
67-
await this.workflowRegistry.initialize(this.handlerRegistry, this.container)
68-
69-
const classHandlers = this.handlerRegistry.getClassHandlers()
70-
if (!this.container && classHandlers.length) {
71-
throw new ContainerNotRegistered(classHandlers[0].constructor.name)
72-
}
73-
}
74-
75-
const transport: Transport = this.configuredTransport || new MemoryQueue()
76-
transport.prepare(coreDependencies)
77-
if (transport.connect) {
78-
await transport.connect()
79-
}
80-
if (!sendOnly && transport.initialize) {
81-
await transport.initialize(this.handlerRegistry)
82-
}
83-
this.busInstance = new BusInstance(
84-
transport,
85-
this.concurrency,
86-
this.workflowRegistry,
87-
coreDependencies
88-
)
89-
90-
logger.debug('Bus initialized', { sendOnly, registeredMessages: this.handlerRegistry.getMessageNames() })
91-
92-
return this.busInstance
93-
}
94-
95-
96-
/**
97-
* Register a handler for a specific message type. When Bus is initialized it will configure
98-
* the transport to subscribe to this type of message and upon receipt will forward the message
99-
* through to the provided message handler
100-
* @param messageType Which message will be subscribed to and routed to the handler
101-
* @param messageHandler A callback that will be invoked when the message is received
102-
* @param customResolver Subscribe to a topic that's created and maintained outside of the application
103-
*/
104-
withHandler (classHandler: ClassConstructor<Handler<Message>>): this
105-
withHandler <MessageType extends (Message | object)>(
106-
functionHandler: {
107-
messageType: ClassConstructor<MessageType>,
108-
messageHandler: HandlerDefinition<MessageType>
109-
}
110-
): this
111-
withHandler <MessageType extends (Message | object)>(
112-
handler: ClassConstructor<Handler<Message>> | {
113-
messageType: ClassConstructor<MessageType>,
114-
messageHandler: HandlerDefinition<MessageType>
115-
}): this
116-
{
117-
if (!!this.busInstance) {
118-
throw new BusAlreadyInitialized()
119-
}
120-
121-
if ('messageHandler' in handler) {
122-
this.handlerRegistry.register(
123-
handler.messageType,
124-
handler.messageHandler
125-
)
126-
} else if (isClassHandler(handler)) {
127-
const handlerInstance = new handler()
128-
this.handlerRegistry.register(
129-
handlerInstance.messageType,
130-
handler
131-
)
132-
}
133-
134-
135-
return this
136-
}
137-
138-
/**
139-
* Registers a custom handler that receives messages from external systems, or messages that don't implement the
140-
* Message interface from @node-ts/bus-messages
141-
* @param messageHandler A handler that receives the custom message
142-
* @param customResolver A discriminator that determines if an incoming message should be mapped to this handler.
143-
*/
144-
withCustomHandler<MessageType extends (Message | object)> (
145-
messageHandler: HandlerDefinition<MessageType>,
146-
customResolver: {
147-
resolveWith: ((message: MessageType) => boolean),
148-
topicIdentifier?: string
149-
}
150-
): this
151-
{
152-
if (!!this.busInstance) {
153-
throw new BusAlreadyInitialized()
154-
}
155-
156-
this.handlerRegistry.registerCustom(
157-
messageHandler,
158-
customResolver
159-
)
160-
return this
161-
}
162-
163-
/**
164-
* Register a workflow definition so that all of the messages it depends on will be subscribed to
165-
* and forwarded to the handlers inside the workflow
166-
*/
167-
withWorkflow<TWorkflowState extends WorkflowState> (workflow: ClassConstructor<Workflow<TWorkflowState>>): this {
168-
if (!!this.busInstance) {
169-
throw new BusAlreadyInitialized()
170-
}
171-
172-
this.workflowRegistry.register(
173-
workflow
174-
)
175-
return this
176-
}
177-
178-
/**
179-
* Configures Bus to use a different transport than the default MemoryQueue
180-
*/
181-
withTransport (transportConfiguration: Transport): this {
182-
if (!!this.busInstance) {
183-
throw new BusAlreadyInitialized()
184-
}
185-
186-
this.configuredTransport = transportConfiguration
187-
return this
188-
}
189-
190-
/**
191-
* Configures Bus to use a different logging provider than the default console logger
192-
*/
193-
withLogger (loggerFactory: LoggerFactory): this {
194-
if (!!this.busInstance) {
195-
throw new BusAlreadyInitialized()
196-
}
197-
198-
this.loggerFactory = loggerFactory
199-
return this
200-
}
201-
202-
/**
203-
* Configures Bus to use a different serialization provider. The provider is responsible for
204-
* transforming messages to/from a serialized representation, as well as ensuring all object
205-
* properties are a strong type
206-
*/
207-
withSerializer (serializer: Serializer): this {
208-
if (!!this.busInstance) {
209-
throw new BusAlreadyInitialized()
210-
}
211-
212-
this.serializer = serializer
213-
return this
214-
}
215-
216-
/**
217-
* Configures Bus to use a different persistence provider than the default InMemoryPersistence provider.
218-
* This is used to persist workflow data and is unused if not using workflows.
219-
*/
220-
withPersistence (persistence: Persistence): this {
221-
if (!!this.busInstance) {
222-
throw new BusAlreadyInitialized()
223-
}
224-
225-
this.persistence = persistence
226-
return this
227-
}
228-
229-
/**
230-
* Sets the message handling concurrency beyond the default value of 1, which will increase the number of messages
231-
* handled in parallel.
232-
*/
233-
withConcurrency (concurrency: number): this {
234-
if (concurrency < 1) {
235-
throw new Error('Invalid concurrency setting. Must be set to 1 or greater')
236-
}
237-
238-
this.concurrency = concurrency
239-
return this
240-
}
241-
242-
/**
243-
* Use a local dependency injection/IoC container to resolve handlers
244-
* and workflows.
245-
* @param container An adapter to an existing DI container to fetch class instances from
246-
*/
247-
withContainer (container: { get <T>(type: ClassConstructor<T>): T }): this {
248-
this.container = container
249-
return this
250-
}
251-
}
1+
import { BusConfiguration } from './bus-configuration'
2522

2533
export class Bus {
2544
private constructor () {
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from './bus'
22
export * from './bus-instance'
33
export * from './bus-state'
4+
export * from './bus-configuration'

‎packages/bus-core/src/util/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from './class-constructor'
33
export * from './assert-unreachable'
44
export * from './typed-emitter'
55
export * from './core-dependencies'
6+
export * from './middleware'
+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* This middleware pattern has been grabbed from here:
3+
* https://evertpot.com/generic-middleware/
4+
*/
5+
6+
/**
7+
* 'next' function, passed to a middleware
8+
*/
9+
export type Next = () => void | Promise<void>
10+
11+
/**
12+
* A middleware
13+
*/
14+
export type Middleware<T> =
15+
(context: T, next: Next) => Promise<void> | void
16+
17+
/**
18+
* A middleware container and invoker
19+
*/
20+
export class MiddlewareDispatcher<T> {
21+
22+
middlewares: Middleware<T>[] = []
23+
24+
constructor (
25+
readonly finalMiddlewares: Middleware<T>[] = []
26+
) {
27+
}
28+
29+
/**
30+
* Add a middleware function.
31+
*/
32+
use(...middlewares: Middleware<T>[]): void {
33+
this.middlewares.push(...middlewares)
34+
}
35+
36+
/**
37+
* Add 'final' middlewares that will be added to the end of the
38+
* regular middlewares. This allows for finer control when exposing
39+
* the @see use functionality to consumers but wanting to ensure that your
40+
* final middleware is last to run
41+
*/
42+
useFinal(...middlewares: Middleware<T>[]): void {
43+
this.finalMiddlewares.push(...middlewares)
44+
}
45+
46+
/**
47+
* Execute the chain of middlewares, in the order they were added on a
48+
* given Context.
49+
*/
50+
dispatch(context: T): Promise<void> {
51+
return invokeMiddlewares(context, this.middlewares.concat(this.finalMiddlewares))
52+
}
53+
}
54+
55+
56+
async function invokeMiddlewares<T>(context: T, middlewares: Middleware<T>[]): Promise<void> {
57+
58+
if (!middlewares.length) {
59+
return
60+
}
61+
62+
const middleware = middlewares[0]
63+
64+
return middleware(context, async () => {
65+
await invokeMiddlewares(context, middlewares.slice(1))
66+
})
67+
}

0 commit comments

Comments
 (0)
Please sign in to comment.