Skip to content

Commit

Permalink
fix: end child spans correctly in pg (#930)
Browse files Browse the repository at this point in the history
  • Loading branch information
kjin committed Jan 22, 2019
1 parent bc98aa3 commit 1a20b7c
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 83 deletions.
214 changes: 144 additions & 70 deletions src/plugins/plugin-pg.ts
Expand Up @@ -16,23 +16,18 @@

import {EventEmitter} from 'events';
import * as shimmer from 'shimmer';
import {Readable} from 'stream';

import {Patch, Plugin, Span} from '../plugin-types';
import {Patch, Plugin, Span, Tracer} from '../plugin-types';

import {pg_6, pg_7} from './types';

// TS: Client#query also accepts a callback as a last argument, but TS cannot
// detect this as it's a dependent type. So we don't specify it here.
type ClientQueryArguments =
[{submit?: Function} & pg_7.QueryConfig]|[string]|[string, {}];
[Submittable & pg_7.QueryConfig]|[string]|[string, {}];
type PG7QueryReturnValue = (pg_7.QueryConfig&({submit: Function}&EventEmitter)|
pg_7.Query)|Promise<pg_7.QueryResult>;

// tslint:disable-next-line:no-any
function isSubmittable(obj: any): obj is {submit: Function} {
return typeof obj.submit === 'function';
}
type Callback<T> = (err: Error|null, res?: T) => void;

const noOp = () => {};

Expand Down Expand Up @@ -66,46 +61,146 @@ function populateLabelsFromOutputs(
}
}

/**
* Partial shape of objects returned by Client#query. Only contains methods that
* are significant to the query lifecycle.
*/
type Submittable = {
// Called when the query is completed.
handleReadyForQuery: () => void;
// Called when an error occurs.
handleError: () => void;
// A field that is populated when the Submittable is a Query object.
_result?: pg_7.QueryResult;
};

/**
* Utility class to help organize patching logic.
*/
class PostgresPatchUtility {
readonly maybePopulateLabelsFromInputs: typeof populateLabelsFromInputs;
readonly maybePopulateLabelsFromOutputs: typeof populateLabelsFromOutputs;

constructor(private readonly tracer: Tracer) {
this.maybePopulateLabelsFromInputs =
tracer.enhancedDatabaseReportingEnabled() ? populateLabelsFromInputs :
noOp;
this.maybePopulateLabelsFromOutputs =
tracer.enhancedDatabaseReportingEnabled() ? populateLabelsFromOutputs :
noOp;
}

patchSubmittable(pgQuery: Submittable, span: Span): Submittable {
let spanEnded = false;
const {maybePopulateLabelsFromOutputs} = this;
if (pgQuery.handleError) {
shimmer.wrap(pgQuery, 'handleError', (origCallback) => {
// Elements of args are not individually accessed.
// tslint:disable:no-any
return this.tracer.wrap(function(
this: Submittable, ...args: any[]): void {
// tslint:enable:no-any
if (!spanEnded) {
const err: Error = args[0];
maybePopulateLabelsFromOutputs(span, err);
span.endSpan();
spanEnded = true;
}
if (origCallback) {
origCallback.apply(this, args);
}
});
});
}
if (pgQuery.handleReadyForQuery) {
shimmer.wrap(pgQuery, 'handleReadyForQuery', (origCallback) => {
// Elements of args are not individually accessed.
// tslint:disable:no-any
return this.tracer.wrap(function(
this: Submittable, ...args: any[]): void {
// tslint:enable:no-any
if (!spanEnded) {
maybePopulateLabelsFromOutputs(span, null, this._result);
span.endSpan();
spanEnded = true;
}
if (origCallback) {
origCallback.apply(this, args);
}
});
});
}
return pgQuery;
}

patchCallback(callback: Callback<pg_7.QueryResult>, span: Span):
Callback<pg_7.QueryResult> {
return this.tracer.wrap((err: Error|null, res?: pg_7.QueryResult) => {
this.maybePopulateLabelsFromOutputs(span, err, res);
span.endSpan();
callback(err, res);
});
}

patchPromise(promise: Promise<pg_7.QueryResult>, span: Span):
Promise<pg_7.QueryResult> {
return promise = promise.then(
(res) => {
this.maybePopulateLabelsFromOutputs(span, null, res);
span.endSpan();
return res;
},
(err) => {
this.maybePopulateLabelsFromOutputs(span, err);
span.endSpan();
throw err;
});
}
}

const plugin: Plugin = [
{
file: 'lib/client.js',
versions: '^6.x',
// TS: Client is a class name.
// tslint:disable-next-line:variable-name
patch: (Client, api) => {
const maybePopulateLabelsFromInputs =
api.enhancedDatabaseReportingEnabled() ? populateLabelsFromInputs :
noOp;
const maybePopulateLabelsFromOutputs =
api.enhancedDatabaseReportingEnabled() ? populateLabelsFromOutputs :
noOp;
const pgPatch = new PostgresPatchUtility(api);

shimmer.wrap(Client.prototype, 'query', (query) => {
return function query_trace(this: pg_6.Client) {
const span = api.createChildSpan({name: 'pg-query'});
if (!api.isRealSpan(span)) {
return query.apply(this, arguments);
}
const argLength = arguments.length;
if (argLength >= 1) {
const args: ClientQueryArguments =
Array.prototype.slice.call(arguments, 0);
// Every call to Client#query will have a Submittable object associated
// with it. We need to patch two handlers (handleReadyForQuery and
// handleError) to end a span.
// There are a few things to note here:
// * query accepts a Submittable or a string. A Query is a Submittable.
// So if we can get a Submittable from the input we patch it
// proactively, otherwise (in the case of a string) we patch the
// output Query instead.
// * If query is passed a callback, the callback will be invoked from
// either handleReadyForQuery or handleError. So we don't need to
// separately patch the callback.
return function query_trace(
this: pg_6.Client, ...args: ClientQueryArguments) {
if (args.length >= 1) {
const span = api.createChildSpan({name: 'pg-query'});
if (!api.isRealSpan(span)) {
return query.apply(this, args);
}
// Extract query text and values, if needed.
maybePopulateLabelsFromInputs(span, args);
pgPatch.maybePopulateLabelsFromInputs(span, args);
if (typeof args[0] === 'object') {
pgPatch.patchSubmittable(args[0], span);
return query.apply(this, args);
} else {
return pgPatch.patchSubmittable(
query.apply(this, args) as Submittable, span);
}
} else {
// query was called with no arguments.
// This doesn't make sense, but don't do anything that might cause
// an error to get thrown here, or a span to be started.
return query.apply(this, args);
}
const pgQuery: pg_6.QueryReturnValue = query.apply(this, arguments);
api.wrapEmitter(pgQuery);
const done = pgQuery.callback;
// TODO(kjin): Clean up this line a little bit by casting the function
// passed to api.wrap as a NonNullable<typeof done>.
pgQuery.callback =
api.wrap((err: Error|null, res?: pg_7.QueryResult) => {
maybePopulateLabelsFromOutputs(span, err, res);
span.endSpan();
if (done) {
done(err, res);
}
});
return pgQuery;
};
});
},
Expand All @@ -121,12 +216,7 @@ const plugin: Plugin = [
// TS: Client is a class name.
// tslint:disable-next-line:variable-name
patch: (Client, api) => {
const maybePopulateLabelsFromInputs =
api.enhancedDatabaseReportingEnabled() ? populateLabelsFromInputs :
noOp;
const maybePopulateLabelsFromOutputs =
api.enhancedDatabaseReportingEnabled() ? populateLabelsFromOutputs :
noOp;
const pgPatch = new PostgresPatchUtility(api);
shimmer.wrap(Client.prototype, 'query', (query) => {
return function query_trace(this: pg_7.Client) {
const span = api.createChildSpan({name: 'pg-query'});
Expand All @@ -151,25 +241,18 @@ const plugin: Plugin = [
Array.prototype.slice.call(arguments, 0);

// Extract query text and values, if needed.
maybePopulateLabelsFromInputs(span, args);
pgPatch.maybePopulateLabelsFromInputs(span, args);

// If we received a callback, bind it to the current context,
// optionally adding labels as well.
const callback = args[args.length - 1];
if (typeof callback === 'function') {
args[args.length - 1] =
api.wrap((err: Error|null, res?: pg_7.QueryResult) => {
maybePopulateLabelsFromOutputs(span, err, res);
span.endSpan();
// TS: Type cast is safe as we know that callback is a
// Function.
(callback as (err: Error|null, res?: pg_7.QueryResult) =>
void)(err, res);
});
pgQuery = query.apply(this, args);
} else {
pgQuery = query.apply(this, arguments);
args[args.length - 1] = pgPatch.patchCallback(
callback as Callback<pg_7.QueryResult>, span);
} else if (typeof args[0] === 'object') {
pgPatch.patchSubmittable(args[0] as Submittable, span);
}
pgQuery = query.apply(this, args);
} else {
pgQuery = query.apply(this, arguments);
}
Expand All @@ -178,19 +261,10 @@ const plugin: Plugin = [
if (pgQuery instanceof EventEmitter) {
api.wrapEmitter(pgQuery);
} else if (typeof pgQuery.then === 'function') {
// Ensure that the span is ended, optionally adding labels as
// well.
pgQuery = pgQuery.then(
(res) => {
maybePopulateLabelsFromOutputs(span, null, res);
span.endSpan();
return res;
},
(err) => {
maybePopulateLabelsFromOutputs(span, err);
span.endSpan();
throw err;
});
// Unlike in pg 6, the returned value can't be both a Promise and
// a Submittable. So we don't run the risk of double-patching
// here.
pgPatch.patchPromise(pgQuery, span);
}
}
return pgQuery;
Expand Down
23 changes: 10 additions & 13 deletions test/plugins/test-trace-pg.ts
Expand Up @@ -46,22 +46,20 @@ pgVersions.forEach(pgVersion => {
client = c;
releaseClient = release;
assert(!err);
client.query('CREATE TABLE t (name text NOT NULL, id text NOT NULL)', [],
client.query('DROP TABLE t', [], function(err, res) {
assert(!err || err.code == '42P01'); // table "t" does not exist
client.query('CREATE TABLE t (name text NOT NULL, id text NOT NULL)', [],
function(err, res) {
assert(!err);
common.cleanTraces();
done();
});
});
});
});

afterEach(function(done) {
client.query('DROP TABLE t', [], function(err, res) {
assert(!err);
releaseClient();
common.cleanTraces();
done();
});
afterEach(() => {
releaseClient();
common.cleanTraces();
});

it('should perform basic operations', function(done) {
Expand Down Expand Up @@ -167,19 +165,18 @@ pgVersions.forEach(pgVersion => {

it('should work with generic Submittables', function(done) {
common.runInTransaction(function(endRootSpan) {
let submitCalled = false;
client.query({
submit: (connection) => {
// Indicate that the next item may be processed.
connection.emit('readyForQuery');
submitCalled = true;
},
handleReadyForQuery: () => {
endRootSpan();
common.getMatchingSpan(function (span) {
return span.name === 'pg-query';
});
done();
},
handleReadyForQuery: () => {}
}
});
});
});
Expand Down

0 comments on commit 1a20b7c

Please sign in to comment.