Skip to content

Commit

Permalink
use /1/batch/<dataset> endpoint (#16)
Browse files Browse the repository at this point in the history
pretty simple pair of changes masked with a lot of additional churn.

Adds a class called BatchEndpointAggregator to transmission.js that groups events based on apiHost/writekey/datasetname. It does manual JSON encoding to not double encode event post data (which is already JSON.stringifyed before making it to transmission)

The Transmission#_sendBatch method is where the other part of the change is. Once a batch of events is chosen (through timeout or event count triggers), it's aggregated. Then we create a promise to send each batch endpoint aggregated list of events.

The bulk of the test changes are to deal with the fact that with the batch endpoint supported, we get multiple responses at once.
  • Loading branch information
toshok committed Jun 16, 2017
1 parent 9c06dd5 commit 07ab2ff
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 103 deletions.
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
"devDependencies": {
"babel-preset-es2015": "^6.9.0",
"babel-register": "^6.11.6",
"esdoc": "^0.4.8",
"esdoc": "^0.5.2",
"gulp": "^3.9.1",
"gulp-babel": "^6.1.2",
"gulp-esdoc": "^0.3.0",
"gulp-gh-pages": "^0.5.4",
"gulp-replace": "^0.5.4",
"jsdoc": "^3.4.0",
"mocha": "^3.0.2",
"superagent-mocker": "^0.5.2",
"esdoc": "^0.5.2"
"superagent-mocker": "^0.5.2"
},
"dependencies": {
"superagent": "^2.3.0",
Expand Down
41 changes: 20 additions & 21 deletions src/libhoney.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// Use of this source code is governed by the Apache License 2.0
// license that can be found in the LICENSE file.

// jshint esversion: 6
/**
* @module
*/
import Transmission from './transmission';
import { Transmission, ValidatedEvent } from './transmission';
import Builder from './builder';

import { EventEmitter } from 'events';
Expand Down Expand Up @@ -75,7 +76,7 @@ export default class Libhoney extends EventEmitter {
super();
this._options = Object.assign({ responseCallback: this._responseCallback.bind(this) }, defaults, opts);
this._transmission = getAndInitTransmission(this._options.transmission, this._options);
this._usable = this._transmission != null;
this._usable = this._transmission !== null;
this._builder = new Builder(this);

this._builder.apiHost = this._options.apiHost;
Expand All @@ -86,12 +87,12 @@ export default class Libhoney extends EventEmitter {
this._responseQueue = [];
}

_responseCallback (response) {
_responseCallback (responses) {
let queue = this._responseQueue;
if (queue.length < this._options.maxResponseQueueSize) {
queue.push(response);
this._responseQueue = this._responseQueue.concat(responses);
}
this.emit("response", queue);
this.emit("response", this._responseQueue);
}

/**
Expand Down Expand Up @@ -230,59 +231,57 @@ export default class Libhoney extends EventEmitter {
* @private
*/
validateEvent (event) {
if (!this._usable) return;
if (!this._usable) return null;

var timestamp = event.timestamp || Date.now();
if (typeof timestamp === 'string' || typeof timestamp === 'number')
timestamp = new Date(timestamp);

if (typeof event.data !== 'object' || event.data === null) {
console.error(".data must be an object");
return;
return null;
}
var postData;
try {
postData = JSON.stringify(event.data);
}
catch (e) {
console.error("error converting event data to JSON: " + e);
return;
return null;
}

var apiHost = event.apiHost;
if (typeof apiHost !== 'string' || apiHost === "") {
console.error(".apiHost must be a non-empty string");
return;
return null;
}

var writeKey = event.writeKey;
if (typeof writeKey !== 'string' || writeKey === "") {
console.error(".writeKey must be a non-empty string");
return;
return null;
}

var dataset = event.dataset;
if (typeof dataset !== 'string' || dataset === "") {
console.error(".dataset must be a non-empty string");
return;
return null;
}

var sampleRate = event.sampleRate;
if (typeof sampleRate !== 'number') {
console.error(".sampleRate must be a number");
return;
return null;
}

var metadata = event.metadata;
return {
timestamp,
apiHost,
postData,
writeKey,
dataset,
sampleRate,
metadata
};
return new ValidatedEvent({timestamp,
apiHost,
postData,
writeKey,
dataset,
sampleRate,
metadata});
}

/**
Expand Down
155 changes: 138 additions & 17 deletions src/transmission.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
// Use of this source code is governed by the Apache License 2.0
// license that can be found in the LICENSE file.

// jshint esversion: 6
/* global require, window, global */

/**
* @module
*/
let superagent = require('superagent');
import superagent from 'superagent';
import urljoin from 'urljoin';

const userAgent = "libhoney-js/LIBHONEY_JS_VERSION";
Expand All @@ -34,10 +35,95 @@ const eachPromise = (arr, iteratorFn) =>
});
}, Promise.resolve());

const partition = (arr, keyfn, createfn, addfn) => {
let result = Object.create(null);
arr.forEach((v) => {
let key = keyfn(v);
if (!result[key]) {
result[key] = createfn(v);
} else {
addfn(result[key], v);
}
});
return result;
};

class BatchEndpointAggregator {
constructor(events) {
this.batches = partition(events,
/* keyfn */
(ev) => `${ev.apiHost}_${ev.writeKey}_${ev.dataset}`,
/* createfn */
(ev) => ({
apiHost: ev.apiHost,
writeKey: ev.writeKey,
dataset: ev.dataset,
events: [ev]
}),
/* addfn */
(batch, ev) => batch.events.push(ev));
}

encodeBatchEvents (events) {
let first = true;
let numEncoded = 0;
let encodedEvents = events.reduce((acc, ev) => {
try {
let encodedEvent = ev.toJSON(); // directly call toJSON, not JSON.stringify, because the latter wraps it in an additional set of quotes
numEncoded++;
let newAcc = acc + (!first ? "," : "") + encodedEvent;
first = false;
return newAcc;
} catch (e) {
ev.encodeError = e;
return acc;
}
}, "");

let encoded = "[" + encodedEvents + "]";
return { encoded, numEncoded };
}
}

/**
* @private
*/
export default class Transmission {
export class ValidatedEvent {
constructor({ timestamp,
apiHost,
postData,
writeKey,
dataset,
sampleRate,
metadata }) {
this.timestamp = timestamp;
this.apiHost = apiHost;
this.postData = postData;
this.writeKey = writeKey;
this.dataset = dataset;
this.sampleRate = sampleRate;
this.metadata = metadata;
}

toJSON() {
let fields = [];
if (this.timestamp) {
fields.push(`"time":${JSON.stringify(this.timestamp)}`);
}
if (this.sampleRate) {
fields.push(`"samplerate":${JSON.stringify(this.sampleRate)}`);
}
if (this.postData) {
fields.push(`"data":${this.postData}`);
}
return `{${fields.join(",")}}`;
}
}

/**
* @private
*/
export class Transmission {

constructor (options) {
this._responseCallback = emptyResponseCallback;
Expand Down Expand Up @@ -71,10 +157,10 @@ export default class Transmission {
}

_droppedCallback(ev, reason) {
this._responseCallback({
this._responseCallback([{
metadata: ev.metadata,
error: new Error(reason)
});
}]);
}

sendEvent (ev) {
Expand Down Expand Up @@ -113,6 +199,8 @@ export default class Transmission {

var batch = this._eventQueue.splice(0, this._batchSizeTrigger);

let batchAgg = new BatchEndpointAggregator(batch);

let finishBatch = () => {
this._batchCount--;

Expand All @@ -126,28 +214,61 @@ export default class Transmission {
}
};

eachPromise(batch, (ev) => {
var url = urljoin(ev.apiHost, "/1/events", ev.dataset);
let batches = Object.keys(batchAgg.batches).map((k) => batchAgg.batches[k]);
eachPromise(batches, (batch) => {
var url = urljoin(batch.apiHost, "/1/batch", batch.dataset);
var req = superagent.post(url);

let { encoded, numEncoded } = batchAgg.encodeBatchEvents(batch.events);
return new Promise( (resolve) => {

// if we failed to encode any of the events, no point in sending anything to honeycomb
if (numEncoded === 0) {
this._responseCallback(batch.events.map((ev) => ({
metadata: ev.metadata,
error: ev.encodeError
})));
resolve();
return;
}

var start = Date.now();
req
.set('X-Hny-Team', ev.writeKey)
.set('X-Hny-Samplerate', ev.sampleRate)
.set('X-Hny-Event-Time', ev.timestamp.toISOString())
.set('X-Hny-Team', batch.writeKey)
.set('User-Agent', userAgent)
.type("json")
.send(ev.postData)
.send(encoded)
.end((err, res) => {
// call a callback here (in our init options) so it can be used both in the node, browser, and worker contexts.
this._responseCallback({
status_code: res ? res.status : err.status,
duration: Date.now() - start,
metadata: ev.metadata,
error: err
});
let end = Date.now();

if (err) {
this._responseCallback(batch.events.map((ev) => ({
status_code: ev.encodeError ? undefined : err.status,
duration: end - start,
metadata: ev.metadata,
error: ev.encodeError || err
})));
} else {
let response = JSON.parse(res.text);
let respIdx = 0;
this._responseCallback(batch.events.map((ev) => {
if (ev.encodeError) {
return {
duration: end - start,
metadata: ev.metadata,
error: ev.encodeError
};
} else {
let res = response[respIdx++];
return {
status_code: res.status,
duration: end - start,
metadata: ev.metadata,
error: res.err
};
}
}));
}
// we resolve unconditionally to continue the iteration in eachSeries. errors will cause
// the event to be re-enqueued/dropped.
resolve();
Expand Down

0 comments on commit 07ab2ff

Please sign in to comment.