Custom event handlers, lifecycle hooks, and outbound messages.
Beyond auto-generated set_<field> events, you can define your own events, send messages back to the client, and run logic when clients connect or disconnect.
Use @event to define actions the client can trigger that aren’t tied to a single state field. The method’s signature becomes the event schema.
from reactor_runtime.interface import event@event(name="get_current_prompt", description="Log the current prompt")async def get_current_prompt(self): print(f"Current prompt: {self.state.prompt}")
The client sends {"type": "get_current_prompt", "data": {}} and the runtime calls your method.Events can also take parameters:
Earlier we printed the prompt to stdout. What if we want to send it to the client instead? Define a ModelMessage subclass:
from dataclasses import dataclassfrom reactor_runtime.interface import ModelMessage@dataclassclass CurrentPrompt(ModelMessage): prompt: str
Now replace the print with self.send():
@event(name="get_current_prompt", description="Return the current prompt")async def get_current_prompt(self): await self.send(CurrentPrompt(prompt=self.state.prompt))
The client receives this as JSON: {"type": "current_prompt", "data": {"prompt": "..."}}. The type on the wire is the snake_case version of the class name.You can also send messages from inside inference(). If you need to, make it async:
@dataclassclass Progress(ModelMessage): step: int total: intasync def inference(self): step = 0 while True: frame = self.pipe.forward(prompt=self.state.prompt, step=step) # ✅ Send a message to the client await self.send(Progress(step=step, total=100)) step += 1 yield MyOutput(main_video=frame)
Note that inference() is async def here instead of def because we need await to send messages. Both sync and async generators work; use whichever fits your model.
When your model needs binary files from the client (reference images, style assets, etc.), declare an UploadedFile-typed field on your InputState. The runtime generates a set_<field> event and handles the upload automatically.UploadedFile has four fields: name, mime_type, size, and data (the raw bytes). You can use them to filter by type, decode images, or pass the content to your pipeline.
from reactor_runtime.interface import InputState, InputField, UploadedFile@dataclassclass MyState(InputState): prompt: str = InputField(default="a sunny meadow") reference_image: UploadedFile = InputField( default=None, description="Reference image for conditioning", ) _img_embedding: Any = None
To run expensive post-processing when a file arrives, override the auto-generated event with a custom @event handler. Type the parameter as UploadedFile:
A model that combines everything from this page: custom events, lifecycle hooks, and outbound messages.First, the types — state, events, and messages:
@dataclassclass MyState(InputState): prompt: str = InputField(default="a sunny meadow")# Sent back to the client when they ask for the current prompt@dataclassclass CurrentPrompt(ModelMessage): prompt: str# Sent every frame so the client can show a progress indicator@dataclassclass Progress(ModelMessage): step: int total: int
Then the model class:
class MyModel(ReactorPipeline): state: MyState @event(name="get_current_prompt", description="Return the current prompt") async def get_current_prompt(self): await self.send(CurrentPrompt(prompt=self.state.prompt)) async def inference(self): step = 0 while True: frame = self.pipe.forward(prompt=self.state.prompt, step=step) await self.send(Progress(step=step, total=100)) step += 1 yield MyOutput(main_video=frame)
The client can:
Send set_prompt (auto-generated from InputState).
Send get_current_prompt (custom event) and receive a current_prompt message back.