Skip to main content
ReactorModel gives you full control over the execution loop. You handle client connections, manage state, emit frames, and clean up between sessions yourself.

The full model

Here’s a complete runnable ReactorModel:
model.py
from reactor_runtime.interface import Output, Video, ReactorModel, event, connected, disconnected, InputField

@dataclass
class MyOutput(Output):
    main_video: Video

class MyModel(ReactorModel):

    def load(self, config):
        self.pipe = load_checkpoint(config["checkpoint"])

    @connected
    async def on_connect(self):
        self.prompt = "a sunny meadow"
        self._step = 0

    @disconnected
    async def on_disconnect(self):
        self.output_buffer.flush()

    @event(name="set_prompt", description="Change the scene prompt")
    def set_prompt(self, prompt: str = InputField(default="")):
        self.prompt = prompt

    async def run(self):
        while True:
            await self.connected.wait()
            while self.connected.is_set():
                frame = self.pipe.forward(prompt=self.prompt, step=self._step)
                await self.emit(MyOutput(main_video=frame))
                self._step += 1
Let’s break it down.

Output tracks

@dataclass
class MyOutput(Output):
    main_video: Video
This declares what media the model sends to clients. Each field is a track, a named media channel. Video means the field carries video frames. You can have multiple tracks:
@dataclass
class MyOutput(Output):
    main_video: Video
    main_audio: Audio
The field names (main_video, main_audio) become the track identifiers that clients use.

The model class

class MyModel(ReactorModel):
Subclass ReactorModel and override the methods you need. The runtime provides emit(), send(), self.connected, and the event dispatcher.

Loading weights

def load(self, config):
    self.pipe = load_checkpoint(config["checkpoint"])
Called once at startup, before any client can connect. The config dict comes from config.yml. This is where you load checkpoints, allocate GPU memory, and set up your pipeline.

The run loop

async def run(self):
    while True:
        await self.connected.wait()
        while self.connected.is_set():
            frame = self.pipe.forward(prompt=self.prompt, step=self._step)
            await self.emit(MyOutput(main_video=frame))
            self._step += 1
This is the heart of the model. It’s an async method that runs for the lifetime of the model:
  1. await self.connected.wait() blocks until a client connects.
  2. The inner loop produces frames and calls emit() to send them to the client.
  3. When self.connected is cleared (client disconnects), the inner loop exits and the outer loop waits for the next client.
The next page covers run() in detail: emitting frames, batches, backpressure, and adaptive frame rates.

Events

@event(name="set_prompt", description="Change the scene prompt")
def set_prompt(self, prompt: str = InputField(default="")):
    self.prompt = prompt
Every event handler is explicit. You write an @event for each parameter the client can change.

Lifecycle hooks

@connected
async def on_connect(self):
    self.prompt = "a sunny meadow"
    self._step = 0

@disconnected
async def on_disconnect(self):
    self.output_buffer.flush()
@connected runs when a client connects. Use it to initialize per-session state. @disconnected runs when they leave. Use it to clean up buffers and reset resources.

The manifest

reactor.yaml
model: model:MyModel
name: my-model
config: config.yml
  • model: import path in module:ClassName format.
  • name: human-readable identifier, used in logging and routing.
  • config: path to the model config YAML.

Next

The Run Loop

Emitting frames, batches, backpressure, and adaptive frame rates.

Managing State

Instance attributes, explicit events, and concurrency.