Skip to content

Commit

Permalink
Process passed Oracle connection (#4757)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Oct 20, 2021
1 parent 3e53c03 commit bfdece3
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 193 deletions.
1 change: 1 addition & 0 deletions lib/builder-interface-augmenter.js
Expand Up @@ -56,6 +56,7 @@ function augmentWithBuilderInterface(Target) {
// Sets an explicit "connection" we wish to use for this query.
Target.prototype.connection = function (connection) {
this._connection = connection;
this.client.processPassedConnection(connection);
return this;
};

Expand Down
4 changes: 4 additions & 0 deletions lib/client.js
Expand Up @@ -438,6 +438,10 @@ class Client extends EventEmitter {

return this.parameter(values, builder, bindingsHolder);
}

processPassedConnection(connection) {
// Default implementation is noop
}
}

Object.assign(Client.prototype, {
Expand Down
202 changes: 10 additions & 192 deletions lib/dialects/oracledb/index.js
@@ -1,7 +1,5 @@
// Oracledb Client
// -------
const { promisify } = require('util');
const stream = require('stream');
const each = require('lodash/each');
const flatten = require('lodash/flatten');
const isEmpty = require('lodash/isEmpty');
Expand All @@ -12,9 +10,13 @@ const Formatter = require('../../formatter');
const QueryCompiler = require('./query/oracledb-querycompiler');
const TableCompiler = require('./schema/oracledb-tablecompiler');
const ColumnCompiler = require('./schema/oracledb-columncompiler');
const {
BlobHelper,
ReturningHelper,
monkeyPatchConnection,
} = require('./utils');
const ViewCompiler = require('./schema/oracledb-viewcompiler');
const ViewBuilder = require('./schema/oracledb-viewbuilder');
const { BlobHelper, ReturningHelper, isConnectionError } = require('./utils');
const Transaction = require('./transaction');
const Client_Oracle = require('../oracle');
const { isString } = require('../../util/is');
Expand Down Expand Up @@ -149,141 +151,8 @@ class Client_Oracledb extends Client_Oracle {
if (err) {
return rejecter(err);
}
connection.commitAsync = function () {
return new Promise((commitResolve, commitReject) => {
this.commit(function (err) {
if (err) {
return commitReject(err);
}
commitResolve();
});
});
};
connection.rollbackAsync = function () {
return new Promise((rollbackResolve, rollbackReject) => {
this.rollback(function (err) {
if (err) {
return rollbackReject(err);
}
rollbackResolve();
});
});
};
const fetchAsync = promisify(function (sql, bindParams, options, cb) {
options = options || {};
options.outFormat =
client.driver.OUT_FORMAT_OBJECT || client.driver.OBJECT;
if (!options.outFormat) {
throw new Error('not found oracledb.outFormat constants');
}
if (options.resultSet) {
connection.execute(
sql,
bindParams || [],
options,
function (err, result) {
if (err) {
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
return cb(err);
}
const fetchResult = { rows: [], resultSet: result.resultSet };
const numRows = 100;
const fetchRowsFromRS = function (
connection,
resultSet,
numRows
) {
resultSet.getRows(numRows, function (err, rows) {
if (err) {
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
resultSet.close(function () {
return cb(err);
});
} else if (rows.length === 0) {
return cb(null, fetchResult);
} else if (rows.length > 0) {
if (rows.length === numRows) {
fetchResult.rows = fetchResult.rows.concat(rows);
fetchRowsFromRS(connection, resultSet, numRows);
} else {
fetchResult.rows = fetchResult.rows.concat(rows);
return cb(null, fetchResult);
}
}
});
};
fetchRowsFromRS(connection, result.resultSet, numRows);
}
);
} else {
connection.execute(
sql,
bindParams || [],
options,
function (err, result) {
if (err) {
// dispose the connection on connection error
if (isConnectionError(err)) {
connection.close().catch(function (err) {});
connection.__knex__disposed = err;
}
return cb(err);
}

return cb(null, result);
}
);
}
});
connection.executeAsync = function (sql, bindParams, options) {
// Read all lob
return fetchAsync(sql, bindParams, options).then(async (results) => {
const closeResultSet = () => {
return results.resultSet
? promisify(results.resultSet.close).call(results.resultSet)
: Promise.resolve();
};

// Collect LOBs to read
const lobs = [];
if (results.rows) {
if (Array.isArray(results.rows)) {
for (let i = 0; i < results.rows.length; i++) {
// Iterate through the rows
const row = results.rows[i];
for (const column in row) {
if (row[column] instanceof stream.Readable) {
lobs.push({ index: i, key: column, stream: row[column] });
}
}
}
}
}

try {
for (const lob of lobs) {
// todo should be fetchAsString/fetchAsBuffer polyfill only
results.rows[lob.index][lob.key] = await lobProcessing(
lob.stream
);
}
} catch (e) {
await closeResultSet().catch(() => {});
monkeyPatchConnection(connection, client);

throw e;
}

await closeResultSet();

return results;
});
};
resolver(connection);
});
});
Expand Down Expand Up @@ -432,6 +301,10 @@ class Client_Oracledb extends Client_Oracle {
return response;
}
}

processPassedConnection(connection) {
monkeyPatchConnection(connection, this);
}
}

Client_Oracledb.prototype.driverName = 'oracledb';
Expand All @@ -454,59 +327,4 @@ function resolveConnectString(connectionSettings) {
);
}

/**
* @param stream
* @param {'string' | 'buffer'} type
*/
function readStream(stream, type) {
return new Promise((resolve, reject) => {
let data = type === 'string' ? '' : Buffer.alloc(0);

stream.on('error', function (err) {
reject(err);
});
stream.on('data', function (chunk) {
if (type === 'string') {
data += chunk;
} else {
data = Buffer.concat([data, chunk]);
}
});
stream.on('end', function () {
resolve(data);
});
});
}

const lobProcessing = function (stream) {
const oracledb = require('oracledb');

/**
* @type 'string' | 'buffer'
*/
let type;

if (stream.type) {
// v1.2-v4
if (stream.type === oracledb.BLOB) {
type = 'buffer';
} else if (stream.type === oracledb.CLOB) {
type = 'string';
}
} else if (stream.iLob) {
// v1
if (stream.iLob.type === oracledb.CLOB) {
type = 'string';
} else if (stream.iLob.type === oracledb.BLOB) {
type = 'buffer';
}
} else {
throw new Error('Unrecognized oracledb lob stream type');
}
if (type === 'string') {
stream.setEncoding('utf-8');
}
return readStream(stream, type);
};

module.exports = Client_Oracledb;

0 comments on commit bfdece3

Please sign in to comment.