Sora API Tutorial 2026: Complete Developer Integration Guide
Learn to integrate OpenAI Sora API into your apps. Full tutorial with code examples in Python, Node.js, authentication, webhooks, and best practices.
Sora API Tutorial: Complete Developer Integration Guide 2026
Want to integrate OpenAI Sora into your application? This comprehensive tutorial covers everything developers need to know about the Sora API - from authentication to production deployment.
We'll walk through code examples in both Python and Node.js, covering text-to-video generation, status checking, webhooks, and best practices.
Table of Contents
- Prerequisites
- Getting API Access
- Authentication
- Basic Video Generation
- Checking Generation Status
- Downloading Generated Videos
- Advanced Features
- Webhooks Integration
- Error Handling
- Rate Limits & Best Practices
- Production Deployment
Prerequisites
Before starting, ensure you have:
- OpenAI account with API access
- ChatGPT Pro subscription (for full Sora access)
- Python 3.8+ or Node.js 18+
- Basic REST API knowledge
- Understanding of async programming
Required Packages
Python:
pip install openai requests python-dotenv aiohttp
Node.js:
npm install openai axios dotenv
Getting API Access
Step 1: Create OpenAI Account
- Visit platform.openai.com
- Sign up or log in
- Navigate to API Keys section
Step 2: Generate API Key
- Click "Create new secret key"
- Name it (e.g., "Sora Integration")
- Copy and save securely (shown only once!)
Step 3: Verify Sora Access
Sora API requires:
- Active ChatGPT Pro subscription ($200/mo)
- Or enterprise API agreement
Step 4: Set Up Environment
Create .env file:
OPENAI_API_KEY=sk-your-api-key-here
SORA_WEBHOOK_SECRET=your-webhook-secret
⚠️ Never commit API keys to version control!
Authentication
Python Setup
import os
from dotenv import load_dotenv
import openai
load_dotenv()
client = openai.OpenAI(
api_key=os.getenv("OPENAI_API_KEY")
)
Node.js Setup
require('dotenv').config();
const OpenAI = require('openai');
const client = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
Verify Connection
Python:
def verify_connection():
try:
models = client.models.list()
print("✓ Connected to OpenAI API")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
return False
Node.js:
async function verifyConnection() {
try {
const models = await client.models.list();
console.log("✓ Connected to OpenAI API");
return true;
} catch (error) {
console.error(`✗ Connection failed: ${error.message}`);
return false;
}
}
Basic Video Generation
API Endpoint
POST https://api.openai.com/v1/videos/generations
Request Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt | string | Yes | Video description |
duration | integer | No | Length in seconds (5-60) |
resolution | string | No | "720p" or "1080p" |
aspect_ratio | string | No | "16:9", "9:16", "1:1" |
style | string | No | Style preset |
Python Implementation
import time
def generate_video(
prompt: str,
duration: int = 10,
resolution: str = "1080p",
aspect_ratio: str = "16:9"
) -> dict:
"""
Generate a video from text prompt.
Args:
prompt: Description of the video to generate
duration: Video length in seconds (5-60)
resolution: "720p" or "1080p"
aspect_ratio: "16:9", "9:16", or "1:1"
Returns:
dict with generation_id and status
"""
try:
response = client.videos.generate(
model="sora-1",
prompt=prompt,
duration=duration,
resolution=resolution,
aspect_ratio=aspect_ratio
)
return {
"success": True,
"generation_id": response.id,
"status": response.status,
"estimated_time": response.estimated_time
}
except openai.APIError as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
# Example usage
result = generate_video(
prompt="A golden retriever running through a meadow at sunset, cinematic",
duration=15,
resolution="1080p"
)
print(f"Generation ID: {result.get('generation_id')}")
Node.js Implementation
async function generateVideo({
prompt,
duration = 10,
resolution = "1080p",
aspectRatio = "16:9"
}) {
/**
* Generate a video from text prompt.
* @param {string} prompt - Description of the video
* @param {number} duration - Length in seconds (5-60)
* @param {string} resolution - "720p" or "1080p"
* @param {string} aspectRatio - "16:9", "9:16", or "1:1"
* @returns {Object} Generation result
*/
try {
const response = await client.videos.generate({
model: "sora-1",
prompt: prompt,
duration: duration,
resolution: resolution,
aspect_ratio: aspectRatio
});
return {
success: true,
generationId: response.id,
status: response.status,
estimatedTime: response.estimated_time
};
} catch (error) {
return {
success: false,
error: error.message,
errorType: error.constructor.name
};
}
}
// Example usage
const result = await generateVideo({
prompt: "A golden retriever running through a meadow at sunset, cinematic",
duration: 15,
resolution: "1080p"
});
console.log(`Generation ID: ${result.generationId}`);
Checking Generation Status
Video generation is asynchronous. Poll for status:
Status Values
| Status | Meaning |
|---|---|
pending | Queued for processing |
processing | Currently generating |
completed | Ready to download |
failed | Generation failed |
Python Status Checker
def check_status(generation_id: str) -> dict:
"""Check the status of a video generation."""
try:
response = client.videos.retrieve(generation_id)
result = {
"status": response.status,
"progress": response.progress, # 0-100
}
if response.status == "completed":
result["video_url"] = response.video_url
result["thumbnail_url"] = response.thumbnail_url
result["duration"] = response.duration
elif response.status == "failed":
result["error"] = response.error_message
return result
except Exception as e:
return {"error": str(e)}
def wait_for_completion(generation_id: str, timeout: int = 300) -> dict:
"""Wait for video generation to complete."""
start_time = time.time()
while time.time() - start_time < timeout:
status = check_status(generation_id)
if status.get("status") == "completed":
return status
if status.get("status") == "failed":
return status
print(f"Progress: {status.get('progress', 0)}%")
time.sleep(5) # Poll every 5 seconds
return {"error": "Timeout waiting for generation"}
Node.js Status Checker
async function checkStatus(generationId) {
try {
const response = await client.videos.retrieve(generationId);
const result = {
status: response.status,
progress: response.progress
};
if (response.status === "completed") {
result.videoUrl = response.video_url;
result.thumbnailUrl = response.thumbnail_url;
result.duration = response.duration;
} else if (response.status === "failed") {
result.error = response.error_message;
}
return result;
} catch (error) {
return { error: error.message };
}
}
async function waitForCompletion(generationId, timeout = 300000) {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
const status = await checkStatus(generationId);
if (status.status === "completed") return status;
if (status.status === "failed") return status;
console.log(`Progress: ${status.progress || 0}%`);
await new Promise(r => setTimeout(r, 5000));
}
return { error: "Timeout waiting for generation" };
}
Downloading Generated Videos
Python Download
import requests
def download_video(video_url: str, output_path: str) -> bool:
"""Download generated video to local file."""
try:
response = requests.get(video_url, stream=True)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✓ Downloaded to {output_path}")
return True
except Exception as e:
print(f"✗ Download failed: {e}")
return False
# Complete workflow example
def generate_and_download(prompt: str, output_path: str):
# Generate
result = generate_video(prompt, duration=10)
if not result.get("success"):
print(f"Generation failed: {result.get('error')}")
return False
# Wait for completion
generation_id = result["generation_id"]
status = wait_for_completion(generation_id)
if status.get("status") != "completed":
print(f"Generation failed: {status.get('error')}")
return False
# Download
return download_video(status["video_url"], output_path)
Node.js Download
const fs = require('fs');
const axios = require('axios');
async function downloadVideo(videoUrl, outputPath) {
try {
const response = await axios({
method: 'GET',
url: videoUrl,
responseType: 'stream'
});
const writer = fs.createWriteStream(outputPath);
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', () => {
console.log(`✓ Downloaded to ${outputPath}`);
resolve(true);
});
writer.on('error', reject);
});
} catch (error) {
console.error(`✗ Download failed: ${error.message}`);
return false;
}
}
Advanced Features
Image-to-Video
def image_to_video(
image_url: str,
motion_prompt: str,
duration: int = 5
) -> dict:
"""Animate an image into a video."""
return client.videos.generate(
model="sora-1",
image_url=image_url,
prompt=motion_prompt,
duration=duration,
mode="image-to-video"
)
Video Extension
def extend_video(
video_id: str,
extension_prompt: str,
additional_seconds: int = 10
) -> dict:
"""Extend an existing video."""
return client.videos.extend(
video_id=video_id,
prompt=extension_prompt,
duration=additional_seconds
)
Batch Generation
import asyncio
import aiohttp
async def batch_generate(prompts: list[str]) -> list[dict]:
"""Generate multiple videos concurrently."""
tasks = []
for prompt in prompts:
task = generate_video_async(prompt)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Webhooks Integration
Instead of polling, use webhooks for status updates:
Setting Up Webhook Endpoint
Express.js:
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.json());
app.post('/webhook/sora', (req, res) => {
// Verify webhook signature
const signature = req.headers['x-openai-signature'];
const payload = JSON.stringify(req.body);
const expectedSignature = crypto
.createHmac('sha256', process.env.SORA_WEBHOOK_SECRET)
.update(payload)
.digest('hex');
if (signature !== expectedSignature) {
return res.status(401).send('Invalid signature');
}
// Handle webhook event
const event = req.body;
switch (event.type) {
case 'video.completed':
console.log(`Video ready: ${event.data.id}`);
handleVideoCompleted(event.data);
break;
case 'video.failed':
console.log(`Video failed: ${event.data.error}`);
handleVideoFailed(event.data);
break;
}
res.status(200).send('OK');
});
app.listen(3000);
Registering Webhook
def register_webhook(webhook_url: str):
"""Register webhook for video status updates."""
return client.webhooks.create(
url=webhook_url,
events=["video.completed", "video.failed", "video.progress"]
)
Error Handling
Common API Errors
| Error Code | Meaning | Solution |
|---|---|---|
| 400 | Bad request | Check prompt/parameters |
| 401 | Unauthorized | Verify API key |
| 403 | Forbidden | Check permissions/subscription |
| 429 | Rate limited | Implement backoff |
| 500 | Server error | Retry with backoff |
Robust Error Handler
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
"""Decorator for retry logic with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except openai.RateLimitError:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Rate limited. Retrying in {delay}s...")
time.sleep(delay)
else:
raise
except openai.APIError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Server error. Retrying in {delay}s...")
time.sleep(delay)
else:
raise
return None
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def generate_video_safe(prompt: str, **kwargs):
return generate_video(prompt, **kwargs)
Rate Limits & Best Practices
Current Rate Limits (2026)
| Tier | Requests/min | Concurrent | Daily Limit |
|---|---|---|---|
| Standard | 10 | 3 | 100 |
| Pro | 30 | 10 | 500 |
| Enterprise | 100 | 50 | Unlimited |
Best Practices
-
Implement Retry Logic
- Use exponential backoff
- Handle rate limits gracefully
-
Cache Results
- Store generation IDs
- Cache video URLs (temporary)
-
Validate Prompts
- Check length before sending
- Filter prohibited content
-
Monitor Usage
- Track API calls
- Set up usage alerts
-
Secure Keys
- Use environment variables
- Rotate keys periodically
- Use separate keys for dev/prod
Example Rate Limiter
from collections import deque
from datetime import datetime, timedelta
import asyncio
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # seconds
self.requests = deque()
async def acquire(self):
now = datetime.now()
# Remove old requests
while self.requests and self.requests[0] < now - timedelta(seconds=self.time_window):
self.requests.popleft()
# Wait if at limit
if len(self.requests) >= self.max_requests:
wait_time = (self.requests[0] + timedelta(seconds=self.time_window) - now).total_seconds()
await asyncio.sleep(max(0, wait_time))
self.requests.append(now)
# Usage
limiter = RateLimiter(max_requests=10, time_window=60)
async def rate_limited_generate(prompt):
await limiter.acquire()
return await generate_video_async(prompt)
Production Deployment
Architecture Recommendations
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────▶│ Your API │────▶│ Sora API │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Queue │ (Redis/SQS)
│ System │
└─────────────┘
│
▼
┌─────────────┐
│ Worker │
│ Process │
└─────────────┘
Production Checklist
- Use queue system for generation jobs
- Implement webhook handling
- Set up monitoring and alerting
- Configure auto-scaling for workers
- Implement proper logging
- Set up error tracking (Sentry, etc.)
- Create health check endpoints
- Document API for your team
- Set up staging environment
- Plan for API key rotation
Complete Example: Express.js API
const express = require('express');
const { OpenAI } = require('openai');
const Redis = require('ioredis');
const app = express();
const client = new OpenAI();
const redis = new Redis();
app.use(express.json());
// Generate video endpoint
app.post('/api/videos/generate', async (req, res) => {
try {
const { prompt, duration, resolution } = req.body;
// Validate
if (!prompt || prompt.length < 10) {
return res.status(400).json({ error: 'Invalid prompt' });
}
// Generate
const result = await client.videos.generate({
model: 'sora-1',
prompt,
duration: duration || 10,
resolution: resolution || '1080p'
});
// Cache generation ID
await redis.set(
`video:${result.id}`,
JSON.stringify({ status: 'processing', prompt }),
'EX',
3600
);
res.json({
success: true,
generationId: result.id,
estimatedTime: result.estimated_time
});
} catch (error) {
console.error('Generation error:', error);
res.status(500).json({ error: 'Generation failed' });
}
});
// Check status endpoint
app.get('/api/videos/:id/status', async (req, res) => {
try {
const { id } = req.params;
const status = await client.videos.retrieve(id);
res.json({
status: status.status,
progress: status.progress,
videoUrl: status.video_url
});
} catch (error) {
res.status(500).json({ error: 'Status check failed' });
}
});
app.listen(3000, () => {
console.log('Sora API server running on port 3000');
});
Resources
Need to Download Sora Videos?
For downloading generated videos without watermarks, use SoraSave.net - perfect for testing and showcasing your API integrations!
API documentation reflects January 2026 version. Check OpenAI docs for latest.