Use Cases
Practical examples of building with the Scriptivox API.
Folder Watcher
A self-contained script that watches a folder for new audio/video files, automatically transcribes them via the Scriptivox API, and saves the results as text files. No manual steps — just drop files in and get transcripts out.
What it does
- Scans
input/for new audio or video files every 10 seconds (configurable) - Uploads multiple files at the same time (10 by default, configurable)
- Starts transcription (optionally with speaker diarization and word-level timestamps)
- Receives results via a local webhook server
- Saves transcriptions as
.txtfiles inoutput/with the same filename - Moves source files to
done/on success orfailed/on failure - Persists job state to
state.json— survives script restarts - Falls back to polling
GET /v1/transcribe/{id}if a webhook is missed (e.g. after a restart) - Client-side rate limiting — self-throttles to stay within your plan's limits (never hits 429)
- Round-robin polling — all pending jobs get checked, even with thousands in-flight
- Handles 10,000+ files/day with no issues
Prerequisites
- Python 3.8+ (or Node.js 18+ for the JavaScript version)
- A Scriptivox API key with balance — create one here
- ngrok (free tier works) to expose your local webhook server
Setup
Install dependencies
Python:
pip install requests
Node.js: No dependencies needed — uses built-in fetch and http (Node 18+).
Expose your local webhook
The script runs a webhook server on port 8765. Use ngrok to make it accessible from the internet so Scriptivox can deliver results:
ngrok http 8765
Copy the HTTPS forwarding URL (e.g. https://abc123.ngrok.io). You'll use this in the next step.
Run the script
Set your API key and webhook URL, then start:
export SCRIPTIVOX_API_KEY="sk_live_YOUR_KEY"export WEBHOOK_URL="https://abc123.ngrok.io/webhook"python transcribe_watcher.py
Or for Node.js:
SCRIPTIVOX_API_KEY=sk_live_YOUR_KEY \WEBHOOK_URL=https://abc123.ngrok.io/webhook \node transcribe_watcher.mjs
Drop files and get transcripts
Put any audio or video file in the input/ folder. The script picks it up on the next scan, uploads it, and transcribes it. Files are moved automatically once processed:
project/input/ <- drop files hereoutput/meeting.txt <- transcripts appear herepodcast.txtdone/meeting.mp3 <- source files after successpodcast.wavfailed/corrupted.mp3 <- source files after failurestate.json <- tracks pending jobs (survives restarts)transcribe_watcher.py
The script
#!/usr/bin/env python3"""Scriptivox Folder WatcherWatches a folder for new audio/video files, transcribes them,and saves results as text files.Handles high-volume workloads (10,000+ files/day) with:- Concurrent uploads (configurable worker count)- Client-side rate limiting (never hits 429)- Separate polling thread (doesn't block submissions)- Round-robin polling (all jobs get checked)- Restart resilience via state.jsonFolder structure:input/ <- drop audio/video files hereoutput/ <- transcripts saved here as .txtdone/ <- source files moved here after successfailed/ <- source files moved here after failureUsage:export SCRIPTIVOX_API_KEY="sk_live_YOUR_KEY"export WEBHOOK_URL="https://abc123.ngrok.io/webhook"python transcribe_watcher.py"""import osimport sysimport timeimport jsonimport hashlibimport hmacimport shutilimport randomimport threadingimport requestsfrom concurrent.futures import ThreadPoolExecutorfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom pathlib import Pathfrom collections import deque# ─── Configuration ───────────────────────────────────────API_KEY = os.environ.get("SCRIPTIVOX_API_KEY", "")BASE_URL = "https://api.scriptivox.com/v1"INPUT_DIR = Path("input")OUTPUT_DIR = Path("output")DONE_DIR = Path("done")FAILED_DIR = Path("failed")STATE_FILE = Path("state.json")SCAN_INTERVAL = 10 # seconds between folder scansPOLL_INTERVAL = 30 # seconds between poll cyclesWEBHOOK_PORT = 8765WEBHOOK_URL = os.environ.get("WEBHOOK_URL", f"http://localhost:{WEBHOOK_PORT}/webhook")DIARIZE = False # set True to enable speaker diarizationALIGN = False # set True to enable word-level timestampsMAX_PARALLEL_UPLOADS = 10 # how many files to upload at the same time# Rate limits (requests per minute) — set these to match your plan.# Keep slightly below actual limits to leave headroom.RATE_LIMITS = {"upload": 50, # POST /v1/upload"transcribe": 50, # POST /v1/transcribe"poll": 150, # GET /v1/transcribe/{id}}SUPPORTED = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac", ".opus",".wma", ".aiff", ".caf", ".mp4", ".mov", ".avi", ".mkv",".webm", ".wmv", ".flv", ".m4v", ".3gp", ".mpeg", ".mts",".ogv", ".ts", ".vob", ".f4v",}state_lock = threading.Lock()pending = {} # transcription_id -> filenamepoll_offset = 0 # tracks where we left off polling# ─── Rate Limiter ────────────────────────────────────────class RateLimiter:"""Token-bucket rate limiter. Blocks until a request is allowed."""def __init__(self, max_per_minute):self.max_per_minute = max_per_minuteself.timestamps = deque()self.lock = threading.Lock()def wait(self):"""Block until we can make a request within the rate limit."""with self.lock:now = time.time()# Remove timestamps older than 60swhile self.timestamps and now - self.timestamps[0] > 60:self.timestamps.popleft()if len(self.timestamps) >= self.max_per_minute:wait_time = 60 - (now - self.timestamps[0]) + 0.1self.lock.release()time.sleep(wait_time)self.lock.acquire()now = time.time()while self.timestamps and now - self.timestamps[0] > 60:self.timestamps.popleft()self.timestamps.append(time.time())limiters = {name: RateLimiter(rpm) for name, rpm in RATE_LIMITS.items()}# ─── State Persistence ───────────────────────────────────def load_state():"""Load pending jobs from disk."""if STATE_FILE.exists():data = json.loads(STATE_FILE.read_text())return data.get("pending", {})return {}def save_state():"""Save pending jobs to disk."""with state_lock:STATE_FILE.write_text(json.dumps({"pending": pending}, indent=2))# ─── Helpers ─────────────────────────────────────────────def unique_output_name(filename, prefix=""):"""Generate a unique output filename, appending a number if needed."""stem = Path(filename).stemname = f"{prefix}{stem}.txt"out = OUTPUT_DIR / nameif not out.exists():return out# Append incrementing number to avoid collisionn = 2while True:name = f"{prefix}{stem}_{n}.txt"out = OUTPUT_DIR / nameif not out.exists():return outn += 1def already_processed(filename):"""Check if output already has a result for this file."""stem = Path(filename).stemreturn ((OUTPUT_DIR / f"{stem}.txt").exists() or(OUTPUT_DIR / f"failed_{stem}.txt").exists())def write_failure(filename, error_code, error_message):"""Write a failure log to output/."""out = unique_output_name(filename, prefix="failed_")out.write_text(f"File: {filename}\nError: {error_code}: {error_message}\n",encoding="utf-8",)return out# ─── API Helper ──────────────────────────────────────────def api_request(method, url, limiter_name, max_retries=5, **kwargs):"""Make a rate-limited API request with retry on 429."""limiter = limiters.get(limiter_name)for attempt in range(max_retries):if limiter:limiter.wait()r = requests.request(method, url, **kwargs)if r.status_code == 429:retry_after = int(r.headers.get("Retry-After", 10))print(f" Rate limited (429), waiting {retry_after}s "f"(attempt {attempt + 1}/{max_retries})")time.sleep(retry_after)continuereturn rreturn r # return last response even if still 429# ─── Webhook Server ─────────────────────────────────────class WebhookHandler(BaseHTTPRequestHandler):def do_POST(self):length = int(self.headers.get("Content-Length", 0))body = self.rfile.read(length)# Verify HMAC signaturesig = self.headers.get("X-Scriptivox-Signature", "")ts = self.headers.get("X-Scriptivox-Timestamp", "")if sig and ts:secret = hashlib.sha256(API_KEY.encode()).hexdigest()expected = hmac.new(secret.encode(),f"{ts}.{body.decode()}".encode(),hashlib.sha256,).hexdigest()if not hmac.compare_digest(expected, sig):print(f" [webhook] Rejected — invalid signature")self.send_response(401)self.end_headers()returndata = json.loads(body)event = data.get("event", "unknown")txn_id = data.get("transcription_id", "?")with state_lock:if event == "transcription.completed" and txn_id in pending:name = pending.pop(txn_id)text = data["result"]["full_transcript"]out = unique_output_name(name)out.write_text(text, encoding="utf-8")src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(DONE_DIR / name))save_state()print(f" [webhook] completed {name}")print(f" transcript -> {out}")elif event == "transcription.failed" and txn_id in pending:name = pending.pop(txn_id)err = data.get("error", {})code = err.get("code", "UNKNOWN")msg = err.get("message", "unknown error")fail_out = write_failure(name, code, msg)src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(FAILED_DIR / name))save_state()print(f" [webhook] failed {name}")print(f" error {code}: {msg}")else:print(f" [webhook] {event} txn={txn_id}")self.send_response(200)self.end_headers()def log_message(self, *_):pass# ─── Poll Pending Jobs (separate thread) ─────────────────def poll_loop():"""Continuously polls pending jobs in a round-robin fashion.Runs in its own thread so it never blocks file submissions."""global poll_offsetwhile True:time.sleep(POLL_INTERVAL)with state_lock:items = list(pending.items())if not items:continue# Round-robin: start from where we left off last timeif poll_offset >= len(items):poll_offset = 0# Rotate the list so we start from the offsetrotated = items[poll_offset:] + items[:poll_offset]polled = 0for txn_id, name in rotated:try:r = api_request("GET", f"{BASE_URL}/transcribe/{txn_id}",limiter_name="poll",headers={"Authorization": API_KEY},)if r.status_code == 429:breakif r.status_code != 200:polled += 1continuedata = r.json()status = data.get("status")if status == "completed":text = data.get("result", {}).get("full_transcript", "")out = unique_output_name(name)out.write_text(text, encoding="utf-8")src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(DONE_DIR / name))with state_lock:pending.pop(txn_id, None)save_state()print(f" [poll] completed {name}")elif status == "failed":err = data.get("error", {})code = err.get("code", "UNKNOWN")msg = err.get("message", "unknown error")write_failure(name, code, msg)src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(FAILED_DIR / name))with state_lock:pending.pop(txn_id, None)save_state()print(f" [poll] failed {name} — {code}: {msg}")polled += 1except Exception:polled += 1poll_offset = (poll_offset + polled) % max(len(items), 1)# ─── Upload & Transcribe ────────────────────────────────def process_file(filepath):name = filepath.nameif already_processed(name):returnwith state_lock:if name in pending.values():returnprint(f" Upload {name}")try:# 1. Get presigned upload URLr = api_request("POST", f"{BASE_URL}/upload",limiter_name="upload",headers={"Authorization": API_KEY, "Content-Type": "application/json"},json={"filename": name},)r.raise_for_status()upload = r.json()# 2. Upload file (streams from disk — no full-file memory load)content_type = upload["headers"]["Content-Type"]with open(filepath, "rb") as f:requests.put(upload["upload_url"],headers={"Content-Type": content_type},data=f,).raise_for_status()# 3. Start transcriptionbody = {"upload_id": upload["upload_id"],"webhook_url": WEBHOOK_URL,}if DIARIZE:body["diarize"] = Trueif ALIGN:body["align"] = Truer = api_request("POST", f"{BASE_URL}/transcribe",limiter_name="transcribe",headers={"Authorization": API_KEY, "Content-Type": "application/json"},json=body,)r.raise_for_status()job = r.json()with state_lock:pending[job["id"]] = namesave_state()print(f" Queued {name} (ID: {job['id']})")except requests.HTTPError as e:try:err_body = e.response.json() if e.response else {}except ValueError:err_body = {}code = err_body.get("error", {}).get("code", "HTTP_ERROR")msg = err_body.get("error", {}).get("message", str(e))print(f" Error {name}: {msg}")fail_out = write_failure(name, code, msg)if filepath.exists():shutil.move(str(filepath), str(FAILED_DIR / name))print(f" log -> {fail_out}")except Exception as e:print(f" Error {name}: {e}")fail_out = write_failure(name, "SCRIPT_ERROR", str(e))if filepath.exists():shutil.move(str(filepath), str(FAILED_DIR / name))print(f" log -> {fail_out}")# ─── Main Loop ──────────────────────────────────────────def main():if not API_KEY:print("Set SCRIPTIVOX_API_KEY environment variable")sys.exit(1)for d in [INPUT_DIR, OUTPUT_DIR, DONE_DIR, FAILED_DIR]:d.mkdir(exist_ok=True)pending.update(load_state())if pending:print(f"Resumed {len(pending)} pending job(s) from state.json")# Start webhook server in backgroundserver = HTTPServer(("0.0.0.0", WEBHOOK_PORT), WebhookHandler)threading.Thread(target=server.serve_forever, daemon=True).start()# Start polling thread in backgroundthreading.Thread(target=poll_loop, daemon=True).start()print(f"Webhook server on port {WEBHOOK_PORT}")print(f"Watching {INPUT_DIR.resolve()}")print(f"Output {OUTPUT_DIR.resolve()}")print(f"Parallel {MAX_PARALLEL_UPLOADS} uploads at a time")print(f"Rate limits upload={RATE_LIMITS['upload']}/min "f"transcribe={RATE_LIMITS['transcribe']}/min "f"poll={RATE_LIMITS['poll']}/min")print(f"Diarize {DIARIZE}")print(f"Align {ALIGN}")print(f"Scan every {SCAN_INTERVAL}s")print(f"Poll every {POLL_INTERVAL}s\n")with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as pool:while True:files = sorted(f for f in INPUT_DIR.iterdir()if f.suffix.lower() in SUPPORTED)if files:# Submit all new files to the worker poolfutures = []for f in files:futures.append(pool.submit(process_file, f))# Wait for this batch to finish before next scanfor fut in futures:try:fut.result()except Exception as e:print(f" Worker error: {e}")time.sleep(SCAN_INTERVAL)if __name__ == "__main__":main()
How it works
- Startup — Creates
input/,output/,done/, andfailed/folders. Loads any pending jobs fromstate.json(so it picks up where it left off after a restart). Starts a webhook server on port 8765 in a background thread. Starts a separate polling thread for webhook fallback. - Scan loop — Every 10 seconds, scans
input/for files with supported extensions. New files are submitted to a concurrent worker pool (10 workers by default) for parallel upload and transcription. - Rate limiting — Before each API call, the client-side rate limiter checks if the request is within the configured limit. If at capacity, it waits automatically. This prevents 429 errors instead of reacting to them. Set the limits slightly below your plan's actual limits.
- Upload — Each worker requests a presigned upload URL via
POST /v1/upload, then streams the file with aPUTrequest using the exactContent-Typereturned by the API. Files are streamed from disk (no full-file memory load). If the API returns 429 despite client-side limiting, the script waits for theRetry-Afterduration and retries (up to 5 attempts). - Transcribe — Starts a transcription job via
POST /v1/transcribewith theupload_idandwebhook_url. The API returns immediately withstatus: "created"— the file is validated and transcribed in the background. Saves the job tostate.jsonso it persists. - Webhook — When Scriptivox finishes transcribing, it POSTs the result to your webhook. The handler verifies the HMAC signature, saves the transcript to
output/<filename>.txt, and moves the source file todone/. All webhook events are logged. - Polling fallback — A separate thread polls pending jobs every 30 seconds using round-robin ordering, so all jobs eventually get checked — even with thousands in-flight. This catches results if a webhook is missed (e.g. after a restart or network blip).
- Failure — If a transcription fails, the webhook (or poll) handler logs the error, moves the source file to
failed/, and removes the job from state. If the upload itself fails (HTTP error, rate limit exhausted), the file is also moved tofailed/. Either way, the watcher continues processing other files.
Configuration
| Variable | Default | Description |
|---|---|---|
| SCRIPTIVOX_API_KEY | — | Your API key (required) |
| WEBHOOK_URL | http://localhost:8765/webhook | Public URL for webhook delivery |
| MAX_PARALLEL_UPLOADS | 10 | How many files to upload at the same time |
| SCAN_INTERVAL | 10 | Seconds between folder scans |
| POLL_INTERVAL | 30 | Seconds between polling cycles |
| WEBHOOK_PORT | 8765 | Local port for the webhook server |
| DIARIZE | False | Set to True to enable speaker diarization (auto-detects number of speakers) |
| ALIGN | False | Set to True to enable word-level timestamps |
| INPUT_DIR | ./input | Folder to watch for new files |
| OUTPUT_DIR | ./output | Folder to save transcripts |
| DONE_DIR | ./done | Completed source files moved here |
| FAILED_DIR | ./failed | Failed source files moved here |
| RATE_LIMITS.upload | 50 | Max upload requests/min (keep below your plan's limit) |
| RATE_LIMITS.transcribe | 50 | Max transcribe requests/min (keep below your plan's limit) |
| RATE_LIMITS.poll | 150 | Max polling requests/min (keep below your plan's limit) |
To change the rate limits, concurrency, or directories, edit the constants at the top of the script. Set the rate limits slightly below your actual plan limits to leave headroom.
Polling-only mode
The script already polls pending jobs as a webhook fallback. If you prefer a polling-only approach, remove the webhook server, drop the webhook_url from the transcribe request, and reduce SCAN_INTERVAL to poll more frequently. See the Get Result endpoint for details.