Skip to content

Commit c7bf217

Browse files
authoredSep 17, 2024··
fix(core): Fix nested stream events behavior (#6836)
1 parent ab2ba50 commit c7bf217

File tree

3 files changed

+73
-4
lines changed

3 files changed

+73
-4
lines changed
 

‎langchain-core/src/runnables/base.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ export abstract class Runnable<
695695
config.callbacks = callbacks.concat([logStreamCallbackHandler]);
696696
} else {
697697
const copiedCallbacks = callbacks.copy();
698-
copiedCallbacks.inheritableHandlers.push(logStreamCallbackHandler);
698+
copiedCallbacks.addHandler(logStreamCallbackHandler, true);
699699
// eslint-disable-next-line no-param-reassign
700700
config.callbacks = copiedCallbacks;
701701
}
@@ -896,7 +896,7 @@ export abstract class Runnable<
896896
config.callbacks = callbacks.concat(eventStreamer);
897897
} else {
898898
const copiedCallbacks = callbacks.copy();
899-
copiedCallbacks.inheritableHandlers.push(eventStreamer);
899+
copiedCallbacks.addHandler(eventStreamer, true);
900900
// eslint-disable-next-line no-param-reassign
901901
config.callbacks = copiedCallbacks;
902902
}

‎langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts

+70-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import { test, expect, afterEach } from "@jest/globals";
66
import { z } from "zod";
7+
import { AsyncLocalStorage } from "node:async_hooks";
78
import {
89
RunnableLambda,
910
RunnableMap,
@@ -28,8 +29,9 @@ import { DynamicStructuredTool, DynamicTool, tool } from "../../tools/index.js";
2829
import { Document } from "../../documents/document.js";
2930
import { PromptTemplate } from "../../prompts/prompt.js";
3031
import { GenerationChunk } from "../../outputs.js";
31-
// Import from web to avoid side-effects from AsyncLocalStorage
32+
// Import from web to avoid top-level side-effects from AsyncLocalStorage
3233
import { dispatchCustomEvent } from "../../callbacks/dispatch/web.js";
34+
import { AsyncLocalStorageProviderSingleton } from "../../singletons/index.js";
3335

3436
function reverse(s: string) {
3537
// Reverse a string.
@@ -138,6 +140,73 @@ test("Runnable streamEvents method on a chat model", async () => {
138140
]);
139141
});
140142

143+
test("Runnable streamEvents call nested in another runnable + passed callbacks should still work", async () => {
144+
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
145+
new AsyncLocalStorage()
146+
);
147+
148+
const model = new FakeListChatModel({
149+
responses: ["abc"],
150+
});
151+
152+
const events: any[] = [];
153+
const container = RunnableLambda.from(async (_) => {
154+
const eventStream = model.streamEvents("hello", { version: "v2" });
155+
for await (const event of eventStream) {
156+
events.push(event);
157+
}
158+
return events;
159+
});
160+
161+
await container.invoke({}, { callbacks: [{ handleLLMStart: () => {} }] });
162+
163+
// used here to avoid casting every ID
164+
const anyString = expect.any(String) as unknown as string;
165+
166+
expect(events).toMatchObject([
167+
{
168+
data: { input: "hello" },
169+
event: "on_chat_model_start",
170+
name: "FakeListChatModel",
171+
metadata: expect.any(Object),
172+
run_id: expect.any(String),
173+
tags: [],
174+
},
175+
{
176+
data: { chunk: new AIMessageChunk({ id: anyString, content: "a" }) },
177+
event: "on_chat_model_stream",
178+
name: "FakeListChatModel",
179+
metadata: expect.any(Object),
180+
run_id: expect.any(String),
181+
tags: [],
182+
},
183+
{
184+
data: { chunk: new AIMessageChunk({ id: anyString, content: "b" }) },
185+
event: "on_chat_model_stream",
186+
name: "FakeListChatModel",
187+
metadata: expect.any(Object),
188+
run_id: expect.any(String),
189+
tags: [],
190+
},
191+
{
192+
data: { chunk: new AIMessageChunk({ id: anyString, content: "c" }) },
193+
event: "on_chat_model_stream",
194+
name: "FakeListChatModel",
195+
metadata: expect.any(Object),
196+
run_id: expect.any(String),
197+
tags: [],
198+
},
199+
{
200+
data: { output: new AIMessageChunk({ id: anyString, content: "abc" }) },
201+
event: "on_chat_model_end",
202+
name: "FakeListChatModel",
203+
metadata: expect.any(Object),
204+
run_id: expect.any(String),
205+
tags: [],
206+
},
207+
]);
208+
});
209+
141210
test("Runnable streamEvents method with three runnables", async () => {
142211
const r = RunnableLambda.from(reverse);
143212

‎langchain-core/src/tracers/event_stream.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ export class EventStreamCallbackHandler extends BaseTracer {
364364
throw new Error(`onLLMNewToken: Run ID ${run.id} not found in run map.`);
365365
}
366366
// Top-level streaming events are covered by tapOutputIterable
367-
if (run.parent_run_id === undefined) {
367+
if (this.runInfoMap.size === 1) {
368368
return;
369369
}
370370
if (runInfo.runType === "chat_model") {

0 commit comments

Comments
 (0)
Please sign in to comment.