Observability and Monitoring - Deep Dive
This guide covers comprehensive observability practices for modern distributed systems, including the three pillars of observability, monitoring strategies, and practical implementation techniques.
Part 1: The Three Pillars of Observability
What is Observability?
Definition: Observability is the ability to understand the internal state of a system by examining its external outputs. It goes beyond traditional monitoring by enabling you to ask arbitrary questions about your system's behavior.
The Three Pillars:
- Metrics - Numerical data aggregated over time
 - Logs - Discrete events with contextual information
 - Traces - Request flows through distributed systems
 
1. Metrics - The Foundation
Types of Metrics:
// Counter - Always increasing values
var (
    httpRequestsTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "http_requests_total",
            Help: "Total number of HTTP requests",
        },
        []string{"method", "endpoint", "status"},
    )
)
// Gauge - Current value that can go up or down
var (
    activeConnections = prometheus.NewGauge(
        prometheus.GaugeOpts{
            Name: "active_connections",
            Help: "Number of active database connections",
        },
    )
)
// Histogram - Distribution of values
var (
    requestDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "http_request_duration_seconds",
            Help: "HTTP request duration in seconds",
            Buckets: prometheus.DefBuckets,
        },
        []string{"method", "endpoint"},
    )
)
// Summary - Similar to histogram but calculates quantiles
var (
    requestSize = prometheus.NewSummaryVec(
        prometheus.SummaryOpts{
            Name: "http_request_size_bytes",
            Help: "HTTP request size in bytes",
            Objectives: map[float64]float64{
                0.5:  0.05,  // 50th percentile with 5% error
                0.9:  0.01,  // 90th percentile with 1% error
                0.99: 0.001, // 99th percentile with 0.1% error
            },
        },
        []string{"method"},
    )
)
Implementation Example:
// Comprehensive metrics instrumentation
package main
import (
    "context"
    "fmt"
    "net/http"
    "strconv"
    "time"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)
type MetricsMiddleware struct {
    requestsTotal    *prometheus.CounterVec
    requestDuration  *prometheus.HistogramVec
    requestsInFlight prometheus.Gauge
}
func NewMetricsMiddleware() *MetricsMiddleware {
    m := &MetricsMiddleware{
        requestsTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "http_requests_total",
                Help: "Total HTTP requests processed",
            },
            []string{"method", "endpoint", "status"},
        ),
        requestDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "http_request_duration_seconds",
                Help:    "HTTP request duration",
                Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2.5, 5, 10},
            },
            []string{"method", "endpoint"},
        ),
        requestsInFlight: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "http_requests_in_flight",
                Help: "Current number of HTTP requests being processed",
            },
        ),
    }
    // Register metrics
    prometheus.MustRegister(m.requestsTotal)
    prometheus.MustRegister(m.requestDuration)
    prometheus.MustRegister(m.requestsInFlight)
    return m
}
func (m *MetricsMiddleware) Handler(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        m.requestsInFlight.Inc()
        defer m.requestsInFlight.Dec()
        // Wrap response writer to capture status code
        wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}
        next.ServeHTTP(wrapped, r)
        duration := time.Since(start).Seconds()
        statusCode := strconv.Itoa(wrapped.statusCode)
        m.requestsTotal.WithLabelValues(r.Method, r.URL.Path, statusCode).Inc()
        m.requestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
    })
}
type responseWriter struct {
    http.ResponseWriter
    statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}
2. Structured Logging - Context and Events
Logging Best Practices:
// Structured logging with context
package main
import (
    "context"
    "time"
    "github.com/sirupsen/logrus"
    "go.opentelemetry.io/otel/trace"
)
// Logger with consistent structure
type AppLogger struct {
    logger *logrus.Logger
}
func NewAppLogger() *AppLogger {
    logger := logrus.New()
    logger.SetFormatter(&logrus.JSONFormatter{
        TimestampFormat: time.RFC3339Nano,
        FieldMap: logrus.FieldMap{
            logrus.FieldKeyTime:  "timestamp",
            logrus.FieldKeyLevel: "level",
            logrus.FieldKeyMsg:   "message",
        },
    })
    return &AppLogger{logger: logger}
}
func (l *AppLogger) WithContext(ctx context.Context) *logrus.Entry {
    entry := l.logger.WithFields(logrus.Fields{})
    // Add trace information if available
    span := trace.SpanFromContext(ctx)
    if span.SpanContext().IsValid() {
        entry = entry.WithFields(logrus.Fields{
            "trace_id": span.SpanContext().TraceID().String(),
            "span_id":  span.SpanContext().SpanID().String(),
        })
    }
    // Add user context if available
    if userID := ctx.Value("user_id"); userID != nil {
        entry = entry.WithField("user_id", userID)
    }
    // Add request ID if available
    if requestID := ctx.Value("request_id"); requestID != nil {
        entry = entry.WithField("request_id", requestID)
    }
    return entry
}
// Usage examples
func (l *AppLogger) LogUserAction(ctx context.Context, action string, details map[string]interface{}) {
    l.WithContext(ctx).WithFields(logrus.Fields{
        "action":  action,
        "details": details,
        "component": "user_service",
    }).Info("User action performed")
}
func (l *AppLogger) LogError(ctx context.Context, err error, component string) {
    l.WithContext(ctx).WithFields(logrus.Fields{
        "error":     err.Error(),
        "component": component,
    }).Error("Operation failed")
}
func (l *AppLogger) LogPerformance(ctx context.Context, operation string, duration time.Duration, metadata map[string]interface{}) {
    l.WithContext(ctx).WithFields(logrus.Fields{
        "operation": operation,
        "duration_ms": duration.Milliseconds(),
        "metadata": metadata,
        "component": "performance",
    }).Info("Performance metric recorded")
}
Advanced Logging Patterns:
# Python structured logging with correlation IDs
import logging
import json
import time
import uuid
from contextlib import contextmanager
from contextvars import ContextVar
# Context variables for request tracking
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
class StructuredFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            'timestamp': time.time(),
            'level': record.levelname,
            'message': record.getMessage(),
            'logger': record.name,
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
        }
        # Add context variables
        if request_id_var.get():
            log_entry['request_id'] = request_id_var.get()
        if user_id_var.get():
            log_entry['user_id'] = user_id_var.get()
        if trace_id_var.get():
            log_entry['trace_id'] = trace_id_var.get()
        # Add any extra fields
        if hasattr(record, 'extra_fields'):
            log_entry.update(record.extra_fields)
        return json.dumps(log_entry)
def setup_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(StructuredFormatter())
    logger.addHandler(handler)
    return logger
# Context manager for request tracking
@contextmanager
def request_context(user_id=None):
    request_id = str(uuid.uuid4())
    request_id_token = request_id_var.set(request_id)
    user_id_token = None
    if user_id:
        user_id_token = user_id_var.set(user_id)
    try:
        yield request_id
    finally:
        request_id_var.reset(request_id_token)
        if user_id_token:
            user_id_var.reset(user_id_token)
# Usage
logger = setup_logger(__name__)
def process_user_request(user_id: str, action: str):
    with request_context(user_id=user_id):
        logger.info("Processing user request", extra={
            'extra_fields': {
                'action': action,
                'component': 'user_service'
            }
        })
        try:
            # Business logic here
            result = perform_action(action)
            logger.info("Request completed successfully", extra={
                'extra_fields': {
                    'action': action,
                    'result_count': len(result),
                    'component': 'user_service'
                }
            })
        except Exception as e:
            logger.error("Request failed", extra={
                'extra_fields': {
                    'action': action,
                    'error': str(e),
                    'error_type': type(e).__name__,
                    'component': 'user_service'
                }
            })
            raise
3. Distributed Tracing - Request Flow Visibility
OpenTelemetry Implementation:
// Comprehensive distributed tracing setup
package main
import (
    "context"
    "fmt"
    "time"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    oteltrace "go.opentelemetry.io/otel/trace"
)
// Initialize tracing
func initTracing(serviceName string) func() {
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
    if err != nil {
        panic(fmt.Sprintf("failed to initialize jaeger exporter: %v", err))
    }
    tp := trace.NewTracerProvider(
        trace.WithBatcher(exp),
        trace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String(serviceName),
            semconv.ServiceVersionKey.String("v1.0.0"),
            attribute.String("environment", "production"),
        )),
    )
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))
    return func() { _ = tp.Shutdown(context.Background()) }
}
// Service layer with tracing
type UserService struct {
    tracer   oteltrace.Tracer
    database DatabaseInterface
    cache    CacheInterface
}
func NewUserService(db DatabaseInterface, cache CacheInterface) *UserService {
    return &UserService{
        tracer:   otel.Tracer("user-service"),
        database: db,
        cache:    cache,
    }
}
func (s *UserService) GetUser(ctx context.Context, userID string) (*User, error) {
    ctx, span := s.tracer.Start(ctx, "UserService.GetUser")
    defer span.End()
    // Add attributes to span
    span.SetAttributes(
        attribute.String("user.id", userID),
        attribute.String("operation", "get_user"),
    )
    // Check cache first
    user, err := s.getUserFromCache(ctx, userID)
    if err == nil {
        span.SetAttributes(attribute.Bool("cache.hit", true))
        return user, nil
    }
    span.SetAttributes(attribute.Bool("cache.hit", false))
    // Fallback to database
    user, err = s.getUserFromDatabase(ctx, userID)
    if err != nil {
        span.RecordError(err)
        span.SetAttributes(attribute.String("error.type", "database_error"))
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    // Cache the result
    go s.cacheUser(context.Background(), user)
    span.SetAttributes(attribute.Bool("user.found", user != nil))
    return user, nil
}
func (s *UserService) getUserFromCache(ctx context.Context, userID string) (*User, error) {
    ctx, span := s.tracer.Start(ctx, "UserService.getUserFromCache")
    defer span.End()
    span.SetAttributes(
        attribute.String("user.id", userID),
        attribute.String("cache.operation", "get"),
    )
    return s.cache.Get(ctx, userID)
}
func (s *UserService) getUserFromDatabase(ctx context.Context, userID string) (*User, error) {
    ctx, span := s.tracer.Start(ctx, "UserService.getUserFromDatabase")
    defer span.End()
    span.SetAttributes(
        attribute.String("user.id", userID),
        attribute.String("db.operation", "select"),
        attribute.String("db.table", "users"),
    )
    start := time.Now()
    user, err := s.database.GetUser(ctx, userID)
    span.SetAttributes(attribute.Int64("db.duration_ms", time.Since(start).Milliseconds()))
    return user, err
}
// HTTP handler with tracing
func (s *UserService) HandleGetUser(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    userID := r.URL.Query().Get("user_id")
    ctx, span := s.tracer.Start(ctx, "HTTP GET /user")
    defer span.End()
    span.SetAttributes(
        attribute.String("http.method", r.Method),
        attribute.String("http.url", r.URL.String()),
        attribute.String("http.user_agent", r.UserAgent()),
        attribute.String("user.id", userID),
    )
    if userID == "" {
        span.SetAttributes(attribute.Int("http.status_code", 400))
        http.Error(w, "user_id is required", http.StatusBadRequest)
        return
    }
    user, err := s.GetUser(ctx, userID)
    if err != nil {
        span.RecordError(err)
        span.SetAttributes(
            attribute.Int("http.status_code", 500),
            attribute.String("error.message", err.Error()),
        )
        http.Error(w, "Internal server error", http.StatusInternalServerError)
        return
    }
    span.SetAttributes(attribute.Int("http.status_code", 200))
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(user)
}
Part 2: Advanced Monitoring Strategies
Performance Optimizations for High-Volume Metrics Collection
Problem: Traditional metrics collection can become a bottleneck in high-throughput systems due to memory allocations and inefficient data structures.
Solution: Use object pools and optimized data structures:
// BEFORE - Inefficient approach
type SlowMetricsCollector struct {
    metrics []map[string]interface{} // Creates many small allocations
}
func (smc *SlowMetricsCollector) AddMetric(name string, value float64) {
    // Creates new map for each metric - expensive
    metric := map[string]interface{}{
        "name":      name,
        "value":     value,
        "timestamp": time.Now().Unix(),
    }
    smc.metrics = append(smc.metrics, metric)
}
// AFTER - Use object pools and typed structs
import (
    "sync"
    "time"
)
type MetricsCollector struct {
    metricPool sync.Pool
    metrics    []Metric
    mu         sync.RWMutex
}
type Metric struct {
    Name      string
    Value     float64
    Timestamp int64
}
func NewMetricsCollector() *MetricsCollector {
    mc := &MetricsCollector{
        metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
    }
    mc.metricPool = sync.Pool{
        New: func() interface{} {
            return &Metric{}
        },
    }
    return mc
}
func (mc *MetricsCollector) AddMetric(name string, value float64) {
    metric := mc.metricPool.Get().(*Metric)
    metric.Name = name
    metric.Value = value
    metric.Timestamp = time.Now().Unix()
    mc.mu.Lock()
    mc.metrics = append(mc.metrics, *metric)
    mc.mu.Unlock()
    // Return to pool for reuse
    mc.metricPool.Put(metric)
}
func (mc *MetricsCollector) GetMetrics() []Metric {
    mc.mu.RLock()
    defer mc.mu.RUnlock()
    // Return copy to avoid race conditions
    result := make([]Metric, len(mc.metrics))
    copy(result, mc.metrics)
    return result
}
func (mc *MetricsCollector) Reset() {
    mc.mu.Lock()
    mc.metrics = mc.metrics[:0] // Keep underlying array, reset length
    mc.mu.Unlock()
}
Continuous CPU Profiling Integration
Implementation for production monitoring:
// CPU profiling integration
import (
    "fmt"
    "log"
    "os"
    "runtime/pprof"
    "time"
)
func enableContinuousProfiling() {
    // Enable continuous CPU profiling
    if os.Getenv("ENABLE_PROFILING") == "true" {
        go func() {
            for {
                f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
                if err != nil {
                    log.Printf("Could not create CPU profile: %v", err)
                    time.Sleep(30 * time.Second)
                    continue
                }
                pprof.StartCPUProfile(f)
                time.Sleep(30 * time.Second)
                pprof.StopCPUProfile()
                f.Close()
                // Upload to object storage for analysis
                uploadProfile(f.Name())
            }
        }()
    }
}
// Helper function to upload profiles to cloud storage
func uploadProfile(filename string) {
    // Implementation would upload to S3, GCS, etc.
    log.Printf("Uploading profile %s to cloud storage", filename)
    // Clean up local file after upload
    defer os.Remove(filename)
}
// Memory profiling function
func captureMemoryProfile() {
    f, err := os.Create(fmt.Sprintf("mem-profile-%d.prof", time.Now().Unix()))
    if err != nil {
        log.Printf("Could not create memory profile: %v", err)
        return
    }
    defer f.Close()
    if err := pprof.WriteHeapProfile(f); err != nil {
        log.Printf("Could not write memory profile: %v", err)
        return
    }
    uploadProfile(f.Name())
}
Golden Signals Monitoring
The Four Golden Signals implementation:
// Golden Signals metrics implementation
type GoldenSignalsCollector struct {
    // Latency
    requestDuration *prometheus.HistogramVec
    // Traffic
    requestsPerSecond *prometheus.CounterVec
    // Errors
    errorRate *prometheus.CounterVec
    // Saturation
    resourceUtilization *prometheus.GaugeVec
}
func NewGoldenSignalsCollector() *GoldenSignalsCollector {
    return &GoldenSignalsCollector{
        requestDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name: "http_request_duration_seconds",
                Help: "Request latency in seconds",
                Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
            },
            []string{"method", "endpoint", "status_class"},
        ),
        requestsPerSecond: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "http_requests_total",
                Help: "Total HTTP requests",
            },
            []string{"method", "endpoint"},
        ),
        errorRate: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "http_errors_total",
                Help: "Total HTTP errors",
            },
            []string{"method", "endpoint", "status_code"},
        ),
        resourceUtilization: prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "resource_utilization_percent",
                Help: "Resource utilization percentage",
            },
            []string{"resource_type", "instance"},
        ),
    }
}
Part 3: Implementation Examples
Microservices Implementation:
// Metrics Collector Service (Go)
package main
import (
    "encoding/json"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/go-redis/redis/v8"
)
type MetricsCollector struct {
    kafka    *kafka.Producer
    redis    *redis.Client
    handlers map[string]MetricHandler
}
func (mc *MetricsCollector) CollectMetric(metric Metric) error {
    // Process metric
    processed := mc.handlers[metric.Type].Process(metric)
    // Publish to message queue for other services
    return mc.kafka.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &metric.Type,
            Partition: kafka.PartitionAny,
        },
        Value: processed.ToJSON(),
    }, nil)
}
# Data Processor Service (Python)
import asyncio
import json
from kafka import KafkaConsumer
from typing import Dict, Any
class DataProcessor:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'metrics-topic',
            bootstrap_servers=['kafka:9092'],
            group_id='data-processors',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
    async def process_metrics(self):
        for message in self.consumer:
            metric = Metric.from_dict(message.value)
            # Process and aggregate
            aggregated = await self.aggregate_metric(metric)
            # Store in time-series database
            await self.store_metric(aggregated)
            # Trigger alerts if needed
            await self.check_alerts(aggregated)
    async def aggregate_metric(self, metric: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for metric aggregation
        return {
            'name': metric['name'],
            'value': metric['value'],
            'timestamp': metric['timestamp'],
            'aggregation_window': '5m',
            'tags': metric.get('tags', {})
        }
    async def store_metric(self, metric: Dict[str, Any]):
        # Store in InfluxDB or similar time-series database
        pass
    async def check_alerts(self, metric: Dict[str, Any]):
        # Check alert thresholds and notify if needed
        pass
Monolithic Implementation:
# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
from typing import Dict, Any
class AnalyticsDashboard:
    def __init__(self):
        self.metrics = metrics.MetricsModule()
        self.processor = processing.ProcessingModule()
        self.api = api.APIModule()
        self.notifications = notifications.NotificationModule()
    def handle_metric(self, raw_metric: Dict[str, Any]) -> Dict[str, Any]:
        # All in same process - no network calls
        metric = self.metrics.parse(raw_metric)
        processed = self.processor.aggregate(metric)
        # Check for alerts
        if self.processor.check_thresholds(processed):
            self.notifications.send_alert(processed)
        return processed
    def get_dashboard_data(self, time_range: str) -> Dict[str, Any]:
        # Single process - can share data structures efficiently
        raw_data = self.metrics.get_data(time_range)
        processed_data = self.processor.prepare_dashboard_data(raw_data)
        return {
            'metrics': processed_data,
            'alerts': self.notifications.get_active_alerts(),
            'system_health': self.api.get_health_status()
        }
# Single deployment with clear module boundaries
# Can be extracted to separate services later
When to Choose Each Approach:
| Factor | Monolith | Microservices | 
|---|---|---|
| Team Size | < 10 developers | > 10 developers | 
| Complexity | Simple-medium | Complex domain | 
| Scale | < 1M requests/day | > 10M requests/day | 
| Technology | Single stack preferred | Multiple technologies needed | 
| Deployment | Weekly/monthly | Multiple times per day | 
| Data Consistency | Strong consistency needed | Eventual consistency OK | 
For Analytics Dashboard Specifically:
- Early Stage: Start with modular monolith
 - Growth Stage: Extract high-scale components (metrics collector) first
 - Mature Stage: Full microservices with proper DevOps practices
 
2. Database Sharding Implementation
Strong Answer: Library Analogy Explanation:
📚 Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout
📚📚📚 Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z
Each library (shard) operates independently but part of same system
Sharding Strategy for Analytics Dashboard:
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List, Optional
import concurrent.futures
class DatabaseSharding:
    def __init__(self):
        self.shards = {
            'shard_americas': {
                'host': 'db-americas.example.com',
                'regions': ['us', 'ca', 'mx', 'br'],
                'connection': self.create_connection('db-americas')
            },
            'shard_europe': {
                'host': 'db-europe.example.com',
                'regions': ['uk', 'de', 'fr', 'es'],
                'connection': self.create_connection('db-europe')
            },
            'shard_asia': {
                'host': 'db-asia.example.com',
                'regions': ['jp', 'sg', 'au', 'in'],
                'connection': self.create_connection('db-asia')
            }
        }
        # Time-based sharding for metrics
        self.time_shards = {
            'metrics_current': 'Last 7 days - hot data',
            'metrics_recent': 'Last 30 days - warm data',
            'metrics_archive': 'Older than 30 days - cold data'
        }
    def get_user_shard(self, user_id: str) -> str:
        """Determine shard based on user ID hash"""
        hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        shard_index = hash_value % len(self.shards)
        return list(self.shards.keys())[shard_index]
    def get_region_shard(self, region: str) -> str:
        """Determine shard based on geographical region"""
        for shard_name, shard_info in self.shards.items():
            if region.lower() in shard_info['regions']:
                return shard_name
        return 'shard_americas'  # Default fallback
    def get_time_shard(self, timestamp: datetime.datetime) -> str:
        """Determine shard based on data age"""
        now = datetime.datetime.now()
        age = now - timestamp
        if age.days <= 7:
            return 'metrics_current'
        elif age.days <= 30:
            return 'metrics_recent'
        else:
            return 'metrics_archive'
    def route_query(self, query_type: str, **kwargs):
        """Route queries to appropriate shard"""
        if query_type == 'user_orders':
            shard = self.get_user_shard(kwargs['user_id'])
            return self.execute_query(shard, query_type, **kwargs)
        elif query_type == 'regional_metrics':
            shard = self.get_region_shard(kwargs['region'])
            return self.execute_query(shard, query_type, **kwargs)
        elif query_type == 'historical_data':
            # Query multiple time shards and aggregate
            return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])
        elif query_type == 'cross_shard_analytics':
            # Fan-out query to all shards
            return self.fan_out_query(query_type, **kwargs)
    def query_time_shards(self, start_date, end_date):
        """Query across time-based shards"""
        results = []
        for shard_name in self.time_shards.keys():
            try {
                shard_result = self.execute_query(shard_name, 'time_range_query',
                                                start_date=start_date, end_date=end_date)
                results.extend(shard_result)
            } catch (ShardUnavailableError) {
                // Handle shard failures gracefully
                this.log_shard_failure(shard_name)
                continue
            }
        return this.aggregate_results(results)
    def fan_out_query(self, query_type: str, **kwargs):
        """Execute query across all shards and aggregate results"""
        results = {}
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
            # Submit queries to all shards concurrently
            future_to_shard = {
                executor.submit(this.execute_query, shard_name, query_type, **kwargs): shard_name
                for shard_name in this.shards.keys()
            }
            for future in concurrent.futures.as_completed(future_to_shard):
                shard_name = future_to_shard[future]
                try {
                    result = future.result(timeout=30)  # 30 second timeout
                    results[shard_name] = result
                } catch (Exception e) {
                    this.log_query_failure(shard_name, e)
                    results[shard_name] = None
                }
        return this.aggregate_cross_shard_results(results)
    def create_connection(self, shard_name: str):
        """Create database connection for shard"""
        # Implementation would create actual DB connection
        pass
    def execute_query(self, shard_name: str, query_type: str, **kwargs):
        """Execute query on specific shard"""
        # Implementation would execute actual query
        pass
    def log_shard_failure(self, shard_name: str):
        """Log shard failure for monitoring"""
        pass
    def log_query_failure(self, shard_name: str, error: Exception):
        """Log query failure for monitoring"""
        pass
    def aggregate_results(self, results: List):
        """Aggregate results from multiple shards"""
        pass
    def aggregate_cross_shard_results(self, results: Dict):
        """Aggregate results from cross-shard queries"""
        pass
# Shard-aware query examples
class ShardedAnalyticsQueries:
    def __init__(self, sharding: DatabaseSharding):
        self.sharding = sharding
    def get_user_orders(self, user_id: str):
        """Get orders for specific user"""
        return self.sharding.route_query('user_orders', user_id=user_id)
    def get_regional_sales(self, region: str, date_range: tuple):
        """Get sales data for specific region"""
        return self.sharding.route_query('regional_metrics',
                                       region=region,
                                       start_date=date_range[0],
                                       end_date=date_range[1])
    def get_global_metrics(self, metric_type: str):
        """Get global metrics across all shards"""
        return self.sharding.route_query('cross_shard_analytics',
                                       metric_type=metric_type)
    def get_historical_trends(self, days_back: int):
        """Get historical data across time shards"""
        end_date = datetime.datetime.now()
        start_date = end_date - datetime.timedelta(days=days_back)
        return self.sharding.query_time_shards(start_date, end_date)
Sharding Implementation with PostgreSQL:
-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 2: Europe
CREATE TABLE orders_europe (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 3: Asia
CREATE TABLE orders_asia (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_europe
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');
CREATE USER MAPPING FOR postgres
    SERVER shard_europe
    OPTIONS (user 'analytics_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
    id UUID,
    user_id UUID,
    region VARCHAR(2),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');
-- View for cross-shard queries
CREATE VIEW orders_global AS
    SELECT 'americas' as shard, * FROM orders_americas
    UNION ALL
    SELECT 'europe' as shard, * FROM orders_europe_remote
    UNION ALL
    SELECT 'asia' as shard, * FROM orders_asia_remote;