LangGraph Integration

ART’s LangGraph integration enables you to build sophisticated, multi-step AI agents that learn and improve through reinforcement training. By combining LangGraph’s powerful agent framework with ART’s training capabilities, you can create agents that reason, use tools, and adapt their behavior over time.

Installation

To use ART with LangGraph, install ART with the required extras:
uv pip install -U openpipe-art[backend,langgraph]>=0.4.9
The langgraph extra includes the LangGraph integration dependencies, while backend provides the training backend components. If running using the SkyPilotBackend, substitute skypilot for backend in the extras array.

Why Use ART with LangGraph?

LangGraph provides an excellent framework for building various types of agents - from ReAct-style reasoning agents to complex multi-agent workflows with supervisor patterns and parallel execution. However, getting these agents to perform optimally often requires extensive prompt engineering and manual tuning. ART’s integration with LangGraph addresses this by:
  • Automatic behavior improvement: Train your agents to get better at multi-step reasoning without manual prompt tuning
  • Tool usage optimization: Learn when and how to use tools more effectively through reinforcement learning
  • Adaptive decision making: Agents learn to make better choices about which actions to take in different situations
  • Scalable training: Train on diverse scenarios to build robust, generalizable agent behaviors

Key Features

  • Seamless integration: Drop-in replacement for LangGraph’s LLM initialization
  • Automatic logging: Captures all agent interactions for training data generation
  • Multi-step trajectory support: Handles complex agent workflows with tool calls and reasoning steps
  • RULER compatibility: Use ART’s general-purpose reward function to train agents without hand-crafted rewards

Code Examples

Here are easily readable code snippets demonstrating the LangGraph integration functionality:

Basic Setup and Initialization

import uuid
import weave
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from art.langgraph import init_chat_model
import art

# Initialize Weave tracking (optional)
if os.getenv("WANDB_API_KEY", ""):
    weave.init(model.project, settings={"print_call_link": False})

Defining Tools for Your Agent

@tool
def search_inbox_tool(keywords: list[str]) -> list[dict]:
    """Search the inbox for emails matching the given keywords and return
    a list of dictionaries so the LLM can easily consume them."""
    results = search_emails(
        inbox=scenario.inbox_address,
        keywords=keywords,
        sent_before=scenario.query_date,
    )
    return [asdict(result) for result in results]

@tool
def read_email_tool(message_id: str) -> dict | None:
    """Read a specific email by message ID."""
    email = read_email(message_id)
    if email:
        return email.model_dump()
    return None

@tool
def return_final_answer_tool(answer: str, reference_message_ids: list[str]) -> dict:
    """Return the final answer and the message IDs used to generate the answer."""
    nonlocal final_answer
    final_answer = FinalAnswer(answer=answer, source_ids=reference_message_ids)
    return final_answer.model_dump()

Creating and Running a LangGraph ReAct Agent

@weave.op
async def rollout(model: art.Model, email_scenario: EmailScenario) -> ProjectTrajectory:
    # Initialize chat model with temperature
    chat_model = init_chat_model(model.name, temperature=1.0)

    # Define available tools
    tools = [search_inbox_tool, read_email_tool, return_final_answer_tool]

    # Create the LangGraph ReAct agent
    react_agent = create_react_agent(chat_model, tools)

    # Configure agent execution
    config = {
        "configurable": {"thread_id": str(uuid.uuid4())},
        "recursion_limit": MAX_TURNS,
    }

    # Run the agent with system and user messages
    await react_agent.ainvoke(
        {
            "messages": [
                SystemMessage(content=system_prompt),
                HumanMessage(content=scenario.question),
            ]
        },
        config=config,
    )

Trajectory Tracking and Scoring

class ProjectTrajectory(art.Trajectory):
    final_answer: FinalAnswer | None = None

# Create trajectory with metadata
traj = ProjectTrajectory(
    reward=0.0,
    messages_and_choices=[],
    metadata={
        "scenario_id": scenario.id,
        "step": email_scenario.step,
    },
)

# Score the trajectory using correctness judge
if final_answer:
    traj.final_answer = final_answer
    correctness_judge_response = await judge_correctness(
        scenario, traj.final_answer.answer
    )
    traj.metrics["correct"] = float(correctness_judge_response.accept)

Training Loop with LangGraph Integration

from art.langgraph import wrap_rollout

# Training configuration
training_config = {
    "groups_per_step": 2,
    "num_epochs": 20,
    "rollouts_per_group": 4,
    "learning_rate": 1e-5,
    "max_steps": 20,
}

# Create trajectory groups for training
for batch in training_iterator:
    groups = []
    for scenario in batch.items:
        groups.append(
            art.TrajectoryGroup(
                (
                    wrap_rollout(model, rollout)(
                        model, EmailScenario(step=batch.step, scenario=scenario)
                    )
                    for _ in range(training_config["rollouts_per_group"])
                )
            )
        )

    # Gather trajectory groups
    finished_groups = await art.gather_trajectory_groups(
        groups,
        pbar_desc="gather",
        max_exceptions=training_config["rollouts_per_group"] * len(batch.items),
    )

    # Apply RULER scoring
    judged_groups = []
    for group in finished_groups:
        judged_group = await ruler_score_group(group, "openai/o4-mini", debug=True)
        judged_groups.append(judged_group)

    # Train the model
    await model.train(
        judged_groups,
        config=art.TrainConfig(learning_rate=training_config["learning_rate"]),
        _config={"logprob_calculation_chunk_size": 8},
    )

Correctness Evaluation

from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt

class CorrectnessJudgeResponse(BaseModel):
    reasoning: str = Field(description="Explanation of the reasoning process.")
    accept: bool = Field(description="Whether the AI answer should be accepted.")

@retry(stop=stop_after_attempt(3))
async def judge_correctness(scenario: Scenario, answer: str) -> CorrectnessJudgeResponse:
    system_prompt = """
    You are given a question, the reference answer, and an answer generated by an AI assistant.
    Your task is to decide whether the AI answer is correct and should be accepted.
    """

    messages = [
        {"role": "system", "content": system_prompt},
        {
            "role": "user",
            "content": (
                f"Question: {scenario.question}\n"
                f"Reference answer: {scenario.answer}\n"
                f"AI answer: {answer}"
            ),
        },
    ]

    response = await acompletion(
        model="openai/gpt-4.1",
        messages=messages,
        response_format=CorrectnessJudgeResponse,
    )

    return CorrectnessJudgeResponse.model_validate_json(
        response.choices[0].message.content or "{}"
    )

Key Components Summary

  1. LangGraph ReAct Agent: Uses create_react_agent() with custom tools and chat model
  2. Tool Definition: Custom tools decorated with @tool for specific functionality
  3. Trajectory Tracking: Custom trajectory class extends art.Trajectory
  4. Training Integration: Uses wrap_rollout() and art.gather_trajectory_groups()
  5. Evaluation: Automated correctness judging with retry logic
  6. Configuration: Flexible training parameters and agent limits

Complete Email Agent Example

Here’s a complete, runnable example that demonstrates training a LangGraph email search agent:
import asyncio
import uuid
from dataclasses import asdict
from textwrap import dedent
from typing import List

import art
import weave
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from litellm import acompletion
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt

from art.langgraph import init_chat_model, wrap_rollout
from art.utils import iterate_dataset

# Initialize model and backend
model = art.Model(name="Qwen/Qwen2.5-7B-Instruct")
backend = art.backends.SkyPilotBackend()

# Data models
class EmailResult(BaseModel):
    message_id: str
    subject: str
    from_address: str
    date: str
    snippet: str

class FinalAnswer(BaseModel):
    answer: str
    source_ids: List[str]

class Scenario(BaseModel):
    id: str
    question: str
    answer: str
    inbox_address: str
    query_date: str

class EmailScenario(BaseModel):
    step: int
    scenario: Scenario

class ProjectTrajectory(art.Trajectory):
    final_answer: FinalAnswer | None = None

class CorrectnessJudgeResponse(BaseModel):
    reasoning: str = Field(description="Explanation of the reasoning process.")
    accept: bool = Field(description="Whether the AI answer should be accepted.")

# Mock email functions (replace with real implementation)
def search_emails(inbox: str, keywords: List[str], sent_before: str) -> List[EmailResult]:
    """Mock email search function - replace with real implementation"""
    return [
        EmailResult(
            message_id="msg_123",
            subject=f"Subject matching {keywords[0]}",
            from_address="sender@example.com",
            date="2024-01-15",
            snippet=f"Email snippet containing {keywords[0]}"
        )
    ]

def read_email(message_id: str) -> EmailResult | None:
    """Mock email read function - replace with real implementation"""
    return EmailResult(
        message_id=message_id,
        subject="Full email subject",
        from_address="sender@example.com",
        date="2024-01-15",
        snippet="Full email content here..."
    )

# Correctness evaluation
@retry(stop=stop_after_attempt(3))
async def judge_correctness(scenario: Scenario, answer: str) -> CorrectnessJudgeResponse:
    system_prompt = dedent("""
        You are given a question, the reference answer, and an answer generated by an AI assistant.
        Your task is to decide whether the AI answer is correct and should be accepted.
    """)
    
    messages = [
        {"role": "system", "content": system_prompt},
        {
            "role": "user",
            "content": (
                f"Question: {scenario.question}\n"
                f"Reference answer: {scenario.answer}\n"
                f"AI answer: {answer}"
            ),
        },
    ]
    
    response = await acompletion(
        model="openai/gpt-4o-mini",
        messages=messages,
        response_format=CorrectnessJudgeResponse,
    )
    
    return CorrectnessJudgeResponse.model_validate_json(
        response.choices[0].message.content or "{}"
    )

# Main rollout function
@weave.op
async def rollout(model: art.Model, email_scenario: EmailScenario) -> ProjectTrajectory:
    scenario = email_scenario.scenario
    MAX_TURNS = 10
    
    traj = ProjectTrajectory(
        reward=0.0,
        messages_and_choices=[],
        metadata={
            "scenario_id": scenario.id,
            "step": email_scenario.step,
        },
    )
    
    system_prompt = dedent(f"""
        You are an email search agent. Use the tools to search emails and find answers.
        User's email address: {scenario.inbox_address}
        Today's date: {scenario.query_date}
        
        When you find the answer, use return_final_answer_tool with the answer and source message IDs.
    """)
    
    final_answer = None
    
    @tool
    def search_inbox_tool(keywords: List[str]) -> List[dict]:
        """Search inbox for emails matching keywords"""
        results = search_emails(scenario.inbox_address, keywords, scenario.query_date)
        return [asdict(result) for result in results]
    
    @tool
    def read_email_tool(message_id: str) -> dict | None:
        """Read a specific email by message ID"""
        email = read_email(message_id)
        return email.model_dump() if email else None
    
    @tool
    def return_final_answer_tool(answer: str, reference_message_ids: List[str]) -> dict:
        """Return final answer with source message IDs"""
        nonlocal final_answer
        final_answer = FinalAnswer(answer=answer, source_ids=reference_message_ids)
        return final_answer.model_dump()
    
    tools = [search_inbox_tool, read_email_tool, return_final_answer_tool]
    chat_model = init_chat_model(model.name, temperature=1.0)
    react_agent = create_react_agent(chat_model, tools)
    
    try:
        config = {
            "configurable": {"thread_id": str(uuid.uuid4())},
            "recursion_limit": MAX_TURNS,
        }
        
        await react_agent.ainvoke({
            "messages": [
                SystemMessage(content=system_prompt),
                HumanMessage(content=scenario.question),
            ]
        }, config=config)
        
        if final_answer:
            traj.final_answer = final_answer
            correctness_judge_response = await judge_correctness(scenario, final_answer.answer)
            traj.metrics["correct"] = float(correctness_judge_response.accept)
    
    except Exception as e:
        print(f"Error running agent: {e}")
        traj.messages_and_choices.append({"role": "assistant", "content": f"Error: {str(e)}"})
    
    return traj

# Main training function
async def main():
    # Sample training scenarios (replace with real data)
    training_scenarios = [
        Scenario(
            id="1",
            question="Find emails about the quarterly budget",
            answer="Budget meeting scheduled for Q4 review",
            inbox_address="user@company.com",
            query_date="2024-01-20"
        ),
        Scenario(
            id="2", 
            question="Look for urgent project updates",
            answer="Project deadline moved to next month",
            inbox_address="user@company.com",
            query_date="2024-01-20"
        ),
    ]
    
    # Register model with backend
    await model.register(backend)
    
    # Training configuration
    training_config = {
        "groups_per_step": 2,
        "num_epochs": 3,
        "rollouts_per_group": 4,
        "learning_rate": 1e-5,
        "max_steps": 5,
    }
    
    # Training iterator
    training_iterator = iterate_dataset(
        training_scenarios,
        groups_per_step=training_config["groups_per_step"],
        num_epochs=training_config["num_epochs"],
        initial_step=await model.get_step(),
    )
    
    # Training loop
    for batch in training_iterator:
        print(f"Training step {batch.step}, epoch {batch.epoch}")
        
        # Create trajectory groups
        groups = []
        for scenario in batch.items:
            groups.append(
                art.TrajectoryGroup([
                    wrap_rollout(model, rollout)(
                        model, EmailScenario(step=batch.step, scenario=scenario)
                    )
                    for _ in range(training_config["rollouts_per_group"])
                ])
            )
        
        # Gather trajectories
        finished_groups = await art.gather_trajectory_groups(
            groups,
            pbar_desc="gather",
            max_exceptions=training_config["rollouts_per_group"] * len(batch.items),
        )
        
        # Train model
        await model.train(
            finished_groups,
            config=art.TrainConfig(learning_rate=training_config["learning_rate"]),
        )
        
        print(f"Completed training step {batch.step}")
        
        if batch.step >= training_config["max_steps"]:
            break

if __name__ == "__main__":
    asyncio.run(main())
This complete example shows how to:
  1. Set up the environment with model, backend, and data structures
  2. Define custom tools for email search and retrieval
  3. Create a LangGraph ReAct agent with proper configuration
  4. Implement trajectory tracking with custom reward scoring
  5. Run the full training loop with proper error handling
  6. Use wrap_rollout to automatically capture agent interactions
To use this example, simply replace the mock email functions (search_emails, read_email) with your actual email API integration, and provide real training scenarios in the training_scenarios list.

Troubleshooting

Common Issues

Empty trajectories or no training data captured:
  • Ensure you’re using init_chat_model(model.name) in your rollout function
  • Verify your rollout function actually executes the agent and makes LLM calls
  • Check that init_chat_model() is called before creating your LangGraph agent
Import errors:
  • Install ART with the correct extras: uv pip install -U openpipe-art[backend,langgraph]>=0.4.9
  • Ensure you have the required LangGraph dependencies
Training not starting:
  • Verify you have trajectory data with await art.gather_trajectory_groups(...)
  • Check that the model is properly registered with await model.register(backend)

Best Practices

Agent Design

  • Clear tool descriptions: Ensure your tool functions have descriptive docstrings
  • Error handling: Include proper error handling in your tools for robust training
  • Final answer pattern: Use a dedicated tool for returning final answers to users

Training Data

  • Diverse scenarios: Create varied training scenarios that cover different use cases
  • Realistic complexity: Include both simple and complex multi-step tasks
  • Edge cases: Add scenarios that test error handling and edge cases

Performance Optimization

  • Tool efficiency: Optimize tool execution time since it affects training speed
  • Batch generation: Generate multiple trajectories efficiently using async patterns
  • Resource management: Monitor memory usage during long training runs
The ART-LangGraph integration makes it straightforward to build and train sophisticated AI agents that improve their performance over time, turning your prototype agents into production-ready intelligent systems.