Building Trading Bots
Building Trading Bots
Comprehensive guide to building Solana trading bots using the AxiomTradeAPI Python library: architecture patterns, token sniping, market making, arbitrage, risk management, performance, and production deployment.
Table of Contents
- Introduction
- Bot Architecture and Design Patterns
- Token Sniping Bots
- Market Making Strategies
- Arbitrage Detection
- Risk Management
- Performance Optimization
- Production Deployment
- Best Practices
Introduction
Building successful trading bots on Solana requires understanding market dynamics, technical analysis, and efficient API integration. The AxiomTradeAPI provides the foundation for creating automated trading systems.
Reasons to use AxiomTradeAPI for bot development:
- Direct, low-latency connection to Solana validators
- Real-time WebSocket data for instant price updates and token launches
- Portfolio management with balance tracking across wallets
- Structured logging for debugging and monitoring bot performance
Bot Architecture and Design Patterns
Modular bot framework
import asyncio
import logging
from axiomtradeapi import AxiomTradeClient, AxiomTradeWebSocketClient
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class TradingSignal:
"""Trading signal with confidence score and metadata"""
token_address: str
action: str # 'buy', 'sell', 'hold'
confidence: float
price_target: Optional[float] = None
stop_loss: Optional[float] = None
reasoning: str = ""
class BaseTradingBot:
"""Base class for all trading bots with common functionality"""
def __init__(self, auth_token: str, wallet_address: str):
self.client = AxiomTradeClient(auth_token=auth_token)
self.ws_client = AxiomTradeWebSocketClient(auth_token=auth_token)
self.wallet_address = wallet_address
self.is_running = False
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'bot_{self.__class__.__name__}.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(self.__class__.__name__)
async def start(self):
"""Start the trading bot with WebSocket connections"""
self.is_running = True
self.logger.info("Starting trading bot")
await self.ws_client.connect()
await asyncio.gather(
self.market_data_processor(),
self.trading_logic_loop(),
self.portfolio_monitor()
)
async def market_data_processor(self):
"""Process incoming market data and generate signals"""
async for message in self.ws_client.listen():
try:
await self.process_market_data(message)
except Exception as e:
self.logger.error(f"Error processing market data: {e}")
async def process_market_data(self, data: dict):
"""Override in child classes for specific strategies"""
raise NotImplementedError
async def trading_logic_loop(self):
"""Main trading logic executed periodically"""
while self.is_running:
try:
signals = await self.generate_signals()
for signal in signals:
await self.execute_signal(signal)
await asyncio.sleep(1) # Adjust based on strategy
except Exception as e:
self.logger.error(f"Trading logic error: {e}")
async def generate_signals(self) -> List[TradingSignal]:
"""Generate trading signals - override in child classes"""
return []
async def execute_signal(self, signal: TradingSignal):
"""Execute trading signal with risk management"""
self.logger.info(f"Executing signal: {signal.action} {signal.token_address}")
# Implement execution logic with proper risk checks
async def portfolio_monitor(self):
"""Monitor portfolio performance and risk metrics"""
while self.is_running:
try:
balance = await self.client.get_balance(self.wallet_address)
self.logger.info(f"Portfolio value: {balance.get('solBalance', 0)} SOL")
await asyncio.sleep(30) # Update every 30 seconds
except Exception as e:
self.logger.error(f"Portfolio monitoring error: {e}")Token Sniping Bots
Token sniping bots identify and purchase newly launched tokens before they gain widespread attention. This strategy requires fast execution and careful risk management.
class TokenSnipingBot(BaseTradingBot):
"""Token sniping bot using AxiomTradeAPI"""
def __init__(self, auth_token: str, wallet_address: str, config: dict = None):
super().__init__(auth_token, wallet_address)
self.config = config or {
'max_buy_amount': 1.0, # SOL
'min_liquidity': 5.0, # SOL
'max_slippage': 0.05, # 5%
'blacklisted_tokens': set(),
'target_profit': 0.20, # 20%
'stop_loss': -0.10, # -10%
}
self.monitored_tokens = {}
self.active_positions = {}
async def process_market_data(self, data: dict):
"""Process new token launches and price updates"""
if data.get('type') == 'new_token':
await self.analyze_new_token(data)
elif data.get('type') == 'price_update':
await self.check_exit_conditions(data)
async def analyze_new_token(self, token_data: dict):
"""Analyze newly launched token for sniping opportunity"""
token_address = token_data.get('token_address')
if token_address in self.config['blacklisted_tokens']:
return
analysis = await self.rapid_token_analysis(token_data)
if analysis['snipe_score'] > 0.7: # High confidence threshold
signal = TradingSignal(
token_address=token_address,
action='buy',
confidence=analysis['snipe_score'],
reasoning=f"New token launch - Score: {analysis['snipe_score']}"
)
await self.execute_snipe_order(signal, token_data)
async def rapid_token_analysis(self, token_data: dict) -> dict:
"""Perform rapid analysis of a new token"""
score = 0.0
factors = {}
liquidity = token_data.get('liquidity', 0)
if liquidity >= self.config['min_liquidity']:
score += 0.3
factors['liquidity'] = 'PASS'
else:
factors['liquidity'] = 'FAIL'
return {'snipe_score': 0.0, 'factors': factors}
if token_data.get('verified_contract', False):
score += 0.2
factors['contract'] = 'VERIFIED'
creator_score = token_data.get('creator_reputation', 0)
if creator_score > 0.5:
score += 0.2
factors['creator'] = 'TRUSTED'
social_activity = token_data.get('social_activity', 0)
if social_activity > 100:
score += 0.15
factors['social'] = 'ACTIVE'
if token_data.get('initial_supply_locked', False):
score += 0.15
factors['supply_lock'] = 'LOCKED'
return {'snipe_score': score, 'factors': factors}
async def execute_snipe_order(self, signal: TradingSignal, token_data: dict):
"""Execute high-speed snipe order"""
try:
position_size = min(
self.config['max_buy_amount'] * signal.confidence,
self.config['max_buy_amount']
)
self.logger.info(f"Sniping: {signal.token_address} - Size: {position_size} SOL")
# Execute buy order here (see buy_token() in Buying and Selling Tokens)
self.active_positions[signal.token_address] = {
'entry_price': token_data.get('price', 0),
'size': position_size,
'entry_time': asyncio.get_event_loop().time(),
'target_profit': self.config['target_profit'],
'stop_loss': self.config['stop_loss']
}
except Exception as e:
self.logger.error(f"Snipe execution failed: {e}")
async def check_exit_conditions(self, price_data: dict):
"""Check if we should exit any active positions"""
token_address = price_data.get('token_address')
current_price = price_data.get('price', 0)
if token_address not in self.active_positions:
return
position = self.active_positions[token_address]
entry_price = position['entry_price']
if entry_price == 0:
return
pnl_ratio = (current_price - entry_price) / entry_price
should_exit = False
exit_reason = ""
if pnl_ratio >= position['target_profit']:
should_exit = True
exit_reason = f"Target profit reached: {pnl_ratio:.2%}"
elif pnl_ratio <= position['stop_loss']:
should_exit = True
exit_reason = f"Stop loss triggered: {pnl_ratio:.2%}"
elif asyncio.get_event_loop().time() - position['entry_time'] > 3600: # 1 hour max hold
should_exit = True
exit_reason = "Maximum hold time reached"
if should_exit:
await self.execute_exit_order(token_address, exit_reason, pnl_ratio)
async def execute_exit_order(self, token_address: str, reason: str, pnl_ratio: float):
"""Execute exit order for a position"""
try:
self.logger.info(f"Exiting: {token_address} - Reason: {reason} - P&L: {pnl_ratio:.2%}")
# Execute sell order here (see sell_token() in Buying and Selling Tokens)
del self.active_positions[token_address]
except Exception as e:
self.logger.error(f"Exit execution failed: {e}")
See Buying and Selling Tokens for the buy_token()/sell_token() calls that fill in the execution steps above.
Market Making Strategies
Market making bots provide liquidity by continuously placing buy and sell orders around the current market price.
class MarketMakingBot(BaseTradingBot):
"""Market making bot for Solana tokens"""
def __init__(self, auth_token: str, wallet_address: str, target_tokens: List[str]):
super().__init__(auth_token, wallet_address)
self.target_tokens = target_tokens
self.active_orders = {}
self.spread_config = {
'bid_spread': 0.002, # 0.2%
'ask_spread': 0.002, # 0.2%
'order_size': 0.1, # SOL per order
'max_position': 2.0, # SOL max position per token
'rebalance_threshold': 0.01 # 1%
}
async def generate_signals(self) -> List[TradingSignal]:
"""Generate market making signals for all target tokens"""
signals = []
for token_address in self.target_tokens:
try:
market_data = await self.get_market_data(token_address)
current_price = market_data.get('price', 0)
if current_price == 0:
continue
bid_price = current_price * (1 - self.spread_config['bid_spread'])
ask_price = current_price * (1 + self.spread_config['ask_spread'])
signals.extend([
TradingSignal(
token_address=token_address,
action='buy',
confidence=0.8,
price_target=bid_price,
reasoning="Market making bid"
),
TradingSignal(
token_address=token_address,
action='sell',
confidence=0.8,
price_target=ask_price,
reasoning="Market making ask"
)
])
except Exception as e:
self.logger.error(f"Error generating MM signals for {token_address}: {e}")
return signals
async def get_market_data(self, token_address: str) -> dict:
"""Get current market data for token"""
# Implement market data fetching from your preferred source
return {'price': 1.0, 'volume': 1000.0, 'liquidity': 5000.0}Arbitrage Detection
Arbitrage bots identify price differences across exchanges or liquidity pools and profit from these inefficiencies.
class ArbitrageBot(BaseTradingBot):
"""Cross-DEX arbitrage bot for the Solana ecosystem"""
def __init__(self, auth_token: str, wallet_address: str):
super().__init__(auth_token, wallet_address)
self.dex_connections = {}
self.min_profit_threshold = 0.005 # 0.5% minimum profit
self.max_position_size = 5.0 # SOL
async def scan_arbitrage_opportunities(self):
"""Continuously scan for arbitrage opportunities"""
while self.is_running:
try:
opportunities = await self.find_arbitrage_opportunities()
for opportunity in opportunities:
if opportunity['profit_ratio'] > self.min_profit_threshold:
await self.execute_arbitrage(opportunity)
await asyncio.sleep(0.5) # High frequency scanning
except Exception as e:
self.logger.error(f"Arbitrage scanning error: {e}")
async def find_arbitrage_opportunities(self) -> List[dict]:
"""Find profitable arbitrage opportunities"""
opportunities = []
tokens_to_check = ['SOL/USDC', 'RAY/SOL', 'SRM/SOL'] # Example pairs
for token_pair in tokens_to_check:
prices = await self.get_multi_dex_prices(token_pair)
if len(prices) >= 2:
max_price_dex = max(prices, key=lambda x: x['price'])
min_price_dex = min(prices, key=lambda x: x['price'])
profit_ratio = (max_price_dex['price'] - min_price_dex['price']) / min_price_dex['price']
if profit_ratio > self.min_profit_threshold:
opportunities.append({
'token_pair': token_pair,
'buy_dex': min_price_dex['dex'],
'sell_dex': max_price_dex['dex'],
'buy_price': min_price_dex['price'],
'sell_price': max_price_dex['price'],
'profit_ratio': profit_ratio,
'estimated_gas': 0.001 # SOL
})
return sorted(opportunities, key=lambda x: x['profit_ratio'], reverse=True)
async def get_multi_dex_prices(self, token_pair: str) -> List[dict]:
"""Get prices from multiple DEXs"""
# Implement multi-DEX price fetching
return [
{'dex': 'Raydium', 'price': 1.001},
{'dex': 'Orca', 'price': 1.006},
{'dex': 'Serum', 'price': 0.998}
]
async def execute_arbitrage(self, opportunity: dict):
"""Execute arbitrage trade"""
try:
self.logger.info(f"Arbitrage: {opportunity['token_pair']} - "
f"Profit: {opportunity['profit_ratio']:.2%}")
position_size = min(
self.max_position_size,
# Add more sophisticated position sizing logic
self.max_position_size
)
# Execute simultaneous buy/sell orders here
except Exception as e:
self.logger.error(f"Arbitrage execution failed: {e}")Risk Management
Comprehensive risk management is crucial for long-term profitability. The AxiomTradeAPI provides tools for portfolio monitoring and risk assessment.
class RiskManager:
"""Risk management system for trading bots"""
def __init__(self, config: dict):
self.config = config
self.daily_loss_limit = config.get('daily_loss_limit', 0.05) # 5%
self.max_position_size = config.get('max_position_size', 0.10) # 10% of portfolio
self.max_correlation = config.get('max_correlation', 0.7)
self.daily_pnl = 0.0
self.positions = {}
self.start_of_day_balance = 0.0
async def check_risk_limits(self, proposed_trade: dict) -> dict:
"""Check if proposed trade passes all risk limits"""
checks = {
'daily_loss_limit': await self.check_daily_loss_limit(),
'position_size_limit': await self.check_position_size_limit(proposed_trade),
'correlation_limit': await self.check_correlation_limit(proposed_trade),
'concentration_risk': await self.check_concentration_risk(proposed_trade)
}
passed = all(checks.values())
return {
'approved': passed,
'checks': checks,
'risk_score': self.calculate_risk_score(checks)
}
async def check_daily_loss_limit(self) -> bool:
"""Check if daily loss limit has been exceeded"""
if self.start_of_day_balance == 0:
return True
loss_ratio = abs(min(0, self.daily_pnl)) / self.start_of_day_balance
return loss_ratio < self.daily_loss_limit
async def check_position_size_limit(self, trade: dict) -> bool:
"""Check if position size is within limits"""
position_value = trade.get('size', 0) * trade.get('price', 0)
portfolio_value = await self.get_portfolio_value()
if portfolio_value == 0:
return False
position_ratio = position_value / portfolio_value
return position_ratio <= self.max_position_size
def calculate_risk_score(self, checks: dict) -> float:
"""Calculate overall risk score (0-1, lower is better)"""
failed_checks = sum(1 for passed in checks.values() if not passed)
return failed_checks / len(checks)Performance Optimization
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import cProfile
import time
class HighPerformanceBot(BaseTradingBot):
"""Trading bot optimized for speed"""
def __init__(self, auth_token: str, wallet_address: str):
super().__init__(auth_token, wallet_address)
self.session = None
self.thread_pool = ThreadPoolExecutor(max_workers=4)
self.performance_metrics = {
'api_call_times': [],
'signal_generation_times': [],
'execution_times': []
}
async def initialize_performance_optimizations(self):
"""Initialize performance optimizations"""
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
keepalive_timeout=30
)
)
await self.warmup_connections()
asyncio.create_task(self.performance_monitor())
async def warmup_connections(self):
"""Pre-warm HTTP connections to reduce latency"""
warmup_tasks = []
for _ in range(5):
warmup_tasks.append(self.client.get_balance(self.wallet_address))
await asyncio.gather(*warmup_tasks, return_exceptions=True)
async def performance_monitor(self):
"""Monitor and log performance metrics"""
while self.is_running:
try:
if self.performance_metrics['api_call_times']:
avg_api_time = sum(self.performance_metrics['api_call_times']) / len(self.performance_metrics['api_call_times'])
self.logger.info(f"Avg API response time: {avg_api_time:.3f}s")
self.performance_metrics = {k: [] for k in self.performance_metrics}
await asyncio.sleep(60)
except Exception as e:
self.logger.error(f"Performance monitoring error: {e}")
async def optimized_api_call(self, func, *args, **kwargs):
"""Wrapper for API calls with performance tracking"""
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
call_time = time.time() - start_time
self.performance_metrics['api_call_times'].append(call_time)
def profile_performance(self, func):
"""Decorator for profiling function performance"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
try:
result = func(*args, **kwargs)
return result
finally:
profiler.disable()
profiler.dump_stats(f'profile_{func.__name__}_{int(time.time())}.prof')
return wrapper
For more on connection pooling, memory management, and benchmarking, see Performance Optimization.
Production Deployment
Deploying trading bots to production requires careful consideration of reliability, monitoring, and security.
import os
from pathlib import Path
import docker
import yaml
class ProductionBotManager:
"""Production deployment manager for trading bots"""
def __init__(self, config_path: str):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.docker_client = docker.from_env()
self.running_bots = {}
def create_bot_dockerfile(self, bot_class: str) -> str:
"""Generate Dockerfile for bot deployment"""
dockerfile_content = f"""
FROM python:3.11-slim
RUN apt-get update && apt-get install -y \\
gcc \\
g++ \\
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN useradd -m botuser && chown -R botuser:botuser /app
USER botuser
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
CMD python -c "import requests; requests.get('http://localhost:8080/health')"
CMD ["python", "-m", "bots.{bot_class}"]
"""
return dockerfile_content.strip()
def create_docker_compose(self) -> str:
"""Generate docker-compose.yml for multi-bot deployment"""
compose_content = """
version: '3.8'
services:
token-sniping-bot:
build: .
environment:
- BOT_TYPE=TokenSnipingBot
- AUTH_TOKEN=${AUTH_TOKEN}
- WALLET_ADDRESS=${WALLET_ADDRESS}
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
- ./config:/app/config
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "-c", "import requests; requests.get('http://localhost:8080/health')"]
interval: 30s
timeout: 10s
retries: 3
market-making-bot:
build: .
environment:
- BOT_TYPE=MarketMakingBot
- AUTH_TOKEN=${AUTH_TOKEN}
- WALLET_ADDRESS=${WALLET_ADDRESS}
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
- ./config:/app/config
restart: unless-stopped
arbitrage-bot:
build: .
environment:
- BOT_TYPE=ArbitrageBot
- AUTH_TOKEN=${AUTH_TOKEN}
- WALLET_ADDRESS=${WALLET_ADDRESS}
- LOG_LEVEL=INFO
volumes:
- ./logs:/app/logs
- ./config:/app/config
restart: unless-stopped
monitoring:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-storage:/var/lib/grafana
volumes:
grafana-storage:
"""
return compose_content.strip()
def deploy_bot_fleet(self):
"""Deploy complete bot fleet with monitoring"""
try:
os.makedirs('logs', exist_ok=True)
os.makedirs('config', exist_ok=True)
os.makedirs('monitoring', exist_ok=True)
with open('Dockerfile', 'w') as f:
f.write(self.create_bot_dockerfile('BaseTradingBot'))
with open('docker-compose.yml', 'w') as f:
f.write(self.create_docker_compose())
self.create_monitoring_config()
os.system('docker-compose up -d --build')
except Exception as e:
print(f"Deployment failed: {e}")
def create_monitoring_config(self):
"""Create Prometheus monitoring configuration"""
prometheus_config = """
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'trading-bots'
static_configs:
- targets: ['token-sniping-bot:8080', 'market-making-bot:8080', 'arbitrage-bot:8080']
scrape_interval: 5s
metrics_path: /metrics
"""
with open('monitoring/prometheus.yml', 'w') as f:
f.write(prometheus_config.strip())
Example production configuration:
production_config = {
'bots': [
{
'type': 'TokenSnipingBot',
'instances': 2,
'resources': {'cpu': '1', 'memory': '512Mi'}
},
{
'type': 'MarketMakingBot',
'instances': 1,
'resources': {'cpu': '0.5', 'memory': '256Mi'}
}
],
'monitoring': {
'enabled': True,
'metrics_port': 8080,
'health_check_interval': 30
}
}
Bot types available in this framework: TokenSnipingBot (high-speed new token acquisition), MarketMakingBot (automated liquidity provision), ArbitrageBot (cross-DEX profit opportunities), and any custom bot subclassing BaseTradingBot.
Best Practices
Security considerations
- API key management - store authentication tokens securely using environment variables.
- Rate limiting - implement proper rate limiting to avoid API restrictions.
- Error handling - handle network issues and API errors comprehensively.
- Logging - keep detailed logs for debugging and performance analysis.
Performance tips
- Connection pooling - use persistent HTTP connections for better performance.
- Async operations - leverage async/await for concurrent operations.
- Data caching - cache frequently accessed data to reduce API calls.
- Memory management - monitor memory usage for long-running bots.
Monitoring and alerts
- Health checks - implement health check endpoints for monitoring.
- Performance metrics - track key performance indicators.
- Alert systems - set up alerts for critical failures or performance issues.
- Dashboards - use Grafana or a similar tool for visualization.
Conclusion
Building successful trading bots requires combining technical expertise with sound trading principles:
- Robust risk management - never risk more than you can afford to lose.
- Continuous testing - backtest strategies thoroughly before deployment.
- Performance monitoring - track and optimize bot performance.
Start with simple strategies and gradually increase complexity as you gain experience.
Related
- Buying and Selling Tokens -
buy_token()/sell_token()reference used by the bots above - WebSocket Integration - real-time market data feeding bot signals
- Performance Optimization - connection pooling, memory, and concurrency in depth
- Error Handling