Skip to main content
You define every event handler explicitly. Each @event becomes a command the client can send.

Custom event handlers

Use @event to define actions the client can trigger. The method’s signature becomes the event schema.
from reactor_runtime.interface import event

@event(name="get_current_prompt", description="Log the current prompt")
async def get_current_prompt(self):
    print(f"Current prompt: {self.prompt}")
The client sends {"type": "get_current_prompt", "data": {}} and the runtime calls your method. This works, but printing to stdout isn’t useful for the client. We’ll see how to send data back in Outbound messages. Events can also take parameters:
@event(name="set_style", description="Change the rendering style")
def set_style(self, style: str = InputField(default="realistic")):
    self._style_embedding = self.encoder.encode(style)

Deduplicating expensive handlers

Pass dedupe=True to collapse duplicate events that queue up while the model is busy. Only the latest payload is processed; earlier ones are discarded.
@event(
    name="set_reference_image",
    description="Upload a reference image",
    dedupe=True,
)
async def set_reference_image(self, image: str = ""):
    self._ref_embedding = self.encoder.encode(image)
Use this for handlers that do heavy compute (forward passes, encoding) where only the latest value matters.

Lifecycle hooks

@connected

Runs once when a client connects, before run() resumes from await self.connected.wait().
from reactor_runtime.interface import connected

@connected
async def on_connect(self):
    self.prompt = "a sunny meadow"
    self._step = 0
Use it to initialize per-session state. The handler can be async def or plain def. Only one @connected handler is allowed per model class.

@disconnected

Runs once when a client disconnects.
from reactor_runtime.interface import disconnected

@disconnected
async def on_disconnect(self):
    self.output_buffer.flush()
Use it to clean up session resources and flush buffers. The handler can be async def or plain def. Only one @disconnected handler is allowed per model class.

Outbound messages

Earlier we printed the prompt to stdout. What if we want to send it to the client instead? Define a ModelMessage subclass:
from dataclasses import dataclass
from reactor_runtime.interface import ModelMessage

@dataclass
class CurrentPrompt(ModelMessage):
    prompt: str
The client receives this as JSON: {"type": "current_prompt", "data": {"prompt": "..."}}. The type on the wire is the snake_case version of the class name. Now replace the print with self.send():
@event(name="get_current_prompt", description="Return the current prompt")
async def get_current_prompt(self):
    await self.send(CurrentPrompt(prompt=self.prompt))
You can also send messages from inside run():
@dataclass
class Progress(ModelMessage):
    step: int
    total: int

async def run(self):
    while True:
        await self.connected.wait()
        while self.connected.is_set():
            frame = self.pipe.forward(prompt=self.prompt, step=self._step)
            # ✅ Send a message to the client
            await self.send(Progress(step=self._step, total=100))
            await self.emit(MyOutput(main_video=frame))
            self._step += 1

File uploads

When your model needs binary files from the client (reference images, style assets, etc.), type an @event parameter as UploadedFile. The runtime handles the upload and delivers the file bytes to your handler automatically. UploadedFile has four fields: name, mime_type, size, and data (the raw bytes). You can use them to filter by type, decode images, or pass the content to your pipeline:
from reactor_runtime.interface import UploadedFile

@event(name="set_reference_image", description="Upload and encode reference image")
def set_reference_image(self, reference_image: UploadedFile):
    self._reference_image = reference_image
    pil_image = Image.open(io.BytesIO(reference_image.data)).convert("RGB")
    self._ref_embedding = self._vae_encode(pil_image)
If you need a generic catch-all for any file the client uploads, use the @file_uploaded decorator instead. It fires once per upload regardless of which command triggered it:
from reactor_runtime.interface import file_uploaded, UploadedFile

@file_uploaded
async def on_file(self, uploaded_file: UploadedFile):
    if uploaded_file.mime_type.startswith("image/"):
        pil_image = Image.open(io.BytesIO(uploaded_file.data)).convert("RGB")
        self._reference = self._vae_encode(pil_image)
The handler must accept exactly one parameter named uploaded_file. Only one @file_uploaded handler is allowed per model class.

Putting it together

A model that combines everything from this page: explicit events, lifecycle hooks, and outbound messages.
@dataclass
class CurrentPrompt(ModelMessage):
    prompt: str

@dataclass
class Progress(ModelMessage):
    step: int
    total: int

class MyModel(ReactorModel):
    fps = 24

    def load(self, config):
        self.pipe = load_checkpoint(config["checkpoint"])

    @connected
    async def on_connect(self):
        self.prompt = "a sunny meadow"
        self._step = 0

    @disconnected
    async def on_disconnect(self):
        self.output_buffer.flush()

    @event(name="set_prompt", description="Change the scene prompt")
    def set_prompt(self, prompt: str = InputField(default="")):
        self.prompt = prompt

    @event(name="get_current_prompt", description="Return the current prompt")
    async def get_current_prompt(self):
        await self.send(CurrentPrompt(prompt=self.prompt))

    async def run(self):
        while True:
            await self.connected.wait()
            while self.connected.is_set():
                prompt = self.prompt
                frame = self.pipe.forward(prompt=prompt, step=self._step)
                await self.send(Progress(step=self._step, total=100))
                await self.emit(MyOutput(main_video=frame))
                self._step += 1
The client can:
  • Send set_prompt to change the prompt.
  • Send get_current_prompt and receive a current_prompt message back.
  • Receive progress messages each frame.

Next

Video Input

Read webcam frames with manual buffer management.

Model Anatomy

Back to the ReactorModel overview.