WebSocket Integration
WebSocket Integration
Real-time Solana token monitoring and WebSocket streaming with AxiomTradeAPI-py, for building trading bots, token snipers, and market monitoring systems.
Quick Start: Real-Time Token Monitoring
Basic WebSocket setup
import asyncio
from axiomtradeapi import AxiomTradeClient
# Your authentication tokens (get from browser cookies)
AUTH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
REFRESH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
async def handle_new_tokens(tokens):
"""Process incoming token updates"""
for token in tokens:
print(f"NEW TOKEN: {token['tokenName']} ({token['tokenTicker']})")
print(f" Address: {token['tokenAddress']}")
print(f" Market Cap: {token['marketCapSol']} SOL")
print(f" Volume: {token['volumeSol']} SOL")
print(f" Protocol: {token['protocol']}")
async def main():
client = AxiomTradeClient(
auth_token=AUTH_TOKEN,
refresh_token=REFRESH_TOKEN
)
# Subscribe to new token pairs
await client.subscribe_new_tokens(handle_new_tokens)
print("Listening for new tokens... (Press Ctrl+C to stop)")
await client.ws.start()
if __name__ == "__main__":
asyncio.run(main())
See Authentication overview for how to obtain and manage auth_token/refresh_token.
Advanced WebSocket Applications
1. Token sniper bot
An automated token sniper that filters and acts on new tokens:
import asyncio
import logging
from datetime import datetime
from axiomtradeapi import AxiomTradeClient
class TokenSniperBot:
def __init__(self, auth_token, refresh_token):
self.client = AxiomTradeClient(
auth_token=auth_token,
refresh_token=refresh_token,
log_level=logging.INFO
)
# Sniper configuration
self.min_market_cap = 10.0 # Minimum 10 SOL market cap
self.max_market_cap = 1000.0 # Maximum 1000 SOL market cap
self.min_volume = 5.0 # Minimum 5 SOL volume
self.target_protocols = ["Raydium", "Orca", "Jupiter"]
self.sniped_tokens = []
self.processed_tokens = set()
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('token_sniper.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def meets_sniper_criteria(self, token):
"""Check if token meets our sniping criteria"""
if token['tokenAddress'] in self.processed_tokens:
return False
self.processed_tokens.add(token['tokenAddress'])
market_cap = token.get('marketCapSol', 0)
volume = token.get('volumeSol', 0)
protocol = token.get('protocol', '')
if not (self.min_market_cap <= market_cap <= self.max_market_cap):
return False
if volume < self.min_volume:
return False
if protocol not in self.target_protocols:
return False
if not self.quality_checks(token):
return False
return True
def quality_checks(self, token):
"""Basic quality checks for token legitimacy"""
name = token.get('tokenName', '').lower()
ticker = token.get('tokenTicker', '').lower()
scam_keywords = ['elon', 'musk', 'doge', 'shib', 'safemoon', 'free', 'airdrop']
if any(keyword in name or keyword in ticker for keyword in scam_keywords):
self.logger.warning(f"{token['tokenName']}: contains suspicious keywords")
return False
if len(name) > 50 or len(ticker) > 10:
return False
has_socials = any([
token.get('website'),
token.get('twitter'),
token.get('telegram')
])
if not has_socials:
self.logger.info(f"{token['tokenName']}: no social media presence")
return True
async def execute_snipe(self, token):
"""Execute the sniping action for a qualified token"""
self.logger.info(f"Sniping token: {token['tokenName']}")
snipe_data = {
'timestamp': datetime.now().isoformat(),
'token_name': token['tokenName'],
'token_ticker': token['tokenTicker'],
'token_address': token['tokenAddress'],
'market_cap_sol': token['marketCapSol'],
'volume_sol': token['volumeSol'],
'protocol': token['protocol'],
}
self.sniped_tokens.append(snipe_data)
# Implement actual trading logic here:
# - Calculate position size
# - Execute buy order (see buy_token() in Buying and Selling Tokens)
# - Set stop-loss/take-profit
# - Record transaction
self.save_sniped_token(snipe_data)
def save_sniped_token(self, snipe_data):
"""Persist sniped token data to file"""
import json
try:
with open('sniped_tokens.json', 'r') as f:
sniped_list = json.load(f)
except FileNotFoundError:
sniped_list = []
sniped_list.append(snipe_data)
with open('sniped_tokens.json', 'w') as f:
json.dump(sniped_list, f, indent=2)
async def process_new_tokens(self, tokens):
"""Main token processing function"""
for token in tokens:
try:
if self.meets_sniper_criteria(token):
await self.execute_snipe(token)
except Exception as e:
self.logger.error(f"Error processing token {token.get('tokenName', 'Unknown')}: {e}")
async def start_sniping(self):
"""Start the token sniping bot"""
self.logger.info("Starting token sniper bot")
self.logger.info(f"Criteria: market cap {self.min_market_cap}-{self.max_market_cap} SOL, "
f"min volume {self.min_volume} SOL, protocols {self.target_protocols}")
try:
await self.client.subscribe_new_tokens(self.process_new_tokens)
await self.client.ws.start()
except KeyboardInterrupt:
self.logger.info("Token sniper stopped by user")
except Exception as e:
self.logger.error(f"Fatal error in token sniper: {e}")
raise
def get_sniper_stats(self):
"""Get sniping statistics"""
return {
"total_processed": len(self.processed_tokens),
"total_sniped": len(self.sniped_tokens),
"success_rate": len(self.sniped_tokens) / len(self.processed_tokens) * 100 if self.processed_tokens else 0
}
# Usage
async def main():
sniper = TokenSniperBot(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
await sniper.start_sniping()
if __name__ == "__main__":
asyncio.run(main())
Notifications (Discord/Telegram/email webhooks) can be added to execute_snipe() if you want out-of-band alerts when a snipe fires — implement send_*_notification() helpers using your webhook or SMTP credentials of choice.
2. Market sentiment analyzer
Analyze token sentiment and market trends in real time:
import asyncio
import json
from collections import defaultdict, deque
from datetime import datetime, timedelta
from axiomtradeapi import AxiomTradeClient
class MarketSentimentAnalyzer:
def __init__(self, auth_token, refresh_token):
self.client = AxiomTradeClient(
auth_token=auth_token,
refresh_token=refresh_token
)
self.token_data = defaultdict(list)
self.market_metrics = {
'new_tokens_per_hour': deque(maxlen=24), # 24 hour window
'protocol_distribution': defaultdict(int),
'market_cap_trends': deque(maxlen=100),
'volume_trends': deque(maxlen=100)
}
self.analysis_interval = 3600 # 1 hour
self.last_analysis = datetime.now()
async def process_market_data(self, tokens):
"""Process incoming token data for market analysis"""
current_time = datetime.now()
for token in tokens:
token_entry = {
'timestamp': current_time.isoformat(),
'name': token['tokenName'],
'ticker': token['tokenTicker'],
'address': token['tokenAddress'],
'market_cap': token['marketCapSol'],
'volume': token['volumeSol'],
'protocol': token['protocol']
}
self.token_data[token['tokenAddress']].append(token_entry)
self.market_metrics['protocol_distribution'][token['protocol']] += 1
self.market_metrics['market_cap_trends'].append(token['marketCapSol'])
self.market_metrics['volume_trends'].append(token['volumeSol'])
self.market_metrics['new_tokens_per_hour'].append({
'timestamp': current_time.isoformat(),
'count': len(tokens)
})
if current_time - self.last_analysis >= timedelta(seconds=self.analysis_interval):
await self.perform_market_analysis()
self.last_analysis = current_time
async def perform_market_analysis(self):
"""Perform market sentiment analysis"""
self.analyze_token_velocity()
self.analyze_protocol_trends()
self.analyze_market_cap_trends()
self.analyze_volume_patterns()
sentiment_score = self.calculate_market_sentiment()
print(f"Overall market sentiment: {sentiment_score}/10")
self.save_analysis_report()
def analyze_token_velocity(self):
"""Analyze how fast new tokens are being created"""
if not self.market_metrics['new_tokens_per_hour']:
return
recent_hours = list(self.market_metrics['new_tokens_per_hour'])[-6:] # Last 6 hours
avg_tokens_per_hour = sum(hour['count'] for hour in recent_hours) / len(recent_hours)
print(f"Average new tokens per hour (last 6h): {avg_tokens_per_hour:.1f}")
def analyze_protocol_trends(self):
"""Analyze which protocols are dominating"""
total_tokens = sum(self.market_metrics['protocol_distribution'].values())
if total_tokens == 0:
return
sorted_protocols = sorted(
self.market_metrics['protocol_distribution'].items(),
key=lambda x: x[1],
reverse=True
)
for protocol, count in sorted_protocols[:5]: # Top 5
percentage = (count / total_tokens) * 100
print(f" {protocol}: {count} tokens ({percentage:.1f}%)")
def analyze_market_cap_trends(self):
"""Analyze market cap trends"""
if len(self.market_metrics['market_cap_trends']) < 10:
return
market_caps = list(self.market_metrics['market_cap_trends'])
avg_market_cap = sum(market_caps) / len(market_caps)
recent_avg = sum(market_caps[-10:]) / 10 # Last 10 tokens
print(f"Average market cap: {avg_market_cap:.2f} SOL, recent avg: {recent_avg:.2f} SOL")
def analyze_volume_patterns(self):
"""Analyze volume patterns"""
if len(self.market_metrics['volume_trends']) < 10:
return
volumes = list(self.market_metrics['volume_trends'])
avg_volume = sum(volumes) / len(volumes)
recent_avg = sum(volumes[-10:]) / 10
print(f"Average volume: {avg_volume:.2f} SOL, recent avg: {recent_avg:.2f} SOL")
def calculate_market_sentiment(self):
"""Calculate overall market sentiment score (1-10)"""
score = 5.0 # Neutral starting point
if self.market_metrics['new_tokens_per_hour']:
recent_hours = list(self.market_metrics['new_tokens_per_hour'])[-6:]
avg_tokens = sum(hour['count'] for hour in recent_hours) / len(recent_hours)
if avg_tokens > 50:
score += 1.5 # High activity is bullish
elif avg_tokens < 10:
score -= 1.0 # Low activity is bearish
if len(self.market_metrics['market_cap_trends']) >= 20:
market_caps = list(self.market_metrics['market_cap_trends'])
early_avg = sum(market_caps[:10]) / 10
recent_avg = sum(market_caps[-10:]) / 10
if recent_avg > early_avg * 1.2:
score += 1.0
elif recent_avg < early_avg * 0.8:
score -= 1.0
if len(self.market_metrics['volume_trends']) >= 20:
volumes = list(self.market_metrics['volume_trends'])
early_avg = sum(volumes[:10]) / 10
recent_avg = sum(volumes[-10:]) / 10
if recent_avg > early_avg * 1.3:
score += 1.0
elif recent_avg < early_avg * 0.7:
score -= 1.0
unique_protocols = len(self.market_metrics['protocol_distribution'])
if unique_protocols > 5:
score += 0.5 # Diversity is good
elif unique_protocols < 3:
score -= 0.5 # Low diversity is concerning
return max(1.0, min(10.0, score)) # Clamp between 1-10
def save_analysis_report(self):
"""Save analysis report to file"""
report = {
'timestamp': datetime.now().isoformat(),
'token_velocity': {
'new_tokens_per_hour': list(self.market_metrics['new_tokens_per_hour']),
},
'protocol_distribution': dict(self.market_metrics['protocol_distribution']),
'market_cap_trends': list(self.market_metrics['market_cap_trends']),
'volume_trends': list(self.market_metrics['volume_trends']),
'sentiment_score': self.calculate_market_sentiment()
}
with open(f"market_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
json.dump(report, f, indent=2)
async def start_analysis(self):
"""Start market sentiment analysis"""
try:
await self.client.subscribe_new_tokens(self.process_market_data)
await self.client.ws.start()
except KeyboardInterrupt:
await self.perform_market_analysis() # Final analysis
# Usage
async def main():
analyzer = MarketSentimentAnalyzer(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
await analyzer.start_analysis()
if __name__ == "__main__":
asyncio.run(main())WebSocket Connection Management
Robust connection handling
import asyncio
import logging
from axiomtradeapi import AxiomTradeClient
class RobustWebSocketClient:
def __init__(self, auth_token, refresh_token):
self.auth_token = auth_token
self.refresh_token = refresh_token
self.client = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
self.reconnect_delay = 5
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def connect_with_retry(self):
"""Connect with automatic retry logic"""
while self.reconnect_attempts < self.max_reconnect_attempts:
try:
self.client = AxiomTradeClient(
auth_token=self.auth_token,
refresh_token=self.refresh_token
)
await self.client.subscribe_new_tokens(self.handle_tokens)
self.logger.info("WebSocket connected successfully")
self.reconnect_attempts = 0
return True
except Exception as e:
self.reconnect_attempts += 1
self.logger.error(f"Connection failed (attempt {self.reconnect_attempts}): {e}")
if self.reconnect_attempts < self.max_reconnect_attempts:
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay *= 2 # Exponential backoff
self.logger.error("Max reconnection attempts exceeded")
return False
async def handle_tokens(self, tokens):
"""Handle incoming token data"""
for token in tokens:
self.logger.info(f"Received: {token['tokenName']}")
async def start_with_monitoring(self):
"""Start with connection monitoring"""
while True:
if await self.connect_with_retry():
try:
await self.client.ws.start()
except Exception as e:
self.logger.error(f"WebSocket error: {e}")
self.logger.info("Attempting to reconnect...")
continue
else:
self.logger.error("Failed to establish connection")
break
# Usage
async def main():
robust_client = RobustWebSocketClient(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
await robust_client.start_with_monitoring()
if __name__ == "__main__":
asyncio.run(main())WebSocket Performance Optimization
High-performance token processing
import asyncio
import time
from collections import deque
from axiomtradeapi import AxiomTradeClient
class HighPerformanceTokenProcessor:
def __init__(self, auth_token, refresh_token):
self.client = AxiomTradeClient(
auth_token=auth_token,
refresh_token=refresh_token
)
self.processing_times = deque(maxlen=1000)
self.tokens_processed = 0
self.start_time = time.time()
self.token_queue = asyncio.Queue(maxsize=10000)
self.workers = []
async def token_receiver(self, tokens):
"""Receive tokens and add to processing queue"""
receive_time = time.time()
for token in tokens:
token['receive_timestamp'] = receive_time
try:
self.token_queue.put_nowait(token)
except asyncio.QueueFull:
print("Warning: token queue is full, dropping token")
async def token_worker(self, worker_id):
"""Worker coroutine for processing tokens"""
while True:
try:
token = await self.token_queue.get()
start_time = time.time()
await self.process_token(token)
end_time = time.time()
self.processing_times.append(end_time - start_time)
self.tokens_processed += 1
self.token_queue.task_done()
except Exception as e:
print(f"Worker {worker_id} error: {e}")
async def process_token(self, token):
"""Process individual token (implement your logic here)"""
await asyncio.sleep(0.01) # Simulated processing time
print(f"Processed: {token['tokenName']}")
def get_performance_stats(self):
"""Get performance statistics"""
if not self.processing_times:
return "No performance data available"
avg_processing_time = sum(self.processing_times) / len(self.processing_times)
total_runtime = time.time() - self.start_time
tokens_per_second = self.tokens_processed / total_runtime if total_runtime > 0 else 0
return {
'tokens_processed': self.tokens_processed,
'avg_processing_time_ms': avg_processing_time * 1000,
'tokens_per_second': tokens_per_second,
'queue_size': self.token_queue.qsize(),
'total_runtime': total_runtime
}
async def start_processing(self, num_workers=5):
"""Start high-performance token processing"""
for i in range(num_workers):
worker = asyncio.create_task(self.token_worker(i))
self.workers.append(worker)
monitor_task = asyncio.create_task(self.performance_monitor())
try:
await self.client.subscribe_new_tokens(self.token_receiver)
await self.client.ws.start()
except KeyboardInterrupt:
for worker in self.workers:
worker.cancel()
monitor_task.cancel()
stats = self.get_performance_stats()
for key, value in stats.items():
print(f" {key}: {value}")
async def performance_monitor(self):
"""Monitor and report performance every 30 seconds"""
while True:
await asyncio.sleep(30)
stats = self.get_performance_stats()
print(f"Tokens processed: {stats['tokens_processed']}, "
f"speed: {stats['tokens_per_second']:.2f}/s, "
f"avg time: {stats['avg_processing_time_ms']:.2f}ms")
# Usage
async def main():
processor = HighPerformanceTokenProcessor(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
await processor.start_processing(num_workers=10)
if __name__ == "__main__":
asyncio.run(main())Security Best Practices
Secure WebSocket implementation
import asyncio
import ssl
import logging
from axiomtradeapi import AxiomTradeClient
class SecureWebSocketClient:
def __init__(self, auth_token, refresh_token):
self.auth_token = auth_token
self.refresh_token = refresh_token
self.setup_security()
self.setup_logging()
def setup_security(self):
"""Setup security configuration"""
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = True
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
if not self.validate_tokens():
raise ValueError("Invalid authentication tokens")
def validate_tokens(self):
"""Validate authentication tokens"""
if not self.auth_token or not self.refresh_token:
return False
# Basic JWT format validation
if not (self.auth_token.count('.') == 2 and self.refresh_token.count('.') == 2):
return False
return True
def setup_logging(self):
"""Setup security-focused logging"""
self.security_logger = logging.getLogger('security')
self.security_logger.setLevel(logging.INFO)
security_handler = logging.FileHandler('security.log')
security_formatter = logging.Formatter(
'%(asctime)s - SECURITY - %(levelname)s - %(message)s'
)
security_handler.setFormatter(security_formatter)
self.security_logger.addHandler(security_handler)
async def secure_token_handler(self, tokens):
"""Securely handle incoming tokens"""
self.security_logger.info(f"Received {len(tokens)} tokens via secure WebSocket")
for token in tokens:
if self.validate_token_data(token):
await self.process_secure_token(token)
else:
self.security_logger.warning(f"Invalid token data received: {token.get('tokenAddress', 'Unknown')}")
def validate_token_data(self, token):
"""Validate incoming token data"""
required_fields = ['tokenName', 'tokenAddress', 'marketCapSol', 'volumeSol']
for field in required_fields:
if field not in token:
return False
if len(token['tokenAddress']) != 44: # Solana address length
return False
if token['marketCapSol'] < 0 or token['volumeSol'] < 0:
return False
return True
async def process_secure_token(self, token):
"""Process validated token data"""
print(f"Securely processed: {token['tokenName']}")
async def start_secure_connection(self):
"""Start secure WebSocket connection"""
try:
self.client = AxiomTradeClient(
auth_token=self.auth_token,
refresh_token=self.refresh_token
)
self.security_logger.info("Initiating secure WebSocket connection")
await self.client.subscribe_new_tokens(self.secure_token_handler)
await self.client.ws.start()
except Exception as e:
self.security_logger.error(f"Connection error: {e}")
raise
# Usage
async def main():
secure_client = SecureWebSocketClient(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
await secure_client.start_secure_connection()
if __name__ == "__main__":
asyncio.run(main())Related
- Building Trading Bots - strategies that consume this WebSocket stream
- Performance Optimization - scaling real-time systems
- Error Handling
- Authentication overview