How to integrate Langchain4j LLM streaming in Langgraph4j¶
In [1]:
Copied!
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.0.1";
var langchain4jbeta = "1.0.1-beta6";
var langgraph4jVersion = "1.6-SNAPSHOT";
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.0.1";
var langchain4jbeta = "1.0.1-beta6";
var langgraph4jVersion = "1.6-SNAPSHOT";
Remove installed package from Jupiter cache
In [2]:
Copied!
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/
In [ ]:
Copied!
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
// %dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-ollama:\{langchain4jbeta}
%dependency /resolve
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
// %dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-ollama:\{langchain4jbeta}
%dependency /resolve
Initialize Logger
In [4]:
Copied!
try( var file = new java.io.FileInputStream("./logging.properties")) {
java.util.logging.LogManager.getLogManager().readConfiguration( file );
}
var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");
try( var file = new java.io.FileInputStream("./logging.properties")) {
java.util.logging.LogManager.getLogManager().readConfiguration( file );
}
var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");
How to use StreamingChatGenerator¶
In [5]:
Copied!
import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.output.Response;
import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import dev.langchain4j.model.ollama.OllamaStreamingChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
var generator = StreamingChatGenerator.<AgentState>builder()
.mapResult( r -> Map.of( "content", r.aiMessage().text() ) )
.build();
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("llama3.1:latest")
.build();
var request = ChatRequest.builder()
.messages( UserMessage.from("Tell me a joke") )
.build();
model.chat(request, generator.handler() );
for( var r : generator ) {
log.info( "{}", r);
}
log.info( "RESULT: {}", generator.resultValue().orElse(null) );
//Thread.sleep( 1000 );
import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.output.Response;
import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import dev.langchain4j.model.ollama.OllamaStreamingChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
var generator = StreamingChatGenerator.builder()
.mapResult( r -> Map.of( "content", r.aiMessage().text() ) )
.build();
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("llama3.1:latest")
.build();
var request = ChatRequest.builder()
.messages( UserMessage.from("Tell me a joke") )
.build();
model.chat(request, generator.handler() );
for( var r : generator ) {
log.info( "{}", r);
}
log.info( "RESULT: {}", generator.resultValue().orElse(null) );
//Thread.sleep( 1000 );
StreamingOutput{chunk=Here} StreamingOutput{chunk='s} StreamingOutput{chunk= one} StreamingOutput{chunk=: } StreamingOutput{chunk=What} StreamingOutput{chunk= do} StreamingOutput{chunk= you} StreamingOutput{chunk= call} StreamingOutput{chunk= a} StreamingOutput{chunk= fake} StreamingOutput{chunk= nood} StreamingOutput{chunk=le} StreamingOutput{chunk=? } StreamingOutput{chunk=An} StreamingOutput{chunk= imp} StreamingOutput{chunk=asta} StreamingOutput{chunk=! } StreamingOutput{chunk=Hope} StreamingOutput{chunk= that} StreamingOutput{chunk= made} StreamingOutput{chunk= you} StreamingOutput{chunk= laugh} StreamingOutput{chunk=!} StreamingOutput{chunk= Do} StreamingOutput{chunk= you} StreamingOutput{chunk= want} StreamingOutput{chunk= to} StreamingOutput{chunk= hear} StreamingOutput{chunk= another} StreamingOutput{chunk= one} StreamingOutput{chunk=?} RESULT: {content=Here's one: What do you call a fake noodle? An impasta! Hope that made you laugh! Do you want to hear another one?}
Use StreamingChatGenerator in Agent¶
Define Serializers¶
In [12]:
Copied!
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.agent.tool.ToolExecutionRequest;
import org.bsc.langgraph4j.langchain4j.serializer.std.LC4jStateSerializer;
import org.bsc.langgraph4j.state.AgentStateFactory;
import org.bsc.langgraph4j.prebuilt.MessagesState;
var stateSerializer = new LC4jStateSerializer<MessagesState<ChatMessage>>( MessagesState::new );
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.agent.tool.ToolExecutionRequest;
import org.bsc.langgraph4j.langchain4j.serializer.std.LC4jStateSerializer;
import org.bsc.langgraph4j.state.AgentStateFactory;
import org.bsc.langgraph4j.prebuilt.MessagesState;
var stateSerializer = new LC4jStateSerializer>( MessagesState::new );
Set up the tools¶
Using langchain4j, We will first define the tools we want to use. For this simple example, we will use create a placeholder search engine. However, it is really easy to create your own tools - see documentation here on how to do that.
In [13]:
Copied!
import dev.langchain4j.agent.tool.P;
import dev.langchain4j.agent.tool.Tool;
import java.util.Optional;
import static java.lang.String.format;
public class SearchTool {
@Tool("get weather realtime information.")
String execQuery(@P("cit.") String city) {
// This is a placeholder for the actual implementation
return "Cold, with a low of 13 degrees";
}
}
import dev.langchain4j.agent.tool.P;
import dev.langchain4j.agent.tool.Tool;
import java.util.Optional;
import static java.lang.String.format;
public class SearchTool {
@Tool("get weather realtime information.")
String execQuery(@P("cit.") String city) {
// This is a placeholder for the actual implementation
return "Cold, with a low of 13 degrees";
}
}
In [16]:
Copied!
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.action.EdgeAction;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.action.NodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import dev.langchain4j.service.tool.DefaultToolExecutor;
import org.bsc.langgraph4j.langchain4j.tool.LC4jToolService;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.model.chat.request.ChatRequestParameters;
// setup streaming model
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("llama3.1:latest")
.build();
// setup tools
var tools = LC4jToolService.builder()
.toolsFromObject( new SearchTool() )
.build();
NodeAction<MessagesState<ChatMessage>> callModel = state -> {
log.info( "CallModel" );
var generator = StreamingChatGenerator.<MessagesState<ChatMessage>>builder()
.mapResult( response -> Map.of("messages", response.aiMessage()) )
.startingNode("agent")
.startingState(state)
.build();
var parameters = ChatRequestParameters.builder()
.toolSpecifications(tools.toolSpecifications())
.build();
var request = ChatRequest.builder()
.parameters(parameters)
.messages( state.messages() )
.build();
model.chat( request, generator.handler() );
return Map.of("_streaming_messages", generator);
};
// Route Message
EdgeAction<MessagesState<ChatMessage>> routeMessage = state -> {
var lastMessage = state.lastMessage()
.orElseThrow(() -> (new IllegalStateException("last message not found!")));
log.info("routeMessage:\n{}", lastMessage );
if (lastMessage instanceof AiMessage message) {
// If tools should be called
if (message.hasToolExecutionRequests()) {
return "next";
}
}
// If no tools are called, we can finish (respond to the user)
return "exit";
};
// Invoke Tool
NodeAction<MessagesState<ChatMessage>> invokeTool = state -> {
var lastMessage = state.lastMessage()
.orElseThrow(() -> (new IllegalStateException("last message not found!")));
log.info("invokeTool:\n{}", lastMessage );
if (lastMessage instanceof AiMessage lastAiMessage) {
var result = tools.execute(lastAiMessage.toolExecutionRequests(), null)
.orElseThrow(() -> (new IllegalStateException("no tool found!")));
return Map.of("messages", result);
}
throw new IllegalStateException("invalid last message");
};
// Define Graph
var workflow = new MessagesStateGraph<ChatMessage>(stateSerializer)
.addNode("agent", node_async(callModel))
.addNode("tools", node_async(invokeTool))
.addEdge(START, "agent")
.addConditionalEdges("agent",
edge_async(routeMessage),
Map.of("next", "tools", "exit", END))
.addEdge("tools", "agent");
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.action.EdgeAction;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.action.NodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import dev.langchain4j.service.tool.DefaultToolExecutor;
import org.bsc.langgraph4j.langchain4j.tool.LC4jToolService;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.model.chat.request.ChatRequestParameters;
// setup streaming model
var model = OllamaStreamingChatModel.builder()
.baseUrl( "http://localhost:11434" )
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("llama3.1:latest")
.build();
// setup tools
var tools = LC4jToolService.builder()
.toolsFromObject( new SearchTool() )
.build();
NodeAction> callModel = state -> {
log.info( "CallModel" );
var generator = StreamingChatGenerator.>builder()
.mapResult( response -> Map.of("messages", response.aiMessage()) )
.startingNode("agent")
.startingState(state)
.build();
var parameters = ChatRequestParameters.builder()
.toolSpecifications(tools.toolSpecifications())
.build();
var request = ChatRequest.builder()
.parameters(parameters)
.messages( state.messages() )
.build();
model.chat( request, generator.handler() );
return Map.of("_streaming_messages", generator);
};
// Route Message
EdgeAction> routeMessage = state -> {
var lastMessage = state.lastMessage()
.orElseThrow(() -> (new IllegalStateException("last message not found!")));
log.info("routeMessage:\n{}", lastMessage );
if (lastMessage instanceof AiMessage message) {
// If tools should be called
if (message.hasToolExecutionRequests()) {
return "next";
}
}
// If no tools are called, we can finish (respond to the user)
return "exit";
};
// Invoke Tool
NodeAction> invokeTool = state -> {
var lastMessage = state.lastMessage()
.orElseThrow(() -> (new IllegalStateException("last message not found!")));
log.info("invokeTool:\n{}", lastMessage );
if (lastMessage instanceof AiMessage lastAiMessage) {
var result = tools.execute(lastAiMessage.toolExecutionRequests(), null)
.orElseThrow(() -> (new IllegalStateException("no tool found!")));
return Map.of("messages", result);
}
throw new IllegalStateException("invalid last message");
};
// Define Graph
var workflow = new MessagesStateGraph(stateSerializer)
.addNode("agent", node_async(callModel))
.addNode("tools", node_async(invokeTool))
.addEdge(START, "agent")
.addConditionalEdges("agent",
edge_async(routeMessage),
Map.of("next", "tools", "exit", END))
.addEdge("tools", "agent");
In [17]:
Copied!
import org.bsc.langgraph4j.streaming.StreamingOutput;
var app = workflow.compile();
for( var out : app.stream( Map.of( "messages", UserMessage.from( "what is the whether in Napoli?")) ) ) {
if( out instanceof StreamingOutput streaming ) {
log.info( "StreamingOutput{node={}, chunk={} }", streaming.node(), streaming.chunk() );
}
else {
log.info( "{}", out );
}
}
import org.bsc.langgraph4j.streaming.StreamingOutput;
var app = workflow.compile();
for( var out : app.stream( Map.of( "messages", UserMessage.from( "what is the whether in Napoli?")) ) ) {
if( out instanceof StreamingOutput streaming ) {
log.info( "StreamingOutput{node={}, chunk={} }", streaming.node(), streaming.chunk() );
}
else {
log.info( "{}", out );
}
}
START CallModel routeMessage: AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{ "city" : "Napoli" }" }] } ToolExecutionRequest id is null! NodeOutput{node=__START__, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether in Napoli?" }] }]}} ToolExecutionRequest id is null! invokeTool: AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{ "city" : "Napoli" }" }] } execute: execQuery ToolExecutionRequest id is null! ToolExecutionResultMessage id is null! NodeOutput{node=agent, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether in Napoli?" }] }, AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{ "city" : "Napoli" }" }] }]}} ToolExecutionRequest id is null! ToolExecutionResultMessage id is null! CallModel NodeOutput{node=tools, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether in Napoli?" }] }, AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{ "city" : "Napoli" }" }] }, ToolExecutionResultMessage { id = null toolName = "execQuery" text = "Cold, with a low of 13 degrees" }]}} StreamingOutput{node=agent, chunk=The } StreamingOutput{node=agent, chunk= current } StreamingOutput{node=agent, chunk= weather } StreamingOutput{node=agent, chunk= in } StreamingOutput{node=agent, chunk= Napoli } StreamingOutput{node=agent, chunk= is } StreamingOutput{node=agent, chunk= cold } StreamingOutput{node=agent, chunk=, } StreamingOutput{node=agent, chunk= with } StreamingOutput{node=agent, chunk= a } StreamingOutput{node=agent, chunk= low } StreamingOutput{node=agent, chunk= temperature } StreamingOutput{node=agent, chunk= of } StreamingOutput{node=agent, chunk= } StreamingOutput{node=agent, chunk=13 } StreamingOutput{node=agent, chunk= degrees } routeMessage: AiMessage { text = "The current weather in Napoli is cold, with a low temperature of 13 degrees." toolExecutionRequests = [] } ToolExecutionRequest id is null! ToolExecutionResultMessage id is null! StreamingOutput{node=agent, chunk=. } ToolExecutionRequest id is null! ToolExecutionResultMessage id is null! NodeOutput{node=agent, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether in Napoli?" }] }, AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{ "city" : "Napoli" }" }] }, ToolExecutionResultMessage { id = null toolName = "execQuery" text = "Cold, with a low of 13 degrees" }, AiMessage { text = "The current weather in Napoli is cold, with a low temperature of 13 degrees." toolExecutionRequests = [] }]}} NodeOutput{node=__END__, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether in Napoli?" }] }, AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = null, name = "execQuery", arguments = "{ "city" : "Napoli" }" }] }, ToolExecutionResultMessage { id = null toolName = "execQuery" text = "Cold, with a low of 13 degrees" }, AiMessage { text = "The current weather in Napoli is cold, with a low temperature of 13 degrees." toolExecutionRequests = [] }]}}