Skip to main content
Many models need input from the client: a webcam feed for style transfer, a camera stream for a world model, audio for speech processing. Declare an Input subclass, annotate it on the model, and read frames. You manage buffer cleanup between sessions yourself. Video frames are np.ndarray with shape (height, width, 3), dtype uint8, in RGB channel order.

Declaring input tracks

from dataclasses import dataclass
from reactor_runtime.interface import Input, Video

@dataclass
class MyInput(Input):
    webcam: Video
Each field is a named media channel. Annotate it on your model class to wire it up:
class MyModel(ReactorModel):
    input: MyInput
    fps = 24
When a client connects and sends their webcam stream, frames arrive in the webcam buffer automatically. You access them via self.input.webcam.

Reading frames

Both methods return the most recent frames and discard any older backlog, so your model always processes the freshest input.

try_read() (non-blocking)

Returns a list of the most recent n frames, or None if fewer than n are available. When None is returned the buffer is left untouched.
async def run(self):
    while True:
        await self.connected.wait()
        while self.connected.is_set():
            # ✅ Non-blocking read — returns None if no frames yet
            frames = self.input.webcam.try_read()
            if frames is not None:
                result = self.pipe.forward(frames[0])
                await self.emit(MyOutput(main_video=result))
            else:
                await asyncio.sleep(0.01)
By default try_read() reads one frame (n=1). Pass n to read multiple frames at once (e.g. self.input.webcam.try_read(n=4)). The return value is always a list when frames are available, even for n=1.

read(n) (async)

Waits until n frames are available, then returns the most recent n as a list.
async def run(self):
    while True:
        await self.connected.wait()
        try:
            while self.connected.is_set():
                # ✅ Async read — blocks until a frame arrives
                [frame] = await self.input.webcam.read(1)
                result = self.pipe.forward(frame)
                await self.emit(MyOutput(main_video=result))
        except BufferClosed:
            pass
read(n) waits until enough frames have arrived, then resumes. Pass a larger n for models that need multiple frames per forward pass (e.g. await self.input.webcam.read(4)).

BufferClosed

In ReactorModel, BufferClosed is not caught automatically. If you use read() and the client disconnects, it raises BufferClosed. Wrap the inner loop in a try/except:
from reactor_runtime.interface import BufferClosed

async def run(self):
    while True:
        await self.connected.wait()
        try:
            while self.connected.is_set():
                [frame] = await self.input.webcam.read(1)
                result = self.pipe.forward(frame)
                await self.emit(MyOutput(main_video=result))
        # ✅ Catch BufferClosed — client disconnected while read() was waiting
        except BufferClosed:
            pass

Buffer cleanup

Input buffers are not reset automatically between sessions. You need to close them on disconnect and reset them on connect. Close buffers in @disconnected to signal end-of-input and wake any blocked read() calls:
@disconnected
async def on_disconnect(self):
    self.input.webcam.close()
    self.output_buffer.flush()
Reset them in @connected to re-open for the new session:
@connected
async def on_connect(self):
    self.input.webcam.reset()
If you don’t close, the next client’s read() calls may see leftover frames. If you don’t reset, read() will raise BufferClosed for the new client.

Full example: video-to-video

from dataclasses import dataclass
from reactor_runtime.interface import (
    Input, Output, Video, ReactorModel, InputField,
    event, connected, disconnected, BufferClosed,
)

@dataclass
class V2VInput(Input):
    camera: Video

@dataclass
class V2VOutput(Output):
    main_video: Video

class V2VModel(ReactorModel):
    input: V2VInput
    fps = 30

    def load(self, config):
        self.pipe = load_style_model(config["checkpoint"])

    @connected
    async def on_connect(self):
        self.input.camera.reset()
        self.style = "none"

    @disconnected
    async def on_disconnect(self):
        self.input.camera.close()
        self.output_buffer.flush()

    @event(name="set_style", description="Change the style filter")
    def set_style(self, style: str = InputField(
        default="none", choices=["none", "oil_paint", "sketch"]
    )):
        self.style = style

    async def run(self):
        while True:
            await self.connected.wait()
            try:
                while self.connected.is_set():
                    [frame] = await self.input.camera.read(1)
                    result = self.pipe.apply(frame, style=self.style)
                    await self.emit(V2VOutput(main_video=result))
            except BufferClosed:
                pass

Next

Audio Output

Add audio output tracks to your model.

The Run Loop

Emitting frames, batches, backpressure, and adaptive frame rates.