Skip to content

Commit

Permalink
feat(NODE-3470): retry selects another mongos (#3963)
Browse files Browse the repository at this point in the history
Co-authored-by: Alena Khineika <alena.khineika@gmail.com>
  • Loading branch information
durran and alenakhineika committed Jan 19, 2024
1 parent e3bfa30 commit 84959ee
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 23 deletions.
10 changes: 7 additions & 3 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import type { MongoClient } from '../mongo_client';
import { ReadPreference } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ServerDescription } from '../sdam/server_description';
import {
sameServerSelector,
secondaryWritableServerSelector,
Expand Down Expand Up @@ -183,7 +184,8 @@ export async function executeOperation<
return await retryOperation(operation, operationError, {
session,
topology,
selector
selector,
previousServer: server.description
});
}
throw operationError;
Expand All @@ -199,6 +201,7 @@ type RetryOptions = {
session: ClientSession;
topology: Topology;
selector: ReadPreference | ServerSelector;
previousServer: ServerDescription;
};

async function retryOperation<
Expand All @@ -207,7 +210,7 @@ async function retryOperation<
>(
operation: T,
originalError: MongoError,
{ session, topology, selector }: RetryOptions
{ session, topology, selector, previousServer }: RetryOptions
): Promise<TResult> {
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
Expand Down Expand Up @@ -243,7 +246,8 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServerAsync(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
previousServer
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
Expand Down
19 changes: 13 additions & 6 deletions src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
/** @internal */
export type ServerSelector = (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
servers: ServerDescription[],
deprioritized?: ServerDescription[]
) => ServerDescription[];

/**
Expand Down Expand Up @@ -266,7 +267,8 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se

return (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
servers: ServerDescription[],
deprioritized: ServerDescription[] = []
): ServerDescription[] => {
const commonWireVersion = topologyDescription.commonWireVersion;
if (
Expand All @@ -287,13 +289,18 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
return [];
}

if (
topologyDescription.type === TopologyType.Single ||
topologyDescription.type === TopologyType.Sharded
) {
if (topologyDescription.type === TopologyType.Single) {
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
}

if (topologyDescription.type === TopologyType.Sharded) {
const filtered = servers.filter(server => {
return !deprioritized.includes(server);
});
const selectable = filtered.length > 0 ? filtered : deprioritized;
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
}

const mode = readPreference.mode;
if (mode === ReadPreference.PRIMARY) {
return servers.filter(primaryFilter);
Expand Down
12 changes: 10 additions & 2 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export interface ServerSelectionRequest {
timeoutController: TimeoutController;
operationName: string;
waitingLogged: boolean;
previousServer?: ServerDescription;
}

/** @internal */
Expand Down Expand Up @@ -175,6 +176,7 @@ export interface SelectServerOptions {
serverSelectionTimeoutMS?: number;
session?: ClientSession;
operationName: string;
previousServer?: ServerDescription;
}

/** @public */
Expand Down Expand Up @@ -598,7 +600,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
startTime: now(),
operationName: options.operationName,
waitingLogged: false
waitingLogged: false,
previousServer: options.previousServer
};

waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
Expand Down Expand Up @@ -930,8 +933,13 @@ function processWaitQueue(topology: Topology) {
let selectedDescriptions;
try {
const serverSelector = waitQueueMember.serverSelector;
const previousServer = waitQueueMember.previousServer;
selectedDescriptions = serverSelector
? serverSelector(topology.description, serverDescriptions)
? serverSelector(
topology.description,
serverDescriptions,
previousServer ? [previousServer] : []
)
: serverDescriptions;
} catch (e) {
waitQueueMember.timeoutController.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { expect } from 'chai';

import type { CommandFailedEvent, CommandSucceededEvent } from '../../mongodb';

const TEST_METADATA = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } };
const FAIL_COMMAND = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: ['find'],
errorCode: 6,
closeConnection: true
}
};
const DISABLE_FAIL_COMMAND = {
configureFailPoint: 'failCommand',
mode: 'off',
data: {
failCommands: ['find'],
errorCode: 6,
closeConnection: true
}
};

describe('Server Selection Sharded Retryable Reads Prose tests', function () {
context('Retryable Reads Are Retried on a Different mongos if One is Available', function () {
const commandFailedEvents: CommandFailedEvent[] = [];
let client;
let utilClientOne;
let utilClientTwo;

// This test MUST be executed against a sharded cluster that has at least two
// mongos instances.
// 1. Ensure that a test is run against a sharded cluster that has at least two
// mongoses. If there are more than two mongoses in the cluster, pick two to
// test against.
beforeEach(async function () {
const uri = this.configuration.url({
monitorCommands: true,
useMultipleMongoses: true
});

// 3. Create a client with ``retryReads=true`` that connects to the cluster,
// providing the two selected mongoses as seeds.
client = this.configuration.newClient(uri, {
monitorCommands: true,
retryReads: true
});
client.on('commandFailed', event => {
commandFailedEvents.push(event);
});
await client.connect();
const seeds = client.topology.s.seedlist.map(address => address.toString());

// 2. Create a client per mongos using the direct connection, and configure the
// following fail points on each mongos::
// {
// configureFailPoint: "failCommand",
// mode: { times: 1 },
// data: {
// failCommands: ["find"],
// errorCode: 6,
// closeConnection: true
// }
// }
utilClientOne = this.configuration.newClient(`mongodb://${seeds[0]}`, {
directConnection: true
});
utilClientTwo = this.configuration.newClient(`mongodb://${seeds[1]}`, {
directConnection: true
});
await utilClientOne.db('admin').command(FAIL_COMMAND);
await utilClientTwo.db('admin').command(FAIL_COMMAND);
});

afterEach(async function () {
await client?.close();
await utilClientOne.db('admin').command(DISABLE_FAIL_COMMAND);
await utilClientTwo.db('admin').command(DISABLE_FAIL_COMMAND);
await utilClientOne?.close();
await utilClientTwo?.close();
});

// 4. Enable command monitoring, and execute a ``find`` command that is
// supposed to fail on both mongoses.
// 5. Asserts that there were failed command events from each mongos.
// 6. Disable the fail points.
it('retries on a different mongos', TEST_METADATA, async function () {
await client
.db('test')
.collection('test')
.find()
.toArray()
.catch(() => null);
expect(commandFailedEvents[0].address).to.not.equal(commandFailedEvents[1].address);
});
});

// 1. Ensure that a test is run against a sharded cluster. If there are multiple
// mongoses in the cluster, pick one to test against.
context('Retryable Reads Are Retried on the Same mongos if No Others are Available', function () {
const commandFailedEvents: CommandFailedEvent[] = [];
const commandSucceededEvents: CommandSucceededEvent[] = [];
let client;
let utilClient;

beforeEach(async function () {
const uri = this.configuration.url({
monitorCommands: true
});

// 3. Create a client with ``retryReads=true`` that connects to the cluster,
// providing the selected mongos as the seed.
client = this.configuration.newClient(uri, {
monitorCommands: true,
retryReads: true
});
client.on('commandFailed', event => {
commandFailedEvents.push(event);
});
client.on('commandSucceeded', event => {
commandSucceededEvents.push(event);
});

// 2. Create a client that connects to the mongos using the direct connection,
// and configure the following fail point on the mongos::
// {
// configureFailPoint: "failCommand",
// mode: { times: 1 },
// data: {
// failCommands: ["find"],
// errorCode: 6,
// closeConnection: true
// }
// }
utilClient = this.configuration.newClient(uri, {
directConnection: true
});
await utilClient.db('admin').command(FAIL_COMMAND);
});

afterEach(async function () {
await client?.close();
await utilClient?.db('admin').command(DISABLE_FAIL_COMMAND);
await utilClient?.close();
});

// 4. Enable command monitoring, and execute a ``find`` command.
// 5. Asserts that there was a failed command and a successful command event.
// 6. Disable the fail point.
it('retries on the same mongos', TEST_METADATA, async function () {
await client
.db('test')
.collection('test')
.find()
.toArray()
.catch(() => null);
expect(commandFailedEvents[0].address).to.equal(commandSucceededEvents[0].address);
});
});
});

0 comments on commit 84959ee

Please sign in to comment.