API Security
Comprehensive guide to API security best practices, including authentication, authorization, rate limiting, and protection against common attacks.
Overview
API security is crucial for protecting your trading data, account information, and preventing unauthorized access to your Vibe Trading account. This guide covers all aspects of API security from basic authentication to advanced threat protection.
Authentication Security
API Key Management
Secure Key Storage
import os
import keyring
from cryptography.fernet import Fernet
class SecureAPIKeyManager:
def __init__(self):
self.keyring_service = "vibe_trading"
def store_api_key(self, user_id, api_key, secret_key):
# Use system keyring for secure storage
keyring.set_password(self.keyring_service, f"{user_id}_api_key", api_key)
keyring.set_password(self.keyring_service, f"{user_id}_secret_key", secret_key)
def retrieve_api_key(self, user_id):
api_key = keyring.get_password(self.keyring_service, f"{user_id}_api_key")
secret_key = keyring.get_password(self.keyring_service, f"{user_id}_secret_key")
return api_key, secret_key
def delete_api_key(self, user_id):
keyring.delete_password(self.keyring_service, f"{user_id}_api_key")
keyring.delete_password(self.keyring_service, f"{user_id}_secret_key")
Key Rotation
import datetime
from datetime import timedelta
class APIKeyRotation:
def __init__(self, rotation_days=90):
self.rotation_days = rotation_days
def should_rotate_key(self, key_created_date):
return datetime.datetime.now() - key_created_date > timedelta(days=self.rotation_days)
def rotate_api_key(self, user_id):
# Generate new key pair
new_api_key = self.generate_api_key()
new_secret_key = self.generate_secret_key()
# Update in database
self.update_api_keys(user_id, new_api_key, new_secret_key)
# Notify user
self.notify_user(user_id, "API key rotated for security")
return new_api_key, new_secret_key
Request Signing
HMAC Signature Generation
import hmac
import hashlib
import time
import json
class SecureRequestSigner:
def __init__(self, secret_key):
self.secret_key = secret_key
def generate_signature(self, method, path, body, timestamp):
# Create message string
message = f"{method}{path}{body}{timestamp}"
# Generate HMAC-SHA256 signature
signature = hmac.new(
self.secret_key.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def verify_signature(self, method, path, body, timestamp, signature):
expected_signature = self.generate_signature(method, path, body, timestamp)
return hmac.compare_digest(signature, expected_signature)
Timestamp Validation
class TimestampValidator:
def __init__(self, tolerance_seconds=300): # 5 minutes
self.tolerance_seconds = tolerance_seconds
def is_valid_timestamp(self, timestamp):
current_time = int(time.time())
time_diff = abs(current_time - timestamp)
return time_diff <= self.tolerance_seconds
def validate_request_timestamp(self, request_timestamp):
if not self.is_valid_timestamp(request_timestamp):
raise ValueError("Request timestamp is outside acceptable time window")
Authorization Security
Permission-Based Access Control
Role-Based Permissions
class PermissionManager:
def __init__(self):
self.permissions = {
'read:account': ['viewer', 'trader', 'admin'],
'write:orders': ['trader', 'admin'],
'read:positions': ['viewer', 'trader', 'admin'],
'write:positions': ['trader', 'admin'],
'read:transactions': ['viewer', 'trader', 'admin'],
'write:withdrawals': ['admin'],
'read:api_keys': ['admin'],
'write:api_keys': ['admin']
}
def has_permission(self, user_role, permission):
return permission in self.permissions and user_role in self.permissions[permission]
def check_permission(self, user_role, permission):
if not self.has_permission(user_role, permission):
raise PermissionError(f"User role '{user_role}' does not have permission '{permission}'")
Resource-Level Authorization
class ResourceAuthorization:
def __init__(self):
self.resource_owners = {}
def check_resource_access(self, user_id, resource_id, action):
# Check if user owns the resource
if resource_id in self.resource_owners:
owner_id = self.resource_owners[resource_id]
if owner_id != user_id:
raise PermissionError("User does not have access to this resource")
# Check action-specific permissions
if action == 'delete' and not self.is_admin(user_id):
raise PermissionError("Only admins can delete resources")
def is_admin(self, user_id):
# Check if user has admin role
return self.get_user_role(user_id) == 'admin'
IP Whitelisting
IP Restriction Management
import ipaddress
from typing import List
class IPWhitelistManager:
def __init__(self):
self.allowed_ips = set()
self.allowed_ranges = set()
def add_ip(self, ip_address):
self.allowed_ips.add(ip_address)
def add_ip_range(self, ip_range):
network = ipaddress.ip_network(ip_range)
self.allowed_ranges.add(network)
def is_ip_allowed(self, ip_address):
ip = ipaddress.ip_address(ip_address)
# Check exact IP match
if ip_address in self.allowed_ips:
return True
# Check IP range match
for network in self.allowed_ranges:
if ip in network:
return True
return False
def validate_request_ip(self, request_ip):
if not self.is_ip_allowed(request_ip):
raise PermissionError(f"IP address {request_ip} is not whitelisted")
Rate Limiting
Rate Limit Implementation
Token Bucket Algorithm
import time
from collections import defaultdict
class TokenBucketRateLimiter:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets = defaultdict(lambda: {'tokens': capacity, 'last_refill': time.time()})
def is_allowed(self, user_id):
bucket = self.buckets[user_id]
now = time.time()
# Refill tokens
time_passed = now - bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
bucket['last_refill'] = now
# Check if request is allowed
if bucket['tokens'] >= 1:
bucket['tokens'] -= 1
return True
return False
def get_remaining_tokens(self, user_id):
bucket = self.buckets[user_id]
now = time.time()
# Refill tokens
time_passed = now - bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
bucket['last_refill'] = now
return int(bucket['tokens'])
Sliding Window Rate Limiter
from collections import deque
import time
class SlidingWindowRateLimiter:
def __init__(self, window_size, max_requests):
self.window_size = window_size # seconds
self.max_requests = max_requests
self.requests = defaultdict(deque)
def is_allowed(self, user_id):
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests outside the window
while user_requests and user_requests[0] <= now - self.window_size:
user_requests.popleft()
# Check if under limit
if len(user_requests) < self.max_requests:
user_requests.append(now)
return True
return False
def get_remaining_requests(self, user_id):
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests
while user_requests and user_requests[0] <= now - self.window_size:
user_requests.popleft()
return max(0, self.max_requests - len(user_requests))
Rate Limit Headers
Response Headers
class RateLimitHeaders:
def __init__(self, rate_limiter):
self.rate_limiter = rate_limiter
def add_rate_limit_headers(self, response, user_id):
remaining = self.rate_limiter.get_remaining_tokens(user_id)
reset_time = int(time.time() + 60) # Reset in 1 minute
response.headers['X-RateLimit-Limit'] = str(self.rate_limiter.capacity)
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Reset'] = str(reset_time)
return response
Input Validation
Request Validation
Schema Validation
from jsonschema import validate, ValidationError
import json
class RequestValidator:
def __init__(self):
self.schemas = {
'create_order': {
"type": "object",
"properties": {
"symbol": {"type": "string", "pattern": "^[A-Z]+/[A-Z]+$"},
"side": {"type": "string", "enum": ["buy", "sell"]},
"type": {"type": "string", "enum": ["market", "limit", "stop"]},
"quantity": {"type": "number", "minimum": 0.0001},
"price": {"type": "number", "minimum": 0.01}
},
"required": ["symbol", "side", "type", "quantity"]
}
}
def validate_request(self, request_data, schema_name):
if schema_name not in self.schemas:
raise ValueError(f"Unknown schema: {schema_name}")
try:
validate(request_data, self.schemas[schema_name])
except ValidationError as e:
raise ValueError(f"Validation error: {e.message}")
def sanitize_input(self, input_data):
# Remove potentially dangerous characters
if isinstance(input_data, str):
return input_data.replace('<', '<').replace('>', '>')
return input_data
SQL Injection Prevention
import re
from typing import Any
class SQLInjectionPrevention:
def __init__(self):
self.dangerous_patterns = [
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b)",
r"(\b(OR|AND)\s+\d+\s*=\s*\d+)",
r"(\b(OR|AND)\s+'.*'\s*=\s*'.*')",
r"(--|#|\/\*|\*\/)",
r"(\b(SCRIPT|JAVASCRIPT|VBSCRIPT)\b)"
]
def detect_sql_injection(self, input_data):
if isinstance(input_data, str):
for pattern in self.dangerous_patterns:
if re.search(pattern, input_data, re.IGNORECASE):
return True
return False
def sanitize_sql_input(self, input_data):
if self.detect_sql_injection(input_data):
raise ValueError("Potential SQL injection detected")
# Use parameterized queries
return input_data
Encryption
Data Encryption
AES Encryption
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class DataEncryption:
def __init__(self, password):
self.password = password.encode()
self.salt = os.urandom(16)
self.key = self._derive_key()
def _derive_key(self):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self.salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.password))
return key
def encrypt(self, data):
f = Fernet(self.key)
encrypted_data = f.encrypt(data.encode())
return encrypted_data
def decrypt(self, encrypted_data):
f = Fernet(self.key)
decrypted_data = f.decrypt(encrypted_data)
return decrypted_data.decode()
Field-Level Encryption
class FieldEncryption:
def __init__(self, encryption_key):
self.encryption_key = encryption_key
self.fernet = Fernet(encryption_key)
def encrypt_field(self, field_value):
if field_value is None:
return None
return self.fernet.encrypt(str(field_value).encode()).decode()
def decrypt_field(self, encrypted_value):
if encrypted_value is None:
return None
return self.fernet.decrypt(encrypted_value.encode()).decode()
def encrypt_sensitive_fields(self, data, sensitive_fields):
encrypted_data = data.copy()
for field in sensitive_fields:
if field in encrypted_data:
encrypted_data[field] = self.encrypt_field(encrypted_data[field])
return encrypted_data
Security Headers
HTTP Security Headers
Security Header Implementation
from flask import Flask, make_response
class SecurityHeaders:
def __init__(self, app):
self.app = app
self.setup_security_headers()
def setup_security_headers(self):
@self.app.after_request
def add_security_headers(response):
# Prevent clickjacking
response.headers['X-Frame-Options'] = 'DENY'
# Prevent MIME type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# Enable XSS protection
response.headers['X-XSS-Protection'] = '1; mode=block'
# Strict Transport Security
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# Content Security Policy
response.headers['Content-Security-Policy'] = "default-src 'self'"
# Referrer Policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
return response
Audit Logging
Security Event Logging
Audit Logger
import logging
import json
from datetime import datetime
class SecurityAuditLogger:
def __init__(self):
self.logger = logging.getLogger('security_audit')
self.logger.setLevel(logging.INFO)
# Create file handler
handler = logging.FileHandler('security_audit.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_authentication_attempt(self, user_id, ip_address, success):
event = {
'event_type': 'authentication_attempt',
'user_id': user_id,
'ip_address': ip_address,
'success': success,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(json.dumps(event))
def log_api_request(self, user_id, endpoint, method, ip_address, success):
event = {
'event_type': 'api_request',
'user_id': user_id,
'endpoint': endpoint,
'method': method,
'ip_address': ip_address,
'success': success,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(json.dumps(event))
def log_security_violation(self, violation_type, user_id, ip_address, details):
event = {
'event_type': 'security_violation',
'violation_type': violation_type,
'user_id': user_id,
'ip_address': ip_address,
'details': details,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.warning(json.dumps(event))
Threat Detection
Anomaly Detection
Behavioral Analysis
from collections import defaultdict
import statistics
class BehavioralAnalyzer:
def __init__(self):
self.user_patterns = defaultdict(list)
self.suspicious_thresholds = {
'requests_per_minute': 100,
'unusual_hours': True,
'geographic_anomaly': True
}
def analyze_request_pattern(self, user_id, request_data):
pattern = {
'timestamp': request_data['timestamp'],
'ip_address': request_data['ip_address'],
'endpoint': request_data['endpoint'],
'user_agent': request_data['user_agent']
}
self.user_patterns[user_id].append(pattern)
# Keep only last 1000 requests
if len(self.user_patterns[user_id]) > 1000:
self.user_patterns[user_id] = self.user_patterns[user_id][-1000:]
return self.detect_anomalies(user_id, pattern)
def detect_anomalies(self, user_id, current_pattern):
anomalies = []
user_history = self.user_patterns[user_id]
if len(user_history) < 10:
return anomalies
# Check request frequency
recent_requests = [r for r in user_history[-10:] if r['timestamp'] > current_pattern['timestamp'] - 60]
if len(recent_requests) > self.suspicious_thresholds['requests_per_minute']:
anomalies.append('high_request_frequency')
# Check unusual hours
hour = datetime.fromtimestamp(current_pattern['timestamp']).hour
if hour < 6 or hour > 22:
anomalies.append('unusual_hours')
# Check IP changes
recent_ips = set(r['ip_address'] for r in user_history[-10:])
if len(recent_ips) > 3:
anomalies.append('multiple_ip_addresses')
return anomalies
Best Practices
Security Checklist
API Security Checklist
- ✅ Strong Authentication: Use HMAC-SHA256 signatures
- ✅ Key Management: Rotate API keys regularly
- ✅ Rate Limiting: Implement proper rate limiting
- ✅ Input Validation: Validate all input data
- ✅ Encryption: Encrypt sensitive data
- ✅ Audit Logging: Log all security events
- ✅ Error Handling: Don't expose sensitive information
- ✅ Security Headers: Use proper HTTP security headers
- ✅ IP Whitelisting: Restrict access by IP when possible
- ✅ Monitoring: Monitor for suspicious activity
Implementation Guidelines
- Use HTTPS: Always use encrypted connections
- Validate Everything: Never trust user input
- Principle of Least Privilege: Grant minimum necessary permissions
- Defense in Depth: Implement multiple security layers
- Regular Updates: Keep security measures updated
- Incident Response: Have a plan for security incidents
Ready to secure your wallet? Check out our Wallet Security guide or explore Data Protection.