LangGraph Durable Execution

해당 문서는 LangGraph 공식 문서 내용을 재구성하여 작성하였습니다.

기본적으로 CheckPointer 사용에 관한 내용을 포함합니다.

CheckPointer의 기본적 사용법을 안다면 해당 문서는 건너뛰어도 됩니다.

개념

LangGraph 진행 상황을 저장해서 중단된 지점부터 정확히 재개할 수 있는 실행 방식입니다.

Human-in-the-loop, 장기 실행 작업, 오류 복구에 유용합니다.

견고한 실행을 구성하는 법

1. Checkpointer 지정

다양한 저장형태의 checkpointer를 이용 가능. 운영 서비스가 아닌 경우 InMemorySaver로 시작해도 무방

from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

2. Thread ID 지정

config = {"configurable": {"thread_id": "abc123"}}
graph.invoke(input, config)

3. Side Effect(e.g., file writes, API calls)는 Task로 래핑

from langgraph.func import task

@task
def api_call(url: str):
    return requests.get(url).text  # Side effect

결정성과 일관된 재실행

재개 방식

중단된 줄에서 재개 ❌ → 특정 시작점 부터 workflow를 재실행하는 방식

이를 위해서는 workflow 중에 결정성이 없는 실행(ex. 랜덤수 생성)이나 Side effect 실행(ex. API 호출)은 따로 task로 분리하여 실행되어야 합니다.

핵심 원칙

1. 작업 반복 방지

# ❌ 나쁨: 여러 side effect가 노드에 섞여있음
def bad_node(state):
    log.write("Processing...")  # Side effect 1
    result = api_call()          # Side effect 2
    db.save(result)              # Side effect 3
    return {"result": result}

# ✅ 좋음: 각 side effect를 task로
@task
def _log_processing():
    log.write("Processing...")

@task
def _api_call():
    return api_call()

@task
def _save_to_db(result):
    db.save(result)

def good_node(state):
    _log_processing()
    result = _api_call().result()
    _save_to_db(result)
    return {"result": result}

2. 비결정적 연산 캡슐화

import random

# ❌ 재실행시 다른 결과
def bad_node(state):
    return {"random": random.randint(1, 100)}

# ✅ Task로 래핑
@task
def _generate_random():
    return random.randint(1, 100)

def good_node(state):
    return {"random": _generate_random().result()}

Durability Modes

실행 성능과 데이터 일관성 균형 조절:

graph.stream({"input": "test"}, durability="sync")

1. “exit” (최저 내구성, 최고 성능)

  • 저장 시점: 그래프 완료시만 (성공/실패)
  • 장점: 최고 성능
  • 단점: 중간 상태 저장 안됨, 중단 불가, 중간 실패시 복구 불가
  • 용도: 빠른 단순 작업

2. “async” (중간 내구성, 좋은 성능)

  • 저장 시점: 다음 step 실행 중 비동기로 저장
  • 장점: 좋은 성능 + 내구성
  • 단점: 프로세스 크래시시 checkpoint 유실 가능성 작음
  • 용도: 대부분의 경우 (기본 권장)

3. “sync” (최고 내구성, 낮은 성능)

  • 저장 시점: 다음 step 시작 전 동기로 저장
  • 장점: 모든 checkpoint 확실히 저장
  • 단점: 성능 오버헤드
  • 용도: 중요한 트랜잭션, 돈/데이터 손실 방지

LangGraph Node에서 Task 사용

Before: Side Effect가 노드에 직접

class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    result = requests.get(state['url']).text[:100]  # Side effect!
    return {"result": result}

builder = StateGraph(State)
builder.add_node("call_api", call_api)

After: Task로 래핑

class State(TypedDict):
    urls: list[str]
    results: NotRequired[list[str]]

@task
def _make_request(url: str):
    return requests.get(url).text[:100]  # Task로 래핑

def call_api(state: State):
    # Task 생성
    requests = [_make_request(url) for url in state['urls']]
    # 결과 대기
    results = [req.result() for req in requests]
    return {"results": results}

builder = StateGraph(State)
builder.add_node("call_api", call_api)

재개시: _make_request가 이미 실행됐으면 재실행 X, 저장된 결과 사용

Leave a Comment


목차