Custom Models
Complete guide to creating, training, and deploying custom AI models for personalized trading signals and strategies.
Overview
Custom models allow you to create personalized AI trading systems tailored to your specific trading style, risk tolerance, and market preferences. You can train models on your own data, use different algorithms, and fine-tune parameters for optimal performance.
Model Types
Supervised Learning Models
Regression Models
Purpose: Predict continuous price values.
Algorithms:
- Linear Regression: Simple price prediction
- Polynomial Regression: Non-linear price relationships
- Ridge/Lasso Regression: Regularized price prediction
- Support Vector Regression: Non-linear price patterns
Example:
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
class PricePredictionModel:
def __init__(self, degree=2):
self.poly_features = PolynomialFeatures(degree=degree)
self.model = LinearRegression()
def train(self, X, y):
X_poly = self.poly_features.fit_transform(X)
self.model.fit(X_poly, y)
def predict(self, X):
X_poly = self.poly_features.transform(X)
return self.model.predict(X_poly)
Classification Models
Purpose: Predict discrete trading signals (buy/sell/hold).
Algorithms:
- Logistic Regression: Binary signal classification
- Random Forest: Ensemble signal prediction
- Gradient Boosting: Advanced signal classification
- Neural Networks: Deep learning signal prediction
Example:
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
class SignalClassificationModel:
def __init__(self, model_type='random_forest'):
if model_type == 'random_forest':
self.model = RandomForestClassifier(n_estimators=100)
elif model_type == 'neural_network':
self.model = MLPClassifier(hidden_layer_sizes=(100, 50))
def train(self, X, y):
self.model.fit(X, y)
def predict(self, X):
return self.model.predict(X)
def predict_proba(self, X):
return self.model.predict_proba(X)
Unsupervised Learning Models
Clustering Models
Purpose: Identify market regimes and patterns.
Algorithms:
- K-Means: Market regime clustering
- DBSCAN: Density-based pattern recognition
- Hierarchical Clustering: Market structure analysis
- Gaussian Mixture: Probabilistic clustering
Example:
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
class MarketRegimeModel:
def __init__(self, n_clusters=4):
self.kmeans = KMeans(n_clusters=n_clusters)
self.gmm = GaussianMixture(n_components=n_clusters)
def train(self, X):
self.kmeans.fit(X)
self.gmm.fit(X)
def predict_regime(self, X):
return self.kmeans.predict(X)
def predict_regime_proba(self, X):
return self.gmm.predict_proba(X)
Dimensionality Reduction
Purpose: Reduce feature complexity and identify key patterns.
Algorithms:
- PCA: Principal component analysis
- t-SNE: Non-linear dimensionality reduction
- UMAP: Uniform manifold approximation
- Autoencoders: Neural network-based reduction
Example:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap
class FeatureReductionModel:
def __init__(self, method='pca', n_components=10):
if method == 'pca':
self.model = PCA(n_components=n_components)
elif method == 'tsne':
self.model = TSNE(n_components=n_components)
elif method == 'umap':
self.model = umap.UMAP(n_components=n_components)
def fit_transform(self, X):
return self.model.fit_transform(X)
def transform(self, X):
return self.model.transform(X)
Deep Learning Models
Recurrent Neural Networks (RNN)
Purpose: Sequence-based price prediction.
Architectures:
- LSTM: Long short-term memory
- GRU: Gated recurrent unit
- Bidirectional RNN: Forward and backward sequences
- Stacked RNN: Multiple RNN layers
Example:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
class LSTMModel:
def __init__(self, sequence_length=60, features=10):
self.sequence_length = sequence_length
self.features = features
self.model = self.build_model()
def build_model(self):
model = Sequential([
LSTM(50, return_sequences=True, input_shape=(self.sequence_length, self.features)),
Dropout(0.2),
LSTM(50, return_sequences=False),
Dropout(0.2),
Dense(25),
Dense(1)
])
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
return model
def train(self, X, y, epochs=100, batch_size=32):
return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=0.2)
def predict(self, X):
return self.model.predict(X)
Convolutional Neural Networks (CNN)
Purpose: Pattern recognition in price charts.
Architectures:
- 1D CNN: Time series pattern recognition
- 2D CNN: Chart pattern recognition
- ResNet: Residual network for complex patterns
- Attention CNN: Attention-based pattern focus
Example:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
class CNNModel:
def __init__(self, sequence_length=60, features=10):
self.sequence_length = sequence_length
self.features = features
self.model = self.build_model()
def build_model(self):
model = Sequential([
Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(self.sequence_length, self.features)),
MaxPooling1D(pool_size=2),
Conv1D(filters=32, kernel_size=3, activation='relu'),
MaxPooling1D(pool_size=2),
Flatten(),
Dense(50, activation='relu'),
Dropout(0.5),
Dense(1)
])
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
return model
def train(self, X, y, epochs=100, batch_size=32):
return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=0.2)
def predict(self, X):
return self.model.predict(X)
Transformer Models
Purpose: Advanced sequence modeling with attention mechanisms.
Architectures:
- Standard Transformer: Self-attention mechanism
- BERT: Bidirectional encoder representations
- GPT: Generative pre-trained transformer
- Custom Transformer: Domain-specific adaptations
Example:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, LayerNormalization
from tensorflow.keras.layers import MultiHeadAttention, GlobalAveragePooling1D
class TransformerModel:
def __init__(self, sequence_length=60, features=10, num_heads=8, ff_dim=64):
self.sequence_length = sequence_length
self.features = features
self.num_heads = num_heads
self.ff_dim = ff_dim
self.model = self.build_model()
def build_model(self):
inputs = Input(shape=(self.sequence_length, self.features))
# Multi-head attention
attention = MultiHeadAttention(num_heads=self.num_heads, key_dim=self.features)(inputs, inputs)
attention = Dropout(0.1)(attention)
attention = LayerNormalization(epsilon=1e-6)(inputs + attention)
# Feed forward
ffn = Dense(self.ff_dim, activation='relu')(attention)
ffn = Dense(self.features)(ffn)
ffn = Dropout(0.1)(ffn)
ffn = LayerNormalization(epsilon=1e-6)(attention + ffn)
# Global pooling and output
pooled = GlobalAveragePooling1D()(ffn)
outputs = Dense(1)(pooled)
model = Model(inputs, outputs)
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
return model
def train(self, X, y, epochs=100, batch_size=32):
return self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=0.2)
def predict(self, X):
return self.model.predict(X)
Feature Engineering
Technical Indicators
Price-Based Features
import pandas as pd
import numpy as np
class TechnicalFeatures:
def __init__(self, data):
self.data = data
def add_price_features(self):
# Price changes
self.data['price_change'] = self.data['close'].pct_change()
self.data['price_change_2'] = self.data['close'].pct_change(2)
self.data['price_change_5'] = self.data['close'].pct_change(5)
# Price ratios
self.data['high_low_ratio'] = self.data['high'] / self.data['low']
self.data['close_open_ratio'] = self.data['close'] / self.data['open']
# Price positions
self.data['price_position'] = (self.data['close'] - self.data['low']) / (self.data['high'] - self.data['low'])
return self.data
def add_moving_averages(self, windows=[5, 10, 20, 50]):
for window in windows:
self.data[f'ma_{window}'] = self.data['close'].rolling(window).mean()
self.data[f'ma_{window}_ratio'] = self.data['close'] / self.data[f'ma_{window}']
return self.data
def add_volatility_features(self, windows=[5, 10, 20]):
for window in windows:
self.data[f'volatility_{window}'] = self.data['close'].rolling(window).std()
self.data[f'volatility_{window}_ratio'] = self.data[f'volatility_{window}'] / self.data['close']
return self.data
Volume-Based Features
class VolumeFeatures:
def __init__(self, data):
self.data = data
def add_volume_features(self):
# Volume changes
self.data['volume_change'] = self.data['volume'].pct_change()
self.data['volume_ma_20'] = self.data['volume'].rolling(20).mean()
self.data['volume_ratio'] = self.data['volume'] / self.data['volume_ma_20']
# Volume-price relationship
self.data['volume_price_trend'] = self.data['volume'] * self.data['price_change']
self.data['obv'] = (self.data['volume'] * np.sign(self.data['price_change'])).cumsum()
return self.data
Momentum Features
class MomentumFeatures:
def __init__(self, data):
self.data = data
def add_momentum_features(self):
# RSI
delta = self.data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
self.data['rsi'] = 100 - (100 / (1 + rs))
# MACD
ema_12 = self.data['close'].ewm(span=12).mean()
ema_26 = self.data['close'].ewm(span=26).mean()
self.data['macd'] = ema_12 - ema_26
self.data['macd_signal'] = self.data['macd'].ewm(span=9).mean()
self.data['macd_histogram'] = self.data['macd'] - self.data['macd_signal']
# Stochastic
low_14 = self.data['low'].rolling(window=14).min()
high_14 = self.data['high'].rolling(window=14).max()
self.data['stoch_k'] = 100 * (self.data['close'] - low_14) / (high_14 - low_14)
self.data['stoch_d'] = self.data['stoch_k'].rolling(window=3).mean()
return self.data
Sentiment Features
Social Media Sentiment
class SentimentFeatures:
def __init__(self, data):
self.data = data
def add_sentiment_features(self):
# Social media sentiment (example with mock data)
self.data['twitter_sentiment'] = np.random.uniform(-1, 1, len(self.data))
self.data['reddit_sentiment'] = np.random.uniform(-1, 1, len(self.data))
self.data['discord_sentiment'] = np.random.uniform(-1, 1, len(self.data))
# Combined sentiment
self.data['combined_sentiment'] = (
self.data['twitter_sentiment'] +
self.data['reddit_sentiment'] +
self.data['discord_sentiment']
) / 3
# Sentiment momentum
self.data['sentiment_momentum'] = self.data['combined_sentiment'].rolling(5).mean()
return self.data
News Sentiment
class NewsFeatures:
def __init__(self, data):
self.data = data
def add_news_features(self):
# News sentiment (example with mock data)
self.data['news_sentiment'] = np.random.uniform(-1, 1, len(self.data))
self.data['news_volume'] = np.random.uniform(0, 100, len(self.data))
# News impact
self.data['news_impact'] = self.data['news_sentiment'] * self.data['news_volume']
return self.data
Model Training
Data Preparation
Data Splitting
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
class DataPreparer:
def __init__(self, data, target_column='target'):
self.data = data
self.target_column = target_column
self.scaler = StandardScaler()
def prepare_data(self, test_size=0.2, val_size=0.2):
# Remove rows with NaN values
clean_data = self.data.dropna()
# Separate features and target
X = clean_data.drop(columns=[self.target_column])
y = clean_data[self.target_column]
# Split data
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=test_size + val_size, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=test_size/(test_size + val_size), random_state=42
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_val_scaled = self.scaler.transform(X_val)
X_test_scaled = self.scaler.transform(X_test)
return {
'X_train': X_train_scaled,
'X_val': X_val_scaled,
'X_test': X_test_scaled,
'y_train': y_train,
'y_val': y_val,
'y_test': y_test
}
Time Series Splitting
from sklearn.model_selection import TimeSeriesSplit
class TimeSeriesPreparer:
def __init__(self, data, target_column='target'):
self.data = data
self.target_column = target_column
self.scaler = StandardScaler()
def prepare_time_series_data(self, sequence_length=60, test_size=0.2):
# Remove rows with NaN values
clean_data = self.data.dropna()
# Create sequences
X, y = self.create_sequences(clean_data, sequence_length)
# Split data (time series aware)
split_index = int(len(X) * (1 - test_size))
X_train = X[:split_index]
X_test = X[split_index:]
y_train = y[:split_index]
y_test = y[split_index:]
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1]))
X_train_scaled = X_train_scaled.reshape(X_train.shape)
X_test_scaled = self.scaler.transform(X_test.reshape(-1, X_test.shape[-1]))
X_test_scaled = X_test_scaled.reshape(X_test.shape)
return {
'X_train': X_train_scaled,
'X_test': X_test_scaled,
'y_train': y_train,
'y_test': y_test
}
def create_sequences(self, data, sequence_length):
X, y = [], []
for i in range(sequence_length, len(data)):
X.append(data.iloc[i-sequence_length:i].values)
y.append(data.iloc[i][self.target_column])
return np.array(X), np.array(y)
Model Training Pipeline
Training Configuration
class TrainingConfig:
def __init__(self):
self.epochs = 100
self.batch_size = 32
self.learning_rate = 0.001
self.validation_split = 0.2
self.early_stopping_patience = 10
self.model_checkpoint = True
self.reduce_lr_on_plateau = True
Training Pipeline
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
class ModelTrainer:
def __init__(self, model, config):
self.model = model
self.config = config
self.history = None
def train(self, X_train, y_train, X_val=None, y_val=None):
callbacks = []
# Early stopping
if self.config.early_stopping_patience:
callbacks.append(EarlyStopping(
monitor='val_loss',
patience=self.config.early_stopping_patience,
restore_best_weights=True
))
# Model checkpoint
if self.config.model_checkpoint:
callbacks.append(ModelCheckpoint(
'best_model.h5',
monitor='val_loss',
save_best_only=True
))
# Reduce learning rate on plateau
if self.config.reduce_lr_on_plateau:
callbacks.append(ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7
))
# Train model
if X_val is not None and y_val is not None:
self.history = self.model.fit(
X_train, y_train,
epochs=self.config.epochs,
batch_size=self.config.batch_size,
validation_data=(X_val, y_val),
callbacks=callbacks,
verbose=1
)
else:
self.history = self.model.fit(
X_train, y_train,
epochs=self.config.epochs,
batch_size=self.config.batch_size,
validation_split=self.config.validation_split,
callbacks=callbacks,
verbose=1
)
return self.history
Model Evaluation
Performance Metrics
Regression Metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
class RegressionEvaluator:
def __init__(self, y_true, y_pred):
self.y_true = y_true
self.y_pred = y_pred
def calculate_metrics(self):
metrics = {
'mse': mean_squared_error(self.y_true, self.y_pred),
'rmse': np.sqrt(mean_squared_error(self.y_true, self.y_pred)),
'mae': mean_absolute_error(self.y_true, self.y_pred),
'r2': r2_score(self.y_true, self.y_pred),
'mape': np.mean(np.abs((self.y_true - self.y_pred) / self.y_true)) * 100
}
return metrics
Classification Metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
class ClassificationEvaluator:
def __init__(self, y_true, y_pred):
self.y_true = y_true
self.y_pred = y_pred
def calculate_metrics(self):
metrics = {
'accuracy': accuracy_score(self.y_true, self.y_pred),
'precision': precision_score(self.y_true, self.y_pred, average='weighted'),
'recall': recall_score(self.y_true, self.y_pred, average='weighted'),
'f1': f1_score(self.y_true, self.y_pred, average='weighted'),
'confusion_matrix': confusion_matrix(self.y_true, self.y_pred)
}
return metrics
Trading Performance Metrics
Trading Metrics
class TradingEvaluator:
def __init__(self, predictions, actual_prices, signals):
self.predictions = predictions
self.actual_prices = actual_prices
self.signals = signals
def calculate_trading_metrics(self):
# Calculate returns
returns = self.calculate_returns()
# Calculate metrics
metrics = {
'total_return': returns.sum(),
'annualized_return': returns.mean() * 252,
'volatility': returns.std() * np.sqrt(252),
'sharpe_ratio': returns.mean() / returns.std() * np.sqrt(252),
'max_drawdown': self.calculate_max_drawdown(returns),
'win_rate': (returns > 0).mean(),
'profit_factor': self.calculate_profit_factor(returns)
}
return metrics
def calculate_returns(self):
# Calculate returns based on signals
returns = []
for i in range(1, len(self.predictions)):
if self.signals[i-1] == 1: # Buy signal
returns.append((self.actual_prices[i] - self.actual_prices[i-1]) / self.actual_prices[i-1])
elif self.signals[i-1] == -1: # Sell signal
returns.append((self.actual_prices[i-1] - self.actual_prices[i]) / self.actual_prices[i-1])
else: # Hold signal
returns.append(0)
return np.array(returns)
def calculate_max_drawdown(self, returns):
cumulative = (1 + returns).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
return drawdown.min()
def calculate_profit_factor(self, returns):
profits = returns[returns > 0].sum()
losses = abs(returns[returns < 0].sum())
return profits / losses if losses > 0 else float('inf')
Model Deployment
API Integration
Model Wrapper
class ModelWrapper:
def __init__(self, model, scaler, feature_columns):
self.model = model
self.scaler = scaler
self.feature_columns = feature_columns
def predict(self, data):
# Preprocess data
processed_data = self.preprocess(data)
# Make prediction
prediction = self.model.predict(processed_data)
# Postprocess prediction
return self.postprocess(prediction)
def preprocess(self, data):
# Ensure data has correct columns
data = data[self.feature_columns]
# Scale data
return self.scaler.transform(data)
def postprocess(self, prediction):
# Convert prediction to trading signal
if prediction > 0.6:
return 'buy'
elif prediction < 0.4:
return 'sell'
else:
return 'hold'
API Endpoint
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
# Load model
model_wrapper = joblib.load('custom_model.pkl')
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.json
# Make prediction
prediction = model_wrapper.predict(data)
return jsonify({
'prediction': prediction,
'status': 'success'
})
except Exception as e:
return jsonify({
'error': str(e),
'status': 'error'
}), 400
if __name__ == '__main__':
app.run(debug=True)
Real-Time Integration
WebSocket Integration
import websocket
import json
class CustomModelWebSocket:
def __init__(self, model_wrapper, symbols):
self.model_wrapper = model_wrapper
self.symbols = symbols
self.ws = None
def connect(self):
self.ws = websocket.WebSocketApp(
"wss://ws.vibetrading.tech/ws",
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.run_forever()
def on_open(self, ws):
print("Connected to WebSocket")
# Subscribe to market data
for symbol in self.symbols:
subscribe_message = {
"type": "subscribe",
"channel": "ticker",
"symbol": symbol
}
ws.send(json.dumps(subscribe_message))
def on_message(self, ws, message):
data = json.loads(message)
if data['type'] == 'ticker':
# Make prediction
prediction = self.model_wrapper.predict(data['data'])
# Send prediction back
response = {
"type": "custom_signal",
"symbol": data['symbol'],
"prediction": prediction,
"timestamp": data['timestamp']
}
ws.send(json.dumps(response))
def on_error(self, ws, error):
print(f"WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket closed")
Best Practices
Model Development
Data Quality
- Clean Data: Remove outliers and handle missing values
- Feature Engineering: Create meaningful features
- Data Validation: Ensure data quality and consistency
- Regular Updates: Keep data fresh and relevant
Model Selection
- Start Simple: Begin with simple models
- Compare Models: Test multiple algorithms
- Cross-Validation: Use proper validation techniques
- Ensemble Methods: Combine multiple models
Performance Optimization
- Hyperparameter Tuning: Optimize model parameters
- Feature Selection: Remove irrelevant features
- Regularization: Prevent overfitting
- Monitoring: Track model performance
Deployment Considerations
Model Versioning
- Version Control: Track model versions
- A/B Testing: Compare model performance
- Rollback Strategy: Plan for model failures
- Documentation: Document model changes
Monitoring
- Performance Tracking: Monitor model accuracy
- Drift Detection: Detect data drift
- Alert Systems: Set up performance alerts
- Regular Retraining: Update models regularly
Security
- API Security: Secure model endpoints
- Data Privacy: Protect sensitive data
- Access Control: Limit model access
- Audit Logging: Track model usage
Ready to implement custom models? Check out our API Documentation or explore Security Best Practices.