Source code for utils.cryptocache

"""
CryptoCache: Advanced Cryptocurrency Data Security Framework
Version: 1.0
Author: Gabriel Cellammare
Last Modified: 05/01/2025

This module implements a comprehensive security framework for caching cryptocurrency
pricing data with defense-in-depth measures and robust security controls. It provides
multi-layered protection against various attack vectors while ensuring data integrity
and secure API communications.

Core Security Features:

1. Data Protection
   - Secure cache file handling
   - Data validation and sanitization
   - Protected memory operations
   - Integrity verification

2. API Security
   - Rate limiting implementation 
   - Request throttling
   - Secure HTTP headers
   - Response validation

3. Cache Management
   - Secure directory creation
   - Protected file operations
   - Timestamp validation
   - Data expiration controls

4. Error Handling
   - Comprehensive exception management
   - Secure logging practices
   - Fail-safe defaults
   - Recovery procedures

Security Considerations:

1. File System Security:
   - Cache directory permissions must be properly configured
   - File operations need atomic write protection
   - Path traversal prevention required
   - Temporary file handling must be secure

2. API Communication:
   - SSL/TLS verification should be enforced
   - API keys need secure storage
   - Request headers should be sanitized
   - Rate limiting must prevent DOS

3. Data Validation:
   - All input parameters require validation
   - Cache data must be verified
   - Response data needs sanitization
   - Type checking should be strict

4. Memory Management:
   - Sensitive data needs secure cleanup
   - Memory limits should be enforced
   - Large objects require streaming
   - Garbage collection timing matters

Dependencies:
- requests: HTTP client with security features
- pathlib: Secure path operations
- json: Data serialization
- logging: Security event tracking
- dotenv: Environment management
- hashlib: Cryptographic operations

"""

from datetime import datetime, time, timedelta
import json
import os
from pathlib import Path
from dotenv import load_dotenv
import requests
from requests.exceptions import RequestException
import logging
from typing import List, Dict, Optional, Any
import hashlib
from urllib.parse import urljoin

# Configure logging with proper format for better debugging and monitoring
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

# Constants for API configuration and request handling
CACHE_DIR = Path("cache")
COINGECKO_BASE_URL = "https://api.coingecko.com/api/v3/"
DEFAULT_TIMEOUT = 10  # Timeout for API requests in seconds
MAX_RETRIES = 3      # Maximum number of retry attempts for failed requests


[docs] class SecurityError(Exception): """Custom exception for security-related errors""" pass
[docs] def initialize_environment() -> Dict: """ Initialize and validate environment variables Returns: Dict: Configuration dictionary containing validated environment variables Raises: EnvironmentError: If required environment variables are missing SecurityError: If environment variables contain invalid values """ load_dotenv() required_vars = ['COINGECKO_API_KEY'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: raise EnvironmentError( f"Missing required environment variables: { ', '.join(missing_vars)}" ) # Validate API key format api_key = os.getenv('COINGECKO_API_KEY') if not isinstance(api_key, str) or len(api_key) < 16: raise SecurityError("Invalid API key format") # Validate cache duration cache_duration = os.getenv('CACHE_DURATION', '30') # 30 minutes default try: cache_duration = int(cache_duration) if cache_duration < 0 or cache_duration > 1440: # Max 24 hours raise ValueError except ValueError: raise SecurityError("Invalid cache duration value") return { 'CACHE_DURATION': cache_duration, 'COINGECKO_API_KEY': api_key }
[docs] class CryptoCache: """ A cryptocurrency data caching system that provides secure data retrieval and storage. Implements rate limiting, input validation, and proper error handling. """
[docs] def __init__(self): """ Initialize CryptoCache with configuration from environment variables Raises: SecurityError: If security requirements are not met EnvironmentError: If required configuration is missing """ try: config = initialize_environment() self.api_key = config['COINGECKO_API_KEY'] self.cache_duration = timedelta(minutes=config['CACHE_DURATION']) self.cache_dir = CACHE_DIR self._secure_cache_directory() self.cache_file = self.cache_dir / "crypto_cache.json" self._load_cache() logger.info("CryptoCache initialized successfully") except Exception as e: logger.error(f"Failed to initialize CryptoCache: {str(e)}") raise
def _validate_cache_data(self, data: Any) -> bool: """ Validate cached data for security and integrity Args: data: Data to validate Returns: bool: True if data is valid, False otherwise """ try: # Verifica che i dati non siano None if data is None: return False # Verifica il tipo di dato basandosi sui possibili valori di ritorno dei metodi if isinstance(data, list): # Validazione per get_available_cryptocurrencies return all( isinstance(item, dict) and all(key in item for key in [ 'id', 'symbol', 'name', 'current_price']) for item in data ) elif isinstance(data, dict): # Validazione per get_crypto_prices return all( isinstance(k, str) and isinstance(v, dict) and all(isinstance(price, (int, float)) for price in v.values()) for k, v in data.items() ) elif isinstance(data, (int, float)): # Validazione per get_exchange_rate return data > 0 return False except Exception as e: logger.error(f"Cache validation error: {str(e)}") return False def _secure_cache_directory(self) -> None: """ Ensure cache directory exists and is properly configured Raises: SecurityError: If directory cannot be created or accessed """ try: if not self.cache_dir.exists(): self.cache_dir.mkdir(parents=True, exist_ok=True) logger.info("Cache directory created successfully") except Exception as e: logger.error(f"Cache directory error: {str(e)}") raise def _load_cache(self) -> None: """ Load and validate cache data from file Raises: SecurityError: If cache data is invalid or cannot be loaded """ try: if self.cache_file.exists(): with open(self.cache_file, 'r') as f: cache_data = json.load(f) # Validate cache structure if not isinstance(cache_data, dict): raise SecurityError("Invalid cache data structure") self.data = cache_data.get('data', {}) self.timestamp = datetime.fromisoformat( cache_data.get('timestamp', '2000-01-01') ) else: # Initialize new cache with open(self.cache_file, 'w') as f: json.dump({ 'data': {}, 'timestamp': datetime.min.isoformat() }, f) self.data = {} self.timestamp = datetime.min logger.info("Cache loaded successfully") except Exception as e: logger.error(f"Cache loading error: {str(e)}") self.data = {} self.timestamp = datetime.min def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> Dict: """ Make secure API request with rate limiting and error handling Args: endpoint (str): API endpoint to call params (Dict, optional): Query parameters for the request Returns: Dict: API response data Raises: RequestException: If request fails after retries SecurityError: If response validation fails """ if not isinstance(endpoint, str) or not endpoint: raise SecurityError("Invalid endpoint") if params is not None and not isinstance(params, dict): raise SecurityError("Invalid parameters") url = urljoin(COINGECKO_BASE_URL, endpoint) headers = { 'X-CG-Demo-Api-Key': self.api_key, 'Accept': 'application/json', 'User-Agent': 'CryptoCache/1.0' } for attempt in range(MAX_RETRIES): try: response = requests.get( url, params=params, headers=headers, timeout=DEFAULT_TIMEOUT ) if response.status_code == 429: retry_after = min( int(response.headers.get('Retry-After', 60)), 300 # Max 5 minute wait ) logger.warning( f"Rate limit reached. Waiting {retry_after}s") time.sleep(retry_after) continue response.raise_for_status() return response.json() except RequestException as e: logger.error(f"Request failed (attempt { attempt + 1}/{MAX_RETRIES}): {str(e)}") if attempt == MAX_RETRIES - 1: raise # Exponential backoff with max 30s time.sleep(min(2 ** attempt, 30))
[docs] def get_available_cryptocurrencies(self) -> List[Dict]: """ Retrieve list of available cryptocurrencies with market data Returns: List[Dict]: List of cryptocurrency information including: - id: Unique identifier - symbol: Trading symbol (uppercase) - name: Full name - current_price: Current price in USD Raises: RequestException: If API request fails SecurityError: If data validation fails """ cached_data = self.get('available_cryptocurrencies') if cached_data: return cached_data combined_cryptos = [] for page in range(1, 3): params = { 'vs_currency': 'usd', 'order': 'market_cap_desc', 'per_page': 100, 'page': page, 'sparkline': False, 'precision': 15 } result = self._make_request('coins/markets', params) if result: formatted_cryptos = [{ 'id': crypto['id'], 'symbol': crypto['symbol'].upper(), 'name': crypto['name'], 'current_price': crypto['current_price'] } for crypto in result] combined_cryptos.extend(formatted_cryptos) if combined_cryptos: self.set('available_cryptocurrencies', combined_cryptos) return combined_cryptos return cached_data or []
[docs] def get_crypto_prices(self, crypto_ids: List[str], currency: str = 'USD') -> Dict: """ Get current cryptocurrency prices for specified currencies Args: crypto_ids (List[str]): List of cryptocurrency IDs to fetch currency (str): Target currency for prices (default: USD) Returns: Dict: Dictionary mapping crypto IDs to their prices in the specified currency Format: { 'bitcoin': {'usd': 50000.00}, 'ethereum': {'usd': 3000.00} } Raises: SecurityError: If input validation fails RequestException: If API request fails """ try: if not isinstance(crypto_ids, list) or not crypto_ids: logger.error("Invalid crypto_ids provided") return {} currency = currency.lower() cache_key = f"prices_{','.join(sorted(crypto_ids))}_{currency}" cache_key = hashlib.sha256(cache_key.encode()).hexdigest() cached_data = self.get(cache_key) if cached_data and isinstance(cached_data, dict): return cached_data params = { 'ids': ','.join(crypto_ids), 'vs_currencies': currency } result = self._make_request('simple/price', params) if not isinstance(result, dict): logger.error(f"Invalid API response format: {result}") return {} formatted_result = {} for crypto_id in crypto_ids: if crypto_id in result: crypto_data = result[crypto_id] if isinstance(crypto_data, dict): formatted_result[crypto_id] = { currency: crypto_data.get(currency, 0) } else: formatted_result[crypto_id] = {currency: 0} logger.warning(f"Invalid price data for {crypto_id}") if formatted_result: self.set(cache_key, formatted_result) return formatted_result except Exception as e: logger.error(f"Error in get_crypto_prices: {str(e)}") return {}
[docs] def get_exchange_rate(self, from_currency: str = 'USD', to_currency: str = 'EUR') -> float: """ Get exchange rate between two currencies Args: from_currency (str): Source currency code (default: USD) to_currency (str): Target currency code (default: EUR) Returns: float: Exchange rate from source to target currency Returns 1.0 if request fails """ try: cache_key = f"exchange_rate_{from_currency}_{to_currency}" cached_rate = self.get(cache_key) if cached_rate is not None and isinstance(cached_rate, (int, float)): return float(cached_rate) response = requests.get( f'https://api.exchangerate-api.com/v4/latest/{ from_currency.upper()}', timeout=DEFAULT_TIMEOUT ) response.raise_for_status() rates = response.json()['rates'] rate = float(rates.get(to_currency.upper(), 1.0)) self.set(cache_key, rate) return rate except (RequestException, ValueError, KeyError) as e: logger.error(f"Exchange rate error: {str(e)}") return 1.0
[docs] def get(self, key: str) -> Optional[Dict]: """ Retrieve and validate cached data Args: key (str): Cache key to retrieve Returns: Optional[Dict]: Cached value if valid and not expired, None otherwise """ try: if (datetime.now() - self.timestamp) < self.cache_duration: cached_value = self.data.get(key) if cached_value and self._validate_cache_data(cached_value): return cached_value return None except Exception as e: logger.error(f"Error retrieving from cache: {str(e)}") return None
[docs] def set(self, key: str, value: any) -> None: """ Store data in cache Args: key (str): Cache key value: Value to cache Raises: SecurityError: If cache write fails """ self.data[key] = value self.timestamp = datetime.now() try: cache_data = { 'data': self.data, 'timestamp': self.timestamp.isoformat() } with open(self.cache_file, 'w') as f: json.dump(cache_data, f) except Exception as e: logger.error(f"Error saving cache: {str(e)}")