Use Cases
Practical examples of building with the Scriptivox API.
Folder Watcher
A self-contained script that watches a folder for new audio or video files, automatically transcribes them via the Scriptivox API, and saves the results as text files. No manual steps — drop files in and get transcripts out.
What it does
- Scans
input/for new audio or video files every 10 seconds (configurable) - Uploads multiple files in parallel (10 at a time by default, configurable)
- Starts transcription (optionally with speaker diarization and word-level timestamps)
- Receives results via a local webhook server
- Saves transcripts as
.txtfiles inoutput/with the same base filename - Moves source files to
done/on success orfailed/on failure - Persists job state to
state.json— survives script restarts - Falls back to polling
GET /v1/transcribe/{id}if a webhook is missed (e.g. after a restart or network blip) - Client-side rate limiting — self-throttles to stay within your plan's limits (never hits 429)
- Round-robin polling — every pending job gets checked, even with thousands in flight
- Handles 10,000+ files/day with no issues
Prerequisites
- Python 3.8+ (or Node.js 18+ for the JavaScript version)
- A Scriptivox API key with balance — create one here
- ngrok (free tier works) to expose your local webhook server
Setup
Install dependencies
Python:
pip install requests
Node.js: No dependencies needed — uses built-in fetch and http (Node 18+).
Expose your local webhook
The script runs a webhook server on port 8765. Use ngrok to make it accessible from the internet so Scriptivox can deliver results to it:
ngrok http 8765
Copy the HTTPS forwarding URL (e.g. https://abc123.ngrok.io). You'll use it in the next step.
Run the script
Set your API key and webhook URL, then start:
export SCRIPTIVOX_API_KEY="sk_live_YOUR_KEY"export WEBHOOK_URL="https://abc123.ngrok.io/webhook"python transcribe_watcher.py
Or for Node.js:
SCRIPTIVOX_API_KEY=sk_live_YOUR_KEY \WEBHOOK_URL=https://abc123.ngrok.io/webhook \node transcribe_watcher.mjs
Drop files and get transcripts
Put any audio or video file in the input/ folder. The script picks it up on the next scan, uploads it, and transcribes it. Files are moved automatically once processed:
project/input/ <- drop files hereoutput/meeting.txt <- transcripts appear herepodcast.txtdone/meeting.mp3 <- source files after successpodcast.wavfailed/corrupted.mp3 <- source files after failurestate.json <- tracks pending jobs (survives restarts)transcribe_watcher.py <- or transcribe_watcher.mjs for Node
The script
#!/usr/bin/env python3"""Scriptivox Folder WatcherWatches a folder for new audio/video files, transcribes them,and saves results as text files.Handles high-volume workloads (10,000+ files/day) with:- Concurrent uploads (configurable worker count)- Client-side rate limiting (never hits 429)- Separate polling thread (doesn't block submissions)- Round-robin polling (all jobs get checked)- Restart resilience via state.jsonFolder structure:input/ <- drop audio/video files hereoutput/ <- transcripts saved here as .txtdone/ <- source files moved here after successfailed/ <- source files moved here after failureUsage:export SCRIPTIVOX_API_KEY="sk_live_YOUR_KEY"export WEBHOOK_URL="https://abc123.ngrok.io/webhook"python transcribe_watcher.py"""import osimport sysimport timeimport jsonimport hashlibimport hmacimport shutilimport randomimport threadingimport requestsfrom concurrent.futures import ThreadPoolExecutorfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom pathlib import Pathfrom collections import deque# ─── Configuration ───────────────────────────────────────API_KEY = os.environ.get("SCRIPTIVOX_API_KEY", "")BASE_URL = "https://api.scriptivox.com/v1"INPUT_DIR = Path("input")OUTPUT_DIR = Path("output")DONE_DIR = Path("done")FAILED_DIR = Path("failed")STATE_FILE = Path("state.json")SCAN_INTERVAL = 10 # seconds between folder scansPOLL_INTERVAL = 30 # seconds between poll cyclesWEBHOOK_PORT = 8765WEBHOOK_URL = os.environ.get("WEBHOOK_URL", f"http://localhost:{WEBHOOK_PORT}/webhook")DIARIZE = False # set True to enable speaker diarizationALIGN = True # word-level timestamps; set False to opt out (auto-True when DIARIZE)MAX_PARALLEL_UPLOADS = 10 # how many files to upload at the same time# Rate limits (requests per minute) — set these to match your plan.# Keep slightly below actual limits to leave headroom.RATE_LIMITS = {"upload": 50, # POST /v1/upload"transcribe": 50, # POST /v1/transcribe"poll": 150, # GET /v1/transcribe/{id}}SUPPORTED = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac", ".opus",".wma", ".aiff", ".caf", ".mp4", ".mov", ".avi", ".mkv",".webm", ".wmv", ".flv", ".m4v", ".3gp", ".mpeg", ".mts",".ogv", ".ts", ".vob", ".f4v",}state_lock = threading.Lock()pending = {} # transcription_id -> filenamepoll_offset = 0 # tracks where we left off polling# ─── Rate Limiter ────────────────────────────────────────class RateLimiter:"""Token-bucket rate limiter. Blocks until a request is allowed."""def __init__(self, max_per_minute):self.max_per_minute = max_per_minuteself.timestamps = deque()self.lock = threading.Lock()def wait(self):"""Block until we can make a request within the rate limit."""with self.lock:now = time.time()# Remove timestamps older than 60swhile self.timestamps and now - self.timestamps[0] > 60:self.timestamps.popleft()if len(self.timestamps) >= self.max_per_minute:wait_time = 60 - (now - self.timestamps[0]) + 0.1self.lock.release()time.sleep(wait_time)self.lock.acquire()now = time.time()while self.timestamps and now - self.timestamps[0] > 60:self.timestamps.popleft()self.timestamps.append(time.time())limiters = {name: RateLimiter(rpm) for name, rpm in RATE_LIMITS.items()}# ─── State Persistence ───────────────────────────────────def load_state():"""Load pending jobs from disk."""if STATE_FILE.exists():data = json.loads(STATE_FILE.read_text())return data.get("pending", {})return {}def save_state():"""Save pending jobs to disk."""with state_lock:STATE_FILE.write_text(json.dumps({"pending": pending}, indent=2))# ─── Helpers ─────────────────────────────────────────────def unique_output_name(filename, prefix=""):"""Generate a unique output filename, appending a number if needed."""stem = Path(filename).stemname = f"{prefix}{stem}.txt"out = OUTPUT_DIR / nameif not out.exists():return out# Append incrementing number to avoid collisionn = 2while True:name = f"{prefix}{stem}_{n}.txt"out = OUTPUT_DIR / nameif not out.exists():return outn += 1def already_processed(filename):"""Check if output already has a result for this file."""stem = Path(filename).stemreturn ((OUTPUT_DIR / f"{stem}.txt").exists() or(OUTPUT_DIR / f"failed_{stem}.txt").exists())def write_failure(filename, error_code, error_message):"""Write a failure log to output/."""out = unique_output_name(filename, prefix="failed_")out.write_text(f"File: {filename}\nError: {error_code}: {error_message}\n",encoding="utf-8",)return out# ─── API Helper ──────────────────────────────────────────def api_request(method, url, limiter_name, max_retries=5, **kwargs):"""Make a rate-limited API request with retry on 429."""limiter = limiters.get(limiter_name)for attempt in range(max_retries):if limiter:limiter.wait()r = requests.request(method, url, **kwargs)if r.status_code == 429:retry_after = int(r.headers.get("Retry-After", 10))print(f" Rate limited (429), waiting {retry_after}s "f"(attempt {attempt + 1}/{max_retries})")time.sleep(retry_after)continuereturn rreturn r # return last response even if still 429# ─── Webhook Server ─────────────────────────────────────class WebhookHandler(BaseHTTPRequestHandler):def do_POST(self):length = int(self.headers.get("Content-Length", 0))body = self.rfile.read(length)# Verify HMAC signaturesig = self.headers.get("X-Scriptivox-Signature", "")ts = self.headers.get("X-Scriptivox-Timestamp", "")if sig and ts:secret = hashlib.sha256(API_KEY.encode()).hexdigest()expected = hmac.new(secret.encode(),f"{ts}.{body.decode()}".encode(),hashlib.sha256,).hexdigest()if not hmac.compare_digest(expected, sig):print(f" [webhook] Rejected — invalid signature")self.send_response(401)self.end_headers()returndata = json.loads(body)event = data.get("event", "unknown")txn_id = data.get("transcription_id", "?")with state_lock:if event == "transcription.completed" and txn_id in pending:name = pending.pop(txn_id)text = data["result"]["full_transcript"]out = unique_output_name(name)out.write_text(text, encoding="utf-8")src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(DONE_DIR / name))save_state()print(f" [webhook] completed {name}")print(f" transcript -> {out}")elif event == "transcription.failed" and txn_id in pending:name = pending.pop(txn_id)err = data.get("error", {})code = err.get("code", "UNKNOWN")msg = err.get("message", "unknown error")fail_out = write_failure(name, code, msg)src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(FAILED_DIR / name))save_state()print(f" [webhook] failed {name}")print(f" error {code}: {msg}")else:print(f" [webhook] {event} txn={txn_id}")self.send_response(200)self.end_headers()def log_message(self, *_):pass# ─── Poll Pending Jobs (separate thread) ─────────────────def poll_loop():"""Continuously polls pending jobs in a round-robin fashion.Runs in its own thread so it never blocks file submissions."""global poll_offsetwhile True:time.sleep(POLL_INTERVAL)with state_lock:items = list(pending.items())if not items:continue# Round-robin: start from where we left off last timeif poll_offset >= len(items):poll_offset = 0# Rotate the list so we start from the offsetrotated = items[poll_offset:] + items[:poll_offset]polled = 0for txn_id, name in rotated:try:r = api_request("GET", f"{BASE_URL}/transcribe/{txn_id}",limiter_name="poll",headers={"Authorization": API_KEY},)if r.status_code == 429:breakif r.status_code != 200:polled += 1continuedata = r.json()status = data.get("status")if status == "completed":text = data.get("result", {}).get("full_transcript", "")out = unique_output_name(name)out.write_text(text, encoding="utf-8")src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(DONE_DIR / name))with state_lock:pending.pop(txn_id, None)save_state()print(f" [poll] completed {name}")elif status == "failed":err = data.get("error", {})code = err.get("code", "UNKNOWN")msg = err.get("message", "unknown error")write_failure(name, code, msg)src = INPUT_DIR / nameif src.exists():shutil.move(str(src), str(FAILED_DIR / name))with state_lock:pending.pop(txn_id, None)save_state()print(f" [poll] failed {name} — {code}: {msg}")polled += 1except Exception:polled += 1poll_offset = (poll_offset + polled) % max(len(items), 1)# ─── Upload & Transcribe ────────────────────────────────def process_file(filepath):name = filepath.nameif already_processed(name):returnwith state_lock:if name in pending.values():returnprint(f" Upload {name}")try:# 1. Get presigned upload URLr = api_request("POST", f"{BASE_URL}/upload",limiter_name="upload",headers={"Authorization": API_KEY, "Content-Type": "application/json"},json={"filename": name},)r.raise_for_status()upload = r.json()# 2. Upload file (streams from disk — no full-file memory load)content_type = upload["headers"]["Content-Type"]with open(filepath, "rb") as f:requests.put(upload["upload_url"],headers={"Content-Type": content_type},data=f,).raise_for_status()# 3. Start transcriptionbody = {"upload_id": upload["upload_id"],"webhook_url": WEBHOOK_URL,}if DIARIZE:body["diarize"] = Trueif ALIGN:body["align"] = Truer = api_request("POST", f"{BASE_URL}/transcribe",limiter_name="transcribe",headers={"Authorization": API_KEY, "Content-Type": "application/json"},json=body,)r.raise_for_status()job = r.json()with state_lock:pending[job["id"]] = namesave_state()print(f" Queued {name} (ID: {job['id']})")except requests.HTTPError as e:try:err_body = e.response.json() if e.response else {}except ValueError:err_body = {}code = err_body.get("error", {}).get("code", "HTTP_ERROR")msg = err_body.get("error", {}).get("message", str(e))print(f" Error {name}: {msg}")fail_out = write_failure(name, code, msg)if filepath.exists():shutil.move(str(filepath), str(FAILED_DIR / name))print(f" log -> {fail_out}")except Exception as e:print(f" Error {name}: {e}")fail_out = write_failure(name, "SCRIPT_ERROR", str(e))if filepath.exists():shutil.move(str(filepath), str(FAILED_DIR / name))print(f" log -> {fail_out}")# ─── Main Loop ──────────────────────────────────────────def main():if not API_KEY:print("Set SCRIPTIVOX_API_KEY environment variable")sys.exit(1)for d in [INPUT_DIR, OUTPUT_DIR, DONE_DIR, FAILED_DIR]:d.mkdir(exist_ok=True)pending.update(load_state())if pending:print(f"Resumed {len(pending)} pending job(s) from state.json")# Start webhook server in backgroundserver = HTTPServer(("0.0.0.0", WEBHOOK_PORT), WebhookHandler)threading.Thread(target=server.serve_forever, daemon=True).start()# Start polling thread in backgroundthreading.Thread(target=poll_loop, daemon=True).start()print(f"Webhook server on port {WEBHOOK_PORT}")print(f"Watching {INPUT_DIR.resolve()}")print(f"Output {OUTPUT_DIR.resolve()}")print(f"Parallel {MAX_PARALLEL_UPLOADS} uploads at a time")print(f"Rate limits upload={RATE_LIMITS['upload']}/min "f"transcribe={RATE_LIMITS['transcribe']}/min "f"poll={RATE_LIMITS['poll']}/min")print(f"Diarize {DIARIZE}")print(f"Align {ALIGN}")print(f"Scan every {SCAN_INTERVAL}s")print(f"Poll every {POLL_INTERVAL}s\n")with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as pool:while True:files = sorted(f for f in INPUT_DIR.iterdir()if f.suffix.lower() in SUPPORTED)if files:# Submit all new files to the worker poolfutures = []for f in files:futures.append(pool.submit(process_file, f))# Wait for this batch to finish before next scanfor fut in futures:try:fut.result()except Exception as e:print(f" Worker error: {e}")time.sleep(SCAN_INTERVAL)if __name__ == "__main__":main()
How it works
- Startup — Creates
input/,output/,done/, andfailed/folders. Loads any pending jobs fromstate.json(so it picks up where it left off after a restart). Starts a webhook server on port 8765 in a background thread. Starts a separate polling thread for webhook fallback. - Scan loop — Every 10 seconds, scans
input/for files with supported extensions. New files are submitted to a concurrent worker pool (10 workers by default) for parallel upload and transcription. - Rate limiting — Before each API call, the client-side rate limiter checks whether the request fits within the configured limit. If at capacity, it waits automatically. This prevents 429 errors instead of reacting to them. Set the limits slightly below your plan's actual limits to leave headroom.
- Upload — Each worker requests a presigned upload URL via
POST /v1/upload, then streams the file with aPUTrequest using the exactContent-Typereturned by the API. Files are streamed from disk (no full-file memory load). If the API returns 429 despite client-side limiting, the script waits for theRetry-Afterduration and retries (up to 5 attempts). - Transcribe — Starts a transcription job via
POST /v1/transcribewith theupload_idandwebhook_url. The API returns immediately withstatus: "created"— the file is validated and transcribed in the background. The job is saved tostate.jsonso it persists across restarts. - Webhook — When Scriptivox finishes a transcription, it POSTs the result to your webhook. The handler verifies the HMAC-SHA256 signature, saves the transcript to
output/<filename>.txt, and moves the source file todone/. All webhook events are logged. - Polling fallback — A separate thread polls pending jobs every 30 seconds in round-robin order, so every job eventually gets checked — even with thousands in flight. This catches results if a webhook is missed (e.g. after a restart or network blip). Webhooks are best-effort with no retries, so this fallback is important.
- Failure — If a transcription fails, the webhook (or poll) handler logs the error, moves the source file to
failed/, and removes the job from state. If the upload itself fails (HTTP error, rate limit exhausted), the file is also moved tofailed/. Either way, the watcher continues processing other files.
Configuration
| Variable | Default | Description |
|---|---|---|
| SCRIPTIVOX_API_KEY | — | Your API key (required) |
| WEBHOOK_URL | http://localhost:8765/webhook | Public URL for webhook delivery |
| MAX_PARALLEL_UPLOADS | 10 | How many files to upload at the same time |
| SCAN_INTERVAL | 10 | Seconds between folder scans |
| POLL_INTERVAL | 30 | Seconds between polling cycles |
| WEBHOOK_PORT | 8765 | Local port for the webhook server |
| DIARIZE | False | Set to True to enable speaker diarization (auto-detects number of speakers) |
| ALIGN | True | Word-level timestamps (set False to opt out; auto-forced True when DIARIZE is True) |
| INPUT_DIR | ./input | Folder to watch for new files |
| OUTPUT_DIR | ./output | Folder to save transcripts |
| DONE_DIR | ./done | Completed source files moved here |
| FAILED_DIR | ./failed | Failed source files moved here |
| RATE_LIMITS.upload | 50 | Max upload requests/min (keep below your plan's limit) |
| RATE_LIMITS.transcribe | 50 | Max transcribe requests/min (keep below your plan's limit) |
| RATE_LIMITS.poll | 150 | Max polling requests/min (keep below your plan's limit) |
To change the rate limits, concurrency, or directories, edit the constants at the top of the script. Set the rate limits slightly below your actual plan limits to leave headroom.
Polling-only mode
The script already polls pending jobs as a webhook fallback. If you'd rather skip webhooks entirely, remove the webhook server, drop webhook_url from the transcribe request, and lean on the polling thread alone. See the Get Result endpoint for details.
Filenames must be ASCII
The Scriptivox API rejects filenames with non-ASCII characters (accents, CJK, emoji) and reserved characters (/, \, <, >, ", ', backtick). The watcher will surface these as INVALID_FILENAME and move the file to failed/. Rename source files to ASCII before dropping them in input/. See filename rules for details.