Skip to main content
run() is where your model’s logic lives. It’s an async method that the runtime calls once after load() and runs for the lifetime of the model. You have full control over what happens inside it.

Basic pattern

async def run(self):
    while True:
        # ✅ Block until a client connects
        await self.connected.wait()
        while self.connected.is_set():
            frame = self.pipe.forward(prompt=self.prompt)
            # ✅ Send the frame to the client
            await self.emit(MyOutput(main_video=frame))
Each iteration of the inner loop:
  1. Run your forward pass.
  2. Call emit() to send the output to the client.
  3. Check self.connected.is_set() to know if the client is still there.
If you don’t override run(), the default blocks forever. This is useful for purely event-driven models where all logic lives in @event handlers.

Connection signal

self.connected is an asyncio.Event that the runtime manages. It is set when a client connects and cleared when they disconnect.
async def run(self):
    while True:
        await self.connected.wait()
        # client is connected, start producing
        while self.connected.is_set():
            ...
The runtime sets and clears this event internally. You never need to manage it yourself. It is set before your @connected handler fires and cleared before your @disconnected handler fires.

Emitting frames

Call emit() to send an Output instance to the client:
await self.emit(MyOutput(main_video=frame))
The output buffer controls the emission rate based on fps.

Batches

emit() accepts both single frames (H, W, 3) and batches (N, H, W, 3). When you pass a batch, the runtime splits it into individual frames and emits them at the target rate. Always pass the full batch in a single emit() call:
frames = self.pipe.forward(prompt=self.prompt)
print(frames.shape)  # (10, 720, 1280, 3)
# ✅ Emit the full batch — the runtime splits it
await self.emit(MyOutput(main_video=frames))

Blocking

If the output queue is full, emit() awaits until a slot opens. This is the natural throttle point: when the model produces faster than the emission rate, it pauses here:
await self.emit(MyOutput(main_video=frame))
Pass drop=True to discard the frame instead of blocking:
await self.emit(MyOutput(main_video=frame), drop=True)

Adaptive frame rate

Report how long inference took so the runtime adjusts the emission rate to match your model’s throughput:
t0 = time.perf_counter()
frames = self.pipe.forward()
compute_time = time.perf_counter() - t0
# ✅ Pass compute_time so the runtime adapts the frame rate
await self.emit(MyOutput(main_video=frames), compute_time=compute_time)

FPS and buffer size

Set these as class attributes on your ReactorModel subclass:
class MyModel(ReactorModel):
    fps = 24
    buffer_size = 4

    async def run(self):
        ...
fps sets the target output frame rate (default 30). buffer_size sets the output queue depth (default 4). Higher values smooth out jitter but add latency. If your model emits frames in batches, set buffer_size to at least double the batch size:
class MyModel(ReactorModel):
    fps = 30
    buffer_size = 20  # model emits ~10 frames at a time

    async def run(self):
        ...

Dynamic FPS

Change the frame rate at any point during the model’s lifecycle:
self.output_buffer.set_fps(new_fps)
This takes effect immediately on the emission thread.

Sending messages

Send structured data to the client from inside run():
await self.send(Progress(step=self._step, total=100))
See Events & Messages for how to define message types.

Concurrency

@event handlers fire concurrently with run() at any await point. If you read an instance attribute, do an await, and read it again, a handler may have changed it in between. Snapshot values into local variables before expensive operations:
async def run(self):
    while True:
        await self.connected.wait()
        while self.connected.is_set():
            # ✅ Snapshot before the forward pass — handlers can fire at awaits
            prompt = self.prompt
            step = self._step
            frame = self.pipe.forward(prompt=prompt, step=step)
            await self.emit(MyOutput(main_video=frame))
            self._step += 1

Next

Managing State

Instance attributes, explicit events, and concurrency.

Events & Messages

Custom events, lifecycle hooks, and outbound messages.