Skip to content

Streams and agents

Lowering Mochi `stream<T>`, stream definitions, `on`-handlers, agent records, and `intent` methods, plus the M:N work-stealing scheduler over minicoro fibers that runs them.

MEP-45 research note 09, Streams and agents

Author: research pass for MEP-45. Date: 2026-05-22 (GMT+7).

Mochi has first-class stream<T> values, stream definitions that declare event types, on handlers that subscribe to events, and agent records with intent methods that wrap a mailbox-style actor model. This note covers their lowering.

1. Surface recap

From examples/v0.4/stream.mochi and examples/v0.5/agent.mochi and the language docs:

stream prices {
  symbol: string
  price: float
  at: time
}

on prices as p where p.symbol == "AAPL" {
  print("AAPL @ ", p.price)
}

agent trader {
  position: int = 0

  intent buy(symbol: string, qty: int) {
    position = position + qty
  }

  intent sell(symbol: string, qty: int) {
    position = position - qty
  }

  on prices as p {
    if p.price > 200 {
      sell(p.symbol, 1)
    }
  }
}

emit prices { symbol: "AAPL", price: 210.5, at: now() }

2. Stream lowering

A stream<T> is a multi-producer multi-consumer broadcast channel plus an immutable schema descriptor. The C representation:

typedef struct mochi_stream__T {
    pthread_mutex_t       lock;        // or platform equivalent
    pthread_cond_t        not_empty;
    mochi_list__T         ring;        // bounded ring buffer
    size_t                head;
    size_t                cap;
    mochi_list__sub__T    subs;        // registered subscribers
    atomic_uint           flags;       // closed?
} mochi_stream__T;

Operations:

mochi_stream__T *mochi_stream__T_new(size_t cap);
void             mochi_stream__T_emit(mochi_stream__T *s, T v);
mochi_sub__T    *mochi_stream__T_subscribe(mochi_stream__T *s, void *env,
                                            void (*cb)(void *env, T v));
void             mochi_stream__T_unsubscribe(mochi_sub__T *sub);
void             mochi_stream__T_close(mochi_stream__T *s);

emit synchronously fans out to every subscriber on a worker thread from the scheduler pool (not the emitter’s thread, so that emit is non-blocking from the caller’s perspective). Backpressure: if the ring is full, emit blocks the caller until a slot frees.

Optionally, a stream can be configured lossy (stream prices @lossy {...}); under backpressure the oldest entry is dropped. Not in v1.

3. on handler lowering

A free-standing on STREAM as x where PRED { BODY } lowers to:

static void mochi_on__0(void *env, struct pkg_Price p) {
    if (!(/* PRED */)) return;
    /* BODY */
}

/* At module init: */
mochi_stream__Price_subscribe(prices, NULL, mochi_on__0);

Module init is generated by the codegen: each module produces a pkg__init(void) function that wires up its on handlers, registers its agents, and runs its top-level let bindings.

The order is: literal bindings -> agent registrations -> on subscriptions. The reason: on handlers may fire as soon as subscribed, so any state they touch must be initialised first.

4. Agent lowering

An agent record lowers to a record type plus a per-agent mailbox:

struct pkg_trader {
    mochi_int position;
    mochi_mailbox *mbox;
};

/* Intents lower to typed messages: */
typedef enum {
    PKG_TRADER_MSG__buy,
    PKG_TRADER_MSG__sell,
} pkg_trader_msg_tag;

typedef struct {
    pkg_trader_msg_tag tag;
    union {
        struct { mochi_str symbol; mochi_int qty; } buy;
        struct { mochi_str symbol; mochi_int qty; } sell;
    } u;
} pkg_trader_msg;

The agent’s “run loop” runs on a dedicated lightweight fiber from the scheduler:

static void pkg_trader__loop(struct pkg_trader *self) {
    for (;;) {
        pkg_trader_msg m;
        if (!mochi_mailbox_recv(self->mbox, &m, sizeof m)) break; // closed
        switch (m.tag) {
        case PKG_TRADER_MSG__buy:
            self->position = self->position + m.u.buy.qty;
            break;
        case PKG_TRADER_MSG__sell:
            self->position = self->position - m.u.sell.qty;
            break;
        }
    }
}

Invoking t.buy("AAPL", 10) from any thread translates to a send on the mailbox; the agent processes it in order. This matches the Erlang/Akka actor model. The intent call returns immediately (fire- and-forget); a intent fn variant that returns a value is a synchronous request/reply, implemented as a one-shot reply channel.

4.1 Agent on handlers

When an agent body contains on STREAM as p { BODY }, the subscription’s callback enqueues a synthetic message onto the agent’s mailbox; the agent thread processes it. This means stream callbacks do not race with intent handlers on the same agent. The agent’s loop is the single-writer for its state.

4.2 Agent fields are private

The lowering treats agent fields as accessible only from inside the agent body (intent methods and on handlers). The codegen rejects t.position = 5 outside the agent. Reads t.position from outside go through a generated atomic getter that snapshots the field under a lock, since the agent loop may write at any time.

5. The scheduler

Architecture: M:N work-stealing scheduler.

  • M worker threads, M = mochi_runtime_workers() (default: number of hardware threads).
  • Each worker has a local FIFO deque of runnable fibers.
  • Idle workers steal from peers (Chase-Lev deque).
  • Blocking syscalls (file I/O, sleep) execute on an overflow pool of threads created on demand, freeing the worker.

Fiber backend: minicoro (note 04 §2.1).

Reference: Go runtime scheduler (Pike, Cox); Tokio worker pool (Lerche). We do not implement Go’s preemption (signal-based) in v1; tight loops that never await or block can monopolise a worker. The MEP body should note this trade.

5.1 Fiber lifecycle

  • spawn creates a new fiber with a 32 KB initial stack (grows on page fault if the OS allows; otherwise a fixed 1 MB).
  • A fiber holds its captured env on the heap.
  • On exit, the fiber’s stack and env are released to the allocator.

5.2 Scheduling fairness

We use a per-worker run queue with a global overflow queue checked every N steps to prevent starvation. N defaults to 61 (a prime, to break resonance with periodic patterns).

6. Time and ticks

The runtime advances mochi_time from the OS monotonic clock. There is no virtual time in v1. Tests that need deterministic time can inject a mochi_time_provider via --test-clock=fixed:2026-05-22T00:00:00Z.

7. emit and back-pressure

emit stream { ... } synchronously inserts into the stream’s ring. If the ring is full, the caller blocks (this is the v1 default; it preserves the “no dropped events” property). A @lossy annotation on the stream changes this to drop-oldest.

8. Stream from query

from p in prices ... select ... against a stream produces another stream, not a list, when the result type is stream<T>. The implementation:

mochi_stream__Out *out = mochi_stream__Out_new(16);
mochi_stream__Price_subscribe(prices, out, fwd_cb);

where fwd_cb runs the predicates and projections and emits to out. This is the “compose streams” pattern.

limit n on a stream completes the result stream after n items and unsubscribes from the source.

9. Channels

The language docs surface channels indirectly through the stream API. The runtime exposes channels too:

mochi_chan__T *mochi_chan__T_new(size_t cap);
bool           mochi_chan__T_send(mochi_chan__T *c, T v);   // blocks if full
bool           mochi_chan__T_recv(mochi_chan__T *c, T *v);  // blocks if empty
void           mochi_chan__T_close(mochi_chan__T *c);

The difference from streams: a channel is point-to-point (one consumer per message), while a stream broadcasts to all subscribers. Internally, a stream is a list of channels + a fanout fiber.

10. Shutdown

On main’s return:

  1. Close all streams (mochi_stream__T_close) so subscribers see end-of-stream.
  2. Join all agent fibers (mochi_mailbox_close then wait).
  3. Drain pending I/O.
  4. Run any defer blocks in reverse order.
  5. Free GC roots and exit.

Step ordering is documented in the MEP body so user code can rely on it (in particular: print from agent shutdown handlers is flushed).

11. Open questions

  1. Whether emit is synchronous or asynchronous from the caller’s perspective when the ring has space. (Spec is ambiguous; v1 will be sync.)
  2. Whether agent state can be inspected from outside (atomic snapshot) or only via intent methods.
  3. Whether on STREAM inside agents and at module top-level have the same execution-thread semantics.
  4. Whether stream subscriptions are weak (auto-unsubscribe on subscriber GC) or strong (manual unsubscribe).
  5. Whether the scheduler should preempt CPU-bound fibers (Go-style signal preemption) in v1. Current answer: no.