Source code for security.rate_limiter

"""
FirebaseRateLimiter: Distributed Rate Limiting Implementation
Version: 1.0
Author: [Gabriel Cellammare]
Last Modified: [05/01/2025]

This module provides a distributed rate limiting solution using Firebase Firestore:
- Scalable rate limiting across multiple instances
- Automatic cleanup of expired entries
- Transaction-based atomic operations
- Probabilistic maintenance

Security Features:
1. Transaction-based updates
2. Automatic data cleanup
3. Error handling and logging
4. Atomic operations
5. Time-window based limiting

Dependencies:
- firebase_admin.firestore
- flask.current_app
- logging
- time
- random
"""

import logging
import random
import time
from typing import Tuple
from flask import current_app
from firebase_admin import firestore


[docs] class FirebaseRateLimitCleaner: """ Cleanup Manager for Rate Limiting Data Handles the periodic cleanup of expired rate limiting entries: - Batch processing to avoid timeouts - Configurable window sizes - Error handling and logging Security Features: - Batched operations - Error isolation - Logging of operations """
[docs] def __init__(self, db, collection_name: str = 'rate_limits'): """ Initialize the rate limit cleaner. Args: db: Firestore client instance collection_name: Collection name for rate limits Security: - Validates inputs - Configures logging """ self.db = db self.collection_name = collection_name self.logger = logging.getLogger(__name__)
[docs] def clean_expired_entries(self, window_seconds: int = 3600, batch_size: int = 500) -> int: """ Remove expired rate limit entries. Args: window_seconds: Time window in seconds batch_size: Number of documents per batch Returns: int: Number of deleted documents Security: - Batched operations - Transaction safety - Error handling """ current_time = int(time.time()) cutoff_time = current_time - window_seconds deleted_count = 0 try: query = (self.db.collection(self.collection_name) .where('window_start', '<', cutoff_time) .limit(batch_size)) while True: docs = query.stream() batch = self.db.batch() batch_deletions = 0 for doc in docs: batch.delete(doc.reference) batch_deletions += 1 if batch_deletions == 0: break batch.commit() deleted_count += batch_deletions self.logger.info( f"Deleted {batch_deletions} expired documents") if batch_deletions < batch_size: break self.logger.info( f"Cleanup completed. Total documents deleted: {deleted_count}") return deleted_count except Exception as e: self.logger.error(f"Cleanup error: {str(e)}") raise
[docs] class FirebaseRateLimiter:
[docs] def __init__(self, db, max_requests: int = 100, window_seconds: int = 3600, ip_max_requests: int = 1000, ip_window_seconds: int = 3600, cleanup_probability: float = 0.001): """ Initialize rate limiter. Args: db: Firestore database instance max_requests: Maximum requests per user window window_seconds: Time window for user limits ip_max_requests: Maximum requests per IP window ip_window_seconds: Time window for IP limits Security: - Input validation - Configuration logging - Cleanup initialization """ self.db = db self.max_requests = max_requests self.window_seconds = window_seconds self.ip_max_requests = ip_max_requests self.ip_window_seconds = ip_window_seconds self.collection_name = 'rate_limits' self.cleanup_probability = cleanup_probability self.cleaner = FirebaseRateLimitCleaner(db, self.collection_name)
[docs] def maybe_cleanup(self): """ Probabilistic cleanup execution. Security: - Error isolation - Logging - Non-blocking operation """ if random.random() < self.cleanup_probability: try: self.cleaner.clean_expired_entries(self.window_seconds) except Exception as e: logging.warning(f"Inline cleanup error: {str(e)}")
[docs] def check_rate_limit(self, user_id: str, ip_address) -> Tuple[bool, int, int]: # First check IP-based limits ip_allowed, ip_remaining, ip_retry = self._check_ip_limit(ip_address) if not ip_allowed: return False, ip_remaining, ip_retry # Then check user-based limits user_allowed, user_remaining, user_retry = self._check_user_limit( user_id) if not user_allowed: return False, user_remaining, user_retry # Return the more restrictive remaining count remaining = min(ip_remaining, user_remaining) return True, remaining, 0
def _check_user_limit(self, user_id): """ Check and update rate limit for user. Args: user_id: User identifier Returns: Tuple[bool, int, int]: (is_allowed, remaining_requests, retry_after) Security: - Transaction safety - Time window validation - Error handling - Atomic updates """ current_app.logger.info(f"Rate limit check for user: {user_id}") self.maybe_cleanup() current_time = int(time.time()) doc_ref = self.db.collection(self.collection_name).document(user_id) transaction = self.db.transaction() @firestore.transactional def update_rate_limit(transaction): doc = doc_ref.get(transaction=transaction) if not doc.exists: data = { 'count': 1, 'window_start': current_time, 'last_request': current_time } transaction.set(doc_ref, data) return True, self.max_requests - 1, 0 data = doc.to_dict() window_start = data['window_start'] if current_time - window_start >= self.window_seconds: data = { 'count': 1, 'window_start': current_time, 'last_request': current_time } transaction.set(doc_ref, data) return True, self.max_requests - 1, 0 if data['count'] >= self.max_requests: retry_after = window_start + self.window_seconds - current_time return False, 0, retry_after data['count'] += 1 data['last_request'] = current_time transaction.update(doc_ref, data) return True, self.max_requests - data['count'], 0 try: return update_rate_limit(transaction) except Exception as e: current_app.logger.error(f"Rate limit error: {str(e)}") return True, 0, 0 def _check_ip_limit(self, ip_address: str) -> Tuple[bool, int, int]: """Check rate limit for an IP address.""" current_app.logger.info(f"Rate limit check for ip: {ip_address}") self.maybe_cleanup() current_time = int(time.time()) doc_ref = self.db.collection( self.collection_name).document(f'ip_{ip_address}') transaction = self.db.transaction() @firestore.transactional def update_ip_limit(transaction): doc = doc_ref.get(transaction=transaction) if not doc.exists: data = { 'count': 1, 'window_start': current_time, 'last_request': current_time } transaction.set(doc_ref, data) return True, self.ip_max_requests - 1, 0 data = doc.to_dict() window_start = data['window_start'] if current_time - window_start >= self.ip_window_seconds: data = { 'count': 1, 'window_start': current_time, 'last_request': current_time } transaction.set(doc_ref, data) return True, self.ip_max_requests - 1, 0 if data['count'] >= self.ip_max_requests: retry_after = window_start + self.ip_window_seconds - current_time return False, 0, retry_after data['count'] += 1 data['last_request'] = current_time transaction.update(doc_ref, data) return True, self.ip_max_requests - data['count'], 0 return update_ip_limit(transaction)