How to create branches for parallel node execution¶
LangGraph4j lets you run nodes in parallel to speed up your total graph execution.
‼️ Currently there are some overall limitations on parallel node implementation execution:
- Only the Fork-Join model is supported
┌─┐
│A│
└─┘
|
┌-----------┐
| | |
┌──┐ ┌──┐ ┌──┐
│A1│ │A2│ │A3│
└──┘ └──┘ └──┘
| | |
└-----------┘
|
┌─┐
│B│
└─┘
- Only one paraller step is allowed
┌─┐
│A│
└─┘
|
┌-----------┐
| | |
┌──┐ ┌──┐ ┌──┐
│A1│ │A2│ │A3│
└──┘ └──┘ └──┘
| | |
┌──┐ | |
│A4│ ❌ Not Allowed
└──┘ | |
| | |
└-----------┘
|
┌─┐
│B│
└─┘
- No Conditional Edges are allowed
Below are some examples showing how to add create branching dataflows.
In [1]:
Copied!
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.0.1";
var langchain4jbeta = "1.0.1-beta6";
var langgraph4jVersion = "1.6-SNAPSHOT";
var userHomeDir = System.getProperty("user.home");
var localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
var langchain4jVersion = "1.0.1";
var langchain4jbeta = "1.0.1-beta6";
var langgraph4jVersion = "1.6-SNAPSHOT";
In [2]:
Copied!
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j
%%bash
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j
In [ ]:
Copied!
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}
%dependency /add net.sourceforge.plantuml:plantuml-mit:1.2024.8
%dependency /resolve
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}
%dependency /add net.sourceforge.plantuml:plantuml-mit:1.2024.8
%dependency /resolve
In [4]:
Copied!
try( var file = new java.io.FileInputStream("./logging.properties")) {
java.util.logging.LogManager.getLogManager().readConfiguration( file );
}
try( var file = new java.io.FileInputStream("./logging.properties")) {
java.util.logging.LogManager.getLogManager().readConfiguration( file );
}
In [5]:
Copied!
import net.sourceforge.plantuml.SourceStringReader;
import net.sourceforge.plantuml.FileFormatOption;
import net.sourceforge.plantuml.FileFormat;
java.awt.Image plantUML2PNG( String code ) throws IOException {
var reader = new SourceStringReader(code);
try(var imageOutStream = new java.io.ByteArrayOutputStream()) {
var description = reader.outputImage( imageOutStream, 0, new FileFormatOption(FileFormat.PNG));
var imageInStream = new java.io.ByteArrayInputStream( imageOutStream.toByteArray() );
return javax.imageio.ImageIO.read( imageInStream );
}
}
import net.sourceforge.plantuml.SourceStringReader;
import net.sourceforge.plantuml.FileFormatOption;
import net.sourceforge.plantuml.FileFormat;
java.awt.Image plantUML2PNG( String code ) throws IOException {
var reader = new SourceStringReader(code);
try(var imageOutStream = new java.io.ByteArrayOutputStream()) {
var description = reader.outputImage( imageOutStream, 0, new FileFormatOption(FileFormat.PNG));
var imageInStream = new java.io.ByteArrayInputStream( imageOutStream.toByteArray() );
return javax.imageio.ImageIO.read( imageInStream );
}
}
Define Graph with parallel branch¶
In [6]:
Copied!
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction<MessagesState<String>> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var workflow = new MessagesStateGraph<String>()
.addNode("A", makeNode("A"))
.addNode("A1", makeNode("A1"))
.addNode("A2", makeNode("A2"))
.addNode("A3", makeNode("A3"))
.addNode("B", makeNode("B"))
.addNode("C", makeNode("C"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge("B", "C")
.addEdge(START, "A")
.addEdge("C", END)
.compile();
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var workflow = new MessagesStateGraph()
.addNode("A", makeNode("A"))
.addNode("A1", makeNode("A1"))
.addNode("A2", makeNode("A2"))
.addNode("A3", makeNode("A3"))
.addNode("B", makeNode("B"))
.addNode("C", makeNode("C"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge("B", "C")
.addEdge(START, "A")
.addEdge("C", END)
.compile();
Print graph representation¶
In [7]:
Copied!
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
Out[7]:
cdadd1e3-ee12-4b14-a7a5-be545a330dc4
In [8]:
Copied!
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
START
NodeOutput{node=__START__, state={messages=[]}} NodeOutput{node=A, state={messages=[A]}} NodeOutput{node=__PARALLEL__(A), state={messages=[A, A1, A2, A3]}} NodeOutput{node=B, state={messages=[A, A1, A2, A3, B]}} NodeOutput{node=C, state={messages=[A, A1, A2, A3, B, C]}} NodeOutput{node=__END__, state={messages=[A, A1, A2, A3, B, C]}}
In [9]:
Copied!
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import org.bsc.langgraph4j.utils.EdgeMappings;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction<MessagesState<String>> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var workflow = new MessagesStateGraph<String>()
.addNode("A", makeNode("A"))
.addNode("A1", makeNode("A1"))
.addNode("A2", makeNode("A2"))
.addNode("A3", makeNode("A3"))
.addNode("B", makeNode("B"))
.addNode("C", makeNode("C"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
// .addEdge("B", "C")
.addConditionalEdges( "B",
edge_async( state ->
state.lastMinus(1)
.filter( m -> Objects.equals(m,"A3"))
.map( m -> "continue" )
.orElse("back") ),
EdgeMappings.builder()
.to( "A1", "back" )
.to( "C" , "continue")
.build()
)
.addEdge(START, "A")
.addEdge("C", END)
.compile();
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import org.bsc.langgraph4j.utils.EdgeMappings;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var workflow = new MessagesStateGraph()
.addNode("A", makeNode("A"))
.addNode("A1", makeNode("A1"))
.addNode("A2", makeNode("A2"))
.addNode("A3", makeNode("A3"))
.addNode("B", makeNode("B"))
.addNode("C", makeNode("C"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
// .addEdge("B", "C")
.addConditionalEdges( "B",
edge_async( state ->
state.lastMinus(1)
.filter( m -> Objects.equals(m,"A3"))
.map( m -> "continue" )
.orElse("back") ),
EdgeMappings.builder()
.to( "A1", "back" )
.to( "C" , "continue")
.build()
)
.addEdge(START, "A")
.addEdge("C", END)
.compile();
In [10]:
Copied!
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
Out[10]:
9c4545b2-c565-440f-81be-261dd0b93bda
In [11]:
Copied!
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
START
NodeOutput{node=__START__, state={messages=[]}} NodeOutput{node=A, state={messages=[A]}} NodeOutput{node=__PARALLEL__(A), state={messages=[A, A1, A2, A3]}} NodeOutput{node=B, state={messages=[A, A1, A2, A3, B]}} NodeOutput{node=C, state={messages=[A, A1, A2, A3, B, C]}} NodeOutput{node=__END__, state={messages=[A, A1, A2, A3, B, C]}}
In [12]:
Copied!
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction<MessagesState<String>> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var subgraphA3 = new MessagesStateGraph<String>()
.addNode("A3.1", makeNode("A3.1"))
.addNode("A3.2", makeNode("A3.2"))
.addEdge(START, "A3.1")
.addEdge( "A3.1", "A3.2")
.addEdge("A3.2", END)
.compile();
var subgraphA1 = new MessagesStateGraph<String>()
.addNode("A1.1", makeNode("A1.1"))
.addNode("A1.2", makeNode("A1.2"))
.addEdge(START, "A1.1")
.addEdge( "A1.1", "A1.2")
.addEdge("A1.2", END)
.compile();
var workflow = new MessagesStateGraph<String>()
.addNode("A", makeNode("A"))
.addNode("A1", subgraphA1)
.addNode("A2", makeNode("A2"))
.addNode("A3", subgraphA3)
.addNode("B", makeNode("B"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge(START, "A")
.addEdge("B", END)
.compile();
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var subgraphA3 = new MessagesStateGraph()
.addNode("A3.1", makeNode("A3.1"))
.addNode("A3.2", makeNode("A3.2"))
.addEdge(START, "A3.1")
.addEdge( "A3.1", "A3.2")
.addEdge("A3.2", END)
.compile();
var subgraphA1 = new MessagesStateGraph()
.addNode("A1.1", makeNode("A1.1"))
.addNode("A1.2", makeNode("A1.2"))
.addEdge(START, "A1.1")
.addEdge( "A1.1", "A1.2")
.addEdge("A1.2", END)
.compile();
var workflow = new MessagesStateGraph()
.addNode("A", makeNode("A"))
.addNode("A1", subgraphA1)
.addNode("A2", makeNode("A2"))
.addNode("A3", subgraphA3)
.addNode("B", makeNode("B"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge(START, "A")
.addEdge("B", END)
.compile();
In [13]:
Copied!
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
Out[13]:
2d790d4a-89c1-4abd-a778-39481c5a9447
In [14]:
Copied!
// workflow.getGraph( GraphRepresentation.Type.MERMAID, "parallel branch",false ).content();
// workflow.getGraph( GraphRepresentation.Type.MERMAID, "parallel branch",false ).content();
In [15]:
Copied!
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
START
NodeOutput{node=__START__, state={messages=[]}}
START START
NodeOutput{node=A, state={messages=[A]}} NodeOutput{node=__PARALLEL__(A), state={messages=[A, A1.1, A1.2, A2, A3.1, A3.2]}} NodeOutput{node=B, state={messages=[A, A1.1, A1.2, A2, A3.1, A3.2, B]}} NodeOutput{node=__END__, state={messages=[A, A1.1, A1.2, A2, A3.1, A3.2, B]}}
In [16]:
Copied!
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction<MessagesState<String>> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var subgraphA3 = new MessagesStateGraph<String>()
.addNode("A3.1", makeNode("A3.1"))
.addNode("A3.2", makeNode("A3.2"))
.addEdge(START, "A3.1")
.addEdge( "A3.1", "A3.2")
.addEdge("A3.2", END)
.compile();
var subgraphA2 = new MessagesStateGraph<String>()
.addNode("A2.1", makeNode("A2.1"))
.addNode("A2.2", makeNode("A2.2"))
.addEdge(START, "A2.1")
.addEdge( "A2.1", "A2.2")
.addEdge("A2.2", END)
.compile();
var subgraphA1 = new MessagesStateGraph<String>()
.addNode("A1.1", makeNode("A1.1"))
.addNode("A1.2", makeNode("A1.2"))
.addEdge(START, "A1.1")
.addEdge( "A1.1", "A1.2")
.addEdge("A1.2", END)
.compile();
var workflow = new MessagesStateGraph<String>()
.addNode("A", makeNode("A"))
.addNode("A1", subgraphA1)
.addNode("A2", subgraphA2)
.addNode("A3", subgraphA3)
.addNode("B", makeNode("B"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge(START, "A")
.addEdge("B", END)
.compile();
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
AsyncNodeAction> makeNode( String message ) {
return node_async(state -> Map.of( "messages", message ) );
}
var subgraphA3 = new MessagesStateGraph()
.addNode("A3.1", makeNode("A3.1"))
.addNode("A3.2", makeNode("A3.2"))
.addEdge(START, "A3.1")
.addEdge( "A3.1", "A3.2")
.addEdge("A3.2", END)
.compile();
var subgraphA2 = new MessagesStateGraph()
.addNode("A2.1", makeNode("A2.1"))
.addNode("A2.2", makeNode("A2.2"))
.addEdge(START, "A2.1")
.addEdge( "A2.1", "A2.2")
.addEdge("A2.2", END)
.compile();
var subgraphA1 = new MessagesStateGraph()
.addNode("A1.1", makeNode("A1.1"))
.addNode("A1.2", makeNode("A1.2"))
.addEdge(START, "A1.1")
.addEdge( "A1.1", "A1.2")
.addEdge("A1.2", END)
.compile();
var workflow = new MessagesStateGraph()
.addNode("A", makeNode("A"))
.addNode("A1", subgraphA1)
.addNode("A2", subgraphA2)
.addNode("A3", subgraphA3)
.addNode("B", makeNode("B"))
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge(START, "A")
.addEdge("B", END)
.compile();
In [17]:
Copied!
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
import org.bsc.langgraph4j.GraphRepresentation;
var representation = workflow.getGraph( GraphRepresentation.Type.PLANTUML, "parallel branch",false );
display( plantUML2PNG( representation.getContent() ) )
Out[17]:
45c4b70c-4993-4648-be88-30023fb73186
In [18]:
Copied!
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
for( var step : workflow.stream( Map.of() ) ) {
System.out.println( step );
}
START
NodeOutput{node=__START__, state={messages=[]}}
START START START
NodeOutput{node=A, state={messages=[A]}} NodeOutput{node=__PARALLEL__(A), state={messages=[A, A1.1, A1.2, A2.1, A2.2, A3.1, A3.2]}} NodeOutput{node=B, state={messages=[A, A1.1, A1.2, A2.1, A2.2, A3.1, A3.2, B]}} NodeOutput{node=__END__, state={messages=[A, A1.1, A1.2, A2.1, A2.2, A3.1, A3.2, B]}}