Skip to content

Commit

Permalink
feat(NODE-3449): Add serverConnectionId to Command Monitoring Spec
Browse files Browse the repository at this point in the history
Co-authored-by: Durran Jordan <durran@gmail.com>
  • Loading branch information
aditi-khare-mongoDB and durran committed Dec 19, 2023
1 parent a3c0298 commit 735f7aa
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 13 deletions.
36 changes: 32 additions & 4 deletions src/cmap/command_monitoring_events.ts
@@ -1,4 +1,4 @@
import type { Document, ObjectId } from '../bson';
import { type Document, type ObjectId } from '../bson';
import {
COMMAND_FAILED,
COMMAND_STARTED,
Expand All @@ -22,7 +22,14 @@ export class CommandStartedEvent {
commandName: string;
command: Document;
address: string;
/** Driver generated connection id */
connectionId?: string | number;
/**
* Server generated connection id
* Distinct from the connection id and is returned by the hello or legacy hello response as "connectionId"
* from the server on 4.2+.
*/
serverConnectionId: bigint | null;
serviceId?: ObjectId;
/** @internal */
name = COMMAND_STARTED;
Expand All @@ -34,7 +41,11 @@ export class CommandStartedEvent {
* @param pool - the pool that originated the command
* @param command - the command
*/
constructor(connection: Connection, command: WriteProtocolMessageType) {
constructor(
connection: Connection,
command: WriteProtocolMessageType,
serverConnectionId: bigint | null
) {
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);
Expand All @@ -52,6 +63,7 @@ export class CommandStartedEvent {
this.databaseName = command.databaseName;
this.commandName = commandName;
this.command = maybeRedact(commandName, cmd, cmd);
this.serverConnectionId = serverConnectionId;
}

/* @internal */
Expand All @@ -67,7 +79,13 @@ export class CommandStartedEvent {
*/
export class CommandSucceededEvent {
address: string;
/** Driver generated connection id */
connectionId?: string | number;
/**
* Server generated connection id
* Distinct from the connection id and is returned by the hello or legacy hello response as "connectionId" from the server on 4.2+.
*/
serverConnectionId: bigint | null;
requestId: number;
duration: number;
commandName: string;
Expand All @@ -89,7 +107,8 @@ export class CommandSucceededEvent {
connection: Connection,
command: WriteProtocolMessageType,
reply: Document | undefined,
started: number
started: number,
serverConnectionId: bigint | null
) {
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
Expand All @@ -102,6 +121,7 @@ export class CommandSucceededEvent {
this.commandName = commandName;
this.duration = calculateDurationInMs(started);
this.reply = maybeRedact(commandName, cmd, extractReply(command, reply));
this.serverConnectionId = serverConnectionId;
}

/* @internal */
Expand All @@ -117,7 +137,13 @@ export class CommandSucceededEvent {
*/
export class CommandFailedEvent {
address: string;
/** Driver generated connection id */
connectionId?: string | number;
/**
* Server generated connection id
* Distinct from the connection id and is returned by the hello or legacy hello response as "connectionId" from the server on 4.2+.
*/
serverConnectionId: bigint | null;
requestId: number;
duration: number;
commandName: string;
Expand All @@ -139,7 +165,8 @@ export class CommandFailedEvent {
connection: Connection,
command: WriteProtocolMessageType,
error: Error | Document,
started: number
started: number,
serverConnectionId: bigint | null
) {
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
Expand All @@ -153,6 +180,7 @@ export class CommandFailedEvent {
this.commandName = commandName;
this.duration = calculateDurationInMs(started);
this.failure = maybeRedact(commandName, cmd, error) as Error;
this.serverConnectionId = serverConnectionId;
}

/* @internal */
Expand Down
49 changes: 41 additions & 8 deletions src/cmap/connection.ts
Expand Up @@ -717,7 +717,10 @@ function write(

// if command monitoring is enabled we need to modify the callback here
if (conn.monitorCommands) {
conn.emit(Connection.COMMAND_STARTED, new CommandStartedEvent(conn, command));
conn.emit(
Connection.COMMAND_STARTED,
new CommandStartedEvent(conn, command, conn[kDescription].serverConnectionId)
);

operationDescription.started = now();
operationDescription.cb = (err, reply) => {
Expand All @@ -727,18 +730,36 @@ function write(
if (err && reply?.ok !== 1) {
conn.emit(
Connection.COMMAND_FAILED,
new CommandFailedEvent(conn, command, err, operationDescription.started)
new CommandFailedEvent(
conn,
command,
err,
operationDescription.started,
conn[kDescription].serverConnectionId
)
);
} else {
if (reply && (reply.ok === 0 || reply.$err)) {
conn.emit(
Connection.COMMAND_FAILED,
new CommandFailedEvent(conn, command, reply, operationDescription.started)
new CommandFailedEvent(
conn,
command,
reply,
operationDescription.started,
conn[kDescription].serverConnectionId
)
);
} else {
conn.emit(
Connection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(conn, command, reply, operationDescription.started)
new CommandSucceededEvent(
conn,
command,
reply,
operationDescription.started,
conn[kDescription].serverConnectionId
)
);
}
}
Expand Down Expand Up @@ -1098,7 +1119,11 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
started = now();
this.emit(
ModernConnection.COMMAND_STARTED,
new CommandStartedEvent(this as unknown as Connection, message)
new CommandStartedEvent(
this as unknown as Connection,
message,
this[kDescription].serverConnectionId
)
);
}

Expand All @@ -1124,7 +1149,8 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
started,
this[kDescription].serverConnectionId
)
);
}
Expand All @@ -1141,12 +1167,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
started,
this[kDescription].serverConnectionId
)
)
: this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, error, started)
new CommandFailedEvent(
this as unknown as Connection,
message,
error,
started,
this[kDescription].serverConnectionId
)
);
}
throw error;
Expand Down
19 changes: 18 additions & 1 deletion src/cmap/stream_description.ts
@@ -1,4 +1,4 @@
import type { Document } from '../bson';
import { type Document, type Double, Long } from '../bson';
import { ServerType } from '../sdam/common';
import { parseServerType } from '../sdam/server_description';
import type { CompressorName } from './wire_protocol/compression';
Expand Down Expand Up @@ -36,6 +36,7 @@ export class StreamDescription {
__nodejs_mock_server__?: boolean;

zlibCompressionLevel?: number;
serverConnectionId: bigint | null;

constructor(address: string, options?: StreamDescriptionOptions) {
this.address = address;
Expand All @@ -51,13 +52,19 @@ export class StreamDescription {
options && options.compressors && Array.isArray(options.compressors)
? options.compressors
: [];
this.serverConnectionId = null;
}

receiveResponse(response: Document | null): void {
if (response == null) {
return;
}
this.type = parseServerType(response);
if ('connectionId' in response) {
this.serverConnectionId = this.parseServerConnectionID(response.connectionId);
} else {
this.serverConnectionId = null;
}
for (const field of RESPONSE_FIELDS) {
if (response[field] != null) {
this[field] = response[field];
Expand All @@ -73,4 +80,14 @@ export class StreamDescription {
this.compressor = this.compressors.filter(c => response.compression?.includes(c))[0];
}
}

/* @internal */
parseServerConnectionID(serverConnectionId: number | Double | bigint | Long): bigint {
// Connection ids are always integral, so it's safe to coerce doubles as well as
// any integral type.
return Long.isLong(serverConnectionId)
? serverConnectionId.toBigInt()
: // @ts-expect-error: Doubles are coercible to number
BigInt(serverConnectionId);
}
}
15 changes: 15 additions & 0 deletions test/tools/unified-spec-runner/match.ts
Expand Up @@ -493,6 +493,11 @@ function compareEvents(
expect.fail(`expected ${path} to be instanceof CommandStartedEvent`);
}
compareCommandStartedEvents(actualEvent, expectedEvent.commandStartedEvent, entities, path);
if (expectedEvent.commandStartedEvent.hasServerConnectionId) {
expect(actualEvent).property('serverConnectionId').to.be.a('bigint');
} else if (expectedEvent.commandStartedEvent.hasServerConnectionId === false) {
expect(actualEvent).property('serverConnectionId').to.be.null;
}
} else if (expectedEvent.commandSucceededEvent) {
const path = `${rootPrefix}.commandSucceededEvent`;
if (!(actualEvent instanceof CommandSucceededEvent)) {
Expand All @@ -504,12 +509,22 @@ function compareEvents(
entities,
path
);
if (expectedEvent.commandSucceededEvent.hasServerConnectionId) {
expect(actualEvent).property('serverConnectionId').to.be.a('bigint');
} else if (expectedEvent.commandSucceededEvent.hasServerConnectionId === false) {
expect(actualEvent).property('serverConnectionId').to.be.null;
}
} else if (expectedEvent.commandFailedEvent) {
const path = `${rootPrefix}.commandFailedEvent`;
if (!(actualEvent instanceof CommandFailedEvent)) {
expect.fail(`expected ${path} to be instanceof CommandFailedEvent`);
}
compareCommandFailedEvents(actualEvent, expectedEvent.commandFailedEvent, entities, path);
if (expectedEvent.commandFailedEvent.hasServerConnectionId) {
expect(actualEvent).property('serverConnectionId').to.be.a('bigint');
} else if (expectedEvent.commandFailedEvent.hasServerConnectionId === false) {
expect(actualEvent).property('serverConnectionId').to.be.null;
}
} else if (expectedEvent.connectionClosedEvent) {
expect(actualEvent).to.be.instanceOf(ConnectionClosedEvent);
if (expectedEvent.connectionClosedEvent.hasServiceId) {
Expand Down
3 changes: 3 additions & 0 deletions test/tools/unified-spec-runner/schema.ts
Expand Up @@ -282,13 +282,16 @@ export interface ExpectedCommandEvent {
command?: Document;
commandName?: string;
databaseName?: string;
hasServerConnectionId?: boolean;
};
commandSucceededEvent?: {
reply?: Document;
commandName?: string;
hasServerConnectionId?: boolean;
};
commandFailedEvent?: {
commandName?: string;
hasServerConnectionId?: boolean;
};
}
export interface ExpectedCmapEvent {
Expand Down
44 changes: 44 additions & 0 deletions test/unit/cmap/stream_description.test.js
@@ -1,5 +1,6 @@
'use strict';

const { Double, Long } = require('bson');
const { StreamDescription } = require('../../mongodb');
const { expect } = require('chai');

Expand Down Expand Up @@ -64,4 +65,47 @@ describe('StreamDescription - unit/cmap', function () {
});
});
});

describe('serverConnectionId', function () {
context('when serverConnectionId is in hello response', function () {
// eslint-disable-next-line no-undef
const expectedServerConnectionId = BigInt(2);
context('when serverConnectionId of type bigint', function () {
it('should save serverConnectionID as a bigint on stream description', function () {
const description = new StreamDescription('a:27017', {});
// eslint-disable-next-line no-undef
description.receiveResponse({ connectionId: BigInt(2) });
expect(description.serverConnectionId).to.equal(expectedServerConnectionId);
});
});
context('when serverConnectionId is of type BSON Double', function () {
it('should save serverConnectionID as a bigint on stream description', function () {
const description = new StreamDescription('a:27017', {});
description.receiveResponse({ connectionId: new Double(2) });
expect(description.serverConnectionId).to.equal(expectedServerConnectionId);
});
});
context('when serverConnectionId is of type number', function () {
it('should save serverConnectionID as a bigint on stream description', function () {
const description = new StreamDescription('a:27017', {});
description.receiveResponse({ connectionId: 2 });
expect(description.serverConnectionId).to.equal(expectedServerConnectionId);
});
});
context('when serverConnectionId is of type BSON Long', function () {
it('should parse serverConnectionId properly', function () {
const description = new StreamDescription('a:27017', {});
description.receiveResponse({ connectionId: new Long(2) });
expect(description.serverConnectionId).to.equal(expectedServerConnectionId);
});
});
});
context('when serverConnectionId is not in hello response', function () {
it('should not throw an error and keep serverConnectionId undefined on stream description', function () {
const description = new StreamDescription('a:27017', {});
description.receiveResponse({});
expect(description.serverConnectionId).to.not.exist;
});
});
});
});

0 comments on commit 735f7aa

Please sign in to comment.