The StateGraph runtime executes durable, checkpointed workflows. Nodes run in sequence or branch via conditional edges. State flows through the graph as a map[string]any. Checkpointing after every node enables resume after interrupts and time-travel debugging.

Creating a Graph

g := graph.New("my-workflow")

Adding Nodes

Each node has an ID and a handler function:

g.AddNode("greet", func(ctx context.Context, s graph.State) (graph.State, error) {
    s["greeting"] = fmt.Sprintf("Hello, %s!", s["user"])
    return s, nil
})

g.AddNode("classify", func(ctx context.Context, s graph.State) (graph.State, error) {
    s["intent"] = "general_question"
    return s, nil
})

Interrupt Nodes

Interrupt nodes pause execution for human-in-the-loop approval. The runner checkpoints and returns before executing the node.

g.AddInterruptNode("approve", func(ctx context.Context, s graph.State) (graph.State, error) {
    // This node runs only after Resume; before that, execution pauses
    s["approved"] = true
    return s, nil
})

Entry and Finish Points

g.SetEntryPoint("greet")   // Start here
g.SetFinishPoint("respond") // End here

SetEntryPoint adds an edge from __start__ to the given node. SetFinishPoint adds an edge from the given node to __end__.

Edges

Static Edges

g.AddEdge("greet", "classify")
g.AddEdge("classify", "respond")

Conditional Edges

Route based on state. The condition function returns the target node ID.

g.AddConditionalEdge("classify", func(s graph.State) string {
    intent, _ := s["intent"].(string)
    switch intent {
    case "support":
        return "support_flow"
    case "sales":
        return "sales_flow"
    default:
        return "general_flow"
    }
})

Compiling

Validate the graph and produce an immutable CompiledGraph:

compiled, err := g.Compile()
if err != nil {
    return err  // e.g., missing entry point, invalid edge targets
}

Runner

The runner executes a compiled graph with checkpointing:

runner := graph.NewRunner(compiled, storage)

Run

Start a new execution with initial state:

result, err := runner.Run(ctx, sessionID, graph.State{"user": "Alice"})

Resume

Continue from the latest checkpoint (e.g., after an interrupt):

result, err := runner.Resume(ctx, sessionID)

ResumeFromCheckpoint

Resume from a specific checkpoint (time-travel debugging):

result, err := runner.ResumeFromCheckpoint(ctx, checkpointID)

Checkpointing

State is saved after every node. Each checkpoint stores:

  • Session ID, Run ID, Node ID
  • Full state
  • Sequence number

Storage must implement SaveCheckpoint and GetLatestCheckpoint (and GetCheckpoint for time-travel). Use SQLite or Postgres adapters.

StreamEvent

Subscribe to execution events for observability. Use the runner directly (not via agent.Run) to access the stream:

compiled, _ := g.Compile()
runner := graph.NewRunner(compiled, store)
stream := runner.Stream()
for evt := range stream {
    switch evt.Type {
    case "node_start":
        fmt.Printf("Starting node %s\n", evt.NodeID)
    case "node_end":
        fmt.Printf("Finished node %s\n", evt.NodeID)
    case "edge_transition":
        fmt.Printf("Transitioning to %s\n", evt.NodeID)
    case "interrupt":
        fmt.Printf("Paused at interrupt node %s\n", evt.NodeID)
    case "error":
        fmt.Printf("Error: %s\n", evt.Error)
    case "completed":
        fmt.Println("Graph completed")
    }
}
Type Description
node_start Node execution began
node_end Node execution finished
edge_transition Transitioning to next node
interrupt Paused at interrupt node
error Node failed
completed Graph finished successfully

Integration with Agent

Attach a graph to an agent via the builder. The agent’s Run and Resume methods use the graph:

g := graph.New("workflow").
    AddNode("greet", func(ctx context.Context, s graph.State) (graph.State, error) {
        s["greeting"] = fmt.Sprintf("Hello, %s!", s["user"])
        return s, nil
    }).
    AddNode("respond", func(ctx context.Context, s graph.State) (graph.State, error) {
        s["response"] = "How can I help?"
        return s, nil
    }).
    SetEntryPoint("greet").
    AddEdge("greet", "respond").
    SetFinishPoint("respond")

a, err := agent.New("run-agent", "Run Agent").
    WithStorage(store).
    WithGraph(g).
    Build()
if err != nil {
    log.Fatal(err)
}

result, err := a.Run(ctx, map[string]any{"user": "World"})
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Result: %v\n", result.State)

Time-Travel Debugging

Resume from any historical checkpoint to replay or debug:

checkpoints, _ := store.ListCheckpoints(ctx, sessionID)
// User selects checkpointID from UI or CLI
result, err := runner.ResumeFromCheckpoint(ctx, checkpointID)

Complete Example

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/spawn08/chronos/engine/graph"
    "github.com/spawn08/chronos/sdk/agent"
    "github.com/spawn08/chronos/storage/adapters/sqlite"
)

func main() {
    ctx := context.Background()

    store, err := sqlite.New("run.db")
    if err != nil {
        log.Fatal(err)
    }
    defer store.Close()
    if err := store.Migrate(ctx); err != nil {
        log.Fatal(err)
    }

    g := graph.New("workflow").
        AddNode("greet", func(_ context.Context, s graph.State) (graph.State, error) {
            s["greeting"] = fmt.Sprintf("Hello, %s!", s["user"])
            return s, nil
        }).
        AddNode("classify", func(_ context.Context, s graph.State) (graph.State, error) {
            s["intent"] = "general_question"
            return s, nil
        }).
        AddNode("respond", func(_ context.Context, s graph.State) (graph.State, error) {
            s["response"] = fmt.Sprintf("Intent: %s. How can I help?", s["intent"])
            return s, nil
        }).
        SetEntryPoint("greet").
        AddEdge("greet", "classify").
        AddEdge("classify", "respond").
        SetFinishPoint("respond")

    a, err := agent.New("run-agent", "Run Agent").
        WithStorage(store).
        WithGraph(g).
        Build()
    if err != nil {
        log.Fatal(err)
    }

    result, err := a.Run(ctx, map[string]any{"user": "World"})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Result: %v\n", result.State)
}