How to integrate Langchain4j LLM streaming in Langgraph4j¶
In [1]:
Copied!
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.9.1";
var langchain4jbeta = "1.9.1-beta17";
var langgraph4jVersion = "1.7-SNAPSHOT";
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.9.1";
var langchain4jbeta = "1.9.1-beta17";
var langgraph4jVersion = "1.7-SNAPSHOT";
Remove installed package from Jupiter cache
In [2]:
Copied!
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/
In [3]:
Copied!
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-ollama:\{langchain4jVersion}
%dependency /resolve
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-ollama:\{langchain4jVersion}
%dependency /resolve
Repository local url: file:///Users/bsorrentino/.m2/repository/ added. Adding dependency org.slf4j:slf4j-jdk14:2.0.9 Adding dependency org.bsc.langgraph4j:langgraph4j-core:1.7-SNAPSHOT Adding dependency org.bsc.langgraph4j:langgraph4j-langchain4j:1.7-SNAPSHOT Adding dependency dev.langchain4j:langchain4j:1.9.1 Adding dependency dev.langchain4j:langchain4j-ollama:1.9.1 Solving dependencies Resolved artifacts count: 15 Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/slf4j/slf4j-jdk14/2.0.9/slf4j-jdk14-2.0.9.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/slf4j/slf4j-api/2.0.9/slf4j-api-2.0.9.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j/langgraph4j-core/1.7-SNAPSHOT/langgraph4j-core-1.7-SNAPSHOT.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/async/async-generator/4.0.0-beta2/async-generator-4.0.0-beta2.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j/langgraph4j-langchain4j/1.7-SNAPSHOT/langgraph4j-langchain4j-1.7-SNAPSHOT.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/dev/langchain4j/langchain4j/1.9.1/langchain4j-1.9.1.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/dev/langchain4j/langchain4j-core/1.9.1/langchain4j-core-1.9.1.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/jspecify/jspecify/1.0.0/jspecify-1.0.0.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/com/fasterxml/jackson/core/jackson-annotations/2.20/jackson-annotations-2.20.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/com/fasterxml/jackson/core/jackson-core/2.20.1/jackson-core-2.20.1.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/com/fasterxml/jackson/core/jackson-databind/2.20.1/jackson-databind-2.20.1.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/apache/opennlp/opennlp-tools/2.5.4/opennlp-tools-2.5.4.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/dev/langchain4j/langchain4j-ollama/1.9.1/langchain4j-ollama-1.9.1.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/dev/langchain4j/langchain4j-http-client/1.9.1/langchain4j-http-client-1.9.1.jar Add to classpath: /Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/dev/langchain4j/langchain4j-http-client-jdk/1.9.1/langchain4j-http-client-jdk-1.9.1.jar
Initialize Logger
In [4]:
Copied!
try( var file = new java.io.FileInputStream("./logging.properties")) {
java.util.logging.LogManager.getLogManager().readConfiguration( file );
}
var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");
try( var file = new java.io.FileInputStream("./logging.properties")) {
java.util.logging.LogManager.getLogManager().readConfiguration( file );
}
var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");
How to use StreamingChatGenerator¶
In [6]:
Copied!
import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.output.Response;
import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import dev.langchain4j.model.ollama.OllamaStreamingChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
var generator = StreamingChatGenerator.<AgentState>builder()
.mapResult( r -> Map.of( "content", r.aiMessage().text() ) )
.build();
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("qwen2.5:7b")
.build();
var request = ChatRequest.builder()
.messages( UserMessage.from("Tell me a joke") )
.build();
model.chat(request, generator.handler() );
for( var r : generator ) {
log.info( "{}", r);
}
log.info( "RESULT: {}", generator.resultValue().orElse(null) );
//Thread.sleep( 1000 );
import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.output.Response;
import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import dev.langchain4j.model.ollama.OllamaStreamingChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
var generator = StreamingChatGenerator.builder()
.mapResult( r -> Map.of( "content", r.aiMessage().text() ) )
.build();
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("qwen2.5:7b")
.build();
var request = ChatRequest.builder()
.messages( UserMessage.from("Tell me a joke") )
.build();
model.chat(request, generator.handler() );
for( var r : generator ) {
log.info( "{}", r);
}
log.info( "RESULT: {}", generator.resultValue().orElse(null) );
//Thread.sleep( 1000 );
StreamingOutput{chunk=Sure}
StreamingOutput{chunk=!}
StreamingOutput{chunk= Here}
StreamingOutput{chunk='s}
StreamingOutput{chunk= a}
StreamingOutput{chunk= light}
StreamingOutput{chunk=-hearted}
StreamingOutput{chunk= joke}
StreamingOutput{chunk= for}
StreamingOutput{chunk= you}
StreamingOutput{chunk=:
}
StreamingOutput{chunk=Why}
StreamingOutput{chunk= don}
StreamingOutput{chunk='t}
StreamingOutput{chunk= scientists}
StreamingOutput{chunk= trust}
StreamingOutput{chunk= atoms}
StreamingOutput{chunk=?
}
StreamingOutput{chunk=Because}
StreamingOutput{chunk= they}
StreamingOutput{chunk= make}
StreamingOutput{chunk= up}
StreamingOutput{chunk= everything}
StreamingOutput{chunk=!}
RESULT: {content=Sure! Here's a light-hearted joke for you:
Why don't scientists trust atoms?
Because they make up everything!}
Use StreamingChatGenerator in Agent¶
Define Serializers¶
In [7]:
Copied!
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.agent.tool.ToolExecutionRequest;
import org.bsc.langgraph4j.langchain4j.serializer.std.LC4jStateSerializer;
import org.bsc.langgraph4j.state.AgentStateFactory;
import org.bsc.langgraph4j.prebuilt.MessagesState;
var stateSerializer = new LC4jStateSerializer<MessagesState<ChatMessage>>( MessagesState::new );
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.agent.tool.ToolExecutionRequest;
import org.bsc.langgraph4j.langchain4j.serializer.std.LC4jStateSerializer;
import org.bsc.langgraph4j.state.AgentStateFactory;
import org.bsc.langgraph4j.prebuilt.MessagesState;
var stateSerializer = new LC4jStateSerializer>( MessagesState::new );
Set up the tools¶
Using langchain4j, We will first define the tools we want to use. For this simple example, we will use create a placeholder search engine. However, it is really easy to create your own tools - see documentation here on how to do that.
In [9]:
Copied!
import dev.langchain4j.agent.tool.P;
import dev.langchain4j.agent.tool.Tool;
import java.util.Optional;
import static java.lang.String.format;
public class SearchTool {
@Tool("get weather realtime information.")
String execQuery(@P("cit.") String city) {
// This is a placeholder for the actual implementation
return "Cold, with a low of 13 degrees";
}
}
import dev.langchain4j.agent.tool.P;
import dev.langchain4j.agent.tool.Tool;
import java.util.Optional;
import static java.lang.String.format;
public class SearchTool {
@Tool("get weather realtime information.")
String execQuery(@P("cit.") String city) {
// This is a placeholder for the actual implementation
return "Cold, with a low of 13 degrees";
}
}
In [17]:
Copied!
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.action.EdgeAction;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.action.NodeAction;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import dev.langchain4j.service.tool.DefaultToolExecutor;
import org.bsc.langgraph4j.langchain4j.tool.LC4jToolService;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.model.chat.request.ChatRequestParameters;
import dev.langchain4j.invocation.InvocationContext;
// setup streaming model
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("qwen2.5:7b")
.build();
// setup tools
var tools = LC4jToolService.builder()
.toolsFromObject( new SearchTool() )
.build();
NodeAction<MessagesState<ChatMessage>> callModel = state -> {
log.info( "CallModel" );
var generator = StreamingChatGenerator.<MessagesState<ChatMessage>>builder()
.mapResult( response -> Map.of("messages", response.aiMessage()) )
.startingNode("agent")
.startingState(state)
.build();
var parameters = ChatRequestParameters.builder()
.toolSpecifications(tools.toolSpecifications())
.build();
var request = ChatRequest.builder()
.parameters(parameters)
.messages( state.messages() )
.build();
model.chat( request, generator.handler() );
return Map.of("_streaming_messages", generator);
};
// Route Message
EdgeAction<MessagesState<ChatMessage>> routeMessage = state -> {
var lastMessage = state.lastMessage()
.orElseThrow(() -> (new IllegalStateException("last message not found!")));
log.info("routeMessage:\n{}", lastMessage );
if (lastMessage instanceof AiMessage message) {
// If tools should be called
if (message.hasToolExecutionRequests()) {
return "next";
}
}
// If no tools are called, we can finish (respond to the user)
return "exit";
};
// Invoke Tool
AsyncNodeAction<MessagesState<ChatMessage>> invokeTool = state -> {
var lastMessage = state.lastMessage();
if( lastMessage.isEmpty() ) {
return CompletableFuture.failedFuture( new IllegalStateException("last message not found!") );
}
log.info("invokeTool:\n{}", lastMessage );
if (lastMessage.get() instanceof AiMessage lastAiMessage) {
return tools.execute(lastAiMessage.toolExecutionRequests(), InvocationContext.builder().build(), "messages")
.thenApply( command -> command.update() );
}
return CompletableFuture.failedFuture( new IllegalStateException("invalid last message") );
};
// Define Graph
var workflow = new MessagesStateGraph<ChatMessage>(stateSerializer)
.addNode("agent", node_async(callModel))
.addNode("tools", invokeTool)
.addEdge(START, "agent")
.addConditionalEdges("agent",
edge_async(routeMessage),
Map.of("next", "tools", "exit", END))
.addEdge("tools", "agent");
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.action.EdgeAction;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.action.NodeAction;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import dev.langchain4j.service.tool.DefaultToolExecutor;
import org.bsc.langgraph4j.langchain4j.tool.LC4jToolService;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.model.chat.request.ChatRequestParameters;
import dev.langchain4j.invocation.InvocationContext;
// setup streaming model
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("qwen2.5:7b")
.build();
// setup tools
var tools = LC4jToolService.builder()
.toolsFromObject( new SearchTool() )
.build();
NodeAction> callModel = state -> {
log.info( "CallModel" );
var generator = StreamingChatGenerator.>builder()
.mapResult( response -> Map.of("messages", response.aiMessage()) )
.startingNode("agent")
.startingState(state)
.build();
var parameters = ChatRequestParameters.builder()
.toolSpecifications(tools.toolSpecifications())
.build();
var request = ChatRequest.builder()
.parameters(parameters)
.messages( state.messages() )
.build();
model.chat( request, generator.handler() );
return Map.of("_streaming_messages", generator);
};
// Route Message
EdgeAction> routeMessage = state -> {
var lastMessage = state.lastMessage()
.orElseThrow(() -> (new IllegalStateException("last message not found!")));
log.info("routeMessage:\n{}", lastMessage );
if (lastMessage instanceof AiMessage message) {
// If tools should be called
if (message.hasToolExecutionRequests()) {
return "next";
}
}
// If no tools are called, we can finish (respond to the user)
return "exit";
};
// Invoke Tool
AsyncNodeAction> invokeTool = state -> {
var lastMessage = state.lastMessage();
if( lastMessage.isEmpty() ) {
return CompletableFuture.failedFuture( new IllegalStateException("last message not found!") );
}
log.info("invokeTool:\n{}", lastMessage );
if (lastMessage.get() instanceof AiMessage lastAiMessage) {
return tools.execute(lastAiMessage.toolExecutionRequests(), InvocationContext.builder().build(), "messages")
.thenApply( command -> command.update() );
}
return CompletableFuture.failedFuture( new IllegalStateException("invalid last message") );
};
// Define Graph
var workflow = new MessagesStateGraph(stateSerializer)
.addNode("agent", node_async(callModel))
.addNode("tools", invokeTool)
.addEdge(START, "agent")
.addConditionalEdges("agent",
edge_async(routeMessage),
Map.of("next", "tools", "exit", END))
.addEdge("tools", "agent");
In [18]:
Copied!
import org.bsc.langgraph4j.streaming.StreamingOutput;
var app = workflow.compile();
for( var out : app.stream( Map.of( "messages", UserMessage.from( "what is the whether in Napoli?")) ) ) {
if( out instanceof StreamingOutput streaming ) {
log.info( "StreamingOutput{node={}, chunk={} }", streaming.node(), streaming.chunk() );
}
else {
log.info( "{}", out );
}
}
import org.bsc.langgraph4j.streaming.StreamingOutput;
var app = workflow.compile();
for( var out : app.stream( Map.of( "messages", UserMessage.from( "what is the whether in Napoli?")) ) ) {
if( out instanceof StreamingOutput streaming ) {
log.info( "StreamingOutput{node={}, chunk={} }", streaming.node(), streaming.chunk() );
}
else {
log.info( "{}", out );
}
}
START
CallModel
routeMessage:
AiMessage { text = null, thinking = null, toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{"city":"Napoli"}" }], attributes = {} }
ToolExecutionRequest id is null!
NodeOutput{node=__START__, state={
messages=[
UserMessage { name = null, contents = [TextContent { text = "what is the whether in Napoli?" }], attributes = {} }
]
}}
ToolExecutionRequest id is null!
invokeTool:
Optional[AiMessage { text = null, thinking = null, toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{"city":"Napoli"}" }], attributes = {} }]
execute: [execQuery]
ToolExecutionRequest id is null!
ToolExecutionResultMessage id is null!
NodeOutput{node=agent, state={
messages=[
UserMessage { name = null, contents = [TextContent { text = "what is the whether in Napoli?" }], attributes = {} }
AiMessage { text = null, thinking = null, toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{"city":"Napoli"}" }], attributes = {} }
]
}}
ToolExecutionRequest id is null!
ToolExecutionResultMessage id is null!
CallModel
NodeOutput{node=tools, state={
messages=[
UserMessage { name = null, contents = [TextContent { text = "what is the whether in Napoli?" }], attributes = {} }
AiMessage { text = null, thinking = null, toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{"city":"Napoli"}" }], attributes = {} }
ToolExecutionResultMessage { id = null toolName = "execQuery" text = "Cold, with a low of 13 degrees" }
]
}}
StreamingOutput{node=agent, chunk=The }
StreamingOutput{node=agent, chunk= current }
StreamingOutput{node=agent, chunk= weather }
StreamingOutput{node=agent, chunk= in }
StreamingOutput{node=agent, chunk= Napoli }
StreamingOutput{node=agent, chunk= is }
StreamingOutput{node=agent, chunk= cold }
StreamingOutput{node=agent, chunk=, }
StreamingOutput{node=agent, chunk= with }
StreamingOutput{node=agent, chunk= temperatures }
StreamingOutput{node=agent, chunk= as }
StreamingOutput{node=agent, chunk= low }
StreamingOutput{node=agent, chunk= as }
StreamingOutput{node=agent, chunk= }
StreamingOutput{node=agent, chunk=1 }
StreamingOutput{node=agent, chunk=3 }
StreamingOutput{node=agent, chunk= degrees }
routeMessage:
AiMessage { text = "The current weather in Napoli is cold, with temperatures as low as 13 degrees.", thinking = null, toolExecutionRequests = [], attributes = {} }
ToolExecutionRequest id is null!
ToolExecutionResultMessage id is null!
StreamingOutput{node=agent, chunk=. }
ToolExecutionRequest id is null!
ToolExecutionResultMessage id is null!
NodeOutput{node=agent, state={
messages=[
UserMessage { name = null, contents = [TextContent { text = "what is the whether in Napoli?" }], attributes = {} }
AiMessage { text = null, thinking = null, toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{"city":"Napoli"}" }], attributes = {} }
ToolExecutionResultMessage { id = null toolName = "execQuery" text = "Cold, with a low of 13 degrees" }
AiMessage { text = "The current weather in Napoli is cold, with temperatures as low as 13 degrees.", thinking = null, toolExecutionRequests = [], attributes = {} }
]
}}
NodeOutput{node=__END__, state={
messages=[
UserMessage { name = null, contents = [TextContent { text = "what is the whether in Napoli?" }], attributes = {} }
AiMessage { text = null, thinking = null, toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{"city":"Napoli"}" }], attributes = {} }
ToolExecutionResultMessage { id = null toolName = "execQuery" text = "Cold, with a low of 13 degrees" }
AiMessage { text = "The current weather in Napoli is cold, with temperatures as low as 13 degrees.", thinking = null, toolExecutionRequests = [], attributes = {} }
]
}}