From 512c84364bdb35285fce85bd977962030c65af3b Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 28 May 2026 19:07:15 -0700 Subject: [PATCH 1/3] perf(tracing): bounded-concurrency span export MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Span export was strictly serial: the drain loop awaited each upsert_batch to completion before sending the next, so per-pod egress was capped at ~1/request-latency (~150ms server-side ⇒ ~6-7 PUT/s/pod) regardless of CPU or backend headroom. Under load the queue backlogged and only drained after the run. The drain now dispatches each batch as its own task, bounded by AGENTEX_SPAN_QUEUE_CONCURRENCY (default 8), so multiple upsert_batch requests are in flight at once and a pod can keep up with span production. The per-span START-before-END invariant is preserved: END send-tasks snapshot the in-flight START tasks and await them before issuing. Since a span's START is always enqueued (and thus dispatched) before its END, that span's START send is either still in flight (waited on) or already done. Independent spans export fully concurrently. Setting concurrency=1 restores the old serial behavior. --- src/agentex/lib/core/tracing/span_queue.py | 130 ++++++++++++++++----- tests/lib/core/tracing/test_span_queue.py | 119 ++++++++++++++++++- 2 files changed, 216 insertions(+), 33 deletions(-) diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index f9105718b..3df9393ba 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -20,6 +20,12 @@ _DEFAULT_MAX_SIZE = 0 # Total attempts per batch for a *transient* failure (1 == no retry). _DEFAULT_MAX_RETRIES = 1 +# Max number of batch-export HTTP requests in flight at once. The export +# backend (EGP) processes each upsert_batch in ~150ms but serves many requests +# concurrently; issuing one batch at a time caps per-pod egress at ~1/latency. +# Sending several concurrently lets a pod keep up with span production under +# load. ``1`` restores the old strictly-serial behavior. +_DEFAULT_CONCURRENCY = 8 # HTTP statuses worth retrying at the queue level. These are explicit # backpressure / transient signals; everything else (esp. 401/403/4xx auth and # validation errors) is a permanent failure that re-enqueuing cannot fix. Note @@ -76,15 +82,23 @@ class AsyncSpanQueue: """Background FIFO queue for async span processing. Span events are enqueued synchronously (non-blocking) and drained by a - background task. Items are processed in batches: all START events in a - batch are flushed concurrently, then all END events, so that per-span - start-before-end ordering is preserved while HTTP calls for independent - spans execute in parallel. - - Once the drain loop picks up the first item, it lingers up to - ``linger_ms`` waiting for more items to coalesce into the same batch. - Without the linger the drain almost always returned size-1 batches under - real agent workloads, because spans typically arrive a few ms apart. + background task. The drain coalesces ready events into batches and + *dispatches* each batch's export as its own task, so up to ``concurrency`` + batch requests can be in flight at once. This matters because each + ``upsert_batch`` HTTP call takes tens-to-hundreds of ms server-side; issuing + them one at a time caps a pod's egress at ~1/latency and lets a backlog + build under load. + + Ordering guarantee: a span's START export always completes before its END + export is issued. END batches wait on the START batches that were in flight + when they were formed; because a span's START is always enqueued before its + END, that span's START send is either still in flight (and waited on) or + already finished. Independent spans export fully concurrently. + + Once the drain loop picks up the first item, it lingers up to ``linger_ms`` + waiting for more items to coalesce into the same batch. Without the linger + the drain almost always returned size-1 batches under real agent workloads, + because spans typically arrive a few ms apart. Reliability: - ``max_size`` bounds the queue. When full, new events are dropped and @@ -101,6 +115,7 @@ def __init__( linger_ms: int | None = None, max_size: int | None = None, max_retries: int | None = None, + concurrency: int | None = None, ) -> None: resolved_max_size = ( _read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size) @@ -115,6 +130,17 @@ def __init__( if max_retries is None else max(1, max_retries) ) + self._concurrency = ( + _read_int_env("AGENTEX_SPAN_QUEUE_CONCURRENCY", _DEFAULT_CONCURRENCY, minimum=1) + if concurrency is None + else max(1, concurrency) + ) + # Bounds concurrent export HTTP requests. + self._send_sema = asyncio.Semaphore(self._concurrency) + # Outstanding dispatched send tasks, and the subset that are START + # sends (END sends wait on these to preserve per-span ordering). + self._inflight: set[asyncio.Task[None]] = set() + self._inflight_starts: set[asyncio.Task[None]] = set() # Total spans dropped for any reason (full queue, shutdown, permanent # failure, exhausted retries). Surfaced for metrics/observability so # span loss stops being silent. @@ -169,6 +195,11 @@ def _ensure_drain_running(self) -> None: async def _drain_loop(self) -> None: while True: + # Backpressure: cap the number of in-flight send tasks so the drain + # does not run unboundedly ahead of the exporters. + while len(self._inflight) >= self._concurrency: + await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED) + # Block until at least one item is available. first = await self._queue.get() batch: list[_SpanQueueItem] = [first] @@ -196,22 +227,43 @@ async def _drain_loop(self) -> None: except asyncio.QueueEmpty: break - try: - # Separate START and END events. Processing all STARTs before - # ENDs ensures that on_span_start completes before on_span_end - # for any span whose both events land in the same batch. - starts = [i for i in batch if i.event_type == SpanEventType.START] - ends = [i for i in batch if i.event_type == SpanEventType.END] - - if starts: - await self._process_items(starts) - if ends: - await self._process_items(ends) - finally: - for _ in batch: - self._queue.task_done() - # Release span data for GC. - batch.clear() + # Separate START and END events and dispatch each as its own send + # task. Dispatching STARTs first (so they are registered before the + # END snapshot) guarantees an END never outruns a START of the same + # span whose events land in this batch. + starts = [i for i in batch if i.event_type == SpanEventType.START] + ends = [i for i in batch if i.event_type == SpanEventType.END] + if starts: + self._dispatch(starts, SpanEventType.START) + if ends: + self._dispatch(ends, SpanEventType.END) + + def _dispatch(self, items: list[_SpanQueueItem], event_type: SpanEventType) -> None: + """Spawn a background task to export ``items``. + + END sends snapshot the currently in-flight START tasks and wait for them + before issuing, preserving the per-span START-before-END invariant. + """ + barrier = tuple(self._inflight_starts) if event_type == SpanEventType.END else () + task = asyncio.create_task(self._run_send(items, barrier)) + self._inflight.add(task) + task.add_done_callback(self._inflight.discard) + if event_type == SpanEventType.START: + self._inflight_starts.add(task) + task.add_done_callback(self._inflight_starts.discard) + + async def _run_send(self, items: list[_SpanQueueItem], barrier: tuple[asyncio.Task[None], ...]) -> None: + try: + if barrier: + # Wait for the START sends this END batch depends on. Their + # exceptions are irrelevant here — we only need them finished. + await asyncio.gather(*barrier, return_exceptions=True) + await self._process_items(items) + finally: + # Mark every item done so shutdown's queue.join() can complete only + # once all sends (and their retries) have finished. + for _ in items: + self._queue.task_done() async def _process_items(self, items: list[_SpanQueueItem]) -> None: """Dispatch a batch of same-event-type items to each processor in one call. @@ -243,10 +295,12 @@ async def _handle( ) -> None: spans = [item.span for item in items] try: - if event_type == SpanEventType.START: - await p.on_spans_start(spans) - else: - await p.on_spans_end(spans) + # Hold a concurrency slot only for the duration of the HTTP call. + async with self._send_sema: + if event_type == SpanEventType.START: + await p.on_spans_start(spans) + else: + await p.on_spans_end(spans) except Exception as exc: self._handle_failure(p, items, event_type, exc) @@ -307,14 +361,21 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: async def shutdown(self, timeout: float = 30.0) -> None: self._stopping = True - if self._queue.empty() and (self._drain_task is None or self._drain_task.done()): + drain_idle = self._drain_task is None or self._drain_task.done() + if self._queue.empty() and drain_idle and not self._inflight: return + + timed_out = False try: + # join() returns once every enqueued (and re-enqueued) item has been + # marked done by its send task. await asyncio.wait_for(self._queue.join(), timeout=timeout) except asyncio.TimeoutError: + timed_out = True logger.warning( "Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize() ) + if self._drain_task is not None and not self._drain_task.done(): self._drain_task.cancel() try: @@ -322,6 +383,15 @@ async def shutdown(self, timeout: float = 30.0) -> None: except asyncio.CancelledError: pass + # Clean up any in-flight send tasks. On a clean shutdown these are + # already finishing; on timeout, cancel the stragglers so we don't hang. + inflight = list(self._inflight) + if inflight: + if timed_out: + for task in inflight: + task.cancel() + await asyncio.gather(*inflight, return_exceptions=True) + _default_span_queue: AsyncSpanQueue | None = None diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index 2e68cf88d..eee513ac9 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -425,9 +425,10 @@ async def block_first(spans: list[Span]) -> None: proc.on_spans_start = AsyncMock(side_effect=block_first) proc.on_spans_end = AsyncMock() - # max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills - # the queue; items 2 and 3 are dropped. - queue = AsyncSpanQueue(max_size=1, linger_ms=0) + # max_size=1, no linger, concurrency=1: the drain dispatches item-0 and + # then blocks at the in-flight cap; item-1 fills the queue; items 2 and 3 + # are dropped. + queue = AsyncSpanQueue(max_size=1, linger_ms=0, concurrency=1) queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) await asyncio.sleep(0.02) # let the drain pick up s0 and block @@ -536,6 +537,118 @@ async def always_503(spans: list[Span]) -> None: assert queue.dropped_spans == 1 +class TestAsyncSpanQueueConcurrency: + """Span export should issue multiple batch requests concurrently (bounded), + so per-pod egress isn't capped at one in-flight request — while still + guaranteeing a span's START send completes before its END send. + """ + + async def test_batches_dispatched_concurrently_up_to_bound(self): + current = 0 + max_seen = 0 + lock = asyncio.Lock() + + async def slow_start(spans: list[Span]) -> None: + nonlocal current, max_seen + async with lock: + current += 1 + max_seen = max(max_seen, current) + await asyncio.sleep(0.05) + async with lock: + current -= 1 + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=slow_start) + proc.on_spans_end = AsyncMock() + + # batch_size=1 → each span is its own batch/send; concurrency=4 caps + # simultaneous in-flight sends. + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4) + for i in range(8): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + + await queue.shutdown() + + assert proc.on_spans_start.call_count == 8 + assert 2 <= max_seen <= 4, f"expected bounded concurrency (2..4), saw {max_seen}" + + async def test_concurrency_one_serializes(self): + current = 0 + max_seen = 0 + lock = asyncio.Lock() + + async def slow_start(spans: list[Span]) -> None: + nonlocal current, max_seen + async with lock: + current += 1 + max_seen = max(max_seen, current) + await asyncio.sleep(0.03) + async with lock: + current -= 1 + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=slow_start) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=1) + for i in range(4): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + + await queue.shutdown() + + assert max_seen == 1, f"concurrency=1 must serialize sends, saw {max_seen}" + + async def test_concurrent_is_faster_than_serial(self): + async def slow_start(spans: list[Span]) -> None: + await asyncio.sleep(0.05) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=slow_start) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=8) + for i in range(8): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + + start = time.monotonic() + await queue.shutdown() + elapsed = time.monotonic() - start + + serial = 8 * 0.05 + assert elapsed < serial * 0.5, f"concurrent drain took {elapsed:.3f}s; serial would be {serial:.3f}s" + + async def test_end_waits_for_start_of_same_span(self): + """The per-span ordering invariant: a span's END upsert must not be sent + until its START upsert has completed, even with concurrency enabled.""" + log: list[tuple[str, str]] = [] + + async def on_start(spans: list[Span]) -> None: + log.append(("start_enter", spans[0].id)) + await asyncio.sleep(0.05) + log.append(("start_exit", spans[0].id)) + + async def on_end(spans: list[Span]) -> None: + log.append(("end_enter", spans[0].id)) + await asyncio.sleep(0.01) + log.append(("end_exit", spans[0].id)) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=on_start) + proc.on_spans_end = AsyncMock(side_effect=on_end) + + queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4) + queue.enqueue(SpanEventType.START, _make_span("A"), [proc]) + await asyncio.sleep(0.01) # let the START send begin (and block on sleep) + queue.enqueue(SpanEventType.END, _make_span("A"), [proc]) + + await queue.shutdown() + + # END must not enter until START has exited for the same span. + start_exit = log.index(("start_exit", "A")) + end_enter = log.index(("end_enter", "A")) + assert start_exit < end_enter, f"END began before START completed: {log}" + + class TestAsyncSpanQueueIntegration: async def test_integration_with_async_trace(self): call_log: list[tuple[str, str]] = [] From c6b117e9183d657bdc1aa7b2a6c4c888ed63fd33 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Fri, 29 May 2026 08:59:43 -0700 Subject: [PATCH 2/3] fix(tracing): address greptile review findings - Re-check backpressure before dispatching the END task so a batch carrying both event types can't push _inflight past the concurrency cap (the semaphore was already the hard limit; this tightens the in-flight task bound to match). - Document the retry-ordering caveat directly in _reenqueue: a re-enqueued START goes to the back of the queue and may miss a concurrently-dispatched END's barrier snapshot when retries are enabled (benign at the default max_retries=1). --- src/agentex/lib/core/tracing/span_queue.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index 3df9393ba..a337786bf 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -236,6 +236,10 @@ async def _drain_loop(self) -> None: if starts: self._dispatch(starts, SpanEventType.START) if ends: + # Re-check backpressure before the second dispatch so a batch + # carrying both event types can't push _inflight past the cap. + while len(self._inflight) >= self._concurrency: + await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED) self._dispatch(ends, SpanEventType.END) def _dispatch(self, items: list[_SpanQueueItem], event_type: SpanEventType) -> None: @@ -342,7 +346,14 @@ def _handle_failure( def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: """Put a single failed item back on the queue, scoped to the processor - that failed, with an incremented attempt count.""" + that failed, with an incremented attempt count. + + NOTE: a re-enqueued START goes to the *back* of the queue. If an END + for the same span is dispatched concurrently before this START is picked + up again, the END's barrier snapshot won't contain it, breaking the + START-before-END guarantee for that span. This is benign at the default + ``max_retries=1`` (retries disabled) but must be addressed before + enabling retries by default.""" try: self._queue.put_nowait( _SpanQueueItem( From f0fe810762f945652226a4cf80192021184f0ddf Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Mon, 1 Jun 2026 09:14:17 -0700 Subject: [PATCH 3/3] perf(tracing): lower default span-export concurrency to 3 --- src/agentex/lib/core/tracing/span_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index a337786bf..5afb8c44b 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -25,7 +25,7 @@ # concurrently; issuing one batch at a time caps per-pod egress at ~1/latency. # Sending several concurrently lets a pod keep up with span production under # load. ``1`` restores the old strictly-serial behavior. -_DEFAULT_CONCURRENCY = 8 +_DEFAULT_CONCURRENCY = 3 # HTTP statuses worth retrying at the queue level. These are explicit # backpressure / transient signals; everything else (esp. 401/403/4xx auth and # validation errors) is a permanent failure that re-enqueuing cannot fix. Note