Skip to main content

Phase 10. Streams (MochiStream, synchronous)

FieldValue
MEPMEP-49 §Phases · Phase 10
StatusLANDED
Started2026-05-28 13:40 (GMT+7)
Landed2026-05-28 13:54 (GMT+7)
Tracking issue
Tracking PR

Gate

TestPhase10Streams: 7 fixtures green on Swift 6.0+, macOS 15. Gate builds each fixture and compares stdout to .expected.

Goal-alignment audit

The v1 stream gate uses synchronous fixtures inherited from the BEAM backend: all emissions happen before any receive, so there is no need for async scheduling. MochiStream<T> is an MPMC broadcast class: emit appends to each subscriber's buffer; MochiSub<T>.recv() reads from that buffer in order. This maps exactly onto the fixture semantics and keeps the generated code readable without async/await. The AsyncStream<T>-based pattern is deferred to a future sub-MEP.

Sub-phases

#ScopeStatusCommit
10.0StreamMakeExprMochiStream<T>(capacity:); StreamSubExpr.subscribe()LANDEDmep/0049-phase-10
10.1StreamEmitStmtstream.emit(value)LANDEDmep/0049-phase-10
10.2StreamRecvExprsub.recv()LANDEDmep/0049-phase-10
10.3AsyncStream<T>, for await, debounce/throttle via swift-async-algorithmsDEFERRED
10.4stream.map, filter, merge, zip operatorsDEFERRED

Sub-phase 10.0 -- Stream and subscriber creation

Decisions made (10.0)

MochiStream<T>: the runtime class in Stream.swift. Holds a list of MochiSub<T> subscribers. Created as:

let s = MochiStream<Int64>(capacity: Int64(0))

The capacity parameter is accepted but currently unused (retained for API compatibility with future bounded-buffer variants).

MochiSub<T>: created by calling stream.subscribe(). Holds an internal buffer: [T] array and a readIndex. recv() returns buffer[readIndex] and advances readIndex.

Type mapping: stream<int>MochiStream<Int64>, stream<string>MochiStream<String>, stream<bool>MochiStream<Bool>, stream<float>MochiStream<Double>.

Sub-phase 10.1 -- Emit

Decisions made (10.1)

StreamEmitStmt: the IR node for emitting a value into a stream. The lowerer emits:

s.emit(value)

Each subscriber's buffer receives the value.

Sub-phase 10.2 -- Receive

Decisions made (10.2)

StreamRecvExpr: the IR node for receiving the next value from a subscriber. The lowerer emits:

sub.recv()

recv() is a synchronous call; it panics (index out of bounds) if called when the buffer is empty, matching the semantics of the fixture suite where all emissions precede all receives.

Files changed

FilePurpose
transpiler3/swift/lower/lower.goStreamMakeExpr, SubMakeExpr, SubRecvExpr, StreamEmitStmt lowering
transpiler3/swift/runtime/Sources/MochiRuntime/Stream.swiftMochiStream<T> and MochiSub<T> classes
transpiler3/swift/build/phase10_test.goTestPhase10Streams: 7 fixtures
tests/transpiler3/swift/fixtures/phase10-streams/7 fixture directories

Test set

  • TestPhase10Streams -- 7 fixtures: stream_bool, stream_emit_after_sub, stream_float, stream_int, stream_loop, stream_multi_sub, stream_string.

Deferred work

  • AsyncStream<T> + for await consumption. Deferred to Phase 10.3.
  • stream.map(f), stream.filter(p), stream.flat_map(f). Deferred to Phase 10.4.
  • stream.debounce(interval), stream.throttle(interval) via swift-async-algorithms. Deferred.
  • stream.merge(s2), stream.zip(s2) via swift-async-algorithms. Deferred.
  • Multi-producer AsyncChannel<T> from swift-async-algorithms. Deferred.