Skip to main content

Streams

A stream is a typed event channel. The declaration looks like a struct, and from then on the program can emit events of that type. Anything with a matching on handler runs in response, typically an agent but also a top-level handler.

Streams give events a name, a shape, and a place in the type system. They are the bus that connects agents.

Declaring a stream

stream Sensor { id: string, temperature: float }

The fields look like a struct's. The difference is in usage. Streams are constructed and emitted, not stored.

A program may declare any number of streams:

stream Click { user_id: int, target: string }
stream Purchase { user_id: int, sku: string, total: float }
stream Error { code: int, message: string }

Emitting events

emit <Stream> { <fields> } produces an event. Every matching handler runs.

emit Sensor { id: "s-01", temperature: 22.5 }

Field assignments use the same syntax as struct literals. Order does not matter; every required field must be present.

emit is a statement. It does not return a value, and it cannot appear inside a value expression. To fan out to many listeners, emit repeatedly:

for s in pending_sensors {
emit Sensor { id: s.id, temperature: s.temp }
}

Top-level handlers

Outside an agent, a free-standing on handler runs every time the named stream emits.

stream Greeting { name: string }

on Greeting as g {
print("hello, " + g.name)
}

emit Greeting { name: "ada" }
emit Greeting { name: "lin" }
hello, ada
hello, lin

Top-level handlers suit small scripts. For state, prefer an agent.

Handlers inside agents

Most production code uses agents as the host for on handlers, because the agent provides somewhere to keep state.

stream Sensor { id: string, temp: float }

agent monitor {
var max: float = 0.0

on Sensor as s {
if s.temp > max { max = s.temp }
}
}

See agents for the full handler surface.

Filtering with where

Handlers accept a guard with where <expression>:

on Sensor as s where s.temp > 30.0 {
print("hot!", s.id, s.temp)
}

The body runs only when the guard is true. The compiled dispatch matches what an explicit if would produce, with the predicate kept next to the handler signature.

Stream pipelines

A program often reads input from one stream and emits to another after transforming. Mochi has no special pipeline operator for streams. Write an on handler that emits.

stream Raw { body: string }
stream Parsed { record: User }

agent ingest {
on Raw as r {
let user = parse_user(r.body)
if user != nil {
emit Parsed { record: user }
}
}
}

Pipelines fan out (one handler emits multiple downstream events) or fan in (multiple upstream emits trigger one downstream).

Backpressure and ordering

Within a single agent, handlers run serially in emit order. There is no implicit concurrency. A handler that does slow work blocks subsequent handlers on the same instance.

Across agents, handlers may run concurrently. Mochi does not currently expose a backpressure protocol. A producer faster than its consumer queues events.

Lifecycle of a stream

A stream has no lifecycle of its own; it is a type. emit is synchronous. By the time emit returns, every handler has been scheduled. The handlers themselves run on Mochi's runtime scheduler.

Stream types as values

Streams are types. They appear in function signatures, lists, and other positions where a type is expected:

fun describe(s: Sensor): string {
return s.id + ": " + str(s.temperature)
}

let buffered: list<Sensor> = []

A list<Sensor> is a list of values that share a stream declaration. Storing them in a list does not emit anything; only emit does.

External producers

A real program often pulls events from outside Mochi: an HTTP endpoint, a file watcher, a Kafka topic. Bridge them into a stream by calling emit from a normal function or agent.

fun bridge_kafka(topic: string) {
for record in kafka.read(topic) {
emit Raw { body: record.value }
}
}

There is no special API; any code path that calls emit participates in dispatch.

Common patterns

Logging tap

on Sensor as s {
log_line(s.id + " " + str(s.temperature))
}

A free-standing on handler is a small tap that runs alongside other handlers. Useful for logging, tracing, and metrics.

Replay

fun replay(events: list<Sensor>) {
for e in events {
emit Sensor { id: e.id, temperature: e.temperature }
}
}

Replaying a saved list of events is a for loop with emit. No special replay machinery is needed.

Multiplexing

stream Click { user: string, target: string }

agent metrics {
var by_target: map<string, int> = {}

on Click as c {
let count = by_target[c.target] ?? 0
by_target[c.target] = count + 1
}
}

Testing streams

The simplest test setup instantiates the agent under test, emits a sequence of events, and asserts via the agent's intents.

test "monitor tracks max temperature" {
let m = monitor {}
emit Sensor { id: "a", temp: 22.5 }
emit Sensor { id: "b", temp: 31.0 }
expect m.peak() == 31.0
}

Common errors

MessageCauseFix
cannot emit non-stream typeemit on a struct that lacks streamReplace type with stream, or wrap the value in a stream type.
unknown stream <Name>Typo in on handlerCheck the stream declaration.
handler must specify a bindingon Sensor { ... } without as <name>Add as <name> to bind the event.

See also