Skip to content

Commit

Permalink
fix(NODE-5840): heartbeat duration includes socket creation (#3973)
Browse files Browse the repository at this point in the history
Co-authored-by: Alena Khineika <alena.khineika@gmail.com>
  • Loading branch information
nbbeeken and alenakhineika committed Jan 30, 2024
1 parent b7d28d3 commit a42039b
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 277 deletions.
237 changes: 103 additions & 134 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
MongoRuntimeError,
needsRetryableWriteLabel
} from '../error';
import { type Callback, HostAddress, ns } from '../utils';
import { HostAddress, ns, promiseWithResolvers } from '../utils';
import { AuthContext, type AuthProvider } from './auth/auth_provider';
import { GSSAPI } from './auth/gssapi';
import { MongoCR } from './auth/mongocr';
Expand Down Expand Up @@ -55,27 +55,26 @@ export const AUTH_PROVIDERS = new Map<AuthMechanism | string, AuthProvider>([
/** @public */
export type Stream = Socket | TLSSocket;

export function connect(options: ConnectionOptions, callback: Callback<Connection>): void {
makeConnection({ ...options, existingSocket: undefined }, (err, socket) => {
if (err || !socket) {
return callback(err);
}

let ConnectionType = options.connectionType ?? Connection;
if (options.autoEncrypter) {
ConnectionType = CryptoConnection;
}
export async function connect(options: ConnectionOptions): Promise<Connection> {
let connection: Connection | null = null;
try {
const socket = await makeSocket(options);
connection = makeConnection(options, socket);
await performInitialHandshake(connection, options);
return connection;
} catch (error) {
connection?.destroy({ force: false });
throw error;
}
}

const connection = new ConnectionType(socket, options);
export function makeConnection(options: ConnectionOptions, socket: Stream): Connection {
let ConnectionType = options.connectionType ?? Connection;
if (options.autoEncrypter) {
ConnectionType = CryptoConnection;
}

performInitialHandshake(connection, options).then(
() => callback(undefined, connection),
error => {
connection.destroy({ force: false });
callback(error);
}
);
});
return new ConnectionType(socket, options);
}

function checkSupportedServer(hello: Document, options: ConnectionOptions) {
Expand Down Expand Up @@ -103,7 +102,7 @@ function checkSupportedServer(hello: Document, options: ConnectionOptions) {
return new MongoCompatibilityError(message);
}

async function performInitialHandshake(
export async function performInitialHandshake(
conn: Connection,
options: ConnectionOptions
): Promise<void> {
Expand Down Expand Up @@ -329,35 +328,21 @@ function parseSslOptions(options: MakeConnectionOptions): TLSConnectionOpts {
return result;
}

const SOCKET_ERROR_EVENT_LIST = ['error', 'close', 'timeout', 'parseError'] as const;
type ErrorHandlerEventName = (typeof SOCKET_ERROR_EVENT_LIST)[number] | 'cancel';
const SOCKET_ERROR_EVENTS = new Set(SOCKET_ERROR_EVENT_LIST);

function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stream>) {
export async function makeSocket(options: MakeConnectionOptions): Promise<Stream> {
const useTLS = options.tls ?? false;
const noDelay = options.noDelay ?? true;
const connectTimeoutMS = options.connectTimeoutMS ?? 30000;
const rejectUnauthorized = options.rejectUnauthorized ?? true;
const existingSocket = options.existingSocket;

let socket: Stream;
const callback: Callback<Stream> = function (err, ret) {
if (err && socket) {
socket.destroy();
}

_callback(err, ret);
};

if (options.proxyHost != null) {
// Currently, only Socks5 is supported.
return makeSocks5Connection(
{
...options,
connectTimeoutMS // Should always be present for Socks5
},
callback
);
return makeSocks5Connection({
...options,
connectTimeoutMS // Should always be present for Socks5
});
}

if (useTLS) {
Expand All @@ -379,47 +364,41 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stre
socket.setTimeout(connectTimeoutMS);
socket.setNoDelay(noDelay);

const connectEvent = useTLS ? 'secureConnect' : 'connect';
let cancellationHandler: (err: Error) => void;
function errorHandler(eventName: ErrorHandlerEventName) {
return (err: Error) => {
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
if (cancellationHandler && options.cancellationToken) {
options.cancellationToken.removeListener('cancel', cancellationHandler);
}

socket.removeListener(connectEvent, connectHandler);
callback(connectionFailureError(eventName, err));
};
}
let cancellationHandler: ((err: Error) => void) | null = null;

function connectHandler() {
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
if (cancellationHandler && options.cancellationToken) {
options.cancellationToken.removeListener('cancel', cancellationHandler);
const { promise: connectedSocket, resolve, reject } = promiseWithResolvers<Stream>();
if (existingSocket) {
resolve(socket);
} else {
const connectEvent = useTLS ? 'secureConnect' : 'connect';
socket
.once(connectEvent, () => resolve(socket))
.once('error', error => reject(connectionFailureError('error', error)))
.once('timeout', () => reject(connectionFailureError('timeout')))
.once('close', () => reject(connectionFailureError('close')));

if (options.cancellationToken != null) {
cancellationHandler = () => reject(connectionFailureError('cancel'));
options.cancellationToken.once('cancel', cancellationHandler);
}
}

if ('authorizationError' in socket) {
if (socket.authorizationError && rejectUnauthorized) {
// TODO(NODE-5192): wrap this with a MongoError subclass
return callback(socket.authorizationError);
}
try {
socket = await connectedSocket;
return socket;
} catch (error) {
socket.destroy();
if ('authorizationError' in socket && socket.authorizationError != null && rejectUnauthorized) {
// TODO(NODE-5192): wrap this with a MongoError subclass
throw socket.authorizationError;
}

throw error;
} finally {
socket.setTimeout(0);
callback(undefined, socket);
}

SOCKET_ERROR_EVENTS.forEach(event => socket.once(event, errorHandler(event)));
if (options.cancellationToken) {
cancellationHandler = errorHandler('cancel');
options.cancellationToken.once('cancel', cancellationHandler);
}

if (existingSocket) {
process.nextTick(connectHandler);
} else {
socket.once(connectEvent, connectHandler);
socket.removeAllListeners();
if (cancellationHandler != null) {
options.cancellationToken?.removeListener('cancel', cancellationHandler);
}
}
}

Expand All @@ -435,78 +414,68 @@ function loadSocks() {
return socks;
}

function makeSocks5Connection(options: MakeConnectionOptions, callback: Callback<Stream>) {
async function makeSocks5Connection(options: MakeConnectionOptions): Promise<Stream> {
const hostAddress = HostAddress.fromHostPort(
options.proxyHost ?? '', // proxyHost is guaranteed to set here
options.proxyPort ?? 1080
);

// First, connect to the proxy server itself:
makeConnection(
{
...options,
hostAddress,
tls: false,
proxyHost: undefined
},
(err, rawSocket) => {
if (err || !rawSocket) {
return callback(err);
}
const rawSocket = await makeSocket({
...options,
hostAddress,
tls: false,
proxyHost: undefined
});

const destination = parseConnectOptions(options) as net.TcpNetConnectOpts;
if (typeof destination.host !== 'string' || typeof destination.port !== 'number') {
return callback(
new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts')
);
}
const destination = parseConnectOptions(options) as net.TcpNetConnectOpts;
if (typeof destination.host !== 'string' || typeof destination.port !== 'number') {
throw new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts');
}

try {
socks ??= loadSocks();
} catch (error) {
return callback(error);
socks ??= loadSocks();

try {
// Then, establish the Socks5 proxy connection:
const { socket } = await socks.SocksClient.createConnection({
existing_socket: rawSocket,
timeout: options.connectTimeoutMS,
command: 'connect',
destination: {
host: destination.host,
port: destination.port
},
proxy: {
// host and port are ignored because we pass existing_socket
host: 'iLoveJavaScript',
port: 0,
type: 5,
userId: options.proxyUsername || undefined,
password: options.proxyPassword || undefined
}
});

// Then, establish the Socks5 proxy connection:
socks.SocksClient.createConnection({
existing_socket: rawSocket,
timeout: options.connectTimeoutMS,
command: 'connect',
destination: {
host: destination.host,
port: destination.port
},
proxy: {
// host and port are ignored because we pass existing_socket
host: 'iLoveJavaScript',
port: 0,
type: 5,
userId: options.proxyUsername || undefined,
password: options.proxyPassword || undefined
}
}).then(
({ socket }) => {
// Finally, now treat the resulting duplex stream as the
// socket over which we send and receive wire protocol messages:
makeConnection(
{
...options,
existingSocket: socket,
proxyHost: undefined
},
callback
);
},
error => callback(connectionFailureError('error', error))
);
}
);
// Finally, now treat the resulting duplex stream as the
// socket over which we send and receive wire protocol messages:
return await makeSocket({
...options,
existingSocket: socket,
proxyHost: undefined
});
} catch (error) {
throw connectionFailureError('error', error);
}
}

function connectionFailureError(type: ErrorHandlerEventName, err: Error) {
function connectionFailureError(type: 'error', cause: Error): MongoNetworkError;
function connectionFailureError(type: 'close' | 'timeout' | 'cancel'): MongoNetworkError;
function connectionFailureError(
type: 'error' | 'close' | 'timeout' | 'cancel',
cause?: Error
): MongoNetworkError {
switch (type) {
case 'error':
return new MongoNetworkError(MongoError.buildErrorMessage(err), { cause: err });
return new MongoNetworkError(MongoError.buildErrorMessage(cause), { cause });
case 'timeout':
return new MongoNetworkTimeoutError('connection timed out');
case 'close':
Expand Down

0 comments on commit a42039b

Please sign in to comment.