WebSocket Integration

WebSocket Integration

Real-time Solana token monitoring and WebSocket streaming with AxiomTradeAPI-py, for building trading bots, token snipers, and market monitoring systems.

Quick Start: Real-Time Token Monitoring

Basic WebSocket setup

import asyncio
from axiomtradeapi import AxiomTradeClient

# Your authentication tokens (get from browser cookies)
AUTH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
REFRESH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

async def handle_new_tokens(tokens):
    """Process incoming token updates"""
    for token in tokens:
        print(f"NEW TOKEN: {token['tokenName']} ({token['tokenTicker']})")
        print(f"   Address: {token['tokenAddress']}")
        print(f"   Market Cap: {token['marketCapSol']} SOL")
        print(f"   Volume: {token['volumeSol']} SOL")
        print(f"   Protocol: {token['protocol']}")

async def main():
    client = AxiomTradeClient(
        auth_token=AUTH_TOKEN,
        refresh_token=REFRESH_TOKEN
    )

    # Subscribe to new token pairs
    await client.subscribe_new_tokens(handle_new_tokens)

    print("Listening for new tokens... (Press Ctrl+C to stop)")
    await client.ws.start()

if __name__ == "__main__":
    asyncio.run(main())

See Authentication overview for how to obtain and manage auth_token/refresh_token.

Advanced WebSocket Applications

1. Token sniper bot

An automated token sniper that filters and acts on new tokens:

import asyncio
import logging
from datetime import datetime
from axiomtradeapi import AxiomTradeClient

class TokenSniperBot:
    def __init__(self, auth_token, refresh_token):
        self.client = AxiomTradeClient(
            auth_token=auth_token,
            refresh_token=refresh_token,
            log_level=logging.INFO
        )

        # Sniper configuration
        self.min_market_cap = 10.0      # Minimum 10 SOL market cap
        self.max_market_cap = 1000.0    # Maximum 1000 SOL market cap
        self.min_volume = 5.0           # Minimum 5 SOL volume
        self.target_protocols = ["Raydium", "Orca", "Jupiter"]

        self.sniped_tokens = []
        self.processed_tokens = set()

        self.setup_logging()

    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('token_sniper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def meets_sniper_criteria(self, token):
        """Check if token meets our sniping criteria"""
        if token['tokenAddress'] in self.processed_tokens:
            return False

        self.processed_tokens.add(token['tokenAddress'])

        market_cap = token.get('marketCapSol', 0)
        volume = token.get('volumeSol', 0)
        protocol = token.get('protocol', '')

        if not (self.min_market_cap <= market_cap <= self.max_market_cap):
            return False

        if volume < self.min_volume:
            return False

        if protocol not in self.target_protocols:
            return False

        if not self.quality_checks(token):
            return False

        return True

    def quality_checks(self, token):
        """Basic quality checks for token legitimacy"""
        name = token.get('tokenName', '').lower()
        ticker = token.get('tokenTicker', '').lower()

        scam_keywords = ['elon', 'musk', 'doge', 'shib', 'safemoon', 'free', 'airdrop']
        if any(keyword in name or keyword in ticker for keyword in scam_keywords):
            self.logger.warning(f"{token['tokenName']}: contains suspicious keywords")
            return False

        if len(name) > 50 or len(ticker) > 10:
            return False

        has_socials = any([
            token.get('website'),
            token.get('twitter'),
            token.get('telegram')
        ])

        if not has_socials:
            self.logger.info(f"{token['tokenName']}: no social media presence")

        return True

    async def execute_snipe(self, token):
        """Execute the sniping action for a qualified token"""
        self.logger.info(f"Sniping token: {token['tokenName']}")

        snipe_data = {
            'timestamp': datetime.now().isoformat(),
            'token_name': token['tokenName'],
            'token_ticker': token['tokenTicker'],
            'token_address': token['tokenAddress'],
            'market_cap_sol': token['marketCapSol'],
            'volume_sol': token['volumeSol'],
            'protocol': token['protocol'],
        }

        self.sniped_tokens.append(snipe_data)

        # Implement actual trading logic here:
        # - Calculate position size
        # - Execute buy order (see buy_token() in Buying and Selling Tokens)
        # - Set stop-loss/take-profit
        # - Record transaction

        self.save_sniped_token(snipe_data)

    def save_sniped_token(self, snipe_data):
        """Persist sniped token data to file"""
        import json

        try:
            with open('sniped_tokens.json', 'r') as f:
                sniped_list = json.load(f)
        except FileNotFoundError:
            sniped_list = []

        sniped_list.append(snipe_data)

        with open('sniped_tokens.json', 'w') as f:
            json.dump(sniped_list, f, indent=2)

    async def process_new_tokens(self, tokens):
        """Main token processing function"""
        for token in tokens:
            try:
                if self.meets_sniper_criteria(token):
                    await self.execute_snipe(token)
            except Exception as e:
                self.logger.error(f"Error processing token {token.get('tokenName', 'Unknown')}: {e}")

    async def start_sniping(self):
        """Start the token sniping bot"""
        self.logger.info("Starting token sniper bot")
        self.logger.info(f"Criteria: market cap {self.min_market_cap}-{self.max_market_cap} SOL, "
                          f"min volume {self.min_volume} SOL, protocols {self.target_protocols}")

        try:
            await self.client.subscribe_new_tokens(self.process_new_tokens)
            await self.client.ws.start()
        except KeyboardInterrupt:
            self.logger.info("Token sniper stopped by user")
        except Exception as e:
            self.logger.error(f"Fatal error in token sniper: {e}")
            raise

    def get_sniper_stats(self):
        """Get sniping statistics"""
        return {
            "total_processed": len(self.processed_tokens),
            "total_sniped": len(self.sniped_tokens),
            "success_rate": len(self.sniped_tokens) / len(self.processed_tokens) * 100 if self.processed_tokens else 0
        }

# Usage
async def main():
    sniper = TokenSniperBot(
        auth_token="your-auth-token",
        refresh_token="your-refresh-token"
    )
    await sniper.start_sniping()

if __name__ == "__main__":
    asyncio.run(main())

Notifications (Discord/Telegram/email webhooks) can be added to execute_snipe() if you want out-of-band alerts when a snipe fires — implement send_*_notification() helpers using your webhook or SMTP credentials of choice.

2. Market sentiment analyzer

Analyze token sentiment and market trends in real time:

import asyncio
import json
from collections import defaultdict, deque
from datetime import datetime, timedelta
from axiomtradeapi import AxiomTradeClient

class MarketSentimentAnalyzer:
    def __init__(self, auth_token, refresh_token):
        self.client = AxiomTradeClient(
            auth_token=auth_token,
            refresh_token=refresh_token
        )

        self.token_data = defaultdict(list)
        self.market_metrics = {
            'new_tokens_per_hour': deque(maxlen=24),  # 24 hour window
            'protocol_distribution': defaultdict(int),
            'market_cap_trends': deque(maxlen=100),
            'volume_trends': deque(maxlen=100)
        }

        self.analysis_interval = 3600  # 1 hour
        self.last_analysis = datetime.now()

    async def process_market_data(self, tokens):
        """Process incoming token data for market analysis"""
        current_time = datetime.now()

        for token in tokens:
            token_entry = {
                'timestamp': current_time.isoformat(),
                'name': token['tokenName'],
                'ticker': token['tokenTicker'],
                'address': token['tokenAddress'],
                'market_cap': token['marketCapSol'],
                'volume': token['volumeSol'],
                'protocol': token['protocol']
            }

            self.token_data[token['tokenAddress']].append(token_entry)

            self.market_metrics['protocol_distribution'][token['protocol']] += 1
            self.market_metrics['market_cap_trends'].append(token['marketCapSol'])
            self.market_metrics['volume_trends'].append(token['volumeSol'])

        self.market_metrics['new_tokens_per_hour'].append({
            'timestamp': current_time.isoformat(),
            'count': len(tokens)
        })

        if current_time - self.last_analysis >= timedelta(seconds=self.analysis_interval):
            await self.perform_market_analysis()
            self.last_analysis = current_time

    async def perform_market_analysis(self):
        """Perform market sentiment analysis"""
        self.analyze_token_velocity()
        self.analyze_protocol_trends()
        self.analyze_market_cap_trends()
        self.analyze_volume_patterns()

        sentiment_score = self.calculate_market_sentiment()
        print(f"Overall market sentiment: {sentiment_score}/10")
        self.save_analysis_report()

    def analyze_token_velocity(self):
        """Analyze how fast new tokens are being created"""
        if not self.market_metrics['new_tokens_per_hour']:
            return

        recent_hours = list(self.market_metrics['new_tokens_per_hour'])[-6:]  # Last 6 hours
        avg_tokens_per_hour = sum(hour['count'] for hour in recent_hours) / len(recent_hours)
        print(f"Average new tokens per hour (last 6h): {avg_tokens_per_hour:.1f}")

    def analyze_protocol_trends(self):
        """Analyze which protocols are dominating"""
        total_tokens = sum(self.market_metrics['protocol_distribution'].values())
        if total_tokens == 0:
            return

        sorted_protocols = sorted(
            self.market_metrics['protocol_distribution'].items(),
            key=lambda x: x[1],
            reverse=True
        )

        for protocol, count in sorted_protocols[:5]:  # Top 5
            percentage = (count / total_tokens) * 100
            print(f"   {protocol}: {count} tokens ({percentage:.1f}%)")

    def analyze_market_cap_trends(self):
        """Analyze market cap trends"""
        if len(self.market_metrics['market_cap_trends']) < 10:
            return

        market_caps = list(self.market_metrics['market_cap_trends'])
        avg_market_cap = sum(market_caps) / len(market_caps)
        recent_avg = sum(market_caps[-10:]) / 10  # Last 10 tokens

        print(f"Average market cap: {avg_market_cap:.2f} SOL, recent avg: {recent_avg:.2f} SOL")

    def analyze_volume_patterns(self):
        """Analyze volume patterns"""
        if len(self.market_metrics['volume_trends']) < 10:
            return

        volumes = list(self.market_metrics['volume_trends'])
        avg_volume = sum(volumes) / len(volumes)
        recent_avg = sum(volumes[-10:]) / 10

        print(f"Average volume: {avg_volume:.2f} SOL, recent avg: {recent_avg:.2f} SOL")

    def calculate_market_sentiment(self):
        """Calculate overall market sentiment score (1-10)"""
        score = 5.0  # Neutral starting point

        if self.market_metrics['new_tokens_per_hour']:
            recent_hours = list(self.market_metrics['new_tokens_per_hour'])[-6:]
            avg_tokens = sum(hour['count'] for hour in recent_hours) / len(recent_hours)

            if avg_tokens > 50:
                score += 1.5  # High activity is bullish
            elif avg_tokens < 10:
                score -= 1.0  # Low activity is bearish

        if len(self.market_metrics['market_cap_trends']) >= 20:
            market_caps = list(self.market_metrics['market_cap_trends'])
            early_avg = sum(market_caps[:10]) / 10
            recent_avg = sum(market_caps[-10:]) / 10

            if recent_avg > early_avg * 1.2:
                score += 1.0
            elif recent_avg < early_avg * 0.8:
                score -= 1.0

        if len(self.market_metrics['volume_trends']) >= 20:
            volumes = list(self.market_metrics['volume_trends'])
            early_avg = sum(volumes[:10]) / 10
            recent_avg = sum(volumes[-10:]) / 10

            if recent_avg > early_avg * 1.3:
                score += 1.0
            elif recent_avg < early_avg * 0.7:
                score -= 1.0

        unique_protocols = len(self.market_metrics['protocol_distribution'])
        if unique_protocols > 5:
            score += 0.5  # Diversity is good
        elif unique_protocols < 3:
            score -= 0.5  # Low diversity is concerning

        return max(1.0, min(10.0, score))  # Clamp between 1-10

    def save_analysis_report(self):
        """Save analysis report to file"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'token_velocity': {
                'new_tokens_per_hour': list(self.market_metrics['new_tokens_per_hour']),
            },
            'protocol_distribution': dict(self.market_metrics['protocol_distribution']),
            'market_cap_trends': list(self.market_metrics['market_cap_trends']),
            'volume_trends': list(self.market_metrics['volume_trends']),
            'sentiment_score': self.calculate_market_sentiment()
        }

        with open(f"market_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
            json.dump(report, f, indent=2)

    async def start_analysis(self):
        """Start market sentiment analysis"""
        try:
            await self.client.subscribe_new_tokens(self.process_market_data)
            await self.client.ws.start()
        except KeyboardInterrupt:
            await self.perform_market_analysis()  # Final analysis

# Usage
async def main():
    analyzer = MarketSentimentAnalyzer(
        auth_token="your-auth-token",
        refresh_token="your-refresh-token"
    )
    await analyzer.start_analysis()

if __name__ == "__main__":
    asyncio.run(main())

WebSocket Connection Management

Robust connection handling

import asyncio
import logging
from axiomtradeapi import AxiomTradeClient

class RobustWebSocketClient:
    def __init__(self, auth_token, refresh_token):
        self.auth_token = auth_token
        self.refresh_token = refresh_token
        self.client = None
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
        self.reconnect_delay = 5

        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    async def connect_with_retry(self):
        """Connect with automatic retry logic"""
        while self.reconnect_attempts < self.max_reconnect_attempts:
            try:
                self.client = AxiomTradeClient(
                    auth_token=self.auth_token,
                    refresh_token=self.refresh_token
                )

                await self.client.subscribe_new_tokens(self.handle_tokens)
                self.logger.info("WebSocket connected successfully")
                self.reconnect_attempts = 0
                return True

            except Exception as e:
                self.reconnect_attempts += 1
                self.logger.error(f"Connection failed (attempt {self.reconnect_attempts}): {e}")

                if self.reconnect_attempts < self.max_reconnect_attempts:
                    await asyncio.sleep(self.reconnect_delay)
                    self.reconnect_delay *= 2  # Exponential backoff

        self.logger.error("Max reconnection attempts exceeded")
        return False

    async def handle_tokens(self, tokens):
        """Handle incoming token data"""
        for token in tokens:
            self.logger.info(f"Received: {token['tokenName']}")

    async def start_with_monitoring(self):
        """Start with connection monitoring"""
        while True:
            if await self.connect_with_retry():
                try:
                    await self.client.ws.start()
                except Exception as e:
                    self.logger.error(f"WebSocket error: {e}")
                    self.logger.info("Attempting to reconnect...")
                    continue
            else:
                self.logger.error("Failed to establish connection")
                break

# Usage
async def main():
    robust_client = RobustWebSocketClient(
        auth_token="your-auth-token",
        refresh_token="your-refresh-token"
    )
    await robust_client.start_with_monitoring()

if __name__ == "__main__":
    asyncio.run(main())

WebSocket Performance Optimization

High-performance token processing

import asyncio
import time
from collections import deque
from axiomtradeapi import AxiomTradeClient

class HighPerformanceTokenProcessor:
    def __init__(self, auth_token, refresh_token):
        self.client = AxiomTradeClient(
            auth_token=auth_token,
            refresh_token=refresh_token
        )

        self.processing_times = deque(maxlen=1000)
        self.tokens_processed = 0
        self.start_time = time.time()

        self.token_queue = asyncio.Queue(maxsize=10000)
        self.workers = []

    async def token_receiver(self, tokens):
        """Receive tokens and add to processing queue"""
        receive_time = time.time()

        for token in tokens:
            token['receive_timestamp'] = receive_time

            try:
                self.token_queue.put_nowait(token)
            except asyncio.QueueFull:
                print("Warning: token queue is full, dropping token")

    async def token_worker(self, worker_id):
        """Worker coroutine for processing tokens"""
        while True:
            try:
                token = await self.token_queue.get()

                start_time = time.time()
                await self.process_token(token)
                end_time = time.time()

                self.processing_times.append(end_time - start_time)
                self.tokens_processed += 1
                self.token_queue.task_done()

            except Exception as e:
                print(f"Worker {worker_id} error: {e}")

    async def process_token(self, token):
        """Process individual token (implement your logic here)"""
        await asyncio.sleep(0.01)  # Simulated processing time
        print(f"Processed: {token['tokenName']}")

    def get_performance_stats(self):
        """Get performance statistics"""
        if not self.processing_times:
            return "No performance data available"

        avg_processing_time = sum(self.processing_times) / len(self.processing_times)
        total_runtime = time.time() - self.start_time
        tokens_per_second = self.tokens_processed / total_runtime if total_runtime > 0 else 0

        return {
            'tokens_processed': self.tokens_processed,
            'avg_processing_time_ms': avg_processing_time * 1000,
            'tokens_per_second': tokens_per_second,
            'queue_size': self.token_queue.qsize(),
            'total_runtime': total_runtime
        }

    async def start_processing(self, num_workers=5):
        """Start high-performance token processing"""
        for i in range(num_workers):
            worker = asyncio.create_task(self.token_worker(i))
            self.workers.append(worker)

        monitor_task = asyncio.create_task(self.performance_monitor())

        try:
            await self.client.subscribe_new_tokens(self.token_receiver)
            await self.client.ws.start()

        except KeyboardInterrupt:
            for worker in self.workers:
                worker.cancel()
            monitor_task.cancel()

            stats = self.get_performance_stats()
            for key, value in stats.items():
                print(f"   {key}: {value}")

    async def performance_monitor(self):
        """Monitor and report performance every 30 seconds"""
        while True:
            await asyncio.sleep(30)
            stats = self.get_performance_stats()
            print(f"Tokens processed: {stats['tokens_processed']}, "
                  f"speed: {stats['tokens_per_second']:.2f}/s, "
                  f"avg time: {stats['avg_processing_time_ms']:.2f}ms")

# Usage
async def main():
    processor = HighPerformanceTokenProcessor(
        auth_token="your-auth-token",
        refresh_token="your-refresh-token"
    )
    await processor.start_processing(num_workers=10)

if __name__ == "__main__":
    asyncio.run(main())

Security Best Practices

Secure WebSocket implementation

import asyncio
import ssl
import logging
from axiomtradeapi import AxiomTradeClient

class SecureWebSocketClient:
    def __init__(self, auth_token, refresh_token):
        self.auth_token = auth_token
        self.refresh_token = refresh_token

        self.setup_security()
        self.setup_logging()

    def setup_security(self):
        """Setup security configuration"""
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.check_hostname = True
        self.ssl_context.verify_mode = ssl.CERT_REQUIRED

        if not self.validate_tokens():
            raise ValueError("Invalid authentication tokens")

    def validate_tokens(self):
        """Validate authentication tokens"""
        if not self.auth_token or not self.refresh_token:
            return False

        # Basic JWT format validation
        if not (self.auth_token.count('.') == 2 and self.refresh_token.count('.') == 2):
            return False

        return True

    def setup_logging(self):
        """Setup security-focused logging"""
        self.security_logger = logging.getLogger('security')
        self.security_logger.setLevel(logging.INFO)

        security_handler = logging.FileHandler('security.log')
        security_formatter = logging.Formatter(
            '%(asctime)s - SECURITY - %(levelname)s - %(message)s'
        )
        security_handler.setFormatter(security_formatter)
        self.security_logger.addHandler(security_handler)

    async def secure_token_handler(self, tokens):
        """Securely handle incoming tokens"""
        self.security_logger.info(f"Received {len(tokens)} tokens via secure WebSocket")

        for token in tokens:
            if self.validate_token_data(token):
                await self.process_secure_token(token)
            else:
                self.security_logger.warning(f"Invalid token data received: {token.get('tokenAddress', 'Unknown')}")

    def validate_token_data(self, token):
        """Validate incoming token data"""
        required_fields = ['tokenName', 'tokenAddress', 'marketCapSol', 'volumeSol']

        for field in required_fields:
            if field not in token:
                return False

        if len(token['tokenAddress']) != 44:  # Solana address length
            return False

        if token['marketCapSol'] < 0 or token['volumeSol'] < 0:
            return False

        return True

    async def process_secure_token(self, token):
        """Process validated token data"""
        print(f"Securely processed: {token['tokenName']}")

    async def start_secure_connection(self):
        """Start secure WebSocket connection"""
        try:
            self.client = AxiomTradeClient(
                auth_token=self.auth_token,
                refresh_token=self.refresh_token
            )

            self.security_logger.info("Initiating secure WebSocket connection")

            await self.client.subscribe_new_tokens(self.secure_token_handler)
            await self.client.ws.start()

        except Exception as e:
            self.security_logger.error(f"Connection error: {e}")
            raise

# Usage
async def main():
    secure_client = SecureWebSocketClient(
        auth_token="your-auth-token",
        refresh_token="your-refresh-token"
    )
    await secure_client.start_secure_connection()

if __name__ == "__main__":
    asyncio.run(main())