Skip to content

Commit

Permalink
Support SELECT .. FOR NO KEY UPDATE / KEY SHARE row level locking cla…
Browse files Browse the repository at this point in the history
…uses in Postgres (#4755)
  • Loading branch information
domkck committed Oct 23, 2021
1 parent 58de7a9 commit a17cc32
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 9 deletions.
22 changes: 14 additions & 8 deletions lib/dialects/postgres/query/pg-querycompiler.js
Expand Up @@ -147,20 +147,26 @@ class QueryCompiler_PG extends QueryCompiler {
return sql.join(', ');
}

forUpdate() {
_lockingClause(lockMode) {
const tables = this.single.lockTables || [];

return (
'for update' + (tables.length ? ' of ' + this._tableNames(tables) : '')
);
return lockMode + (tables.length ? ' of ' + this._tableNames(tables) : '');
}

forUpdate() {
return this._lockingClause('for update');
}

forShare() {
const tables = this.single.lockTables || [];
return this._lockingClause('for share');
}

forNoKeyUpdate() {
return this._lockingClause('for no key update');
}

return (
'for share' + (tables.length ? ' of ' + this._tableNames(tables) : '')
);
forKeyShare() {
return this._lockingClause('for key share');
}

skipLocked() {
Expand Down
12 changes: 12 additions & 0 deletions lib/dialects/redshift/query/redshift-querycompiler.js
Expand Up @@ -61,6 +61,18 @@ class QueryCompiler_Redshift extends QueryCompiler_PG {
return '';
}

forNoKeyUpdate() {
this.client.logger.warn('table lock is not supported by redshift dialect');
return '';
}

forKeyShare() {
this.client.logger.warn(
'lock for share is not supported by redshift dialect'
);
return '';
}

// Compiles a columnInfo query
columnInfo() {
const column = this.single.columnInfo;
Expand Down
2 changes: 2 additions & 0 deletions lib/dialects/sqlite3/query/sqlite-querycompiler.js
Expand Up @@ -27,7 +27,9 @@ class QueryCompiler_SQLite3 extends QueryCompiler {

// The locks are not applicable in SQLite3
this.forShare = emptyStr;
this.forKeyShare = emptyStr;
this.forUpdate = emptyStr;
this.forNoKeyUpdate = emptyStr;
}

// SQLite requires us to build the multi-row insert as a listing of select with
Expand Down
2 changes: 2 additions & 0 deletions lib/query/constants.js
Expand Up @@ -5,6 +5,8 @@ module.exports = {
lockMode: {
forShare: 'forShare',
forUpdate: 'forUpdate',
forNoKeyUpdate: 'forNoKeyUpdate',
forKeyShare: 'forKeyShare',
},
waitMode: {
skipLocked: 'skipLocked',
Expand Down
21 changes: 20 additions & 1 deletion lib/query/querybuilder.js
Expand Up @@ -46,7 +46,12 @@ const CLEARABLE_STATEMENTS = new Set([
'counter',
'counters',
]);
const LOCK_MODES = new Set([lockMode.forShare, lockMode.forUpdate]);
const LOCK_MODES = new Set([
lockMode.forShare,
lockMode.forUpdate,
lockMode.forNoKeyUpdate,
lockMode.forKeyShare,
]);

// Typically called from `knex.builder`,
// start a new query building chain.
Expand Down Expand Up @@ -1198,6 +1203,20 @@ class Builder extends EventEmitter {
return this;
}

// Set a lock for no key update constraint.
forNoKeyUpdate(...tables) {
this._single.lock = lockMode.forNoKeyUpdate;
this._single.lockTables = tables;
return this;
}

// Set a lock for key share constraint.
forKeyShare(...tables) {
this._single.lock = lockMode.forKeyShare;
this._single.lockTables = tables;
return this;
}

// Skips locked rows when using a lock constraint.
skipLocked() {
if (!this._isSelectQuery()) {
Expand Down
72 changes: 72 additions & 0 deletions test/integration2/query/select/selects.spec.js
Expand Up @@ -23,6 +23,7 @@ const {
createTestTableTwo,
dropTables,
createDefaultTable,
createParentAndChildTables,
} = require('../../../util/tableCreatorHelper');
const { insertAccounts } = require('../../../util/dataInsertHelper');
const { assertNumberArrayStrict } = require('../../../util/assertHelper');
Expand Down Expand Up @@ -1773,6 +1774,77 @@ describe('Selects', function () {
});
});

it('select for no key update doesnt stop other transactions from inserting into tables that have a foreign key relationship', async function () {
if (!isPostgreSQL(knex)) {
return this.skip();
}

await createParentAndChildTables(knex);

return knex('parent')
.insert({
id: 1,
})
.then(() => {
return knex('child')
.insert({
id: 1,
parent_id: 1,
})
.then(() => {
return knex.transaction((trx) => {
// select all from the parent table in the for no key update mode
return trx('parent')
.forNoKeyUpdate()
.then((res) => {
// Insert should into the child table not hang
return knex('child')
.insert({
id: 2,
parent_id: 1,
})
.timeout(150);
});
});
});
});
});

it('select for key share blocks select for update but not select for no key update', async function () {
if (!isPostgreSQL(knex)) {
return this.skip();
}

return knex('test_default_table')
.insert({ string: 'making sure there is a row to lock' })
.then(() => {
return knex
.transaction((trx) => {
// select all from test table and lock
return trx('test_default_table')
.forKeyShare()
.then((res) => {
// trying to select stuff from table in other connection should succeed with for no key update
return knex('test_default_table')
.forNoKeyUpdate()
.timeout(200);
})
.then((res) => {
// trying to select stuff from table in other connection should hang with for update
return knex('test_default_table').forUpdate().timeout(100);
});
})
.then((res) => {
expect('Second query should have timed out').to.be.false;
})
.catch((err) => {
expect(err.message).to.be.contain(
'Defined query timeout of 100ms exceeded when running query'
);
});
});
});

it('select for share prevents updating in other transaction', function () {
// Query cancellation is not yet implemented for CockroachDB
if (isSQLite(knex) || isOracle(knex) || isCockroachDB(knex)) {
Expand Down
21 changes: 21 additions & 0 deletions test/unit/query/builder.js
Expand Up @@ -6507,6 +6507,27 @@ describe('QueryBuilder', () => {
});
});

it('lock for no key update', () => {
testsql(
qb().select('*').from('foo').where('bar', '=', 'baz').forNoKeyUpdate(),
{
pg: {
sql: 'select * from "foo" where "bar" = ? for no key update',
bindings: ['baz'],
},
}
);
});

it('lock for key share', () => {
testsql(qb().select('*').from('foo').where('bar', '=', 'baz').forShare(), {

This comment has been minimized.

Copy link
@ekzyis

ekzyis Oct 25, 2021

shouldn't this be here forKeyShare() instead of forShare()? 🤔

pg: {
sql: 'select * from "foo" where "bar" = ? for share',
bindings: ['baz'],
},
});
});

it('should allow lock (such as forUpdate) outside of a transaction', () => {
testsql(qb().select('*').from('foo').where('bar', '=', 'baz').forUpdate(), {
mysql: {
Expand Down
13 changes: 13 additions & 0 deletions test/util/tableCreatorHelper.js
Expand Up @@ -85,6 +85,16 @@ async function createCompositeKeyTable(knex) {
});
}

async function createParentAndChildTables(knex) {
await knex.schema.createTable('parent', (table) => {
table.integer('id').primary();
});
await knex.schema.createTable('child', (table) => {
table.integer('id').primary();
table.integer('parent_id').references('parent.id');
});
}

async function dropTables(knex) {
await knex.schema.dropTableIfExists('accounts');
await knex.schema.dropTableIfExists('users');
Expand All @@ -95,6 +105,8 @@ async function dropTables(knex) {
await knex.schema.dropTableIfExists('datatype_test');
await knex.schema.dropTableIfExists('test_default_table');
await knex.schema.dropTableIfExists('test_default_table2');
await knex.schema.dropTableIfExists('child');
await knex.schema.dropTableIfExists('parent');
}

module.exports = {
Expand All @@ -104,5 +116,6 @@ module.exports = {
createDefaultTable,
createUsers,
createTestTableTwo,
createParentAndChildTables,
dropTables,
};
6 changes: 6 additions & 0 deletions types/index.d.ts
Expand Up @@ -1740,6 +1740,12 @@ export declare namespace Knex {
forShare(...tableNames: string[]): QueryBuilder<TRecord, TResult>;
forShare(tableNames: readonly string[]): QueryBuilder<TRecord, TResult>;

forNoKeyUpdate(...tableNames: string[]): QueryBuilder<TRecord, TResult>;
forNoKeyUpdate(tableNames: readonly string[]): QueryBuilder<TRecord, TResult>;

forKeyShare(...tableNames: string[]): QueryBuilder<TRecord, TResult>;
forKeyShare(tableNames: readonly string[]): QueryBuilder<TRecord, TResult>;

skipLocked(): QueryBuilder<TRecord, TResult>;
noWait(): QueryBuilder<TRecord, TResult>;

Expand Down

0 comments on commit a17cc32

Please sign in to comment.