API Security

Comprehensive guide to API security best practices, including authentication, authorization, rate limiting, and protection against common attacks.

Overview

API security is crucial for protecting your trading data, account information, and preventing unauthorized access to your Vibe Trading account. This guide covers all aspects of API security from basic authentication to advanced threat protection.

Authentication Security

API Key Management

Secure Key Storage

import os
import keyring
from cryptography.fernet import Fernet

class SecureAPIKeyManager:
    def __init__(self):
        self.keyring_service = "vibe_trading"

    def store_api_key(self, user_id, api_key, secret_key):
        # Use system keyring for secure storage
        keyring.set_password(self.keyring_service, f"{user_id}_api_key", api_key)
        keyring.set_password(self.keyring_service, f"{user_id}_secret_key", secret_key)

    def retrieve_api_key(self, user_id):
        api_key = keyring.get_password(self.keyring_service, f"{user_id}_api_key")
        secret_key = keyring.get_password(self.keyring_service, f"{user_id}_secret_key")
        return api_key, secret_key

    def delete_api_key(self, user_id):
        keyring.delete_password(self.keyring_service, f"{user_id}_api_key")
        keyring.delete_password(self.keyring_service, f"{user_id}_secret_key")

Key Rotation

import datetime
from datetime import timedelta

class APIKeyRotation:
    def __init__(self, rotation_days=90):
        self.rotation_days = rotation_days

    def should_rotate_key(self, key_created_date):
        return datetime.datetime.now() - key_created_date > timedelta(days=self.rotation_days)

    def rotate_api_key(self, user_id):
        # Generate new key pair
        new_api_key = self.generate_api_key()
        new_secret_key = self.generate_secret_key()

        # Update in database
        self.update_api_keys(user_id, new_api_key, new_secret_key)

        # Notify user
        self.notify_user(user_id, "API key rotated for security")

        return new_api_key, new_secret_key

Request Signing

HMAC Signature Generation

import hmac
import hashlib
import time
import json

class SecureRequestSigner:
    def __init__(self, secret_key):
        self.secret_key = secret_key

    def generate_signature(self, method, path, body, timestamp):
        # Create message string
        message = f"{method}{path}{body}{timestamp}"

        # Generate HMAC-SHA256 signature
        signature = hmac.new(
            self.secret_key.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()

        return signature

    def verify_signature(self, method, path, body, timestamp, signature):
        expected_signature = self.generate_signature(method, path, body, timestamp)
        return hmac.compare_digest(signature, expected_signature)

Timestamp Validation

class TimestampValidator:
    def __init__(self, tolerance_seconds=300):  # 5 minutes
        self.tolerance_seconds = tolerance_seconds

    def is_valid_timestamp(self, timestamp):
        current_time = int(time.time())
        time_diff = abs(current_time - timestamp)
        return time_diff <= self.tolerance_seconds

    def validate_request_timestamp(self, request_timestamp):
        if not self.is_valid_timestamp(request_timestamp):
            raise ValueError("Request timestamp is outside acceptable time window")

Authorization Security

Permission-Based Access Control

Role-Based Permissions

class PermissionManager:
    def __init__(self):
        self.permissions = {
            'read:account': ['viewer', 'trader', 'admin'],
            'write:orders': ['trader', 'admin'],
            'read:positions': ['viewer', 'trader', 'admin'],
            'write:positions': ['trader', 'admin'],
            'read:transactions': ['viewer', 'trader', 'admin'],
            'write:withdrawals': ['admin'],
            'read:api_keys': ['admin'],
            'write:api_keys': ['admin']
        }

    def has_permission(self, user_role, permission):
        return permission in self.permissions and user_role in self.permissions[permission]

    def check_permission(self, user_role, permission):
        if not self.has_permission(user_role, permission):
            raise PermissionError(f"User role '{user_role}' does not have permission '{permission}'")

Resource-Level Authorization

class ResourceAuthorization:
    def __init__(self):
        self.resource_owners = {}

    def check_resource_access(self, user_id, resource_id, action):
        # Check if user owns the resource
        if resource_id in self.resource_owners:
            owner_id = self.resource_owners[resource_id]
            if owner_id != user_id:
                raise PermissionError("User does not have access to this resource")

        # Check action-specific permissions
        if action == 'delete' and not self.is_admin(user_id):
            raise PermissionError("Only admins can delete resources")

    def is_admin(self, user_id):
        # Check if user has admin role
        return self.get_user_role(user_id) == 'admin'

IP Whitelisting

IP Restriction Management

import ipaddress
from typing import List

class IPWhitelistManager:
    def __init__(self):
        self.allowed_ips = set()
        self.allowed_ranges = set()

    def add_ip(self, ip_address):
        self.allowed_ips.add(ip_address)

    def add_ip_range(self, ip_range):
        network = ipaddress.ip_network(ip_range)
        self.allowed_ranges.add(network)

    def is_ip_allowed(self, ip_address):
        ip = ipaddress.ip_address(ip_address)

        # Check exact IP match
        if ip_address in self.allowed_ips:
            return True

        # Check IP range match
        for network in self.allowed_ranges:
            if ip in network:
                return True

        return False

    def validate_request_ip(self, request_ip):
        if not self.is_ip_allowed(request_ip):
            raise PermissionError(f"IP address {request_ip} is not whitelisted")

Rate Limiting

Rate Limit Implementation

Token Bucket Algorithm

import time
from collections import defaultdict

class TokenBucketRateLimiter:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.buckets = defaultdict(lambda: {'tokens': capacity, 'last_refill': time.time()})

    def is_allowed(self, user_id):
        bucket = self.buckets[user_id]
        now = time.time()

        # Refill tokens
        time_passed = now - bucket['last_refill']
        tokens_to_add = time_passed * self.refill_rate
        bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
        bucket['last_refill'] = now

        # Check if request is allowed
        if bucket['tokens'] >= 1:
            bucket['tokens'] -= 1
            return True

        return False

    def get_remaining_tokens(self, user_id):
        bucket = self.buckets[user_id]
        now = time.time()

        # Refill tokens
        time_passed = now - bucket['last_refill']
        tokens_to_add = time_passed * self.refill_rate
        bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
        bucket['last_refill'] = now

        return int(bucket['tokens'])

Sliding Window Rate Limiter

from collections import deque
import time

class SlidingWindowRateLimiter:
    def __init__(self, window_size, max_requests):
        self.window_size = window_size  # seconds
        self.max_requests = max_requests
        self.requests = defaultdict(deque)

    def is_allowed(self, user_id):
        now = time.time()
        user_requests = self.requests[user_id]

        # Remove old requests outside the window
        while user_requests and user_requests[0] <= now - self.window_size:
            user_requests.popleft()

        # Check if under limit
        if len(user_requests) < self.max_requests:
            user_requests.append(now)
            return True

        return False

    def get_remaining_requests(self, user_id):
        now = time.time()
        user_requests = self.requests[user_id]

        # Remove old requests
        while user_requests and user_requests[0] <= now - self.window_size:
            user_requests.popleft()

        return max(0, self.max_requests - len(user_requests))

Rate Limit Headers

Response Headers

class RateLimitHeaders:
    def __init__(self, rate_limiter):
        self.rate_limiter = rate_limiter

    def add_rate_limit_headers(self, response, user_id):
        remaining = self.rate_limiter.get_remaining_tokens(user_id)
        reset_time = int(time.time() + 60)  # Reset in 1 minute

        response.headers['X-RateLimit-Limit'] = str(self.rate_limiter.capacity)
        response.headers['X-RateLimit-Remaining'] = str(remaining)
        response.headers['X-RateLimit-Reset'] = str(reset_time)

        return response

Input Validation

Request Validation

Schema Validation

from jsonschema import validate, ValidationError
import json

class RequestValidator:
    def __init__(self):
        self.schemas = {
            'create_order': {
                "type": "object",
                "properties": {
                    "symbol": {"type": "string", "pattern": "^[A-Z]+/[A-Z]+$"},
                    "side": {"type": "string", "enum": ["buy", "sell"]},
                    "type": {"type": "string", "enum": ["market", "limit", "stop"]},
                    "quantity": {"type": "number", "minimum": 0.0001},
                    "price": {"type": "number", "minimum": 0.01}
                },
                "required": ["symbol", "side", "type", "quantity"]
            }
        }

    def validate_request(self, request_data, schema_name):
        if schema_name not in self.schemas:
            raise ValueError(f"Unknown schema: {schema_name}")

        try:
            validate(request_data, self.schemas[schema_name])
        except ValidationError as e:
            raise ValueError(f"Validation error: {e.message}")

    def sanitize_input(self, input_data):
        # Remove potentially dangerous characters
        if isinstance(input_data, str):
            return input_data.replace('<', '&lt;').replace('>', '&gt;')
        return input_data

SQL Injection Prevention

import re
from typing import Any

class SQLInjectionPrevention:
    def __init__(self):
        self.dangerous_patterns = [
            r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b)",
            r"(\b(OR|AND)\s+\d+\s*=\s*\d+)",
            r"(\b(OR|AND)\s+'.*'\s*=\s*'.*')",
            r"(--|#|\/\*|\*\/)",
            r"(\b(SCRIPT|JAVASCRIPT|VBSCRIPT)\b)"
        ]

    def detect_sql_injection(self, input_data):
        if isinstance(input_data, str):
            for pattern in self.dangerous_patterns:
                if re.search(pattern, input_data, re.IGNORECASE):
                    return True
        return False

    def sanitize_sql_input(self, input_data):
        if self.detect_sql_injection(input_data):
            raise ValueError("Potential SQL injection detected")

        # Use parameterized queries
        return input_data

Encryption

Data Encryption

AES Encryption

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class DataEncryption:
    def __init__(self, password):
        self.password = password.encode()
        self.salt = os.urandom(16)
        self.key = self._derive_key()

    def _derive_key(self):
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self.salt,
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self.password))
        return key

    def encrypt(self, data):
        f = Fernet(self.key)
        encrypted_data = f.encrypt(data.encode())
        return encrypted_data

    def decrypt(self, encrypted_data):
        f = Fernet(self.key)
        decrypted_data = f.decrypt(encrypted_data)
        return decrypted_data.decode()

Field-Level Encryption

class FieldEncryption:
    def __init__(self, encryption_key):
        self.encryption_key = encryption_key
        self.fernet = Fernet(encryption_key)

    def encrypt_field(self, field_value):
        if field_value is None:
            return None
        return self.fernet.encrypt(str(field_value).encode()).decode()

    def decrypt_field(self, encrypted_value):
        if encrypted_value is None:
            return None
        return self.fernet.decrypt(encrypted_value.encode()).decode()

    def encrypt_sensitive_fields(self, data, sensitive_fields):
        encrypted_data = data.copy()
        for field in sensitive_fields:
            if field in encrypted_data:
                encrypted_data[field] = self.encrypt_field(encrypted_data[field])
        return encrypted_data

Security Headers

HTTP Security Headers

Security Header Implementation

from flask import Flask, make_response

class SecurityHeaders:
    def __init__(self, app):
        self.app = app
        self.setup_security_headers()

    def setup_security_headers(self):
        @self.app.after_request
        def add_security_headers(response):
            # Prevent clickjacking
            response.headers['X-Frame-Options'] = 'DENY'

            # Prevent MIME type sniffing
            response.headers['X-Content-Type-Options'] = 'nosniff'

            # Enable XSS protection
            response.headers['X-XSS-Protection'] = '1; mode=block'

            # Strict Transport Security
            response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'

            # Content Security Policy
            response.headers['Content-Security-Policy'] = "default-src 'self'"

            # Referrer Policy
            response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'

            return response

Audit Logging

Security Event Logging

Audit Logger

import logging
import json
from datetime import datetime

class SecurityAuditLogger:
    def __init__(self):
        self.logger = logging.getLogger('security_audit')
        self.logger.setLevel(logging.INFO)

        # Create file handler
        handler = logging.FileHandler('security_audit.log')
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_authentication_attempt(self, user_id, ip_address, success):
        event = {
            'event_type': 'authentication_attempt',
            'user_id': user_id,
            'ip_address': ip_address,
            'success': success,
            'timestamp': datetime.utcnow().isoformat()
        }
        self.logger.info(json.dumps(event))

    def log_api_request(self, user_id, endpoint, method, ip_address, success):
        event = {
            'event_type': 'api_request',
            'user_id': user_id,
            'endpoint': endpoint,
            'method': method,
            'ip_address': ip_address,
            'success': success,
            'timestamp': datetime.utcnow().isoformat()
        }
        self.logger.info(json.dumps(event))

    def log_security_violation(self, violation_type, user_id, ip_address, details):
        event = {
            'event_type': 'security_violation',
            'violation_type': violation_type,
            'user_id': user_id,
            'ip_address': ip_address,
            'details': details,
            'timestamp': datetime.utcnow().isoformat()
        }
        self.logger.warning(json.dumps(event))

Threat Detection

Anomaly Detection

Behavioral Analysis

from collections import defaultdict
import statistics

class BehavioralAnalyzer:
    def __init__(self):
        self.user_patterns = defaultdict(list)
        self.suspicious_thresholds = {
            'requests_per_minute': 100,
            'unusual_hours': True,
            'geographic_anomaly': True
        }

    def analyze_request_pattern(self, user_id, request_data):
        pattern = {
            'timestamp': request_data['timestamp'],
            'ip_address': request_data['ip_address'],
            'endpoint': request_data['endpoint'],
            'user_agent': request_data['user_agent']
        }

        self.user_patterns[user_id].append(pattern)

        # Keep only last 1000 requests
        if len(self.user_patterns[user_id]) > 1000:
            self.user_patterns[user_id] = self.user_patterns[user_id][-1000:]

        return self.detect_anomalies(user_id, pattern)

    def detect_anomalies(self, user_id, current_pattern):
        anomalies = []
        user_history = self.user_patterns[user_id]

        if len(user_history) < 10:
            return anomalies

        # Check request frequency
        recent_requests = [r for r in user_history[-10:] if r['timestamp'] > current_pattern['timestamp'] - 60]
        if len(recent_requests) > self.suspicious_thresholds['requests_per_minute']:
            anomalies.append('high_request_frequency')

        # Check unusual hours
        hour = datetime.fromtimestamp(current_pattern['timestamp']).hour
        if hour < 6 or hour > 22:
            anomalies.append('unusual_hours')

        # Check IP changes
        recent_ips = set(r['ip_address'] for r in user_history[-10:])
        if len(recent_ips) > 3:
            anomalies.append('multiple_ip_addresses')

        return anomalies

Best Practices

Security Checklist

API Security Checklist

  • Strong Authentication: Use HMAC-SHA256 signatures
  • Key Management: Rotate API keys regularly
  • Rate Limiting: Implement proper rate limiting
  • Input Validation: Validate all input data
  • Encryption: Encrypt sensitive data
  • Audit Logging: Log all security events
  • Error Handling: Don't expose sensitive information
  • Security Headers: Use proper HTTP security headers
  • IP Whitelisting: Restrict access by IP when possible
  • Monitoring: Monitor for suspicious activity

Implementation Guidelines

  • Use HTTPS: Always use encrypted connections
  • Validate Everything: Never trust user input
  • Principle of Least Privilege: Grant minimum necessary permissions
  • Defense in Depth: Implement multiple security layers
  • Regular Updates: Keep security measures updated
  • Incident Response: Have a plan for security incidents

Ready to secure your wallet? Check out our Wallet Security guide or explore Data Protection.