Lowering Mochi `stream<T>`, stream definitions, `on`-handlers, agent records, and `intent` methods, plus the M:N work-stealing scheduler over minicoro fibers that runs them.
MEP-45 research note 09, Streams and agents
Author: research pass for MEP-45. Date: 2026-05-22 (GMT+7).
Mochi has first-class stream<T> values, stream definitions that
declare event types, on handlers that subscribe to events, and
agent records with intent methods that wrap a mailbox-style actor
model. This note covers their lowering.
1. Surface recap
From examples/v0.4/stream.mochi and examples/v0.5/agent.mochi and
the language docs:
stream prices {
symbol: string
price: float
at: time
}
on prices as p where p.symbol == "AAPL" {
print("AAPL @ ", p.price)
}
agent trader {
position: int = 0
intent buy(symbol: string, qty: int) {
position = position + qty
}
intent sell(symbol: string, qty: int) {
position = position - qty
}
on prices as p {
if p.price > 200 {
sell(p.symbol, 1)
}
}
}
emit prices { symbol: "AAPL", price: 210.5, at: now() }2. Stream lowering
A stream<T> is a multi-producer multi-consumer broadcast channel
plus an immutable schema descriptor. The C representation:
typedef struct mochi_stream__T {
pthread_mutex_t lock; // or platform equivalent
pthread_cond_t not_empty;
mochi_list__T ring; // bounded ring buffer
size_t head;
size_t cap;
mochi_list__sub__T subs; // registered subscribers
atomic_uint flags; // closed?
} mochi_stream__T;Operations:
mochi_stream__T *mochi_stream__T_new(size_t cap);
void mochi_stream__T_emit(mochi_stream__T *s, T v);
mochi_sub__T *mochi_stream__T_subscribe(mochi_stream__T *s, void *env,
void (*cb)(void *env, T v));
void mochi_stream__T_unsubscribe(mochi_sub__T *sub);
void mochi_stream__T_close(mochi_stream__T *s);emit synchronously fans out to every subscriber on a worker thread
from the scheduler pool (not the emitter’s thread, so that emit is
non-blocking from the caller’s perspective). Backpressure: if the ring
is full, emit blocks the caller until a slot frees.
Optionally, a stream can be configured lossy (stream prices @lossy {...}); under backpressure the oldest entry is dropped. Not in v1.
3. on handler lowering
A free-standing on STREAM as x where PRED { BODY } lowers to:
static void mochi_on__0(void *env, struct pkg_Price p) {
if (!(/* PRED */)) return;
/* BODY */
}
/* At module init: */
mochi_stream__Price_subscribe(prices, NULL, mochi_on__0);Module init is generated by the codegen: each module produces a
pkg__init(void) function that wires up its on handlers, registers
its agents, and runs its top-level let bindings.
The order is: literal bindings -> agent registrations -> on
subscriptions. The reason: on handlers may fire as soon as
subscribed, so any state they touch must be initialised first.
4. Agent lowering
An agent record lowers to a record type plus a per-agent mailbox:
struct pkg_trader {
mochi_int position;
mochi_mailbox *mbox;
};
/* Intents lower to typed messages: */
typedef enum {
PKG_TRADER_MSG__buy,
PKG_TRADER_MSG__sell,
} pkg_trader_msg_tag;
typedef struct {
pkg_trader_msg_tag tag;
union {
struct { mochi_str symbol; mochi_int qty; } buy;
struct { mochi_str symbol; mochi_int qty; } sell;
} u;
} pkg_trader_msg;The agent’s “run loop” runs on a dedicated lightweight fiber from the scheduler:
static void pkg_trader__loop(struct pkg_trader *self) {
for (;;) {
pkg_trader_msg m;
if (!mochi_mailbox_recv(self->mbox, &m, sizeof m)) break; // closed
switch (m.tag) {
case PKG_TRADER_MSG__buy:
self->position = self->position + m.u.buy.qty;
break;
case PKG_TRADER_MSG__sell:
self->position = self->position - m.u.sell.qty;
break;
}
}
}Invoking t.buy("AAPL", 10) from any thread translates to a send
on the mailbox; the agent processes it in order. This matches the
Erlang/Akka actor model. The intent call returns immediately (fire-
and-forget); a intent fn variant that returns a value is a
synchronous request/reply, implemented as a one-shot reply channel.
4.1 Agent on handlers
When an agent body contains on STREAM as p { BODY }, the
subscription’s callback enqueues a synthetic message onto the agent’s
mailbox; the agent thread processes it. This means stream callbacks
do not race with intent handlers on the same agent. The agent’s loop
is the single-writer for its state.
4.2 Agent fields are private
The lowering treats agent fields as accessible only from inside the
agent body (intent methods and on handlers). The codegen rejects
t.position = 5 outside the agent. Reads t.position from outside
go through a generated atomic getter that snapshots the field under
a lock, since the agent loop may write at any time.
5. The scheduler
Architecture: M:N work-stealing scheduler.
- M worker threads, M =
mochi_runtime_workers()(default: number of hardware threads). - Each worker has a local FIFO deque of runnable fibers.
- Idle workers steal from peers (Chase-Lev deque).
- Blocking syscalls (file I/O, sleep) execute on an overflow pool of threads created on demand, freeing the worker.
Fiber backend: minicoro (note 04 §2.1).
Reference: Go runtime scheduler (Pike, Cox); Tokio worker pool (Lerche).
We do not implement Go’s preemption (signal-based) in v1; tight loops
that never await or block can monopolise a worker. The MEP body should
note this trade.
5.1 Fiber lifecycle
spawncreates a new fiber with a 32 KB initial stack (grows on page fault if the OS allows; otherwise a fixed 1 MB).- A fiber holds its captured env on the heap.
- On exit, the fiber’s stack and env are released to the allocator.
5.2 Scheduling fairness
We use a per-worker run queue with a global overflow queue checked every N steps to prevent starvation. N defaults to 61 (a prime, to break resonance with periodic patterns).
6. Time and ticks
The runtime advances mochi_time from the OS monotonic clock. There
is no virtual time in v1. Tests that need deterministic time can
inject a mochi_time_provider via --test-clock=fixed:2026-05-22T00:00:00Z.
7. emit and back-pressure
emit stream { ... } synchronously inserts into the stream’s ring. If
the ring is full, the caller blocks (this is the v1 default; it
preserves the “no dropped events” property). A @lossy annotation
on the stream changes this to drop-oldest.
8. Stream from query
from p in prices ... select ... against a stream produces another
stream, not a list, when the result type is stream<T>. The
implementation:
mochi_stream__Out *out = mochi_stream__Out_new(16);
mochi_stream__Price_subscribe(prices, out, fwd_cb);where fwd_cb runs the predicates and projections and emits to
out. This is the “compose streams” pattern.
limit n on a stream completes the result stream after n items and
unsubscribes from the source.
9. Channels
The language docs surface channels indirectly through the
stream API. The runtime exposes channels too:
mochi_chan__T *mochi_chan__T_new(size_t cap);
bool mochi_chan__T_send(mochi_chan__T *c, T v); // blocks if full
bool mochi_chan__T_recv(mochi_chan__T *c, T *v); // blocks if empty
void mochi_chan__T_close(mochi_chan__T *c);The difference from streams: a channel is point-to-point (one consumer per message), while a stream broadcasts to all subscribers. Internally, a stream is a list of channels + a fanout fiber.
10. Shutdown
On main’s return:
- Close all streams (
mochi_stream__T_close) so subscribers see end-of-stream. - Join all agent fibers (
mochi_mailbox_closethen wait). - Drain pending I/O.
- Run any
deferblocks in reverse order. - Free GC roots and exit.
Step ordering is documented in the MEP body so user code can rely on
it (in particular: print from agent shutdown handlers is flushed).
11. Open questions
- Whether
emitis synchronous or asynchronous from the caller’s perspective when the ring has space. (Spec is ambiguous; v1 will be sync.) - Whether agent state can be inspected from outside (atomic snapshot) or only via intent methods.
- Whether
on STREAMinside agents and at module top-level have the same execution-thread semantics. - Whether stream subscriptions are weak (auto-unsubscribe on
subscriber GC) or strong (manual
unsubscribe). - Whether the scheduler should preempt CPU-bound fibers (Go-style signal preemption) in v1. Current answer: no.