Skip to content

Streaming Internals

ProxAI treats streaming as a byte-carrier problem plus protocol-specific event observation. The upstream SSE bytes are preserved, while provider observers scan events, maintain state, and emit compact diagnostics.

  1. 1pipeline/upstream_response.rsDetects successful SSE responses and dispatches to provider streaming handling.
  2. 2provider/<protocol>/response/streaming.rsConstructs the protocol-specific BodyObserver and returns an outbound streaming body.
  3. 3upstream/streaming.rsWraps reqwest bytes_stream, records metrics, invokes observer hooks, and enforces idle timeout.

translation/ does not perform provider-local stream observation. Provider streaming observation stays inside provider/; cross-protocol stream translation stays in translation/streaming.

pub(crate) trait BodyObserver: Send + Unpin + 'static {
fn on_chunk(&mut self, _chunk: &[u8]) -> BodyAction;
fn on_stream_error(&mut self, error: &reqwest::Error);
fn poll_pending_action(&mut self, _cx: &mut Context<'_>) -> BodyAction;
fn on_stream_finished(&self, head: &UpstreamResponseHead, stats: UpstreamBodyStreamStats);
}
  1. 1Pull bytesRead the next chunk from reqwest response bytes_stream().
  2. 2Record carrier metricsUpdate common upstream stream byte/chunk stats.
  3. 3Scan SSE eventsUse SseEventScanner once per chunk inside the provider observer.
  4. 4Update protocol stateFeed parsed events into the provider state machine.
  5. 5Maybe interveneMost chunks continue unchanged; rare semantic failures inject an SSE error and close.
  6. 6Finish snapshotOn EOF/error/timeout/drop, emit a compact stream outcome snapshot.

The three provider protocols now follow the same structure:

FileResponsibility
streaming.rsCarrier hook implementation and lifecycle checks
state.rsProtocol event state and summary/outcome projection
SseEventScannerChunk-to-event scanning, held by the observer
  1. 1
    response.created

    Start of a Responses stream and initial response metadata.

  2. 2
    response.output_item.added

    A new output item such as message, reasoning, function_call, or MCP item.

  3. 3
    response.output_text.delta / response.function_call_arguments.delta

    Incremental text or tool-call argument bytes.

  4. 4
    response.output_text.done / response.function_call_arguments.done

    Semantic completion of a content part or tool-call arguments.

  5. 5
    response.output_item.done

    Output item is complete.

  6. 6
    response.completed

    Terminal event for a complete stream.

OpenAI Responses has extra tool-call semantics. If tool-call arguments start but never finish, ProxAI can inject a Responses-style SSE error event and close the stream.

  1. 1
    chat.completion.chunk

    Delta chunk containing role/content/tool_calls/finish_reason updates.

  2. 2
    [DONE]

    Terminal sentinel. EOF without this sentinel is treated as a closed/incomplete stream.

  1. 1
    message_start

    Starts an Anthropic message stream.

  2. 2
    content_block_start

    Starts a text, thinking, tool_use, or other content block.

  3. 3
    content_block_delta

    Incremental content for the current block.

  4. 4
    content_block_stop

    Completes a content block.

  5. 5
    message_delta

    Carries stop_reason, stop_sequence, usage, and other message-level deltas.

  6. 6
    message_stop

    Terminal event for a complete Anthropic stream.

Cross-protocol streaming translation uses a protocol-neutral four-phase carrier in translation/streaming.rs:

PhaseMeaning
`Waiting`No semantic source message/chunk has initialized the translated stream yet.
`Streaming(StreamingPhase<S>)`Source deltas are active; pair-private state and target-representable output tracking live in StreamingPhase<S>.
`Terminal(T)`The source protocol has emitted its semantic terminal signal, but the carrier/source stop has not been fully consumed.
`Stopped`The translator has emitted its final target stream output.

Terminal(T) is intentionally generic because source protocols terminate differently:

Inbound sourceTerminal payloadReason
Anthropic MessagesStreamingPhase<S>message_delta carries terminal stop/usage semantics, but message_stop is still required. The translator keeps the full phase until message_stop consumes it.
OpenAI Chat Completionspair-specific Tfinish_reason ends semantic deltas. Later [DONE], EOF, or usage-only chunks need only a target-specific terminal summary, such as ChatTerminalState for Anthropic output or () for Responses output.

Protocol wrappers such as anthropic_messages::streaming and openai_chat_completions::streaming keep source-specific event ordering and error wording. Pair translators remain responsible for target representability checks, output item/block emission, and terminal flush behavior.

Streaming identity (StreamIdentity) belongs to the protocol-neutral inbound lifecycle carrier, while source-protocol wrappers decide when to initialize or validate it.

Inbound sourceWhere identity comes fromWhy it is handled this way
Anthropic Messagesmessage_start.message.id and message_start.message.modelAnthropic emits identity once at the explicit message_start semantic boundary. Later content_block_*, message_delta, and message_stop events do not repeat id/model, so the Anthropic wrapper initializes lifecycle identity at message_start and then relies on lifecycle ordering to prevent duplicate starts.
OpenAI Chat CompletionsEvery chat.completion.chunk repeats id and modelChat has no separate message_start event. The first semantic chunk initializes lifecycle identity, and later chunks are checked against it so pieces from different upstream responses are never merged into one translated message/response.

InboundStreamLifecycle stores identity outside the phase enum because identity is stream-envelope metadata that remains stable across Streaming, Terminal, and Stopped. Pair-local stream state should keep only target-conversion state or derived identifiers, not another copy of the full source identity.

pub(crate) enum BodyAction {
Continue,
InjectAndClose(Bytes),
}

Most observers return Continue. InjectAndClose is reserved for cases where ProxAI can produce a better client-facing stream failure than silently hanging or closing without context.

Each translation direction’s streaming implementation (e.g. anthropic_messages -> openai_responses) is split into three submodules under a streaming/ directory:

streaming/
├── mod.rs # translator: drives state + calls output, owns lifecycle and timing
├── state.rs # accumulation state machine: fields + register/append/stop/snapshot
└── output.rs # pure constructors: take data, return events; hold no streaming state
  • state only accumulates and queries state; it does not know about the event loop.
  • output only constructs events/items; it holds no streaming state (no &mut State).
  • mod drives state, calls output, and manages lifecycle.

Rule of thumb: if a function needs &mut State (advance counter, push items, read accumulated fields), it is state behavior and stays in state.rs; if it only takes data and returns an event, it is output construction and belongs in output.rs.

When state and output share the same internal data structure, a streaming/types.rs is the neutral home to avoid state ↔ output circular dependencies.

Whether it is needed depends on the accumulation model:

PairAccumulation modelFinalize modelNeeds types.rs?
ant → responseseach content_block accumulates independentlystop_block returns owned data; output consumes itNo (StreamBlock flows through output once, move semantics)
chat → responseswhole stream accumulates text/tool itemsfinish_completed_stream is a state method that reads state fields multiple timesYes (StreamTextItem/StreamToolItem are held by state and read repeatedly by output)

Root cause: Anthropic’s accumulation unit is the content_block (explicit open/close lifecycle, moved away on stop), while Chat’s is the whole stream (finalized all at once at stream end, so state must retain data for repeated reads and terminal snapshot). The latter “state holds + output reads repeatedly” pattern naturally requires a neutral home for shared types.

InboundStreamLifecycle<S, T> has two type parameters: S is the streaming-phase state type, T is the terminal-phase state type. They may be the same or different, depending on how much data the terminal events need.

PairS (streaming)T (terminal)Why
chat → antChatStreamingStateChatTerminalState (lightweight snapshot)Terminal events are constructed on the spot by the translator; only finish_reason + refusal are needed. All blocks were closed during streaming.
chat → responsesStreamingStateStreamingState (same type)Terminal events (*.done × N + response.completed) need the full accumulated data (text/tool items/usage), constructed by the state method finish_completed_stream.

Root cause is the timing and data requirements of terminal event construction:

  • Anthropic target has an explicit block lifecycle (content_block_stop); each block is closed and emitted during streaming, so by finish_reason only a few snapshot values remain — a lightweight terminal type suffices.
  • Responses target has no block-close equivalent; text deltas and tool arguments stream until finish_reason, when *.done + response.completed are emitted all at once. Data must stay in state until the end, so terminal must be the full streaming state.