Skip to content

Comments

fix(core): avoid inspect.getsource in RunnableLambda.deps#35239

Open
GAUTAM V DATLA (gautamvarmadatla) wants to merge 6 commits intolangchain-ai:masterfrom
gautamvarmadatla:fix-nonblocking-aconfig-with-context
Open

fix(core): avoid inspect.getsource in RunnableLambda.deps#35239
GAUTAM V DATLA (gautamvarmadatla) wants to merge 6 commits intolangchain-ai:masterfrom
gautamvarmadatla:fix-nonblocking-aconfig-with-context

Conversation

@gautamvarmadatla
Copy link

@gautamvarmadatla GAUTAM V DATLA (gautamvarmadatla) commented Feb 15, 2026

Summary

This PR fixes blocking behavior when computing RunnableLambda.deps by avoiding inspect.getsource in nonlocal discovery. Dependencies are now determined from bytecode analysis, which removes file I/O from the deps path and improves performance in async contexts.

Fixes #29530

Verification

  • uv run python -m pytest tests/unit_tests/runnables -q and ruff
  • Added a regression test ensuring RunnableLambda.deps does not call inspect.getsource.

Breaking changes

None.

Benchmarks

I ran some benchmarks locally on both a baseline checkout and this branch.

Results

Baseline

case getsource? cold (ms) fresh (µs) recompute (µs)
direct_invoke yes 0.286 112.59 0.65
nested_attr_invoke yes 0.245 125.25 0.66
global_invoke yes 0.268 103.79 0.64
nested_two_levels yes 0.280 138.17 0.66

This PR

case getsource? cold (ms) fresh (µs) recompute (µs)
direct_invoke no 0.023 8.37 0.71
nested_attr_invoke no 0.022 13.28 0.69
global_invoke no 0.017 7.66 0.69
nested_two_levels no 0.022 17.49 0.64

TLDR;

  • getsource? goes from yes → no in all cases.
  • Cold/fresh deps compute improves significantly in this microbenchmark and recompute is roughly unchanged.
Benchmark code
import argparse
import gc
import statistics
import time
from dataclasses import dataclass
from typing import Callable, Dict, List, Tuple

from langchain_core.runnables import RunnableLambda
import langchain_core.runnables.utils as utils


# ----------------------------
# Test case builders
# ----------------------------

def case_direct_invoke() -> RunnableLambda:
    """agent.invoke(x) where agent is a captured RunnableLambda."""
    agent = RunnableLambda(lambda x: x)

    def my_func(x: str) -> str:
        return agent.invoke(x)

    return RunnableLambda(my_func)


def case_nested_attr_invoke() -> RunnableLambda:
    """box.agent.invoke(x) where box is captured and agent is an attribute."""
    agent = RunnableLambda(lambda x: x)

    class Box:
        def __init__(self, a):
            self.agent = a

    box = Box(agent)

    def my_func(x: str) -> str:
        return box.agent.invoke(x)

    return RunnableLambda(my_func)


def case_global_invoke() -> RunnableLambda:
    """GLOBAL_AGENT.invoke(x) where GLOBAL_AGENT is a module global."""
    global GLOBAL_AGENT
    GLOBAL_AGENT = RunnableLambda(lambda x: x)  # noqa: PLW0603

    def my_func(x: str) -> str:
        return GLOBAL_AGENT.invoke(x)

    return RunnableLambda(my_func)


def case_nested_two_levels() -> RunnableLambda:
    """outer.inner.agent.invoke(x) (deeper attribute chain)."""
    agent = RunnableLambda(lambda x: x)

    class Inner:
        def __init__(self, a):
            self.agent = a

    class Outer:
        def __init__(self, inner):
            self.inner = inner

    outer = Outer(Inner(agent))

    def my_func(x: str) -> str:
        return outer.inner.agent.invoke(x)

    return RunnableLambda(my_func)


CASES: Dict[str, Callable[[], RunnableLambda]] = {
    "direct_invoke": case_direct_invoke,
    "nested_attr_invoke": case_nested_attr_invoke,
    "global_invoke": case_global_invoke,
    "nested_two_levels": case_nested_two_levels,
}


# ----------------------------
# Helpers
# ----------------------------

def ns_now() -> int:
    return time.perf_counter_ns()


def time_one_call_ns(fn: Callable[[], None]) -> int:
    t0 = ns_now()
    fn()
    t1 = ns_now()
    return t1 - t0


def find_deps_cache_keys(r: RunnableLambda) -> List[str]:
    """Find which __dict__ keys appear after first computing deps."""
    before = set(r.__dict__.keys())
    _ = r.deps
    after = set(r.__dict__.keys())
    return sorted(after - before)


def clear_cache_keys(r: RunnableLambda, keys: List[str]) -> None:
    for k in keys:
        r.__dict__.pop(k, None)


def detect_getsource_called_when_computing_deps(maker: Callable[[], RunnableLambda]) -> Tuple[bool, str]:
    """
    Returns (called, error_message).
    - called=True if utils.inspect.getsource was invoked during r.deps
    - error_message is non-empty if r.deps raised some other error
    """
    r = maker()
    original_getsource = utils.inspect.getsource
    called = False

    def wrapped_getsource(*args, **kwargs):
        nonlocal called
        called = True
        return original_getsource(*args, **kwargs)

    utils.inspect.getsource = wrapped_getsource
    try:
        _ = r.deps
        return called, ""
    except Exception as e:
        # still restore, then report error
        return called, f"{type(e).__name__}: {e}"
    finally:
        utils.inspect.getsource = original_getsource


@dataclass
class Result:
    case: str
    getsource_called: str  # "yes"/"no"/"error"
    cold_ms: float
    fresh_us: float
    recompute_us: float


def bench_case(
    name: str,
    maker: Callable[[], RunnableLambda],
    n_fresh: int,
    n_recompute: int,
    repeats: int,
    do_gc: bool,
) -> Result:
    """
    Benchmarks:
      - cold_ms: first deps on a fresh runnable
      - fresh_us: avg deps cost when building a fresh runnable each iteration
      - recompute_us: avg deps cost when forcing recompute on same runnable by clearing cache keys
    Reports medians over repeats.
    """
    called, err = detect_getsource_called_when_computing_deps(maker)
    if err:
        getsource_flag = "error"
    else:
        getsource_flag = "yes" if called else "no"

    # Determine cache keys used by deps on this version (using this case)
    probe = maker()
    cache_keys = find_deps_cache_keys(probe)

    cold_samples_ms: List[float] = []
    fresh_samples_us: List[float] = []
    recompute_samples_us: List[float] = []

    for _ in range(repeats):
        if do_gc:
            gc.collect()

        # Cold
        r = maker()
        cold_ns = time_one_call_ns(lambda: getattr(r, "deps"))
        cold_samples_ms.append(cold_ns / 1e6)

        # Fresh instances
        def fresh_loop():
            for __ in range(n_fresh):
                rr = maker()
                _ = rr.deps

        fresh_ns = time_one_call_ns(fresh_loop)
        fresh_samples_us.append((fresh_ns / n_fresh) / 1e3)

        # Forced recompute on same instance
        r = maker()
        _ = r.deps  # establish cache
        def recompute_loop():
            for __ in range(n_recompute):
                clear_cache_keys(r, cache_keys)
                _ = r.deps

        recompute_ns = time_one_call_ns(recompute_loop)
        recompute_samples_us.append((recompute_ns / n_recompute) / 1e3)

    return Result(
        case=name,
        getsource_called=getsource_flag,
        cold_ms=statistics.median(cold_samples_ms),
        fresh_us=statistics.median(fresh_samples_us),
        recompute_us=statistics.median(recompute_samples_us),
    )


# ----------------------------
# Main
# ----------------------------

def main() -> None:
    p = argparse.ArgumentParser(description="RunnableLambda.deps: detect inspect.getsource usage + benchmark")
    p.add_argument("--cases", default="all", help="Comma-separated case names or 'all'")
    p.add_argument("--n-fresh", type=int, default=5000, help="Iterations for fresh-instance benchmark")
    p.add_argument("--n-recompute", type=int, default=2000, help="Iterations for forced recompute benchmark")
    p.add_argument("--repeats", type=int, default=7, help="Repeat count; median reported")
    p.add_argument("--no-gc", action="store_true", help="Disable gc.collect() between repeats")
    args = p.parse_args()

    if args.cases.strip().lower() == "all":
        selected = list(CASES.keys())
    else:
        selected = [c.strip() for c in args.cases.split(",") if c.strip()]
        unknown = [c for c in selected if c not in CASES]
        if unknown:
            raise SystemExit(f"Unknown case(s): {unknown}. Options: {list(CASES.keys())}")

    # Show cache keys for a representative case
    probe = case_nested_attr_invoke()
    print("deps cache keys added after first compute:", find_deps_cache_keys(probe))
    print()

    results: List[Result] = []
    for name in selected:
        results.append(
            bench_case(
                name=name,
                maker=CASES[name],
                n_fresh=args.n_fresh,
                n_recompute=args.n_recompute,
                repeats=args.repeats,
                do_gc=not args.no_gc,
            )
        )

    header = f"{'case':20} {'getsource?':>10} {'cold(ms)':>10} {'fresh(us)':>12} {'recompute(us)':>14}"
    print(header)
    print("-" * len(header))
    for r in results:
        print(f"{r.case:20} {r.getsource_called:>10} {r.cold_ms:10.3f} {r.fresh_us:12.2f} {r.recompute_us:14.2f}")


if __name__ == "__main__":
    main()

cc : Christophe Bornet (@cbornet) , Eugene Yurtsev (@eyurtsev)

@github-actions github-actions bot added core `langchain-core` package issues & PRs external labels Feb 15, 2026
@gautamvarmadatla GAUTAM V DATLA (gautamvarmadatla) changed the title Fix #29530: remove blocking inspect.getsource from deps discovery fix(core): avoid inspect.getsource in RunnableLambda.deps Feb 15, 2026
@github-actions github-actions bot added the fix For PRs that implement a fix label Feb 15, 2026
@codspeed-hq
Copy link

codspeed-hq bot commented Feb 20, 2026

Merging this PR will not alter performance

⚠️ Unknown Walltime execution environment detected

Using the Walltime instrument on standard Hosted Runners will lead to inconsistent data.

For the most accurate results, we recommend using CodSpeed Macro Runners: bare-metal machines fine-tuned for performance measurement consistency.

✅ 13 untouched benchmarks
⏩ 22 skipped benchmarks1


Comparing gautamvarmadatla:fix-nonblocking-aconfig-with-context (3a5609f) with master (7019269)2

Open in CodSpeed

Footnotes

  1. 22 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

  2. No successful run was found on master (0081dea) during the generation of this report, so 7019269 was used instead as the comparison base. There might be some changes unrelated to this pull request in this report.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core `langchain-core` package issues & PRs external fix For PRs that implement a fix

Projects

None yet

Development

Successfully merging this pull request may close these issues.

aconfig_with_context() makes a blocking call

1 participant