Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: node-ts/bus
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 895bdd8ae7080e5fdb2dd7832236793ac438fb9a
Choose a base ref
...
head repository: node-ts/bus
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 82997a83eeeff6e27b55d89373c91a202b1e590b
Choose a head ref
  • 2 commits
  • 50 files changed
  • 1 contributor

Commits on Jul 12, 2020

  1. Explicitly fail messages (#75)

    * fail messages for rabbitmq
    
    * fail message for memory queue
    
    * memory queue fail tests
    
    * fail sqs messages
    
    * SQS fail message implementation
    
    * fix routing of sqs fail messages
    
    * remove unused imports
    
    * minor test fixes
    
    * correct naming in serializers
    
    * correct naming in serializers
    
    * upgrade circle runner to node 12
    
    * upgrade typescript and fix broken types
    
    * set snyk badge
    
    * doc updates
    
    * fail message attribute tests
    
    * unfocus tests
    adenhertog authored Jul 12, 2020
    Copy the full SHA
    6c350eb View commit details
  2. Publish

     - @node-ts/bus-core@0.6.0
     - @node-ts/bus-messages@0.3.0
     - @node-ts/bus-postgres@0.5.0
     - @node-ts/bus-rabbitmq@0.6.0
     - @node-ts/bus-sqs@0.5.0
     - @node-ts/bus-workflow@0.6.0
    adenhertog committed Jul 12, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    82997a8 View commit details
Showing with 2,467 additions and 1,120 deletions.
  1. +1 −1 .circleci/config.yml
  2. +1 −1 .nvmrc
  3. +8 −7 package.json
  4. +10 −2 packages/bus-core/README.md
  5. +2 −2 packages/bus-core/package.json
  6. +2 −1 packages/bus-core/src/bus-symbols.ts
  7. +20 −0 packages/bus-core/src/error/fail-message-outside-handling-context.ts
  8. +1 −0 packages/bus-core/src/error/index.ts
  9. +1 −0 packages/bus-core/src/index.ts
  10. +10 −6 packages/bus-core/src/serialization/json-serializer.ts
  11. +14 −15 packages/bus-core/src/serialization/message-serializer.spec.ts
  12. +9 −8 packages/bus-core/src/serialization/message-serializer.ts
  13. +15 −2 packages/bus-core/src/serialization/serializer.ts
  14. +15 −0 packages/bus-core/src/service-bus/bus.ts
  15. +1 −0 packages/bus-core/src/service-bus/index.ts
  16. +13 −0 packages/bus-core/src/service-bus/message-handling-context.ts
  17. +4 −1 packages/bus-core/src/service-bus/service-bus.integration.ts
  18. +33 −22 packages/bus-core/src/service-bus/service-bus.ts
  19. +1 −0 packages/bus-core/src/test/index.ts
  20. +13 −0 packages/bus-core/src/test/test-fail-message.ts
  21. +1 −1 packages/bus-core/src/transport/index.ts
  22. +49 −8 packages/bus-core/src/transport/memory-queue.spec.ts
  23. +58 −15 packages/bus-core/src/transport/memory-queue.ts
  24. +7 −1 packages/bus-core/src/transport/transport.ts
  25. +1 −1 packages/bus-messages/README.md
  26. +1 −1 packages/bus-messages/package.json
  27. +2 −2 packages/bus-postgres/README.md
  28. +4 −4 packages/bus-postgres/package.json
  29. +1 −1 packages/bus-rabbitmq/README.md
  30. +3 −3 packages/bus-rabbitmq/package.json
  31. +54 −4 packages/bus-rabbitmq/src/rabbitmq-transport.integration.ts
  32. +18 −5 packages/bus-rabbitmq/src/rabbitmq-transport.ts
  33. +2 −0 packages/bus-rabbitmq/test/index.ts
  34. +16 −0 packages/bus-rabbitmq/test/test-fail-message-handler.ts
  35. +13 −0 packages/bus-rabbitmq/test/test-fail-message.ts
  36. +4 −2 packages/bus-sqs/README.md
  37. +3 −3 packages/bus-sqs/package.json
  38. +6 −0 packages/bus-sqs/src/sqs-transport-configuration.ts
  39. +74 −20 packages/bus-sqs/src/sqs-transport.integration.ts
  40. +18 −0 packages/bus-sqs/src/sqs-transport.ts
  41. +2 −0 packages/bus-sqs/test/index.ts
  42. +16 −0 packages/bus-sqs/test/test-fail-message-handler.ts
  43. +13 −0 packages/bus-sqs/test/test-fail-message.ts
  44. +1 −1 packages/bus-workflow/README.md
  45. +3 −3 packages/bus-workflow/package.json
  46. +12 −6 packages/bus-workflow/src/workflow/decorators/handles.ts
  47. +1 −1 packages/bus-workflow/src/workflow/message-workflow-mapping.ts
  48. +1 −1 packages/bus-workflow/src/workflow/persistence/in-memory-persistence.ts
  49. +4 −0 test.env
  50. +1,905 −969 yarn.lock
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ version: 2
defaults: &defaults
working_directory: ~/repo
docker:
- image: circleci/node:8.14.1
- image: circleci/node:12.18.1

jobs:
build:
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8.15.0
12.14.1
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@
"docs:deploy": "./deploy-docs.sh",
"lint": "lerna run lint",
"test": "yarn test:unit && yarn test:integration",
"test:unit": "jest \"(src\\/.+\\.|/)spec\\.ts$\"",
"test:unit": "dotenv -e test.env -- jest \"(src\\/.+\\.|/)spec\\.ts$\"",
"test:unit:watch": "yarn run test:unit --watch",
"test:integration": "jest --runInBand \"bus-(core|messages|workflow).*(src\\/.+\\.|/)integration\\.ts$\""
"test:integration": "dotenv -e test.env -- jest --runInBand \"bus-(core|messages|workflow).*(src\\/.+\\.|/)integration\\.ts$\""
},
"keywords": [
"typescript",
@@ -44,14 +44,15 @@
],
"devDependencies": {
"@node-ts/code-standards": "^0.0.10",
"@types/jest": "^24.0.12",
"@types/jest": "^26.0.4",
"dotenv-cli": "^3.2.0",
"inversify": "^5.0.1",
"jest": "^24.7.1",
"jest": "^26.1.0",
"lerna": "^3.18.4",
"reflect-metadata": "^0.1.13",
"ts-jest": "^25.1.0",
"tslib": "^1.9.3",
"typescript": "^3.3.3",
"ts-jest": "^26.1.1",
"tslib": "^2.0.0",
"typescript": "^3.9.6",
"vuepress": "^1.1.0",
"vuepress-plugin-sitemap": "^2.1.2"
}
12 changes: 10 additions & 2 deletions packages/bus-core/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# @node-ts/bus-core

[![Greenkeeper badge](https://badges.greenkeeper.io/node-ts/bus.svg)](https://greenkeeper.io/)
[![Known Vulnerabilities](https://snyk.io/test/github/node-ts/bus/badge.svg)](https://snyk.io/test/github/node-ts/bus)
[![CircleCI](https://circleci.com/gh/node-ts/bus/tree/master.svg?style=svg)](https://circleci.com/gh/node-ts/bus/tree/master)[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)

The core messaging framework. This package provides an in-memory queue and persistence by default, but is designed to be used with other @node-ts/bus-* packages that provide compatibility with other transports (SQS, RabbitMQ, Azure Queues) and persistence technologies (PostgreSQL, SQL Server, Oracle).
@@ -29,7 +29,7 @@ export class ApplicationContainer extends Container {

## Register a message handler

Messages are handled by defining and registring a handler class. Each time a message is received by the application, it will be dispatched to each of the registered handlers.
Messages are handled by defining and registering a handler class. Each time a message is received by the application, it will be dispatched to each of the registered handlers.

Define the handler:

@@ -105,3 +105,11 @@ addHook (): void {
bus.off('send', callback)
}
```

## Failing a Message

When an error is thrown whilst handling an error, the message is typically sent back to the queue so that it can be retried.

There are times when we know that a message will never succeed even if it were to be retried. In these situations we may not want to wait for our message to be retried before sending it to the dead letter queue, but instead bypass the retries and send it to the dead letter queue immediately.

This can be done by calling `bus.fail()` from within the scope of a message handling context. This will instruct `@node-ts/bus` to forward the currently handled message to the dead letter queue and remove it from the service queue.
4 changes: 2 additions & 2 deletions packages/bus-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@node-ts/bus-core",
"version": "0.5.9",
"version": "0.6.0",
"description": "A service bus for message-based, distributed node applications",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
@@ -13,7 +13,7 @@
"access": "public"
},
"dependencies": {
"@node-ts/bus-messages": "^0.2.3",
"@node-ts/bus-messages": "^0.3.0",
"@types/node": "^12.12.37",
"autobind-decorator": "^2.4.0",
"class-transformer": "^0.2.3",
3 changes: 2 additions & 1 deletion packages/bus-core/src/bus-symbols.ts
Original file line number Diff line number Diff line change
@@ -11,5 +11,6 @@ export const BUS_SYMBOLS = {

export const BUS_INTERNAL_SYMBOLS = {
SessionScopeBinder: Symbol.for('@node-ts/bus-core/session-scope-binder'),
BusHooks: Symbol.for('@node-ts/bus-core/bus-hooks')
BusHooks: Symbol.for('@node-ts/bus-core/bus-hooks'),
RawMessage: Symbol.for('@node-ts/bus-core/raw-message')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Message } from '@node-ts/bus-messages'

export class FailMessageOutsideHandlingContext extends Error {
/**
* Calling .fail() with a message indicates that the message received from the
* queue can not be processed even with retries and should immediately be sent
* to the dead letter queue.
*
* This error occurs when .fail() has been called outside of a message handling context,
* or more specifically - outside the stack of a Handler() operation
* @param msg The message that was attempted to be failed
*/
constructor (
readonly msg: Message
) {
super(`Attempted to fail message outside of a message handling context`)
// tslint:disable-next-line:no-unsafe-any
Object.setPrototypeOf(this, new.target.prototype)
}
}
1 change: 1 addition & 0 deletions packages/bus-core/src/error/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './fail-message-outside-handling-context'
1 change: 1 addition & 0 deletions packages/bus-core/src/index.ts
Original file line number Diff line number Diff line change
@@ -6,3 +6,4 @@ export * from './application-bootstrap'
export * from './handler'
export * from './serialization'
export * from './util'
export * from './error'
16 changes: 10 additions & 6 deletions packages/bus-core/src/serialization/json-serializer.ts
Original file line number Diff line number Diff line change
@@ -4,18 +4,22 @@ import { ClassConstructor } from '../util'
import { classToPlain, plainToClass, serialize, deserialize } from 'class-transformer'

/**
* A very unsafe, basic JSON serializer. This will serialize objects to strings, but will
* deserialize strings into plain objects. These will NOT contain methods or special types,
* so the usage of this serializer is limited.
* A JSON-based serializer that uses `class-transformer` to transform to and from
* class instances of an object rather than just their plain types. As a result,
* object types can use all of the serialization decorator hints provided by
* that library.
*/
@injectable()
export class JsonSerializer implements Serializer {
serialize<T extends object> (obj: T): string {
serialize<ObjectType extends object> (obj: ObjectType): string {
return serialize(obj)
}

deserialize<T extends object> (val: string, classConstructor: ClassConstructor<T>): T {
return deserialize<T> (classConstructor, val)
deserialize<ObjectType extends object> (
serialized: string,
classConstructor: ClassConstructor<ObjectType>
): ObjectType {
return deserialize<ObjectType> (classConstructor, serialized)
}

toPlain<T extends object> (obj: T): object {
29 changes: 14 additions & 15 deletions packages/bus-core/src/serialization/message-serializer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// tslint:disable:max-classes-per-file no-inferred-empty-object-type
import { Serializer } from './serializer'
import { ClassConstructor } from '../util'
import { MessageSerializer } from './message-serializer';
import { MessageSerializer } from './message-serializer'
import { Mock, IMock, It } from 'typemoq'
import { HandlerRegistry } from '../handler'
import * as faker from 'faker'
import { Message } from '@node-ts/bus-messages'

class DummyMessage {
$name = 'bluh'
@@ -18,12 +19,12 @@ class DummyMessage {

class ToxicSerializer implements Serializer {

serialize (msg: any): string {
return msg.$name as string
serialize<ObjectType extends object> (obj: ObjectType): string {
return (obj as Message).$name
}

deserialize<T extends object> (msg: string, classType: ClassConstructor<T>): T {
return new classType(msg)
deserialize<ObjectType extends object> (serialized: string, classType: ClassConstructor<ObjectType>): ObjectType {
return new classType(serialized)
}
}

@@ -49,27 +50,25 @@ describe('MessageSerializer', () => {
)
})

it ('should use underlying serializer to serialize', () => {
const msg = {
$name: msgName
it('should use underlying serializer to serialize', () => {
const message: Message = {
$name: msgName,
$version: 1
}
const result = sut.serialize(msg)
// As per toxic serializer's behavior
expect(result).toBe(msg.$name)
const result = sut.serialize(message)
expect(result).toBe(message.$name)
})

it ('should use underlying deserializer to deserialize', () => {
it('should use underlying deserializer to deserialize', () => {
const msg = {
$name: msgName,
text: faker.random.words()
}
const raw = JSON.stringify(msg)

const result = sut.deserialize(raw) as DummyMessage
const result = sut.deserialize<DummyMessage>(raw)

handlerRegistry.verifyAll()

// As per toxic serializer's behavior
expect(result.value).toBe(raw)
})
})
17 changes: 9 additions & 8 deletions packages/bus-core/src/serialization/message-serializer.ts
Original file line number Diff line number Diff line change
@@ -23,17 +23,18 @@ export class MessageSerializer {
) {
}

serialize<T extends object> (obj: T): string {
return this.serializer.serialize(obj)
serialize<MessageType extends Message> (message: MessageType): string {
return this.serializer.serialize(message)
}

deserialize (val: string): Message {
const naiveDerializedMessage = JSON.parse(val) as Message
deserialize<MessageType extends Message> (serializedMessage: string): MessageType {
const naiveDerializedMessage = JSON.parse(serializedMessage) as Message
const messageType = this.handlerRegistry.getMessageType(naiveDerializedMessage)

return !!messageType ? this.serializer.deserialize(
val,
messageType
) : naiveDerializedMessage
return (!!messageType
? this.serializer.deserialize(
serializedMessage,
messageType
) : naiveDerializedMessage) as MessageType
}
}
17 changes: 15 additions & 2 deletions packages/bus-core/src/serialization/serializer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { ClassConstructor } from '../util'

/**
* A serializer that's use to serialize/deserialize objects as they leave and enter the application boundary.
*/
export interface Serializer {
serialize<T extends object> (obj: T): string
deserialize<T extends object> (val: string, classType: ClassConstructor<T>): T
/**
* Serializes a message into a string representation so it can be written to an underlying queue/topic
* @param obj Message to serialize
*/
serialize<ObjectType extends object> (obj: ObjectType): string

/**
* Deserializes a string-based representation of an object back into a strong class type.
* @param serialized Serialized string of the object to deserialize
* @param classType Type of the class to deserialize the object back into
*/
deserialize<ObjectType extends object> (serialized: string, classType: ClassConstructor<ObjectType>): ObjectType
}
15 changes: 15 additions & 0 deletions packages/bus-core/src/service-bus/bus.ts
Original file line number Diff line number Diff line change
@@ -16,9 +16,24 @@ export interface Bus {
*/
state: BusState

/**
* Publishes an event onto the bus. Any subscribers of this event will receive a copy of it.
*/
publish<EventType extends Event> (event: EventType, messageOptions?: MessageAttributes): Promise<void>

/**
* Sends a command onto the bus. There should be exactly one subscriber of this command type who can
* process it and perform the requested action.
*/
send<CommandType extends Command> (command: CommandType, messageOptions?: MessageAttributes): Promise<void>

/**
* Immediately fail the message of the current receive context and deliver it to the dead letter queue
* (if configured). It will not be retried Any processing of the message by a different handler on the
* same service instance will still process it.
*/
fail (): Promise<void>

/**
* For applications that handle messages, start reading messages off the underlying queue and process them.
*/
1 change: 1 addition & 0 deletions packages/bus-core/src/service-bus/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './bus'
export * from './service-bus'
export * from './bus-hooks'
export * from './message-handling-context'
13 changes: 13 additions & 0 deletions packages/bus-core/src/service-bus/message-handling-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { TransportMessage } from '../transport'
import { Message } from '@node-ts/bus-messages'

/**
* A context that exists from when a message is received from the transport
* and until it is deleted from or returned back to the transport
*/
export interface MessageHandlingContext<TMessageType extends Message> {
/**
* The raw message that was received from the transport
*/
rawMessage: TransportMessage<TMessageType>
}
5 changes: 4 additions & 1 deletion packages/bus-core/src/service-bus/service-bus.integration.ts
Original file line number Diff line number Diff line change
@@ -41,7 +41,10 @@ describe('ServiceBus', () => {

beforeAll(async () => {
container = new TestContainer().silenceLogs()
queue = new MemoryQueue(Mock.ofType<Logger>().object)
queue = new MemoryQueue(
Mock.ofType<Logger>().object,
container.get(BUS_SYMBOLS.HandlerRegistry)
)

bootstrapper = container.get<ApplicationBootstrap>(BUS_SYMBOLS.ApplicationBootstrap)
bootstrapper.registerHandler(TestEventHandler)
Loading