LangGraph4j - Graph Execution Cancellation
LangGraph4j provides a powerful mechanism to cancel the execution of a graph, which is particularly useful for long-running processes. This feature is built upon the cancellation capabilities of the java-async-generator library.
Cancelling a Graph Stream
When you execute a graph using the stream method, you get an AsyncGenerator instance. This generator can be cancelled, allowing you to stop the graph's execution gracefully or immediately.
To cancel a stream, you need to call the cancel(boolean mayInterruptIfRunning) method on the generator.
In the example below we test cancellation considering to make a request from a different thread from the main one using the following method
Consuming the Stream with forEachAsync
When you consume the stream using forEachAsync, the graph execution runs in a separate thread.
-
cancel(true)(Immediate Cancellation): This will interrupt the execution thread, causing theCompletableFuturereturned byforEachAsyncto complete exceptionally with anInterruptedException. -
cancel(false)(Graceful Cancellation): This will let the currently executing node finish, and then stop the execution before starting the next node.
Here is an example from CancellationTest.java that demonstrates immediate cancellation:
var generator = workflow.stream(GraphInput.noArgs(), RunnableConfig.builder().build());
// Request cancellation after 500 milliseconds from a new thread different from main one
CompletableFuture.runAsync( () -> {
try {
Thread.sleep(500);
var result = generator.cancel(mayInterruptIfRunning);
log.info("cancellation executed with result: {}", result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
var futureResult = generator.forEachAsync(output -> {
log.info("iteration is on: {}", output);
}).exceptionally(ex -> {
assertTrue(generator.isCancelled());
return "CANCELLED";
});
var genericResult = futureResult.get(5, TimeUnit.SECONDS);
assertTrue(generator.isCancelled());
assertEquals("CANCELLED", genericResult);
Consuming the Stream with an Iterator
When you use a for-each loop to iterate over the stream, the execution runs on the current thread.
cancel(true)orcancel(false): Both will cause thehasNext()method of the iterator to returnfalse, effectively stopping the loop.
Here is an example from CancellationTest.java:
var generator = workflow.stream(GraphInput.noArgs(), RunnableConfig.builder().build());
// Request cancellation after 500 milliseconds from a new thread different from main one
CompletableFuture.runAsync( () -> {
try {
Thread.sleep(500);
var result = generator.cancel(mayInterruptIfRunning);
log.info("cancellation executed with result: {}", result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
NodeOutput<?> currentOutput = null;
for (var output : generator) {
log.info("iteration is on: {}", output);
currentOutput = output;
}
assertNotNull(currentOutput);
assertNotEquals(END, currentOutput.node());
assertTrue(generator.isCancelled());
Checking for Cancellation
You can check if a stream has been cancelled by calling the isCancelled() method on the generator.
if (generator.isCancelled()) {
// Handle cancellation
}
Cancellation and Subgraphs
Cancellation also works with nested graphs (subgraphs). If you cancel the execution of a parent graph, the cancellation is propagated to any currently executing subgraph.
Further Reading
The cancellation mechanism is provided by the java-async-generator library. For a more in-depth explanation of the underlying cancellation mechanism, please refer to the CANCELLATION.md document in the java-async-generator repository.