Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: node-ts/bus
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 895bdd8ae7080e5fdb2dd7832236793ac438fb9a
Choose a base ref
...
head repository: node-ts/bus
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 2fad73410a7338dc0d89039085564e68ab68c58a
Choose a head ref
Loading
Showing with 2,897 additions and 1,376 deletions.
  1. +1 −1 .circleci/config.yml
  2. +1 −1 .nvmrc
  3. +8 −7 package.json
  4. +32 −2 packages/bus-core/README.md
  5. +4 −4 packages/bus-core/package.json
  6. +2 −1 packages/bus-core/src/bus-module.ts
  7. +4 −2 packages/bus-core/src/bus-symbols.ts
  8. +20 −0 packages/bus-core/src/error/fail-message-outside-handling-context.ts
  9. +1 −0 packages/bus-core/src/error/index.ts
  10. +2 −1 packages/bus-core/src/index.ts
  11. +10 −6 packages/bus-core/src/serialization/json-serializer.ts
  12. +14 −15 packages/bus-core/src/serialization/message-serializer.spec.ts
  13. +9 −8 packages/bus-core/src/serialization/message-serializer.ts
  14. +15 −2 packages/bus-core/src/serialization/serializer.ts
  15. +13 −0 packages/bus-core/src/service-bus/bus-configuration.ts
  16. +24 −31 packages/bus-core/src/service-bus/bus-hooks.spec.ts
  17. +13 −6 packages/bus-core/src/service-bus/bus-hooks.ts
  18. +32 −2 packages/bus-core/src/service-bus/bus.ts
  19. +2 −0 packages/bus-core/src/service-bus/index.ts
  20. +13 −0 packages/bus-core/src/service-bus/message-handling-context.ts
  21. +196 −82 packages/bus-core/src/service-bus/service-bus.integration.ts
  22. +51 −26 packages/bus-core/src/service-bus/service-bus.ts
  23. +1 −0 packages/bus-core/src/test/index.ts
  24. +13 −0 packages/bus-core/src/test/test-fail-message.ts
  25. +1 −1 packages/bus-core/src/transport/index.ts
  26. +49 −8 packages/bus-core/src/transport/memory-queue.spec.ts
  27. +69 −16 packages/bus-core/src/transport/memory-queue.ts
  28. +7 −1 packages/bus-core/src/transport/transport.ts
  29. +16 −15 packages/bus-core/yarn.lock
  30. +1 −1 packages/bus-messages/README.md
  31. +2 −2 packages/bus-messages/package.json
  32. +8 −8 packages/bus-messages/yarn.lock
  33. +2 −2 packages/bus-postgres/README.md
  34. +4 −4 packages/bus-postgres/package.json
  35. +1 −1 packages/bus-rabbitmq/README.md
  36. +4 −4 packages/bus-rabbitmq/package.json
  37. +54 −4 packages/bus-rabbitmq/src/rabbitmq-transport.integration.ts
  38. +18 −5 packages/bus-rabbitmq/src/rabbitmq-transport.ts
  39. +2 −0 packages/bus-rabbitmq/test/index.ts
  40. +16 −0 packages/bus-rabbitmq/test/test-fail-message-handler.ts
  41. +13 −0 packages/bus-rabbitmq/test/test-fail-message.ts
  42. +4 −2 packages/bus-sqs/README.md
  43. +4 −4 packages/bus-sqs/package.json
  44. +6 −0 packages/bus-sqs/src/sqs-transport-configuration.ts
  45. +74 −20 packages/bus-sqs/src/sqs-transport.integration.ts
  46. +31 −1 packages/bus-sqs/src/sqs-transport.ts
  47. +2 −0 packages/bus-sqs/test/index.ts
  48. +16 −0 packages/bus-sqs/test/test-fail-message-handler.ts
  49. +13 −0 packages/bus-sqs/test/test-fail-message.ts
  50. +1 −1 packages/bus-workflow/README.md
  51. +3 −3 packages/bus-workflow/package.json
  52. +12 −6 packages/bus-workflow/src/workflow/decorators/handles.ts
  53. +1 −1 packages/bus-workflow/src/workflow/message-workflow-mapping.ts
  54. +1 −1 packages/bus-workflow/src/workflow/persistence/in-memory-persistence.ts
  55. +4 −0 test.env
  56. +1,977 −1,068 yarn.lock
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ version: 2
defaults: &defaults
working_directory: ~/repo
docker:
- image: circleci/node:8.14.1
- image: circleci/node:12.18.1

jobs:
build:
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8.15.0
12.14.1
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@
"docs:deploy": "./deploy-docs.sh",
"lint": "lerna run lint",
"test": "yarn test:unit && yarn test:integration",
"test:unit": "jest \"(src\\/.+\\.|/)spec\\.ts$\"",
"test:unit": "dotenv -e test.env -- jest \"(src\\/.+\\.|/)spec\\.ts$\"",
"test:unit:watch": "yarn run test:unit --watch",
"test:integration": "jest --runInBand \"bus-(core|messages|workflow).*(src\\/.+\\.|/)integration\\.ts$\""
"test:integration": "dotenv -e test.env -- jest --runInBand \"bus-(core|messages|workflow).*(src\\/.+\\.|/)integration\\.ts$\""
},
"keywords": [
"typescript",
@@ -44,14 +44,15 @@
],
"devDependencies": {
"@node-ts/code-standards": "^0.0.10",
"@types/jest": "^24.0.12",
"@types/jest": "^26.0.4",
"dotenv-cli": "^3.2.0",
"inversify": "^5.0.1",
"jest": "^24.7.1",
"jest": "^26.1.0",
"lerna": "^3.18.4",
"reflect-metadata": "^0.1.13",
"ts-jest": "^25.1.0",
"tslib": "^1.9.3",
"typescript": "^3.3.3",
"ts-jest": "^26.1.1",
"tslib": "^2.0.0",
"typescript": "^3.9.6",
"vuepress": "^1.1.0",
"vuepress-plugin-sitemap": "^2.1.2"
}
34 changes: 32 additions & 2 deletions packages/bus-core/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# @node-ts/bus-core

[![Greenkeeper badge](https://badges.greenkeeper.io/node-ts/bus.svg)](https://greenkeeper.io/)
[![Known Vulnerabilities](https://snyk.io/test/github/node-ts/bus/badge.svg)](https://snyk.io/test/github/node-ts/bus)
[![CircleCI](https://circleci.com/gh/node-ts/bus/tree/master.svg?style=svg)](https://circleci.com/gh/node-ts/bus/tree/master)[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)

The core messaging framework. This package provides an in-memory queue and persistence by default, but is designed to be used with other @node-ts/bus-* packages that provide compatibility with other transports (SQS, RabbitMQ, Azure Queues) and persistence technologies (PostgreSQL, SQL Server, Oracle).
@@ -29,7 +29,7 @@ export class ApplicationContainer extends Container {

## Register a message handler

Messages are handled by defining and registring a handler class. Each time a message is received by the application, it will be dispatched to each of the registered handlers.
Messages are handled by defining and registering a handler class. Each time a message is received by the application, it will be dispatched to each of the registered handlers.

Define the handler:

@@ -105,3 +105,33 @@ addHook (): void {
bus.off('send', callback)
}
```

## Failing a Message

When an error is thrown whilst handling an error, the message is typically sent back to the queue so that it can be retried.

There are times when we know that a message will never succeed even if it were to be retried. In these situations we may not want to wait for our message to be retried before sending it to the dead letter queue, but instead bypass the retries and send it to the dead letter queue immediately.

This can be done by calling `bus.fail()` from within the scope of a message handling context. This will instruct `@node-ts/bus` to forward the currently handled message to the dead letter queue and remove it from the service queue.

## Message handling concurrency

By default, `@node-ts/bus` will run with a message handling concurrency of 1. This means that only a single message will be read off the queue and processed at a time.

To increase the message handling concurrency, provide your configuration like so:

```typescript
import { Container } from 'inversify'
import { BusModule, BUS_SYMBOLS, BusConfiguration } from '@node-ts/bus-core'

const concurrency = 3 // Handle up to 3 messages in parallel
export class ApplicationContainer extends Container {
constructor () {
this.load(new BusModule())

this
.rebind<BusConfiguration>(BUS_SYMBOLS.BusConfiguration)
.toConstantValue({ concurrency })
}
}
```
8 changes: 4 additions & 4 deletions packages/bus-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@node-ts/bus-core",
"version": "0.5.9",
"version": "0.6.4",
"description": "A service bus for message-based, distributed node applications",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
@@ -13,10 +13,10 @@
"access": "public"
},
"dependencies": {
"@node-ts/bus-messages": "^0.2.3",
"@types/node": "^12.12.37",
"@node-ts/bus-messages": "^0.3.1",
"@types/node": "^12.12.39",
"autobind-decorator": "^2.4.0",
"class-transformer": "^0.2.3",
"class-transformer": "^0.3.1",
"inversify": "^5.0.1",
"reflect-metadata": "^0.1.13",
"serialize-error": "^4.1.0",
3 changes: 2 additions & 1 deletion packages/bus-core/src/bus-module.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ import { ContainerModule, interfaces } from 'inversify'
import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from './bus-symbols'
import { MessageAttributes } from '@node-ts/bus-messages'
import { MemoryQueue } from './transport'
import { ServiceBus, BusHooks } from './service-bus'
import { ServiceBus, BusHooks, DefaultBusConfiguration } from './service-bus'
import { JsonSerializer, MessageSerializer } from './serialization'
import { ApplicationBootstrap } from './application-bootstrap'
import { HandlerRegistry } from './handler'
@@ -31,6 +31,7 @@ export class BusModule extends ContainerModule {
bindService(bind, BUS_INTERNAL_SYMBOLS.BusHooks, BusHooks).inSingletonScope()

bind(BUS_SYMBOLS.MessageHandlingContext).toConstantValue(new MessageAttributes())
bind(BUS_SYMBOLS.BusConfiguration).toConstantValue(new DefaultBusConfiguration())
})
}
}
6 changes: 4 additions & 2 deletions packages/bus-core/src/bus-symbols.ts
Original file line number Diff line number Diff line change
@@ -6,10 +6,12 @@ export const BUS_SYMBOLS = {
ApplicationBootstrap: Symbol.for('@node-ts/bus-core/application-bootstrap'),
JsonSerializer: Symbol.for('@node-ts/bus-core/json-serializer'),
MessageSerializer: Symbol.for('@node-ts/bus-core/message-serializer'),
MessageHandlingContext: Symbol.for('@node-ts/bus-core/message-handling-context')
MessageHandlingContext: Symbol.for('@node-ts/bus-core/message-handling-context'),
BusConfiguration: Symbol.for('@node-ts/bus-core/bus-configuration')
}

export const BUS_INTERNAL_SYMBOLS = {
SessionScopeBinder: Symbol.for('@node-ts/bus-core/session-scope-binder'),
BusHooks: Symbol.for('@node-ts/bus-core/bus-hooks')
BusHooks: Symbol.for('@node-ts/bus-core/bus-hooks'),
RawMessage: Symbol.for('@node-ts/bus-core/raw-message')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Message } from '@node-ts/bus-messages'

export class FailMessageOutsideHandlingContext extends Error {
/**
* Calling .fail() with a message indicates that the message received from the
* queue can not be processed even with retries and should immediately be sent
* to the dead letter queue.
*
* This error occurs when .fail() has been called outside of a message handling context,
* or more specifically - outside the stack of a Handler() operation
* @param msg The message that was attempted to be failed
*/
constructor (
readonly msg: Message
) {
super(`Attempted to fail message outside of a message handling context`)
// tslint:disable-next-line:no-unsafe-any
Object.setPrototypeOf(this, new.target.prototype)
}
}
1 change: 1 addition & 0 deletions packages/bus-core/src/error/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './fail-message-outside-handling-context'
3 changes: 2 additions & 1 deletion packages/bus-core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
export { BUS_SYMBOLS } from './bus-symbols'
export * from './bus-module'
export * from './service-bus/bus'
export * from './service-bus'
export { Transport, TransportMessage } from './transport'
export * from './application-bootstrap'
export * from './handler'
export * from './serialization'
export * from './util'
export * from './error'
16 changes: 10 additions & 6 deletions packages/bus-core/src/serialization/json-serializer.ts
Original file line number Diff line number Diff line change
@@ -4,18 +4,22 @@ import { ClassConstructor } from '../util'
import { classToPlain, plainToClass, serialize, deserialize } from 'class-transformer'

/**
* A very unsafe, basic JSON serializer. This will serialize objects to strings, but will
* deserialize strings into plain objects. These will NOT contain methods or special types,
* so the usage of this serializer is limited.
* A JSON-based serializer that uses `class-transformer` to transform to and from
* class instances of an object rather than just their plain types. As a result,
* object types can use all of the serialization decorator hints provided by
* that library.
*/
@injectable()
export class JsonSerializer implements Serializer {
serialize<T extends object> (obj: T): string {
serialize<ObjectType extends object> (obj: ObjectType): string {
return serialize(obj)
}

deserialize<T extends object> (val: string, classConstructor: ClassConstructor<T>): T {
return deserialize<T> (classConstructor, val)
deserialize<ObjectType extends object> (
serialized: string,
classConstructor: ClassConstructor<ObjectType>
): ObjectType {
return deserialize<ObjectType> (classConstructor, serialized)
}

toPlain<T extends object> (obj: T): object {
29 changes: 14 additions & 15 deletions packages/bus-core/src/serialization/message-serializer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// tslint:disable:max-classes-per-file no-inferred-empty-object-type
import { Serializer } from './serializer'
import { ClassConstructor } from '../util'
import { MessageSerializer } from './message-serializer';
import { MessageSerializer } from './message-serializer'
import { Mock, IMock, It } from 'typemoq'
import { HandlerRegistry } from '../handler'
import * as faker from 'faker'
import { Message } from '@node-ts/bus-messages'

class DummyMessage {
$name = 'bluh'
@@ -18,12 +19,12 @@ class DummyMessage {

class ToxicSerializer implements Serializer {

serialize (msg: any): string {
return msg.$name as string
serialize<ObjectType extends object> (obj: ObjectType): string {
return (obj as Message).$name
}

deserialize<T extends object> (msg: string, classType: ClassConstructor<T>): T {
return new classType(msg)
deserialize<ObjectType extends object> (serialized: string, classType: ClassConstructor<ObjectType>): ObjectType {
return new classType(serialized)
}
}

@@ -49,27 +50,25 @@ describe('MessageSerializer', () => {
)
})

it ('should use underlying serializer to serialize', () => {
const msg = {
$name: msgName
it('should use underlying serializer to serialize', () => {
const message: Message = {
$name: msgName,
$version: 1
}
const result = sut.serialize(msg)
// As per toxic serializer's behavior
expect(result).toBe(msg.$name)
const result = sut.serialize(message)
expect(result).toBe(message.$name)
})

it ('should use underlying deserializer to deserialize', () => {
it('should use underlying deserializer to deserialize', () => {
const msg = {
$name: msgName,
text: faker.random.words()
}
const raw = JSON.stringify(msg)

const result = sut.deserialize(raw) as DummyMessage
const result = sut.deserialize<DummyMessage>(raw)

handlerRegistry.verifyAll()

// As per toxic serializer's behavior
expect(result.value).toBe(raw)
})
})
17 changes: 9 additions & 8 deletions packages/bus-core/src/serialization/message-serializer.ts
Original file line number Diff line number Diff line change
@@ -23,17 +23,18 @@ export class MessageSerializer {
) {
}

serialize<T extends object> (obj: T): string {
return this.serializer.serialize(obj)
serialize<MessageType extends Message> (message: MessageType): string {
return this.serializer.serialize(message)
}

deserialize (val: string): Message {
const naiveDerializedMessage = JSON.parse(val) as Message
deserialize<MessageType extends Message> (serializedMessage: string): MessageType {
const naiveDerializedMessage = JSON.parse(serializedMessage) as Message
const messageType = this.handlerRegistry.getMessageType(naiveDerializedMessage)

return !!messageType ? this.serializer.deserialize(
val,
messageType
) : naiveDerializedMessage
return (!!messageType
? this.serializer.deserialize(
serializedMessage,
messageType
) : naiveDerializedMessage) as MessageType
}
}
17 changes: 15 additions & 2 deletions packages/bus-core/src/serialization/serializer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { ClassConstructor } from '../util'

/**
* A serializer that's use to serialize/deserialize objects as they leave and enter the application boundary.
*/
export interface Serializer {
serialize<T extends object> (obj: T): string
deserialize<T extends object> (val: string, classType: ClassConstructor<T>): T
/**
* Serializes a message into a string representation so it can be written to an underlying queue/topic
* @param obj Message to serialize
*/
serialize<ObjectType extends object> (obj: ObjectType): string

/**
* Deserializes a string-based representation of an object back into a strong class type.
* @param serialized Serialized string of the object to deserialize
* @param classType Type of the class to deserialize the object back into
*/
deserialize<ObjectType extends object> (serialized: string, classType: ClassConstructor<ObjectType>): ObjectType
}
13 changes: 13 additions & 0 deletions packages/bus-core/src/service-bus/bus-configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { injectable } from 'inversify'

export interface BusConfiguration {
/**
* The number of messages to handle in parallel
*/
readonly concurrency: number
}

@injectable()
export class DefaultBusConfiguration implements BusConfiguration {
readonly concurrency: number = 1
}
Loading