Wait for User Input¶
In [1]:
Copied!
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.0.1";
var langchain4jbeta = "1.0.1-beta6";
var langgraph4jVersion = "1.6-SNAPSHOT";
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.0.1";
var langchain4jbeta = "1.0.1-beta6";
var langgraph4jVersion = "1.6-SNAPSHOT";
In [2]:
Copied!
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j
In [ ]:
Copied!
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-postgres-saver:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
// %dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-ollama:\{langchain4jbeta}
%dependency /add net.sourceforge.plantuml:plantuml-mit:1.2024.6
%dependency /list-dependencies
%dependency /resolve
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-postgres-saver:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
// %dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-ollama:\{langchain4jbeta}
%dependency /add net.sourceforge.plantuml:plantuml-mit:1.2024.6
%dependency /list-dependencies
%dependency /resolve
utility to render graph respresentation in PlantUML
In [4]:
Copied!
import net.sourceforge.plantuml.SourceStringReader;
import net.sourceforge.plantuml.FileFormatOption;
import net.sourceforge.plantuml.FileFormat;
import org.bsc.langgraph4j.GraphRepresentation;
void displayDiagram( GraphRepresentation representation ) throws IOException {
var reader = new SourceStringReader(representation.getContent());
try(var imageOutStream = new java.io.ByteArrayOutputStream()) {
var description = reader.outputImage( imageOutStream, 0, new FileFormatOption(FileFormat.PNG));
var imageInStream = new java.io.ByteArrayInputStream( imageOutStream.toByteArray() );
var image = javax.imageio.ImageIO.read( imageInStream );
display( image );
}
}
import net.sourceforge.plantuml.SourceStringReader;
import net.sourceforge.plantuml.FileFormatOption;
import net.sourceforge.plantuml.FileFormat;
import org.bsc.langgraph4j.GraphRepresentation;
void displayDiagram( GraphRepresentation representation ) throws IOException {
var reader = new SourceStringReader(representation.getContent());
try(var imageOutStream = new java.io.ByteArrayOutputStream()) {
var description = reader.outputImage( imageOutStream, 0, new FileFormatOption(FileFormat.PNG));
var imageInStream = new java.io.ByteArrayInputStream( imageOutStream.toByteArray() );
var image = javax.imageio.ImageIO.read( imageInStream );
display( image );
}
}
Define graph with interruption¶
In [5]:
Copied!
import org.bsc.langgraph4j.*;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.state.Channel;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import org.bsc.langgraph4j.action.AsyncEdgeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.checkpoint.PostgresSaver;
import org.bsc.langgraph4j.CompileConfig;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
public class State extends MessagesState<String> {
public State(Map<String, Object> initData) {
super( initData );
}
public Optional<String> humanFeedback() {
return value("human_feedback");
}
}
AsyncNodeAction<State> step1 = node_async( state -> {
return Map.of( "messages", "Step 1" );
});
AsyncNodeAction<State> humanFeedback = node_async( state -> {
return Map.of();
});
AsyncNodeAction<State> step3 = node_async( state -> {
return Map.of( "messages", "Step 3" );
});
AsyncEdgeAction<State> evalHumanFeedback = edge_async( state -> {
var feedback = state.humanFeedback().orElseThrow();
return ( feedback.equals("next") || feedback.equals("back") ) ? feedback : "unknown";
});
var builder = new StateGraph<>(State.SCHEMA, State::new)
.addNode("step_1", step1)
.addNode("human_feedback", humanFeedback)
.addNode("step_3", step3)
.addEdge(START, "step_1")
.addEdge("step_1", "human_feedback")
.addConditionalEdges("human_feedback", evalHumanFeedback,
Map.of( "back", "step_1", "next", "step_3", "unknown", "human_feedback" ))
.addEdge("step_3", END)
;
// Set up memory
// var saver = new MemorySaver();
// Set up Postgres saver
var saver = PostgresSaver.builder()
.host("localhost")
.port(5432)
.user("admin")
.password("bsorrentino")
.database("lg4j-store")
.stateSerializer( builder.getStateSerializer() )
.dropTablesFirst(true)
.build();
// Add
var compileConfig = CompileConfig.builder()
.checkpointSaver(saver)
.interruptBefore("human_feedback")
.releaseThread(true)
.build();
var graph = builder.compile(compileConfig);
displayDiagram( graph.getGraph(GraphRepresentation.Type.PLANTUML, "Human in the Loop", false) );
import org.bsc.langgraph4j.*;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.state.Channel;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import org.bsc.langgraph4j.action.AsyncEdgeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.checkpoint.PostgresSaver;
import org.bsc.langgraph4j.CompileConfig;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
public class State extends MessagesState {
public State(Map initData) {
super( initData );
}
public Optional humanFeedback() {
return value("human_feedback");
}
}
AsyncNodeAction step1 = node_async( state -> {
return Map.of( "messages", "Step 1" );
});
AsyncNodeAction humanFeedback = node_async( state -> {
return Map.of();
});
AsyncNodeAction step3 = node_async( state -> {
return Map.of( "messages", "Step 3" );
});
AsyncEdgeAction evalHumanFeedback = edge_async( state -> {
var feedback = state.humanFeedback().orElseThrow();
return ( feedback.equals("next") || feedback.equals("back") ) ? feedback : "unknown";
});
var builder = new StateGraph<>(State.SCHEMA, State::new)
.addNode("step_1", step1)
.addNode("human_feedback", humanFeedback)
.addNode("step_3", step3)
.addEdge(START, "step_1")
.addEdge("step_1", "human_feedback")
.addConditionalEdges("human_feedback", evalHumanFeedback,
Map.of( "back", "step_1", "next", "step_3", "unknown", "human_feedback" ))
.addEdge("step_3", END)
;
// Set up memory
// var saver = new MemorySaver();
// Set up Postgres saver
var saver = PostgresSaver.builder()
.host("localhost")
.port(5432)
.user("admin")
.password("bsorrentino")
.database("lg4j-store")
.stateSerializer( builder.getStateSerializer() )
.dropTablesFirst(true)
.build();
// Add
var compileConfig = CompileConfig.builder()
.checkpointSaver(saver)
.interruptBefore("human_feedback")
.releaseThread(true)
.build();
var graph = builder.compile(compileConfig);
displayDiagram( graph.getGraph(GraphRepresentation.Type.PLANTUML, "Human in the Loop", false) );
SLF4J: No SLF4J providers were found. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
Start graph until interruption¶
In [6]:
Copied!
// Input
Map<String,Object> initialInput = Map.of("messages", "Step 0");
// Thread
var invokeConfig = RunnableConfig.builder()
.threadId("Thread1")
.build();
// Run the graph until the first interruption
for (var event : graph.stream(initialInput, invokeConfig)) {
System.out.println(event);
}
// Input
Map initialInput = Map.of("messages", "Step 0");
// Thread
var invokeConfig = RunnableConfig.builder()
.threadId("Thread1")
.build();
// Run the graph until the first interruption
for (var event : graph.stream(initialInput, invokeConfig)) {
System.out.println(event);
}
NodeOutput{node=__START__, state={ messages=[ Step 0 ] }} NodeOutput{node=step_1, state={ messages=[ Step 0 Step 1 ] }}
In [7]:
Copied!
// We can check the state
System.out.printf("--State before update--\n%s\n", graph.getState(invokeConfig));
// Simulate user input
var userInput = "back"; // back means we want to go back to the previous node
System.out.printf("\n--User Input--\nTell me how you want to update the state: '%s'\n\n", userInput);
// We now update the state as if we are the human_feedback node
var updateConfig = graph.updateState(invokeConfig, Map.of("human_feedback", userInput), null);
// We can check the state
System.out.printf("--State after update--\n%s\n", graph.getState(invokeConfig) );
// We can check the next node, showing that it is node 3 (which follows human_feedback)
System.out.printf("\ngetNext()\n\twith invokeConfig:[%s]\n\twith updateConfig:[%s]\n",
graph.getState(invokeConfig).getNext(),
graph.getState(updateConfig).getNext());
;
// We can check the state
System.out.printf("--State before update--\n%s\n", graph.getState(invokeConfig));
// Simulate user input
var userInput = "back"; // back means we want to go back to the previous node
System.out.printf("\n--User Input--\nTell me how you want to update the state: '%s'\n\n", userInput);
// We now update the state as if we are the human_feedback node
var updateConfig = graph.updateState(invokeConfig, Map.of("human_feedback", userInput), null);
// We can check the state
System.out.printf("--State after update--\n%s\n", graph.getState(invokeConfig) );
// We can check the next node, showing that it is node 3 (which follows human_feedback)
System.out.printf("\ngetNext()\n\twith invokeConfig:[%s]\n\twith updateConfig:[%s]\n",
graph.getState(invokeConfig).getNext(),
graph.getState(updateConfig).getNext());
;
--State before update-- StateSnapshot{node=step_1, state={ messages=[ Step 0 Step 1 ] }, config=RunnableConfig{ threadId=Thread1, checkPointId=6b4937e2-68c3-43f0-ae9b-96ef4c05048e, nextNode=human_feedback, streamMode=VALUES }} --User Input-- Tell me how you want to update the state: 'back' --State after update-- StateSnapshot{node=step_1, state={ messages=[ Step 0 Step 1 ] human_feedback=back }, config=RunnableConfig{ threadId=Thread1, checkPointId=a684697c-57db-40fc-9cd1-8231a39bbab9, nextNode=human_feedback, streamMode=VALUES }} getNext() with invokeConfig:[human_feedback] with updateConfig:[human_feedback]
Continue graph execution after interruption¶
In [8]:
Copied!
import org.bsc.langgraph4j.GraphInput;
// Continue the graph execution
for (var event : graph.stream(GraphInput.resume(), updateConfig)) {
System.out.println(event);
}
import org.bsc.langgraph4j.GraphInput;
// Continue the graph execution
for (var event : graph.stream(GraphInput.resume(), updateConfig)) {
System.out.println(event);
}
NodeOutput{node=human_feedback, state={ messages=[ Step 0 Step 1 ] human_feedback=back }} NodeOutput{node=step_1, state={ messages=[ Step 0 Step 1 ] human_feedback=back }}
In [9]:
Copied!
var userInput = "next"; // 'next' means we want to go to the next node
System.out.printf("\n--User Input--\nTell me how you want to update the state: '%s'\n", userInput);
// We now update the state as if we are the human_feedback node
var updateConfig = graph.updateState(invokeConfig, Map.of("human_feedback", userInput), null);
System.out.printf("\ngetNext()\n\twith invokeConfig:[%s]\n\twith updateConfig:[%s]\n",
graph.getState(invokeConfig).getNext(),
graph.getState(updateConfig).getNext());
;
var userInput = "next"; // 'next' means we want to go to the next node
System.out.printf("\n--User Input--\nTell me how you want to update the state: '%s'\n", userInput);
// We now update the state as if we are the human_feedback node
var updateConfig = graph.updateState(invokeConfig, Map.of("human_feedback", userInput), null);
System.out.printf("\ngetNext()\n\twith invokeConfig:[%s]\n\twith updateConfig:[%s]\n",
graph.getState(invokeConfig).getNext(),
graph.getState(updateConfig).getNext());
;
--User Input-- Tell me how you want to update the state: 'next' getNext() with invokeConfig:[human_feedback] with updateConfig:[human_feedback]
Continue graph execution after the 2nd interruption¶
In [10]:
Copied!
// Continue the graph execution
for (var event : graph.stream(GraphInput.resume(), updateConfig)) {
System.out.println(event);
}
// Continue the graph execution
for (var event : graph.stream(GraphInput.resume(), updateConfig)) {
System.out.println(event);
}
NodeOutput{node=human_feedback, state={ messages=[ Step 0 Step 1 ] human_feedback=next }} NodeOutput{node=step_3, state={ messages=[ Step 0 Step 1 Step 3 ] human_feedback=next }} NodeOutput{node=__END__, state={ messages=[ Step 0 Step 1 Step 3 ] human_feedback=next }}