Skip to content

Commit

Permalink
fix: kill connection on commit/rollback error (#14535)
Browse files Browse the repository at this point in the history
  • Loading branch information
ephys committed May 27, 2022
1 parent b37df96 commit e1a9c28
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 134 deletions.
9 changes: 7 additions & 2 deletions src/dialects/abstract/connection-manager.d.ts
Expand Up @@ -28,7 +28,12 @@ export interface ConnectionManager {
*/
getConnection(opts: GetConnectionOptions): Promise<Connection>;
/**
* Release a pooled connection so it can be utilized by other connection requests
* Release a pooled connection, so it can be utilized by other connection requests
*/
releaseConnection(conn: Connection): Promise<void>;
releaseConnection(conn: Connection): void;

/**
* Destroys a pooled connection and removes it from the pool.
*/
destroyConnection(conn: Connection): Promise<void>;
}
14 changes: 11 additions & 3 deletions src/dialects/abstract/connection-manager.js
Expand Up @@ -298,14 +298,22 @@ class ConnectionManager {
* Release a pooled connection so it can be utilized by other connection requests
*
* @param {Connection} connection
*
* @returns {Promise}
*/
async releaseConnection(connection) {
releaseConnection(connection) {
this.pool.release(connection);
debug('connection released');
}

/**
* Destroys a pooled connection and removes it from the pool.
*
* @param {Connection} connection
*/
async destroyConnection(connection) {
await this.pool.destroy(connection);
debug(`connection ${connection.uuid} destroyed`);
}

/**
* Call dialect library to get connection
*
Expand Down
27 changes: 13 additions & 14 deletions src/sequelize.js
Expand Up @@ -642,7 +642,7 @@ class Sequelize {
} finally {
await this.runHooks('afterQuery', options, query);
if (!options.transaction) {
await this.connectionManager.releaseConnection(connection);
this.connectionManager.releaseConnection(connection);
}
}
}, retryOptions);
Expand Down Expand Up @@ -1174,30 +1174,29 @@ class Sequelize {
const transaction = new Transaction(this, options);

if (!autoCallback) {
await transaction.prepareEnvironment(false);
await transaction.prepareEnvironment(/* cls */ false);
return transaction;
}

// autoCallback provided
return Sequelize._clsRun(async () => {
await transaction.prepareEnvironment(/* cls */ true);

let result;
try {
await transaction.prepareEnvironment();
const result = await autoCallback(transaction);
await transaction.commit();
return await result;
result = await autoCallback(transaction);
} catch (err) {
try {
if (!transaction.finished) {
await transaction.rollback();
} else {
// release the connection, even if we don't need to rollback
await transaction.cleanup();
}
} catch (err0) {
// ignore
await transaction.rollback();
} catch (ignore) {
// ignore, because 'rollback' will already print the error before killing the connection
}

throw err;
}

await transaction.commit();
return result;
});
}

Expand Down
50 changes: 38 additions & 12 deletions src/transaction.js
Expand Up @@ -57,10 +57,15 @@ class Transaction {
}

try {
return await this.sequelize.getQueryInterface().commitTransaction(this, this.options);
await this.sequelize.getQueryInterface().commitTransaction(this, this.options);
this.cleanup();
} catch (e) {
console.warn(`Committing transaction ${this.id} failed with error ${JSON.stringify(e.message)}. We are killing its connection as it is now in an undetermined state.`);
await this.forceCleanup();

throw e;
} finally {
this.finished = 'commit';
this.cleanup();
for (const hook of this._afterCommitHooks) {
await hook.apply(this, [this]);
}
Expand All @@ -82,12 +87,17 @@ class Transaction {
}

try {
return await this
await this
.sequelize
.getQueryInterface()
.rollbackTransaction(this, this.options);
} finally {

this.cleanup();
} catch (e) {
console.warn(`Rolling back transaction ${this.id} failed with error ${JSON.stringify(e.message)}. We are killing its connection as it is now in an undetermined state.`);
await this.forceCleanup();

throw e;
}
}

Expand All @@ -98,13 +108,9 @@ class Transaction {
* @param {boolean} useCLS Defaults to true: Use CLS (Continuation Local Storage) with Sequelize. With CLS, all queries within the transaction callback will automatically receive the transaction object.
* @returns {Promise}
*/
async prepareEnvironment(useCLS) {
async prepareEnvironment(useCLS = true) {
let connectionPromise;

if (useCLS === undefined) {
useCLS = true;
}

if (this.parent) {
connectionPromise = Promise.resolve(this.parent.connection);
} else {
Expand All @@ -131,6 +137,7 @@ class Transaction {
}
}

// TODO (@ephys) [>=7.0.0]: move this inside of sequelize.transaction, remove parameter.
if (useCLS && this.sequelize.constructor._cls) {
this.sequelize.constructor._cls.set('transaction', this);
}
Expand Down Expand Up @@ -163,12 +170,31 @@ class Transaction {
cleanup() {
// Don't release the connection if there's a parent transaction or
// if we've already cleaned up
if (this.parent || this.connection.uuid === undefined) return;
if (this.parent || this.connection.uuid === undefined) {
return;
}

this._clearCls();
this.sequelize.connectionManager.releaseConnection(this.connection);
this.connection.uuid = undefined;
}

/**
* Kills the connection this transaction uses.
* Used as a last resort, for instance because COMMIT or ROLLBACK resulted in an error
* and the transaction is left in a broken state,
* and releasing the connection to the pool would be dangerous.
*/
async forceCleanup() {
// Don't release the connection if there's a parent transaction or
// if we've already cleaned up
if (this.parent || this.connection.uuid === undefined) {
return;
}

this._clearCls();
const res = this.sequelize.connectionManager.releaseConnection(this.connection);
await this.sequelize.connectionManager.destroyConnection(this.connection);
this.connection.uuid = undefined;
return res;
}

_clearCls() {
Expand Down

0 comments on commit e1a9c28

Please sign in to comment.