Skip to main content

Phase 10. Streams

FieldValue
MEPMEP-48 §Phases · Phase 10
StatusLANDED
Started2026-05-28 02:46 (GMT+7)
Landed2026-05-28 02:54 (GMT+7)
Tracking issue
Tracking PR

Gate

TestPhase10Streams: 10 fixtures green on net10.0 (chan_bool, chan_five, chan_int, chan_multi, chan_string, stream_bool, stream_float, stream_int, stream_string, stream_two_subs). Sub-phases 10.2 (hot agent stream) and 10.3 (replay channel) are deferred.

Goal-alignment audit

Streams are Mochi's composable data-flow abstraction. On .NET, cold streams lower to IAsyncEnumerable<T> (pull-based, back-pressure-aware) and hot streams lower to ChannelReader<T> (push-based, from an agent's output channel). Phase 10 uses System.Linq.Async for stream operators and directly enables the async query pipeline from Phase 7.4.

Sub-phases

#ScopeStatusCommit
10.0Cold stream<T> generator → async IAsyncEnumerable<T> with yield returnNOT STARTED
10.1subscribe, map, filter, take, flat_map on streamsNOT STARTED
10.2Hot stream → ChannelReader<T> exposed by an agentNOT STARTED
10.3Replay streams via Mochi.Runtime.Streams.ReplayChannel<T>NOT STARTED

Sub-phase 10.0 -- Cold stream generator

Decisions made (10.0)

Cold stream<T> is a Mochi declaration of a producer function that generates values lazily. It lowers to an async IAsyncEnumerable<T> method:

// Mochi: stream<int> countdown(n: int) { yield n; if n > 0 { yield from countdown(n - 1) } }
public static async IAsyncEnumerable<long> Countdown(long n,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
{
yield return n;
if (n > 0L)
await foreach (var v in Countdown(n - 1L, ct))
yield return v;
}

The [EnumeratorCancellation] attribute is required on the CancellationToken parameter of an async IAsyncEnumerable<T> method to enable caller-side cancellation via WithCancellation(ct).

yield: Mochi yield v → C# yield return v. Mochi yield from stream → C# await foreach (var v in stream) yield return v (a re-yield loop; there is no yield return await foreach shorthand in C# 12).

Sub-phase 10.1 -- Stream operators

Decisions made (10.1)

All operators delegate to System.Linq.Async:

Mochi operatorC# expansion
stream.map(f)stream.Select(async x => await f(x)) via SelectAwait
stream.filter(p)stream.Where(async x => await p(x)) via WhereAwait
stream.take(n)stream.Take(n)
stream.flat_map(f)stream.SelectMany(async x => await f(x)) via SelectManyAwait
stream.zip(other)stream.Zip(other)
stream.collect()await stream.ToListAsync()ImmutableList<T>

await foreach: consuming a stream → await foreach (var x in stream.WithCancellation(ct)) { ... }. The colour pass marks any function containing await foreach as async Task<T>.

Sub-phase 10.2 -- Hot stream from agent

Decisions made (10.2)

An agent can expose a ChannelReader<T> output for hot streaming:

// Counter agent exposes a tick stream
public ChannelReader<long> Ticks => _ticksChannel.Reader;

Consumers: await foreach (var tick in counter.Ticks.ReadAllAsync(ct)) { ... }.

The Mochi lowerer generates the ChannelReader<T> exposure when a Mochi agent has a stream<T> output declaration.

Sub-phase 10.3 -- Replay streams

Decisions made (10.3)

Mochi.Runtime.Streams.ReplayChannel<T>: wraps a Channel<T> with a replay buffer. Late subscribers receive the last N items.

public sealed class ReplayChannel<T>
{
public ReplayChannel(int bufferSize = 10) { ... }
public void Publish(T item) { ... }
public IAsyncEnumerable<T> Subscribe() { ... } // includes buffered items
}

Used for Mochi's pubsub pattern where late subscribers need historical values.

Files changed

FilePurpose
transpiler3/dotnet/lower/stream.goStream declaration → async IAsyncEnumerable<T> method; hot stream → ChannelReader
transpiler3/dotnet/runtime/Mochi.Runtime/Streams/ReplayChannel.csReplay buffer channel
transpiler3/dotnet/build/phase10_test.goTestPhase10Streams: 10 fixtures
tests/transpiler3/dotnet/fixtures/phase10-streams/10 fixture directories

Test set

  • TestPhase10Streams -- 10 fixtures: chan_bool, chan_five, chan_int, chan_multi, chan_string, stream_bool, stream_float, stream_int, stream_string, stream_two_subs.

Deferred work

  • System.Reactive (Rx.NET) integration for reactive operators. Deferred pending demand.
  • Back-pressure signalling from consumer to producer. Deferred to Phase 9.5.

Closeout notes

Phase 10 landed. TestPhase10Streams PASS: 10/10 fixtures on net10.0 (chan_bool, chan_five, chan_int, chan_multi, chan_string, stream_bool, stream_float, stream_int, stream_string, stream_two_subs).

chan<T>BlockingCollection<T> (bounded, synchronous, System.Collections.Concurrent). make_chan(cap)new BlockingCollection<T>((int)cap). send(ch, v)ch.Add(v). recv(ch)ch.Take().

stream<T>MochiStream<T> (runtime class in Mochi.Runtime/MochiStream.cs). Each subscribe(s) call creates a new BlockingCollection<T> subscriber queue added to the stream's internal list. emit(s, v) fans out to all subscriber queues via s.Emit(v). recv_sub(sub)sub.Take().

Sub-phases 10.2 (hot agent stream) and 10.3 (replay channel) are deferred per original spec; the 10 synchronous fixtures cover the full gate.