Scriptivox logoScriptivox

    Get started

    OverviewQuickstartPricing

    API Reference

    TranscribeFile UploadGet ResultBalanceError CodesRate LimitsFormatsLanguages

    Guides

    Webhooks

    Use Cases

    Folder Watcher
    Scriptivox logoScriptivoxAPI Documentation

    Use Cases

    Practical examples of building with the Scriptivox API.


    Folder Watcher

    A self-contained script that watches a folder for new audio/video files, automatically transcribes them via the Scriptivox API, and saves the results as text files. No manual steps — just drop files in and get transcripts out.

    What it does

    • Scans input/ for new audio or video files every 10 seconds (configurable)
    • Uploads multiple files at the same time (10 by default, configurable)
    • Starts transcription (optionally with speaker diarization and word-level timestamps)
    • Receives results via a local webhook server
    • Saves transcriptions as .txt files in output/ with the same filename
    • Moves source files to done/ on success or failed/ on failure
    • Persists job state to state.json — survives script restarts
    • Falls back to polling GET /v1/transcribe/{id} if a webhook is missed (e.g. after a restart)
    • Client-side rate limiting — self-throttles to stay within your plan's limits (never hits 429)
    • Round-robin polling — all pending jobs get checked, even with thousands in-flight
    • Handles 10,000+ files/day with no issues

    Prerequisites

    • Python 3.8+ (or Node.js 18+ for the JavaScript version)
    • A Scriptivox API key with balance — create one here
    • ngrok (free tier works) to expose your local webhook server

    Setup

    1

    Install dependencies

    Python:

    bash
    pip install requests

    Node.js: No dependencies needed — uses built-in fetch and http (Node 18+).

    2

    Expose your local webhook

    The script runs a webhook server on port 8765. Use ngrok to make it accessible from the internet so Scriptivox can deliver results:

    bash
    ngrok http 8765

    Copy the HTTPS forwarding URL (e.g. https://abc123.ngrok.io). You'll use this in the next step.

    3

    Run the script

    Set your API key and webhook URL, then start:

    bash
    export SCRIPTIVOX_API_KEY="sk_live_YOUR_KEY"
    export WEBHOOK_URL="https://abc123.ngrok.io/webhook"
    python transcribe_watcher.py

    Or for Node.js:

    bash
    SCRIPTIVOX_API_KEY=sk_live_YOUR_KEY \
    WEBHOOK_URL=https://abc123.ngrok.io/webhook \
    node transcribe_watcher.mjs
    4

    Drop files and get transcripts

    Put any audio or video file in the input/ folder. The script picks it up on the next scan, uploads it, and transcribes it. Files are moved automatically once processed:

    project/
    input/ <- drop files here
    output/
    meeting.txt <- transcripts appear here
    podcast.txt
    done/
    meeting.mp3 <- source files after success
    podcast.wav
    failed/
    corrupted.mp3 <- source files after failure
    state.json <- tracks pending jobs (survives restarts)
    transcribe_watcher.py

    The script

    #!/usr/bin/env python3
    """
    Scriptivox Folder Watcher
    Watches a folder for new audio/video files, transcribes them,
    and saves results as text files.
    Handles high-volume workloads (10,000+ files/day) with:
    - Concurrent uploads (configurable worker count)
    - Client-side rate limiting (never hits 429)
    - Separate polling thread (doesn't block submissions)
    - Round-robin polling (all jobs get checked)
    - Restart resilience via state.json
    Folder structure:
    input/ <- drop audio/video files here
    output/ <- transcripts saved here as .txt
    done/ <- source files moved here after success
    failed/ <- source files moved here after failure
    Usage:
    export SCRIPTIVOX_API_KEY="sk_live_YOUR_KEY"
    export WEBHOOK_URL="https://abc123.ngrok.io/webhook"
    python transcribe_watcher.py
    """
    import os
    import sys
    import time
    import json
    import hashlib
    import hmac
    import shutil
    import random
    import threading
    import requests
    from concurrent.futures import ThreadPoolExecutor
    from http.server import HTTPServer, BaseHTTPRequestHandler
    from pathlib import Path
    from collections import deque
    # ─── Configuration ───────────────────────────────────────
    API_KEY = os.environ.get("SCRIPTIVOX_API_KEY", "")
    BASE_URL = "https://api.scriptivox.com/v1"
    INPUT_DIR = Path("input")
    OUTPUT_DIR = Path("output")
    DONE_DIR = Path("done")
    FAILED_DIR = Path("failed")
    STATE_FILE = Path("state.json")
    SCAN_INTERVAL = 10 # seconds between folder scans
    POLL_INTERVAL = 30 # seconds between poll cycles
    WEBHOOK_PORT = 8765
    WEBHOOK_URL = os.environ.get(
    "WEBHOOK_URL", f"http://localhost:{WEBHOOK_PORT}/webhook"
    )
    DIARIZE = False # set True to enable speaker diarization
    ALIGN = False # set True to enable word-level timestamps
    MAX_PARALLEL_UPLOADS = 10 # how many files to upload at the same time
    # Rate limits (requests per minute) — set these to match your plan.
    # Keep slightly below actual limits to leave headroom.
    RATE_LIMITS = {
    "upload": 50, # POST /v1/upload
    "transcribe": 50, # POST /v1/transcribe
    "poll": 150, # GET /v1/transcribe/{id}
    }
    SUPPORTED = {
    ".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac", ".opus",
    ".wma", ".aiff", ".caf", ".mp4", ".mov", ".avi", ".mkv",
    ".webm", ".wmv", ".flv", ".m4v", ".3gp", ".mpeg", ".mts",
    ".ogv", ".ts", ".vob", ".f4v",
    }
    state_lock = threading.Lock()
    pending = {} # transcription_id -> filename
    poll_offset = 0 # tracks where we left off polling
    # ─── Rate Limiter ────────────────────────────────────────
    class RateLimiter:
    """Token-bucket rate limiter. Blocks until a request is allowed."""
    def __init__(self, max_per_minute):
    self.max_per_minute = max_per_minute
    self.timestamps = deque()
    self.lock = threading.Lock()
    def wait(self):
    """Block until we can make a request within the rate limit."""
    with self.lock:
    now = time.time()
    # Remove timestamps older than 60s
    while self.timestamps and now - self.timestamps[0] > 60:
    self.timestamps.popleft()
    if len(self.timestamps) >= self.max_per_minute:
    wait_time = 60 - (now - self.timestamps[0]) + 0.1
    self.lock.release()
    time.sleep(wait_time)
    self.lock.acquire()
    now = time.time()
    while self.timestamps and now - self.timestamps[0] > 60:
    self.timestamps.popleft()
    self.timestamps.append(time.time())
    limiters = {
    name: RateLimiter(rpm) for name, rpm in RATE_LIMITS.items()
    }
    # ─── State Persistence ───────────────────────────────────
    def load_state():
    """Load pending jobs from disk."""
    if STATE_FILE.exists():
    data = json.loads(STATE_FILE.read_text())
    return data.get("pending", {})
    return {}
    def save_state():
    """Save pending jobs to disk."""
    with state_lock:
    STATE_FILE.write_text(json.dumps({"pending": pending}, indent=2))
    # ─── Helpers ─────────────────────────────────────────────
    def unique_output_name(filename, prefix=""):
    """Generate a unique output filename, appending a number if needed."""
    stem = Path(filename).stem
    name = f"{prefix}{stem}.txt"
    out = OUTPUT_DIR / name
    if not out.exists():
    return out
    # Append incrementing number to avoid collision
    n = 2
    while True:
    name = f"{prefix}{stem}_{n}.txt"
    out = OUTPUT_DIR / name
    if not out.exists():
    return out
    n += 1
    def already_processed(filename):
    """Check if output already has a result for this file."""
    stem = Path(filename).stem
    return (
    (OUTPUT_DIR / f"{stem}.txt").exists() or
    (OUTPUT_DIR / f"failed_{stem}.txt").exists()
    )
    def write_failure(filename, error_code, error_message):
    """Write a failure log to output/."""
    out = unique_output_name(filename, prefix="failed_")
    out.write_text(
    f"File: {filename}\nError: {error_code}: {error_message}\n",
    encoding="utf-8",
    )
    return out
    # ─── API Helper ──────────────────────────────────────────
    def api_request(method, url, limiter_name, max_retries=5, **kwargs):
    """Make a rate-limited API request with retry on 429."""
    limiter = limiters.get(limiter_name)
    for attempt in range(max_retries):
    if limiter:
    limiter.wait()
    r = requests.request(method, url, **kwargs)
    if r.status_code == 429:
    retry_after = int(r.headers.get("Retry-After", 10))
    print(f" Rate limited (429), waiting {retry_after}s "
    f"(attempt {attempt + 1}/{max_retries})")
    time.sleep(retry_after)
    continue
    return r
    return r # return last response even if still 429
    # ─── Webhook Server ─────────────────────────────────────
    class WebhookHandler(BaseHTTPRequestHandler):
    def do_POST(self):
    length = int(self.headers.get("Content-Length", 0))
    body = self.rfile.read(length)
    # Verify HMAC signature
    sig = self.headers.get("X-Scriptivox-Signature", "")
    ts = self.headers.get("X-Scriptivox-Timestamp", "")
    if sig and ts:
    secret = hashlib.sha256(API_KEY.encode()).hexdigest()
    expected = hmac.new(
    secret.encode(),
    f"{ts}.{body.decode()}".encode(),
    hashlib.sha256,
    ).hexdigest()
    if not hmac.compare_digest(expected, sig):
    print(f" [webhook] Rejected — invalid signature")
    self.send_response(401)
    self.end_headers()
    return
    data = json.loads(body)
    event = data.get("event", "unknown")
    txn_id = data.get("transcription_id", "?")
    with state_lock:
    if event == "transcription.completed" and txn_id in pending:
    name = pending.pop(txn_id)
    text = data["result"]["full_transcript"]
    out = unique_output_name(name)
    out.write_text(text, encoding="utf-8")
    src = INPUT_DIR / name
    if src.exists():
    shutil.move(str(src), str(DONE_DIR / name))
    save_state()
    print(f" [webhook] completed {name}")
    print(f" transcript -> {out}")
    elif event == "transcription.failed" and txn_id in pending:
    name = pending.pop(txn_id)
    err = data.get("error", {})
    code = err.get("code", "UNKNOWN")
    msg = err.get("message", "unknown error")
    fail_out = write_failure(name, code, msg)
    src = INPUT_DIR / name
    if src.exists():
    shutil.move(str(src), str(FAILED_DIR / name))
    save_state()
    print(f" [webhook] failed {name}")
    print(f" error {code}: {msg}")
    else:
    print(f" [webhook] {event} txn={txn_id}")
    self.send_response(200)
    self.end_headers()
    def log_message(self, *_):
    pass
    # ─── Poll Pending Jobs (separate thread) ─────────────────
    def poll_loop():
    """Continuously polls pending jobs in a round-robin fashion.
    Runs in its own thread so it never blocks file submissions."""
    global poll_offset
    while True:
    time.sleep(POLL_INTERVAL)
    with state_lock:
    items = list(pending.items())
    if not items:
    continue
    # Round-robin: start from where we left off last time
    if poll_offset >= len(items):
    poll_offset = 0
    # Rotate the list so we start from the offset
    rotated = items[poll_offset:] + items[:poll_offset]
    polled = 0
    for txn_id, name in rotated:
    try:
    r = api_request(
    "GET", f"{BASE_URL}/transcribe/{txn_id}",
    limiter_name="poll",
    headers={"Authorization": API_KEY},
    )
    if r.status_code == 429:
    break
    if r.status_code != 200:
    polled += 1
    continue
    data = r.json()
    status = data.get("status")
    if status == "completed":
    text = data.get("result", {}).get("full_transcript", "")
    out = unique_output_name(name)
    out.write_text(text, encoding="utf-8")
    src = INPUT_DIR / name
    if src.exists():
    shutil.move(str(src), str(DONE_DIR / name))
    with state_lock:
    pending.pop(txn_id, None)
    save_state()
    print(f" [poll] completed {name}")
    elif status == "failed":
    err = data.get("error", {})
    code = err.get("code", "UNKNOWN")
    msg = err.get("message", "unknown error")
    write_failure(name, code, msg)
    src = INPUT_DIR / name
    if src.exists():
    shutil.move(str(src), str(FAILED_DIR / name))
    with state_lock:
    pending.pop(txn_id, None)
    save_state()
    print(f" [poll] failed {name} — {code}: {msg}")
    polled += 1
    except Exception:
    polled += 1
    poll_offset = (poll_offset + polled) % max(len(items), 1)
    # ─── Upload & Transcribe ────────────────────────────────
    def process_file(filepath):
    name = filepath.name
    if already_processed(name):
    return
    with state_lock:
    if name in pending.values():
    return
    print(f" Upload {name}")
    try:
    # 1. Get presigned upload URL
    r = api_request(
    "POST", f"{BASE_URL}/upload",
    limiter_name="upload",
    headers={"Authorization": API_KEY, "Content-Type": "application/json"},
    json={"filename": name},
    )
    r.raise_for_status()
    upload = r.json()
    # 2. Upload file (streams from disk — no full-file memory load)
    content_type = upload["headers"]["Content-Type"]
    with open(filepath, "rb") as f:
    requests.put(
    upload["upload_url"],
    headers={"Content-Type": content_type},
    data=f,
    ).raise_for_status()
    # 3. Start transcription
    body = {
    "upload_id": upload["upload_id"],
    "webhook_url": WEBHOOK_URL,
    }
    if DIARIZE:
    body["diarize"] = True
    if ALIGN:
    body["align"] = True
    r = api_request(
    "POST", f"{BASE_URL}/transcribe",
    limiter_name="transcribe",
    headers={"Authorization": API_KEY, "Content-Type": "application/json"},
    json=body,
    )
    r.raise_for_status()
    job = r.json()
    with state_lock:
    pending[job["id"]] = name
    save_state()
    print(f" Queued {name} (ID: {job['id']})")
    except requests.HTTPError as e:
    try:
    err_body = e.response.json() if e.response else {}
    except ValueError:
    err_body = {}
    code = err_body.get("error", {}).get("code", "HTTP_ERROR")
    msg = err_body.get("error", {}).get("message", str(e))
    print(f" Error {name}: {msg}")
    fail_out = write_failure(name, code, msg)
    if filepath.exists():
    shutil.move(str(filepath), str(FAILED_DIR / name))
    print(f" log -> {fail_out}")
    except Exception as e:
    print(f" Error {name}: {e}")
    fail_out = write_failure(name, "SCRIPT_ERROR", str(e))
    if filepath.exists():
    shutil.move(str(filepath), str(FAILED_DIR / name))
    print(f" log -> {fail_out}")
    # ─── Main Loop ──────────────────────────────────────────
    def main():
    if not API_KEY:
    print("Set SCRIPTIVOX_API_KEY environment variable")
    sys.exit(1)
    for d in [INPUT_DIR, OUTPUT_DIR, DONE_DIR, FAILED_DIR]:
    d.mkdir(exist_ok=True)
    pending.update(load_state())
    if pending:
    print(f"Resumed {len(pending)} pending job(s) from state.json")
    # Start webhook server in background
    server = HTTPServer(("0.0.0.0", WEBHOOK_PORT), WebhookHandler)
    threading.Thread(target=server.serve_forever, daemon=True).start()
    # Start polling thread in background
    threading.Thread(target=poll_loop, daemon=True).start()
    print(f"Webhook server on port {WEBHOOK_PORT}")
    print(f"Watching {INPUT_DIR.resolve()}")
    print(f"Output {OUTPUT_DIR.resolve()}")
    print(f"Parallel {MAX_PARALLEL_UPLOADS} uploads at a time")
    print(f"Rate limits upload={RATE_LIMITS['upload']}/min "
    f"transcribe={RATE_LIMITS['transcribe']}/min "
    f"poll={RATE_LIMITS['poll']}/min")
    print(f"Diarize {DIARIZE}")
    print(f"Align {ALIGN}")
    print(f"Scan every {SCAN_INTERVAL}s")
    print(f"Poll every {POLL_INTERVAL}s\n")
    with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as pool:
    while True:
    files = sorted(
    f for f in INPUT_DIR.iterdir()
    if f.suffix.lower() in SUPPORTED
    )
    if files:
    # Submit all new files to the worker pool
    futures = []
    for f in files:
    futures.append(pool.submit(process_file, f))
    # Wait for this batch to finish before next scan
    for fut in futures:
    try:
    fut.result()
    except Exception as e:
    print(f" Worker error: {e}")
    time.sleep(SCAN_INTERVAL)
    if __name__ == "__main__":
    main()

    How it works

    1. Startup — Creates input/, output/, done/, and failed/ folders. Loads any pending jobs from state.json (so it picks up where it left off after a restart). Starts a webhook server on port 8765 in a background thread. Starts a separate polling thread for webhook fallback.
    2. Scan loop — Every 10 seconds, scans input/ for files with supported extensions. New files are submitted to a concurrent worker pool (10 workers by default) for parallel upload and transcription.
    3. Rate limiting — Before each API call, the client-side rate limiter checks if the request is within the configured limit. If at capacity, it waits automatically. This prevents 429 errors instead of reacting to them. Set the limits slightly below your plan's actual limits.
    4. Upload — Each worker requests a presigned upload URL via POST /v1/upload, then streams the file with a PUT request using the exact Content-Type returned by the API. Files are streamed from disk (no full-file memory load). If the API returns 429 despite client-side limiting, the script waits for the Retry-After duration and retries (up to 5 attempts).
    5. Transcribe — Starts a transcription job via POST /v1/transcribe with the upload_id and webhook_url. The API returns immediately with status: "created" — the file is validated and transcribed in the background. Saves the job to state.json so it persists.
    6. Webhook — When Scriptivox finishes transcribing, it POSTs the result to your webhook. The handler verifies the HMAC signature, saves the transcript to output/<filename>.txt, and moves the source file to done/. All webhook events are logged.
    7. Polling fallback — A separate thread polls pending jobs every 30 seconds using round-robin ordering, so all jobs eventually get checked — even with thousands in-flight. This catches results if a webhook is missed (e.g. after a restart or network blip).
    8. Failure — If a transcription fails, the webhook (or poll) handler logs the error, moves the source file to failed/, and removes the job from state. If the upload itself fails (HTTP error, rate limit exhausted), the file is also moved to failed/. Either way, the watcher continues processing other files.

    Configuration

    VariableDefaultDescription
    SCRIPTIVOX_API_KEY—Your API key (required)
    WEBHOOK_URLhttp://localhost:8765/webhookPublic URL for webhook delivery
    MAX_PARALLEL_UPLOADS10How many files to upload at the same time
    SCAN_INTERVAL10Seconds between folder scans
    POLL_INTERVAL30Seconds between polling cycles
    WEBHOOK_PORT8765Local port for the webhook server
    DIARIZEFalseSet to True to enable speaker diarization (auto-detects number of speakers)
    ALIGNFalseSet to True to enable word-level timestamps
    INPUT_DIR./inputFolder to watch for new files
    OUTPUT_DIR./outputFolder to save transcripts
    DONE_DIR./doneCompleted source files moved here
    FAILED_DIR./failedFailed source files moved here
    RATE_LIMITS.upload50Max upload requests/min (keep below your plan's limit)
    RATE_LIMITS.transcribe50Max transcribe requests/min (keep below your plan's limit)
    RATE_LIMITS.poll150Max polling requests/min (keep below your plan's limit)

    To change the rate limits, concurrency, or directories, edit the constants at the top of the script. Set the rate limits slightly below your actual plan limits to leave headroom.

    Polling-only mode

    The script already polls pending jobs as a webhook fallback. If you prefer a polling-only approach, remove the webhook server, drop the webhook_url from the transcribe request, and reduce SCAN_INTERVAL to poll more frequently. See the Get Result endpoint for details.


    API Reference

    Full endpoint documentation

    Webhooks

    Webhook setup and verification