Custom Models

Complete guide to creating, training, and deploying custom AI models for personalized trading signals and strategies.

Overview

Custom models allow you to create personalized AI trading systems tailored to your specific trading style, risk tolerance, and market preferences. You can train models on your own data, use different algorithms, and fine-tune parameters for optimal performance.

Model Types

Supervised Learning Models

Regression Models

Purpose: Predict continuous price values.

Algorithms:

  • Linear Regression: Simple price prediction
  • Polynomial Regression: Non-linear price relationships
  • Ridge/Lasso Regression: Regularized price prediction
  • Support Vector Regression: Non-linear price patterns

Example:

from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures

class PricePredictionModel:
    def __init__(self, degree=2):
        self.poly_features = PolynomialFeatures(degree=degree)
        self.model = LinearRegression()

    def train(self, X, y):
        X_poly = self.poly_features.fit_transform(X)
        self.model.fit(X_poly, y)

    def predict(self, X):
        X_poly = self.poly_features.transform(X)
        return self.model.predict(X_poly)

Classification Models

Purpose: Predict discrete trading signals (buy/sell/hold).

Algorithms:

  • Logistic Regression: Binary signal classification
  • Random Forest: Ensemble signal prediction
  • Gradient Boosting: Advanced signal classification
  • Neural Networks: Deep learning signal prediction

Example:

from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier

class SignalClassificationModel:
    def __init__(self, model_type='random_forest'):
        if model_type == 'random_forest':
            self.model = RandomForestClassifier(n_estimators=100)
        elif model_type == 'neural_network':
            self.model = MLPClassifier(hidden_layer_sizes=(100, 50))

    def train(self, X, y):
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

    def predict_proba(self, X):
        return self.model.predict_proba(X)

Unsupervised Learning Models

Clustering Models

Purpose: Identify market regimes and patterns.

Algorithms:

  • K-Means: Market regime clustering
  • DBSCAN: Density-based pattern recognition
  • Hierarchical Clustering: Market structure analysis
  • Gaussian Mixture: Probabilistic clustering

Example:

from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture

class MarketRegimeModel:
    def __init__(self, n_clusters=4):
        self.kmeans = KMeans(n_clusters=n_clusters)
        self.gmm = GaussianMixture(n_components=n_clusters)

    def train(self, X):
        self.kmeans.fit(X)
        self.gmm.fit(X)

    def predict_regime(self, X):
        return self.kmeans.predict(X)

    def predict_regime_proba(self, X):
        return self.gmm.predict_proba(X)

Dimensionality Reduction

Purpose: Reduce feature complexity and identify key patterns.

Algorithms:

  • PCA: Principal component analysis
  • t-SNE: Non-linear dimensionality reduction
  • UMAP: Uniform manifold approximation
  • Autoencoders: Neural network-based reduction

Example:

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap

class FeatureReductionModel:
    def __init__(self, method='pca', n_components=10):
        if method == 'pca':
            self.model = PCA(n_components=n_components)
        elif method == 'tsne':
            self.model = TSNE(n_components=n_components)
        elif method == 'umap':
            self.model = umap.UMAP(n_components=n_components)

    def fit_transform(self, X):
        return self.model.fit_transform(X)

    def transform(self, X):
        return self.model.transform(X)

Deep Learning Models

Recurrent Neural Networks (RNN)

Purpose: Sequence-based price prediction.

Architectures:

  • LSTM: Long short-term memory
  • GRU: Gated recurrent unit
  • Bidirectional RNN: Forward and backward sequences
  • Stacked RNN: Multiple RNN layers

Example:

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

class LSTMModel:
    def __init__(self, sequence_length=60, features=10):
        self.sequence_length = sequence_length
        self.features = features
        self.model = self.build_model()

    def build_model(self):
        model = Sequential([
            LSTM(50, return_sequences=True, input_shape=(self.sequence_length, self.features)),
            Dropout(0.2),
            LSTM(50, return_sequences=False),
            Dropout(0.2),
            Dense(25),
            Dense(1)
        ])

        model.compile(optimizer='adam', loss='mse', metrics=['mae'])
        return model

    def train(self, X, y, epochs=100, batch_size=32):
        return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=0.2)

    def predict(self, X):
        return self.model.predict(X)

Convolutional Neural Networks (CNN)

Purpose: Pattern recognition in price charts.

Architectures:

  • 1D CNN: Time series pattern recognition
  • 2D CNN: Chart pattern recognition
  • ResNet: Residual network for complex patterns
  • Attention CNN: Attention-based pattern focus

Example:

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout

class CNNModel:
    def __init__(self, sequence_length=60, features=10):
        self.sequence_length = sequence_length
        self.features = features
        self.model = self.build_model()

    def build_model(self):
        model = Sequential([
            Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(self.sequence_length, self.features)),
            MaxPooling1D(pool_size=2),
            Conv1D(filters=32, kernel_size=3, activation='relu'),
            MaxPooling1D(pool_size=2),
            Flatten(),
            Dense(50, activation='relu'),
            Dropout(0.5),
            Dense(1)
        ])

        model.compile(optimizer='adam', loss='mse', metrics=['mae'])
        return model

    def train(self, X, y, epochs=100, batch_size=32):
        return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=0.2)

    def predict(self, X):
        return self.model.predict(X)

Transformer Models

Purpose: Advanced sequence modeling with attention mechanisms.

Architectures:

  • Standard Transformer: Self-attention mechanism
  • BERT: Bidirectional encoder representations
  • GPT: Generative pre-trained transformer
  • Custom Transformer: Domain-specific adaptations

Example:

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, LayerNormalization
from tensorflow.keras.layers import MultiHeadAttention, GlobalAveragePooling1D

class TransformerModel:
    def __init__(self, sequence_length=60, features=10, num_heads=8, ff_dim=64):
        self.sequence_length = sequence_length
        self.features = features
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.model = self.build_model()

    def build_model(self):
        inputs = Input(shape=(self.sequence_length, self.features))

        # Multi-head attention
        attention = MultiHeadAttention(num_heads=self.num_heads, key_dim=self.features)(inputs, inputs)
        attention = Dropout(0.1)(attention)
        attention = LayerNormalization(epsilon=1e-6)(inputs + attention)

        # Feed forward
        ffn = Dense(self.ff_dim, activation='relu')(attention)
        ffn = Dense(self.features)(ffn)
        ffn = Dropout(0.1)(ffn)
        ffn = LayerNormalization(epsilon=1e-6)(attention + ffn)

        # Global pooling and output
        pooled = GlobalAveragePooling1D()(ffn)
        outputs = Dense(1)(pooled)

        model = Model(inputs, outputs)
        model.compile(optimizer='adam', loss='mse', metrics=['mae'])
        return model

    def train(self, X, y, epochs=100, batch_size=32):
        return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=0.2)

    def predict(self, X):
        return self.model.predict(X)

Feature Engineering

Technical Indicators

Price-Based Features

import pandas as pd
import numpy as np

class TechnicalFeatures:
    def __init__(self, data):
        self.data = data

    def add_price_features(self):
        # Price changes
        self.data['price_change'] = self.data['close'].pct_change()
        self.data['price_change_2'] = self.data['close'].pct_change(2)
        self.data['price_change_5'] = self.data['close'].pct_change(5)

        # Price ratios
        self.data['high_low_ratio'] = self.data['high'] / self.data['low']
        self.data['close_open_ratio'] = self.data['close'] / self.data['open']

        # Price positions
        self.data['price_position'] = (self.data['close'] - self.data['low']) / (self.data['high'] - self.data['low'])

        return self.data

    def add_moving_averages(self, windows=[5, 10, 20, 50]):
        for window in windows:
            self.data[f'ma_{window}'] = self.data['close'].rolling(window).mean()
            self.data[f'ma_{window}_ratio'] = self.data['close'] / self.data[f'ma_{window}']

        return self.data

    def add_volatility_features(self, windows=[5, 10, 20]):
        for window in windows:
            self.data[f'volatility_{window}'] = self.data['close'].rolling(window).std()
            self.data[f'volatility_{window}_ratio'] = self.data[f'volatility_{window}'] / self.data['close']

        return self.data

Volume-Based Features

class VolumeFeatures:
    def __init__(self, data):
        self.data = data

    def add_volume_features(self):
        # Volume changes
        self.data['volume_change'] = self.data['volume'].pct_change()
        self.data['volume_ma_20'] = self.data['volume'].rolling(20).mean()
        self.data['volume_ratio'] = self.data['volume'] / self.data['volume_ma_20']

        # Volume-price relationship
        self.data['volume_price_trend'] = self.data['volume'] * self.data['price_change']
        self.data['obv'] = (self.data['volume'] * np.sign(self.data['price_change'])).cumsum()

        return self.data

Momentum Features

class MomentumFeatures:
    def __init__(self, data):
        self.data = data

    def add_momentum_features(self):
        # RSI
        delta = self.data['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        self.data['rsi'] = 100 - (100 / (1 + rs))

        # MACD
        ema_12 = self.data['close'].ewm(span=12).mean()
        ema_26 = self.data['close'].ewm(span=26).mean()
        self.data['macd'] = ema_12 - ema_26
        self.data['macd_signal'] = self.data['macd'].ewm(span=9).mean()
        self.data['macd_histogram'] = self.data['macd'] - self.data['macd_signal']

        # Stochastic
        low_14 = self.data['low'].rolling(window=14).min()
        high_14 = self.data['high'].rolling(window=14).max()
        self.data['stoch_k'] = 100 * (self.data['close'] - low_14) / (high_14 - low_14)
        self.data['stoch_d'] = self.data['stoch_k'].rolling(window=3).mean()

        return self.data

Sentiment Features

Social Media Sentiment

class SentimentFeatures:
    def __init__(self, data):
        self.data = data

    def add_sentiment_features(self):
        # Social media sentiment (example with mock data)
        self.data['twitter_sentiment'] = np.random.uniform(-1, 1, len(self.data))
        self.data['reddit_sentiment'] = np.random.uniform(-1, 1, len(self.data))
        self.data['discord_sentiment'] = np.random.uniform(-1, 1, len(self.data))

        # Combined sentiment
        self.data['combined_sentiment'] = (
            self.data['twitter_sentiment'] +
            self.data['reddit_sentiment'] +
            self.data['discord_sentiment']
        ) / 3

        # Sentiment momentum
        self.data['sentiment_momentum'] = self.data['combined_sentiment'].rolling(5).mean()

        return self.data

News Sentiment

class NewsFeatures:
    def __init__(self, data):
        self.data = data

    def add_news_features(self):
        # News sentiment (example with mock data)
        self.data['news_sentiment'] = np.random.uniform(-1, 1, len(self.data))
        self.data['news_volume'] = np.random.uniform(0, 100, len(self.data))

        # News impact
        self.data['news_impact'] = self.data['news_sentiment'] * self.data['news_volume']

        return self.data

Model Training

Data Preparation

Data Splitting

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

class DataPreparer:
    def __init__(self, data, target_column='target'):
        self.data = data
        self.target_column = target_column
        self.scaler = StandardScaler()

    def prepare_data(self, test_size=0.2, val_size=0.2):
        # Remove rows with NaN values
        clean_data = self.data.dropna()

        # Separate features and target
        X = clean_data.drop(columns=[self.target_column])
        y = clean_data[self.target_column]

        # Split data
        X_train, X_temp, y_train, y_temp = train_test_split(
            X, y, test_size=test_size + val_size, random_state=42
        )

        X_val, X_test, y_val, y_test = train_test_split(
            X_temp, y_temp, test_size=test_size/(test_size + val_size), random_state=42
        )

        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_val_scaled = self.scaler.transform(X_val)
        X_test_scaled = self.scaler.transform(X_test)

        return {
            'X_train': X_train_scaled,
            'X_val': X_val_scaled,
            'X_test': X_test_scaled,
            'y_train': y_train,
            'y_val': y_val,
            'y_test': y_test
        }

Time Series Splitting

from sklearn.model_selection import TimeSeriesSplit

class TimeSeriesPreparer:
    def __init__(self, data, target_column='target'):
        self.data = data
        self.target_column = target_column
        self.scaler = StandardScaler()

    def prepare_time_series_data(self, sequence_length=60, test_size=0.2):
        # Remove rows with NaN values
        clean_data = self.data.dropna()

        # Create sequences
        X, y = self.create_sequences(clean_data, sequence_length)

        # Split data (time series aware)
        split_index = int(len(X) * (1 - test_size))

        X_train = X[:split_index]
        X_test = X[split_index:]
        y_train = y[:split_index]
        y_test = y[split_index:]

        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1]))
        X_train_scaled = X_train_scaled.reshape(X_train.shape)

        X_test_scaled = self.scaler.transform(X_test.reshape(-1, X_test.shape[-1]))
        X_test_scaled = X_test_scaled.reshape(X_test.shape)

        return {
            'X_train': X_train_scaled,
            'X_test': X_test_scaled,
            'y_train': y_train,
            'y_test': y_test
        }

    def create_sequences(self, data, sequence_length):
        X, y = [], []

        for i in range(sequence_length, len(data)):
            X.append(data.iloc[i-sequence_length:i].values)
            y.append(data.iloc[i][self.target_column])

        return np.array(X), np.array(y)

Model Training Pipeline

Training Configuration

class TrainingConfig:
    def __init__(self):
        self.epochs = 100
        self.batch_size = 32
        self.learning_rate = 0.001
        self.validation_split = 0.2
        self.early_stopping_patience = 10
        self.model_checkpoint = True
        self.reduce_lr_on_plateau = True

Training Pipeline

import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau

class ModelTrainer:
    def __init__(self, model, config):
        self.model = model
        self.config = config
        self.history = None

    def train(self, X_train, y_train, X_val=None, y_val=None):
        callbacks = []

        # Early stopping
        if self.config.early_stopping_patience:
            callbacks.append(EarlyStopping(
                monitor='val_loss',
                patience=self.config.early_stopping_patience,
                restore_best_weights=True
            ))

        # Model checkpoint
        if self.config.model_checkpoint:
            callbacks.append(ModelCheckpoint(
                'best_model.h5',
                monitor='val_loss',
                save_best_only=True
            ))

        # Reduce learning rate on plateau
        if self.config.reduce_lr_on_plateau:
            callbacks.append(ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=5,
                min_lr=1e-7
            ))

        # Train model
        if X_val is not None and y_val is not None:
            self.history = self.model.fit(
                X_train, y_train,
                epochs=self.config.epochs,
                batch_size=self.config.batch_size,
                validation_data=(X_val, y_val),
                callbacks=callbacks,
                verbose=1
            )
        else:
            self.history = self.model.fit(
                X_train, y_train,
                epochs=self.config.epochs,
                batch_size=self.config.batch_size,
                validation_split=self.config.validation_split,
                callbacks=callbacks,
                verbose=1
            )

        return self.history

Model Evaluation

Performance Metrics

Regression Metrics

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

class RegressionEvaluator:
    def __init__(self, y_true, y_pred):
        self.y_true = y_true
        self.y_pred = y_pred

    def calculate_metrics(self):
        metrics = {
            'mse': mean_squared_error(self.y_true, self.y_pred),
            'rmse': np.sqrt(mean_squared_error(self.y_true, self.y_pred)),
            'mae': mean_absolute_error(self.y_true, self.y_pred),
            'r2': r2_score(self.y_true, self.y_pred),
            'mape': np.mean(np.abs((self.y_true - self.y_pred) / self.y_true)) * 100
        }
        return metrics

Classification Metrics

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

class ClassificationEvaluator:
    def __init__(self, y_true, y_pred):
        self.y_true = y_true
        self.y_pred = y_pred

    def calculate_metrics(self):
        metrics = {
            'accuracy': accuracy_score(self.y_true, self.y_pred),
            'precision': precision_score(self.y_true, self.y_pred, average='weighted'),
            'recall': recall_score(self.y_true, self.y_pred, average='weighted'),
            'f1': f1_score(self.y_true, self.y_pred, average='weighted'),
            'confusion_matrix': confusion_matrix(self.y_true, self.y_pred)
        }
        return metrics

Trading Performance Metrics

Trading Metrics

class TradingEvaluator:
    def __init__(self, predictions, actual_prices, signals):
        self.predictions = predictions
        self.actual_prices = actual_prices
        self.signals = signals

    def calculate_trading_metrics(self):
        # Calculate returns
        returns = self.calculate_returns()

        # Calculate metrics
        metrics = {
            'total_return': returns.sum(),
            'annualized_return': returns.mean() * 252,
            'volatility': returns.std() * np.sqrt(252),
            'sharpe_ratio': returns.mean() / returns.std() * np.sqrt(252),
            'max_drawdown': self.calculate_max_drawdown(returns),
            'win_rate': (returns > 0).mean(),
            'profit_factor': self.calculate_profit_factor(returns)
        }

        return metrics

    def calculate_returns(self):
        # Calculate returns based on signals
        returns = []
        for i in range(1, len(self.predictions)):
            if self.signals[i-1] == 1:  # Buy signal
                returns.append((self.actual_prices[i] - self.actual_prices[i-1]) / self.actual_prices[i-1])
            elif self.signals[i-1] == -1:  # Sell signal
                returns.append((self.actual_prices[i-1] - self.actual_prices[i]) / self.actual_prices[i-1])
            else:  # Hold signal
                returns.append(0)

        return np.array(returns)

    def calculate_max_drawdown(self, returns):
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.expanding().max()
        drawdown = (cumulative - running_max) / running_max
        return drawdown.min()

    def calculate_profit_factor(self, returns):
        profits = returns[returns > 0].sum()
        losses = abs(returns[returns < 0].sum())
        return profits / losses if losses > 0 else float('inf')

Model Deployment

API Integration

Model Wrapper

class ModelWrapper:
    def __init__(self, model, scaler, feature_columns):
        self.model = model
        self.scaler = scaler
        self.feature_columns = feature_columns

    def predict(self, data):
        # Preprocess data
        processed_data = self.preprocess(data)

        # Make prediction
        prediction = self.model.predict(processed_data)

        # Postprocess prediction
        return self.postprocess(prediction)

    def preprocess(self, data):
        # Ensure data has correct columns
        data = data[self.feature_columns]

        # Scale data
        return self.scaler.transform(data)

    def postprocess(self, prediction):
        # Convert prediction to trading signal
        if prediction > 0.6:
            return 'buy'
        elif prediction < 0.4:
            return 'sell'
        else:
            return 'hold'

API Endpoint

from flask import Flask, request, jsonify
import joblib

app = Flask(__name__)

# Load model
model_wrapper = joblib.load('custom_model.pkl')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json

        # Make prediction
        prediction = model_wrapper.predict(data)

        return jsonify({
            'prediction': prediction,
            'status': 'success'
        })

    except Exception as e:
        return jsonify({
            'error': str(e),
            'status': 'error'
        }), 400

if __name__ == '__main__':
    app.run(debug=True)

Real-Time Integration

WebSocket Integration

import websocket
import json

class CustomModelWebSocket:
    def __init__(self, model_wrapper, symbols):
        self.model_wrapper = model_wrapper
        self.symbols = symbols
        self.ws = None

    def connect(self):
        self.ws = websocket.WebSocketApp(
            "wss://ws.vibetrading.tech/ws",
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        self.ws.run_forever()

    def on_open(self, ws):
        print("Connected to WebSocket")
        # Subscribe to market data
        for symbol in self.symbols:
            subscribe_message = {
                "type": "subscribe",
                "channel": "ticker",
                "symbol": symbol
            }
            ws.send(json.dumps(subscribe_message))

    def on_message(self, ws, message):
        data = json.loads(message)

        if data['type'] == 'ticker':
            # Make prediction
            prediction = self.model_wrapper.predict(data['data'])

            # Send prediction back
            response = {
                "type": "custom_signal",
                "symbol": data['symbol'],
                "prediction": prediction,
                "timestamp": data['timestamp']
            }
            ws.send(json.dumps(response))

    def on_error(self, ws, error):
        print(f"WebSocket error: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        print("WebSocket closed")

Best Practices

Model Development

Data Quality

  • Clean Data: Remove outliers and handle missing values
  • Feature Engineering: Create meaningful features
  • Data Validation: Ensure data quality and consistency
  • Regular Updates: Keep data fresh and relevant

Model Selection

  • Start Simple: Begin with simple models
  • Compare Models: Test multiple algorithms
  • Cross-Validation: Use proper validation techniques
  • Ensemble Methods: Combine multiple models

Performance Optimization

  • Hyperparameter Tuning: Optimize model parameters
  • Feature Selection: Remove irrelevant features
  • Regularization: Prevent overfitting
  • Monitoring: Track model performance

Deployment Considerations

Model Versioning

  • Version Control: Track model versions
  • A/B Testing: Compare model performance
  • Rollback Strategy: Plan for model failures
  • Documentation: Document model changes

Monitoring

  • Performance Tracking: Monitor model accuracy
  • Drift Detection: Detect data drift
  • Alert Systems: Set up performance alerts
  • Regular Retraining: Update models regularly

Security

  • API Security: Secure model endpoints
  • Data Privacy: Protect sensitive data
  • Access Control: Limit model access
  • Audit Logging: Track model usage

Ready to implement custom models? Check out our API Documentation or explore Security Best Practices.