Skip to content

Speech-to-text

In this guide you’ll wire a Python sidecar that subscribes to a Smelter input’s audio side channel, runs faster-whisper speech-to-text, and posts subtitle updates back to the composed WHEP output.

The full sidecar lives in one transcribe.py file built up across the steps below.

  1. Start the Smelter server (learn more) with SMELTER_SIDE_CHANNEL_SOCKET_DIR set to a directory the sidecar can also read, and install the sidecar’s dependencies. The input→output delay is configured per input via side_channel.delay_ms in step 3 (set to 5000 below). The delay gives the sidecar time to buffer ~3 s of audio, run Whisper, and schedule each subtitle at the segment’s own pts so it lines up with the spoken words on the delayed output.

    pip install smelter-sdk faster-whisper
  2. Define subtitle_scene. It constructs a scene object with subtitles.

    transcribe.py
    INPUT_ID = "input"
    def subtitle_scene(subtitle: str) -> dict:
    children = [
    {"type": "rescaler",
    "child": {"type": "input_stream", "input_id": INPUT_ID}},
    ]
    if subtitle:
    children.append({
    "type": "view",
    "bottom": 40, "left": 80, "width": 1760, "height": 120,
    "background_color": "#000000EE",
    "padding_horizontal": 40,
    "direction": "column",
    "children": [
    {"type": "view"},
    {
    "type": "text",
    "text": subtitle,
    "width": 1680,
    "font_size": 40,
    "color": "#FFFFFFFF",
    "align": "center",
    },
    {"type": "view"},
    ],
    })
    return {"type": "view", "children": children}
  3. Define register_pipeline, which:

    • registers a WHIP input with the audio side channel enabled,
    • registers a WHEP output whose initial scene is subtitle_scene(""),
    • starts the pipeline.
    transcribe.py
    import json
    import urllib.request
    29 collapsed lines
    INPUT_ID = "input"
    def subtitle_scene(subtitle: str) -> dict:
    children = [
    {"type": "rescaler",
    "child": {"type": "input_stream", "input_id": INPUT_ID}},
    ]
    if subtitle:
    children.append({
    "type": "view",
    "bottom": 40, "left": 80, "width": 1760, "height": 120,
    "background_color": "#000000EE",
    "padding_horizontal": 40,
    "direction": "column",
    "children": [
    {"type": "view"},
    {
    "type": "text",
    "text": subtitle,
    "width": 1680,
    "font_size": 40,
    "color": "#FFFFFFFF",
    "align": "center",
    },
    {"type": "view"},
    ],
    })
    return {"type": "view", "children": children}
    SMELTER_API = "http://127.0.0.1:8081"
    OUTPUT_ID = "output"
    def api_post(path: str, body: dict | None = None):
    req = urllib.request.Request(
    f"{SMELTER_API}{path}",
    data=json.dumps(body or {}).encode(),
    headers={"Content-Type": "application/json"},
    method="POST",
    )
    with urllib.request.urlopen(req) as r:
    return r.read()
    def register_pipeline():
    api_post(f"/api/input/{INPUT_ID}/register", {
    "type": "whip_server",
    "bearer_token": "example",
    "side_channel": {"audio": True, "delay_ms": 5000},
    })
    api_post(f"/api/output/{OUTPUT_ID}/register", {
    "type": "whep_server",
    "bearer_token": "example",
    "video": {
    "resolution": {"width": 1920, "height": 1080},
    "encoder": {"type": "ffmpeg_h264", "preset": "ultrafast"},
    "initial": {"root": subtitle_scene("")},
    },
    "audio": {
    "encoder": {"type": "opus"},
    "initial": {"inputs": [{"input_id": INPUT_ID}]},
    },
    })
    api_post("/api/start")

    The initial scene is just the input (the bar is omitted while the text is empty); each update call swaps in a freshly recognised line.

  4. Subscribe to the audio side channel on one thread and run Whisper on another.

    transcribe.py
    import queue
    import threading
    import numpy as np
    from faster_whisper import WhisperModel
    from smelter import subscribe_audio_channel
    69 collapsed lines
    INPUT_ID = "input"
    def subtitle_scene(subtitle: str) -> dict:
    children = [
    {"type": "rescaler",
    "child": {"type": "input_stream", "input_id": INPUT_ID}},
    ]
    if subtitle:
    children.append({
    "type": "view",
    "bottom": 40, "left": 80, "width": 1760, "height": 120,
    "background_color": "#000000EE",
    "padding_horizontal": 40,
    "direction": "column",
    "children": [
    {"type": "view"},
    {
    "type": "text",
    "text": subtitle,
    "width": 1680,
    "font_size": 40,
    "color": "#FFFFFFFF",
    "align": "center",
    },
    {"type": "view"},
    ],
    })
    return {"type": "view", "children": children}
    import json
    import urllib.request
    SMELTER_API = "http://127.0.0.1:8081"
    OUTPUT_ID = "output"
    def api_post(path: str, body: dict | None = None):
    req = urllib.request.Request(
    f"{SMELTER_API}{path}",
    data=json.dumps(body or {}).encode(),
    headers={"Content-Type": "application/json"},
    method="POST",
    )
    with urllib.request.urlopen(req) as r:
    return r.read()
    def register_pipeline():
    api_post(f"/api/input/{INPUT_ID}/register", {
    "type": "whip_server",
    "bearer_token": "example",
    "side_channel": {"audio": True, "delay_ms": 5000},
    })
    api_post(f"/api/output/{OUTPUT_ID}/register", {
    "type": "whep_server",
    "bearer_token": "example",
    "video": {
    "resolution": {"width": 1920, "height": 1080},
    "encoder": {"type": "ffmpeg_h264", "preset": "ultrafast"},
    "initial": {"root": subtitle_scene("")},
    },
    "audio": {
    "encoder": {"type": "opus"},
    "initial": {"inputs": [{"input_id": INPUT_ID}]},
    },
    })
    api_post("/api/start")
    WHISPER_SAMPLE_RATE = 16000
    CHUNK_DURATION_MS = 3000
    def main():
    register_pipeline()
    model = WhisperModel("base", compute_type="int8")
    chunks: queue.Queue[tuple[np.ndarray, int]] = queue.Queue()
    def reader():
    buffer = np.empty(0, dtype=np.float32)
    chunk_start_pts_nanos: int | None = None
    for batch in subscribe_audio_channel(INPUT_ID):
    if chunk_start_pts_nanos is None:
    chunk_start_pts_nanos = batch.start_pts_nanos
    samples = batch.to_mono()
    if batch.sample_rate != WHISPER_SAMPLE_RATE:
    ratio = WHISPER_SAMPLE_RATE / batch.sample_rate
    target = int(len(samples) * ratio)
    idx = np.linspace(0, len(samples) - 1, target)
    samples = np.interp(idx, np.arange(len(samples)), samples).astype(np.float32)
    buffer = np.concatenate([buffer, samples])
    if len(buffer) >= WHISPER_SAMPLE_RATE * CHUNK_DURATION_MS // 1000:
    chunks.put((buffer, chunk_start_pts_nanos))
    buffer = np.empty(0, dtype=np.float32)
    chunk_start_pts_nanos = None
    threading.Thread(target=reader, daemon=True).start()
    while True:
    chunk, chunk_start_pts_nanos = chunks.get()
    segments, _ = model.transcribe(chunk, language="en")
    for segment in segments:
    text = segment.text.strip()
    if not text:
    continue
    start_pts_ms = (chunk_start_pts_nanos + int(segment.start * 1e9)) / 1e6
    api_post(f"/api/output/{OUTPUT_ID}/update", {
    "video": {"root": subtitle_scene(text)},
    "audio": {"inputs": [{"input_id": INPUT_ID}]},
    "schedule_time_ms": start_pts_ms,
    })
    if __name__ == "__main__":
    main()

    Run it with python transcribe.py.

  5. Stream a test source and watch the result with Smelter’s hosted browser tools (no install required):

    Each subtitle is scheduled at its segment’s pts via schedule_time_ms, so it appears in sync with the spoken words on the delayed output.