# Streams and agents

# MEP-45 research note 09, Streams and agents

Author: research pass for MEP-45.
Date: 2026-05-22 (GMT+7).

Mochi has first-class `stream<T>` values, `stream` definitions that
declare event types, `on` handlers that subscribe to events, and
`agent` records with `intent` methods that wrap a mailbox-style actor
model. This note covers their lowering.

## 1. Surface recap

From `examples/v0.4/stream.mochi` and `examples/v0.5/agent.mochi` and
the language docs:

```mochi
stream prices {
  symbol: string
  price: float
  at: time
}

on prices as p where p.symbol == "AAPL" {
  print("AAPL @ ", p.price)
}

agent trader {
  position: int = 0

  intent buy(symbol: string, qty: int) {
    position = position + qty
  }

  intent sell(symbol: string, qty: int) {
    position = position - qty
  }

  on prices as p {
    if p.price > 200 {
      sell(p.symbol, 1)
    }
  }
}

emit prices { symbol: "AAPL", price: 210.5, at: now() }
```

## 2. Stream lowering

A `stream<T>` is a multi-producer multi-consumer broadcast channel
plus an immutable schema descriptor. The C representation:

```c
typedef struct mochi_stream__T {
    pthread_mutex_t       lock;        // or platform equivalent
    pthread_cond_t        not_empty;
    mochi_list__T         ring;        // bounded ring buffer
    size_t                head;
    size_t                cap;
    mochi_list__sub__T    subs;        // registered subscribers
    atomic_uint           flags;       // closed?
} mochi_stream__T;
```

Operations:

```c
mochi_stream__T *mochi_stream__T_new(size_t cap);
void             mochi_stream__T_emit(mochi_stream__T *s, T v);
mochi_sub__T    *mochi_stream__T_subscribe(mochi_stream__T *s, void *env,
                                            void (*cb)(void *env, T v));
void             mochi_stream__T_unsubscribe(mochi_sub__T *sub);
void             mochi_stream__T_close(mochi_stream__T *s);
```

`emit` synchronously fans out to every subscriber on a worker thread
from the scheduler pool (not the emitter's thread, so that emit is
non-blocking from the caller's perspective). Backpressure: if the ring
is full, emit blocks the caller until a slot frees.

Optionally, a stream can be configured **lossy** (`stream prices @lossy
{...}`); under backpressure the oldest entry is dropped. Not in v1.

## 3. `on` handler lowering

A free-standing `on STREAM as x where PRED { BODY }` lowers to:

```c
static void mochi_on__0(void *env, struct pkg_Price p) {
    if (!(/* PRED */)) return;
    /* BODY */
}

/* At module init: */
mochi_stream__Price_subscribe(prices, NULL, mochi_on__0);
```

Module init is generated by the codegen: each module produces a
`pkg__init(void)` function that wires up its `on` handlers, registers
its agents, and runs its top-level `let` bindings.

The order is: literal bindings -> agent registrations -> on
subscriptions. The reason: `on` handlers may fire as soon as
subscribed, so any state they touch must be initialised first.

## 4. Agent lowering

An `agent` record lowers to a record type plus a per-agent mailbox:

```c
struct pkg_trader {
    mochi_int position;
    mochi_mailbox *mbox;
};

/* Intents lower to typed messages: */
typedef enum {
    PKG_TRADER_MSG__buy,
    PKG_TRADER_MSG__sell,
} pkg_trader_msg_tag;

typedef struct {
    pkg_trader_msg_tag tag;
    union {
        struct { mochi_str symbol; mochi_int qty; } buy;
        struct { mochi_str symbol; mochi_int qty; } sell;
    } u;
} pkg_trader_msg;
```

The agent's "run loop" runs on a dedicated lightweight fiber from the
scheduler:

```c
static void pkg_trader__loop(struct pkg_trader *self) {
    for (;;) {
        pkg_trader_msg m;
        if (!mochi_mailbox_recv(self->mbox, &m, sizeof m)) break; // closed
        switch (m.tag) {
        case PKG_TRADER_MSG__buy:
            self->position = self->position + m.u.buy.qty;
            break;
        case PKG_TRADER_MSG__sell:
            self->position = self->position - m.u.sell.qty;
            break;
        }
    }
}
```

Invoking `t.buy("AAPL", 10)` from any thread translates to a `send`
on the mailbox; the agent processes it in order. This matches the
Erlang/Akka actor model. The intent call returns immediately (fire-
and-forget); a `intent fn` variant that returns a value is a
synchronous request/reply, implemented as a one-shot reply channel.

### 4.1 Agent `on` handlers

When an agent body contains `on STREAM as p { BODY }`, the
subscription's callback enqueues a synthetic message onto the agent's
mailbox; the agent thread processes it. This means stream callbacks
do not race with intent handlers on the same agent. The agent's loop
is the single-writer for its state.

### 4.2 Agent fields are private

The lowering treats agent fields as accessible only from inside the
agent body (intent methods and on handlers). The codegen rejects
`t.position = 5` outside the agent. Reads `t.position` from outside
go through a generated atomic getter that snapshots the field under
a lock, since the agent loop may write at any time.

## 5. The scheduler

Architecture: M:N work-stealing scheduler.

- M worker threads, M = `mochi_runtime_workers()` (default: number of
  hardware threads).
- Each worker has a local FIFO deque of runnable fibers.
- Idle workers steal from peers (Chase-Lev deque).
- Blocking syscalls (file I/O, sleep) execute on an overflow pool of
  threads created on demand, freeing the worker.

Fiber backend: minicoro (note 04 §2.1).

Reference: Go runtime scheduler (Pike, Cox); Tokio worker pool (Lerche).
We do not implement Go's preemption (signal-based) in v1; tight loops
that never `await` or block can monopolise a worker. The MEP body should
note this trade.

### 5.1 Fiber lifecycle

- `spawn` creates a new fiber with a 32 KB initial stack (grows on
  page fault if the OS allows; otherwise a fixed 1 MB).
- A fiber holds its captured env on the heap.
- On exit, the fiber's stack and env are released to the allocator.

### 5.2 Scheduling fairness

We use a per-worker run queue with a global overflow queue checked
every N steps to prevent starvation. N defaults to 61 (a prime, to
break resonance with periodic patterns).

## 6. Time and ticks

The runtime advances `mochi_time` from the OS monotonic clock. There
is no virtual time in v1. Tests that need deterministic time can
inject a `mochi_time_provider` via `--test-clock=fixed:2026-05-22T00:00:00Z`.

## 7. emit and back-pressure

`emit stream { ... }` synchronously inserts into the stream's ring. If
the ring is full, the caller blocks (this is the v1 default; it
preserves the "no dropped events" property). A `@lossy` annotation
on the stream changes this to drop-oldest.

## 8. Stream from query

`from p in prices ... select ...` against a stream produces another
stream, not a list, when the result type is `stream<T>`. The
implementation:

```c
mochi_stream__Out *out = mochi_stream__Out_new(16);
mochi_stream__Price_subscribe(prices, out, fwd_cb);
```

where `fwd_cb` runs the predicates and projections and emits to
`out`. This is the "compose streams" pattern.

`limit n` on a stream completes the result stream after n items and
unsubscribes from the source.

## 9. Channels

The language docs surface channels indirectly through the
`stream` API. The runtime exposes channels too:

```c
mochi_chan__T *mochi_chan__T_new(size_t cap);
bool           mochi_chan__T_send(mochi_chan__T *c, T v);   // blocks if full
bool           mochi_chan__T_recv(mochi_chan__T *c, T *v);  // blocks if empty
void           mochi_chan__T_close(mochi_chan__T *c);
```

The difference from streams: a channel is point-to-point (one consumer
per message), while a stream broadcasts to all subscribers. Internally,
a stream is a list of channels + a fanout fiber.

## 10. Shutdown

On `main`'s return:

1. Close all streams (`mochi_stream__T_close`) so subscribers see
   end-of-stream.
2. Join all agent fibers (`mochi_mailbox_close` then wait).
3. Drain pending I/O.
4. Run any `defer` blocks in reverse order.
5. Free GC roots and exit.

Step ordering is documented in the MEP body so user code can rely on
it (in particular: `print` from agent shutdown handlers is flushed).

## 11. Open questions

1. Whether `emit` is synchronous or asynchronous from the caller's
   perspective when the ring has space. (Spec is ambiguous; v1 will be
   sync.)
2. Whether agent state can be inspected from outside (atomic snapshot)
   or only via intent methods.
3. Whether `on STREAM` inside agents and at module top-level have
   the same execution-thread semantics.
4. Whether stream subscriptions are weak (auto-unsubscribe on
   subscriber GC) or strong (manual `unsubscribe`).
5. Whether the scheduler should preempt CPU-bound fibers (Go-style
   signal preemption) in v1. Current answer: no.

