Skip to content

Commit

Permalink
feat(NODE-3689): require hello command for connection handshake to us…
Browse files Browse the repository at this point in the history
…e OP_MSG disallowing OP_QUERY (#3938)

Co-authored-by: Durran Jordan <durran@gmail.com>
  • Loading branch information
alenakhineika and durran committed Dec 6, 2023
1 parent 130691d commit ce7df0f
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 123 deletions.
4 changes: 4 additions & 0 deletions src/sdam/topology.ts
Expand Up @@ -382,6 +382,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return this.s.options.loadBalanced;
}

get serverApi(): ServerApi | undefined {
return this.s.options.serverApi;
}

get capabilities(): ServerCapabilities {
return new ServerCapabilities(this.lastHello());
}
Expand Down
10 changes: 6 additions & 4 deletions src/utils.ts
Expand Up @@ -371,11 +371,13 @@ export function uuidV4(): Buffer {
*/
export function maxWireVersion(topologyOrServer?: Connection | Topology | Server): number {
if (topologyOrServer) {
if (topologyOrServer.loadBalanced) {
// Since we do not have a monitor, we assume the load balanced server is always
// pointed at the latest mongodb version. There is a risk that for on-prem
// deployments that don't upgrade immediately that this could alert to the
if (topologyOrServer.loadBalanced || topologyOrServer.serverApi?.version) {
// Since we do not have a monitor in the load balanced mode,
// we assume the load-balanced server is always pointed at the latest mongodb version.
// There is a risk that for on-prem deployments
// that don't upgrade immediately that this could alert to the
// application that a feature is available that is actually not.
// We also return the max supported wire version for serverAPI.
return MAX_SUPPORTED_WIRE_VERSION;
}
if (topologyOrServer.hello) {
Expand Down
4 changes: 3 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Expand Up @@ -1262,7 +1262,9 @@ describe('Change Streams', function () {
}
req.reply({ ok: 1 });
});
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`);
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(err => {
expect(err).to.not.exist;
const collection = client.db('cs').collection('test');
Expand Down
5 changes: 4 additions & 1 deletion test/integration/change-streams/change_streams.prose.test.ts
Expand Up @@ -332,7 +332,10 @@ describe('Change Stream prose tests', function () {
}
request.reply(this.applyOpTime(response));
});
this.client = this.config.newClient(this.mongodbURI, { monitorCommands: true });
this.client = this.config.newClient(this.mongodbURI, {
monitorCommands: true,
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
this.apm = { started: [], succeeded: [], failed: [] };

(
Expand Down
4 changes: 3 additions & 1 deletion test/integration/collection-management/collection.test.ts
Expand Up @@ -474,7 +474,9 @@ describe('Collection', function () {
afterEach(() => mock.cleanup());

function testCountDocMock(testConfiguration, config, done) {
const client = testConfiguration.newClient(`mongodb://${server.uri()}/test`);
const client = testConfiguration.newClient(`mongodb://${server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
const close = e => client.close(() => done(e));

server.setMessageHandler(request => {
Expand Down
15 changes: 11 additions & 4 deletions test/integration/max-staleness/max_staleness.test.js
Expand Up @@ -54,7 +54,8 @@ describe('Max Staleness', function () {
var self = this;
const configuration = this.configuration;
const client = configuration.newClient(
`mongodb://${test.server.uri()}/test?readPreference=secondary&maxStalenessSeconds=250`
`mongodb://${test.server.uri()}/test?readPreference=secondary&maxStalenessSeconds=250`,
{ serverApi: null } // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
);

client.connect(function (err, client) {
Expand Down Expand Up @@ -86,7 +87,9 @@ describe('Max Staleness', function () {

test: function (done) {
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`);
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;

Expand Down Expand Up @@ -124,7 +127,9 @@ describe('Max Staleness', function () {
test: function (done) {
var self = this;
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`);
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);
Expand Down Expand Up @@ -159,7 +164,9 @@ describe('Max Staleness', function () {
test: function (done) {
var self = this;
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`);
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);
Expand Down
76 changes: 74 additions & 2 deletions test/integration/mongodb-handshake/mongodb-handshake.test.ts
Expand Up @@ -6,7 +6,10 @@ import {
Connection,
LEGACY_HELLO_COMMAND,
MongoServerError,
MongoServerSelectionError
MongoServerSelectionError,
OpMsgRequest,
OpQueryRequest,
ServerApiVersion
} from '../../mongodb';

describe('MongoDB Handshake', () => {
Expand Down Expand Up @@ -48,6 +51,7 @@ describe('MongoDB Handshake', () => {

context('when compressors are provided on the mongo client', () => {
let spy: Sinon.SinonSpy;

before(() => {
spy = sinon.spy(Connection.prototype, 'command');
});
Expand All @@ -56,10 +60,78 @@ describe('MongoDB Handshake', () => {

it('constructs a handshake with the specified compressors', async function () {
client = this.configuration.newClient({ compressors: ['snappy'] });
await client.connect();
// The load-balanced mode doesn’t perform SDAM,
// so `connect` doesn’t do anything unless authentication is enabled.
// Force the driver to send a command to the server in the noauth mode.
await client.db('admin').command({ ping: 1 });
expect(spy.called).to.be.true;
const handshakeDoc = spy.getCall(0).args[1];
expect(handshakeDoc).to.have.property('compression').to.deep.equal(['snappy']);
});
});

context('when load-balanced', function () {
let opMsgRequestToBinSpy: Sinon.SinonSpy;

beforeEach(() => {
opMsgRequestToBinSpy = sinon.spy(OpMsgRequest.prototype, 'toBin');
});

afterEach(() => sinon.restore());

it('sends the hello command as OP_MSG', {
metadata: { requires: { topology: 'load-balanced' } },
test: async function () {
client = this.configuration.newClient({ loadBalanced: true });
await client.db('admin').command({ ping: 1 });
expect(opMsgRequestToBinSpy).to.have.been.called;
}
});
});

context('when serverApi version is present', function () {
let opMsgRequestToBinSpy: Sinon.SinonSpy;

beforeEach(() => {
opMsgRequestToBinSpy = sinon.spy(OpMsgRequest.prototype, 'toBin');
});

afterEach(() => sinon.restore());

it('sends the hello command as OP_MSG', {
metadata: { requires: { topology: '!load-balanced', mongodb: '>=5.0' } },
test: async function () {
client = this.configuration.newClient({}, { serverApi: { version: ServerApiVersion.v1 } });
await client.connect();
expect(opMsgRequestToBinSpy).to.have.been.called;
}
});
});

context('when not load-balanced and serverApi version is not present', function () {
let opQueryRequestToBinSpy: Sinon.SinonSpy;
let opMsgRequestToBinSpy: Sinon.SinonSpy;

beforeEach(() => {
opQueryRequestToBinSpy = sinon.spy(OpQueryRequest.prototype, 'toBin');
opMsgRequestToBinSpy = sinon.spy(OpMsgRequest.prototype, 'toBin');
});

afterEach(() => sinon.restore());

it('sends the hello command as OP_MSG', {
metadata: { requires: { topology: '!load-balanced', mongodb: '>=5.0' } },
test: async function () {
if (this.configuration.serverApi) {
this.skipReason = 'Test requires serverApi to NOT be enabled';
return this.skip();
}
client = this.configuration.newClient();
await client.db('admin').command({ ping: 1 });
expect(opQueryRequestToBinSpy).to.have.been.called;
expect(opMsgRequestToBinSpy).to.have.been.called;
opMsgRequestToBinSpy.calledAfter(opQueryRequestToBinSpy);
}
});
});
});
2 changes: 1 addition & 1 deletion test/readme.md
Expand Up @@ -330,7 +330,7 @@ The following steps will walk you through how to start and test a load balancer.
A new file name `lb.env` is automatically created.
1. Source the environment variables using a command like `source lb.env`.
1. Export **each** of the environment variables that were created in `lb.env`. For example: `export SINGLE_MONGOS_LB_URI`.
1. Export the `LOAD_BALANCED` environment variable to 'true': `export LOAD_BALANCED='true'`
1. Export the `LOAD_BALANCER` environment variable to 'true': `export LOAD_BALANCER='true'`
1. Disable auth for tests: `export AUTH='noauth'`
1. Run the test suite as you normally would:
```sh
Expand Down
111 changes: 2 additions & 109 deletions test/unit/cmap/connection.test.ts
Expand Up @@ -14,15 +14,13 @@ import {
type HostAddress,
isHello,
type MessageHeader,
MessageStream,
type MessageStream,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoRuntimeError,
ns,
type OperationDescription,
OpMsgRequest,
OpMsgResponse,
OpQueryRequest
OpMsgResponse
} from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';
import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils';
Expand Down Expand Up @@ -1030,109 +1028,4 @@ describe('new Connection()', function () {
});
});
});

describe('when load-balanced', () => {
const CONNECT_DEFAULTS = {
id: 1,
tls: false,
generation: 1,
monitorCommands: false,
metadata: {} as ClientMetadata
};
let server;
let connectOptions;
let connection: Connection;
let writeCommandSpy;

beforeEach(async () => {
server = await mock.createServer();
server.setMessageHandler(request => {
request.reply(mock.HELLO);
});
writeCommandSpy = sinon.spy(MessageStream.prototype, 'writeCommand');
});

afterEach(async () => {
connection?.destroy({ force: true });
sinon.restore();
await mock.cleanup();
});

it('sends the first command as OP_MSG', async () => {
try {
connectOptions = {
...CONNECT_DEFAULTS,
hostAddress: server.hostAddress() as HostAddress,
socketTimeoutMS: 100,
loadBalanced: true
};

connection = await promisify<Connection>(callback =>
//@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence
connect(connectOptions, callback)
)();

await promisify(callback =>
connection.command(ns('admin.$cmd'), { hello: 1 }, {}, callback)
)();
} catch (error) {
/** Connection timeouts, but the handshake message is sent. */
}

expect(writeCommandSpy).to.have.been.called;
expect(writeCommandSpy.firstCall.args[0] instanceof OpMsgRequest).to.equal(true);
});
});

describe('when not load-balanced', () => {
const CONNECT_DEFAULTS = {
id: 1,
tls: false,
generation: 1,
monitorCommands: false,
metadata: {} as ClientMetadata
};
let server;
let connectOptions;
let connection: Connection;
let writeCommandSpy;

beforeEach(async () => {
server = await mock.createServer();
server.setMessageHandler(request => {
request.reply(mock.HELLO);
});
writeCommandSpy = sinon.spy(MessageStream.prototype, 'writeCommand');
});

afterEach(async () => {
connection?.destroy({ force: true });
sinon.restore();
await mock.cleanup();
});

it('sends the first command as OP_QUERY', async () => {
try {
connectOptions = {
...CONNECT_DEFAULTS,
hostAddress: server.hostAddress() as HostAddress,
socketTimeoutMS: 100
};

connection = await promisify<Connection>(callback =>
//@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence
connect(connectOptions, callback)
)();

await promisify(callback =>
connection.command(ns('admin.$cmd'), { hello: 1 }, {}, callback)
)();
} catch (error) {
/** Connection timeouts, but the handshake message is sent. */
}

expect(writeCommandSpy).to.have.been.called;
expect(writeCommandSpy.firstCall.args[0] instanceof OpQueryRequest).to.equal(true);
});
});
});

0 comments on commit ce7df0f

Please sign in to comment.