Skip to main content

Observability and Monitoring - Deep Dive

This guide covers comprehensive observability practices for modern distributed systems, including the three pillars of observability, monitoring strategies, and practical implementation techniques.

Part 1: The Three Pillars of Observability

What is Observability?

Definition: Observability is the ability to understand the internal state of a system by examining its external outputs. It goes beyond traditional monitoring by enabling you to ask arbitrary questions about your system's behavior.

The Three Pillars:

  1. Metrics - Numerical data aggregated over time
  2. Logs - Discrete events with contextual information
  3. Traces - Request flows through distributed systems

1. Metrics - The Foundation

Types of Metrics:

// Counter - Always increasing values
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
)

// Gauge - Current value that can go up or down
var (
activeConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_connections",
Help: "Number of active database connections",
},
)
)

// Histogram - Distribution of values
var (
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
)
)

// Summary - Similar to histogram but calculates quantiles
var (
requestSize = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "HTTP request size in bytes",
Objectives: map[float64]float64{
0.5: 0.05, // 50th percentile with 5% error
0.9: 0.01, // 90th percentile with 1% error
0.99: 0.001, // 99th percentile with 0.1% error
},
},
[]string{"method"},
)
)

Implementation Example:

// Comprehensive metrics instrumentation
package main

import (
"context"
"fmt"
"net/http"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type MetricsMiddleware struct {
requestsTotal *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
requestsInFlight prometheus.Gauge
}

func NewMetricsMiddleware() *MetricsMiddleware {
m := &MetricsMiddleware{
requestsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total HTTP requests processed",
},
[]string{"method", "endpoint", "status"},
),
requestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration",
Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2.5, 5, 10},
},
[]string{"method", "endpoint"},
),
requestsInFlight: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "http_requests_in_flight",
Help: "Current number of HTTP requests being processed",
},
),
}

// Register metrics
prometheus.MustRegister(m.requestsTotal)
prometheus.MustRegister(m.requestDuration)
prometheus.MustRegister(m.requestsInFlight)

return m
}

func (m *MetricsMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
m.requestsInFlight.Inc()
defer m.requestsInFlight.Dec()

// Wrap response writer to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}

next.ServeHTTP(wrapped, r)

duration := time.Since(start).Seconds()
statusCode := strconv.Itoa(wrapped.statusCode)

m.requestsTotal.WithLabelValues(r.Method, r.URL.Path, statusCode).Inc()
m.requestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
})
}

type responseWriter struct {
http.ResponseWriter
statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}

2. Structured Logging - Context and Events

Logging Best Practices:

// Structured logging with context
package main

import (
"context"
"time"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

// Logger with consistent structure
type AppLogger struct {
logger *logrus.Logger
}

func NewAppLogger() *AppLogger {
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "level",
logrus.FieldKeyMsg: "message",
},
})

return &AppLogger{logger: logger}
}

func (l *AppLogger) WithContext(ctx context.Context) *logrus.Entry {
entry := l.logger.WithFields(logrus.Fields{})

// Add trace information if available
span := trace.SpanFromContext(ctx)
if span.SpanContext().IsValid() {
entry = entry.WithFields(logrus.Fields{
"trace_id": span.SpanContext().TraceID().String(),
"span_id": span.SpanContext().SpanID().String(),
})
}

// Add user context if available
if userID := ctx.Value("user_id"); userID != nil {
entry = entry.WithField("user_id", userID)
}

// Add request ID if available
if requestID := ctx.Value("request_id"); requestID != nil {
entry = entry.WithField("request_id", requestID)
}

return entry
}

// Usage examples
func (l *AppLogger) LogUserAction(ctx context.Context, action string, details map[string]interface{}) {
l.WithContext(ctx).WithFields(logrus.Fields{
"action": action,
"details": details,
"component": "user_service",
}).Info("User action performed")
}

func (l *AppLogger) LogError(ctx context.Context, err error, component string) {
l.WithContext(ctx).WithFields(logrus.Fields{
"error": err.Error(),
"component": component,
}).Error("Operation failed")
}

func (l *AppLogger) LogPerformance(ctx context.Context, operation string, duration time.Duration, metadata map[string]interface{}) {
l.WithContext(ctx).WithFields(logrus.Fields{
"operation": operation,
"duration_ms": duration.Milliseconds(),
"metadata": metadata,
"component": "performance",
}).Info("Performance metric recorded")
}

Advanced Logging Patterns:

# Python structured logging with correlation IDs
import logging
import json
import time
import uuid
from contextlib import contextmanager
from contextvars import ContextVar

# Context variables for request tracking
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')

class StructuredFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': time.time(),
'level': record.levelname,
'message': record.getMessage(),
'logger': record.name,
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}

# Add context variables
if request_id_var.get():
log_entry['request_id'] = request_id_var.get()
if user_id_var.get():
log_entry['user_id'] = user_id_var.get()
if trace_id_var.get():
log_entry['trace_id'] = trace_id_var.get()

# Add any extra fields
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)

return json.dumps(log_entry)

def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger.addHandler(handler)

return logger

# Context manager for request tracking
@contextmanager
def request_context(user_id=None):
request_id = str(uuid.uuid4())
request_id_token = request_id_var.set(request_id)

user_id_token = None
if user_id:
user_id_token = user_id_var.set(user_id)

try:
yield request_id
finally:
request_id_var.reset(request_id_token)
if user_id_token:
user_id_var.reset(user_id_token)

# Usage
logger = setup_logger(__name__)

def process_user_request(user_id: str, action: str):
with request_context(user_id=user_id):
logger.info("Processing user request", extra={
'extra_fields': {
'action': action,
'component': 'user_service'
}
})

try:
# Business logic here
result = perform_action(action)

logger.info("Request completed successfully", extra={
'extra_fields': {
'action': action,
'result_count': len(result),
'component': 'user_service'
}
})

except Exception as e:
logger.error("Request failed", extra={
'extra_fields': {
'action': action,
'error': str(e),
'error_type': type(e).__name__,
'component': 'user_service'
}
})
raise

3. Distributed Tracing - Request Flow Visibility

OpenTelemetry Implementation:

// Comprehensive distributed tracing setup
package main

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
oteltrace "go.opentelemetry.io/otel/trace"
)

// Initialize tracing
func initTracing(serviceName string) func() {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://localhost:14268/api/traces")))
if err != nil {
panic(fmt.Sprintf("failed to initialize jaeger exporter: %v", err))
}

tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String("v1.0.0"),
attribute.String("environment", "production"),
)),
)

otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))

return func() { _ = tp.Shutdown(context.Background()) }
}

// Service layer with tracing
type UserService struct {
tracer oteltrace.Tracer
database DatabaseInterface
cache CacheInterface
}

func NewUserService(db DatabaseInterface, cache CacheInterface) *UserService {
return &UserService{
tracer: otel.Tracer("user-service"),
database: db,
cache: cache,
}
}

func (s *UserService) GetUser(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "UserService.GetUser")
defer span.End()

// Add attributes to span
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("operation", "get_user"),
)

// Check cache first
user, err := s.getUserFromCache(ctx, userID)
if err == nil {
span.SetAttributes(attribute.Bool("cache.hit", true))
return user, nil
}

span.SetAttributes(attribute.Bool("cache.hit", false))

// Fallback to database
user, err = s.getUserFromDatabase(ctx, userID)
if err != nil {
span.RecordError(err)
span.SetAttributes(attribute.String("error.type", "database_error"))
return nil, fmt.Errorf("failed to get user: %w", err)
}

// Cache the result
go s.cacheUser(context.Background(), user)

span.SetAttributes(attribute.Bool("user.found", user != nil))
return user, nil
}

func (s *UserService) getUserFromCache(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "UserService.getUserFromCache")
defer span.End()

span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("cache.operation", "get"),
)

return s.cache.Get(ctx, userID)
}

func (s *UserService) getUserFromDatabase(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "UserService.getUserFromDatabase")
defer span.End()

span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("db.operation", "select"),
attribute.String("db.table", "users"),
)

start := time.Now()
user, err := s.database.GetUser(ctx, userID)
span.SetAttributes(attribute.Int64("db.duration_ms", time.Since(start).Milliseconds()))

return user, err
}

// HTTP handler with tracing
func (s *UserService) HandleGetUser(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID := r.URL.Query().Get("user_id")

ctx, span := s.tracer.Start(ctx, "HTTP GET /user")
defer span.End()

span.SetAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("http.user_agent", r.UserAgent()),
attribute.String("user.id", userID),
)

if userID == "" {
span.SetAttributes(attribute.Int("http.status_code", 400))
http.Error(w, "user_id is required", http.StatusBadRequest)
return
}

user, err := s.GetUser(ctx, userID)
if err != nil {
span.RecordError(err)
span.SetAttributes(
attribute.Int("http.status_code", 500),
attribute.String("error.message", err.Error()),
)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

span.SetAttributes(attribute.Int("http.status_code", 200))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
}

Part 2: Advanced Monitoring Strategies

Performance Optimizations for High-Volume Metrics Collection

Problem: Traditional metrics collection can become a bottleneck in high-throughput systems due to memory allocations and inefficient data structures.

Solution: Use object pools and optimized data structures:

// BEFORE - Inefficient approach
type SlowMetricsCollector struct {
metrics []map[string]interface{} // Creates many small allocations
}

func (smc *SlowMetricsCollector) AddMetric(name string, value float64) {
// Creates new map for each metric - expensive
metric := map[string]interface{}{
"name": name,
"value": value,
"timestamp": time.Now().Unix(),
}
smc.metrics = append(smc.metrics, metric)
}

// AFTER - Use object pools and typed structs
import (
"sync"
"time"
)

type MetricsCollector struct {
metricPool sync.Pool
metrics []Metric
mu sync.RWMutex
}

type Metric struct {
Name string
Value float64
Timestamp int64
}

func NewMetricsCollector() *MetricsCollector {
mc := &MetricsCollector{
metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
}

mc.metricPool = sync.Pool{
New: func() interface{} {
return &Metric{}
},
}

return mc
}

func (mc *MetricsCollector) AddMetric(name string, value float64) {
metric := mc.metricPool.Get().(*Metric)
metric.Name = name
metric.Value = value
metric.Timestamp = time.Now().Unix()

mc.mu.Lock()
mc.metrics = append(mc.metrics, *metric)
mc.mu.Unlock()

// Return to pool for reuse
mc.metricPool.Put(metric)
}

func (mc *MetricsCollector) GetMetrics() []Metric {
mc.mu.RLock()
defer mc.mu.RUnlock()

// Return copy to avoid race conditions
result := make([]Metric, len(mc.metrics))
copy(result, mc.metrics)
return result
}

func (mc *MetricsCollector) Reset() {
mc.mu.Lock()
mc.metrics = mc.metrics[:0] // Keep underlying array, reset length
mc.mu.Unlock()
}

Continuous CPU Profiling Integration

Implementation for production monitoring:

// CPU profiling integration
import (
"fmt"
"log"
"os"
"runtime/pprof"
"time"
)

func enableContinuousProfiling() {
// Enable continuous CPU profiling
if os.Getenv("ENABLE_PROFILING") == "true" {
go func() {
for {
f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create CPU profile: %v", err)
time.Sleep(30 * time.Second)
continue
}

pprof.StartCPUProfile(f)
time.Sleep(30 * time.Second)
pprof.StopCPUProfile()
f.Close()

// Upload to object storage for analysis
uploadProfile(f.Name())
}
}()
}
}

// Helper function to upload profiles to cloud storage
func uploadProfile(filename string) {
// Implementation would upload to S3, GCS, etc.
log.Printf("Uploading profile %s to cloud storage", filename)

// Clean up local file after upload
defer os.Remove(filename)
}

// Memory profiling function
func captureMemoryProfile() {
f, err := os.Create(fmt.Sprintf("mem-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create memory profile: %v", err)
return
}
defer f.Close()

if err := pprof.WriteHeapProfile(f); err != nil {
log.Printf("Could not write memory profile: %v", err)
return
}

uploadProfile(f.Name())
}

Golden Signals Monitoring

The Four Golden Signals implementation:

// Golden Signals metrics implementation
type GoldenSignalsCollector struct {
// Latency
requestDuration *prometheus.HistogramVec

// Traffic
requestsPerSecond *prometheus.CounterVec

// Errors
errorRate *prometheus.CounterVec

// Saturation
resourceUtilization *prometheus.GaugeVec
}

func NewGoldenSignalsCollector() *GoldenSignalsCollector {
return &GoldenSignalsCollector{
requestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Request latency in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"method", "endpoint", "status_class"},
),
requestsPerSecond: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total HTTP requests",
},
[]string{"method", "endpoint"},
),
errorRate: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_errors_total",
Help: "Total HTTP errors",
},
[]string{"method", "endpoint", "status_code"},
),
resourceUtilization: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "resource_utilization_percent",
Help: "Resource utilization percentage",
},
[]string{"resource_type", "instance"},
),
}
}

Part 3: Implementation Examples

Microservices Implementation:

// Metrics Collector Service (Go)
package main

import (
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-redis/redis/v8"
)

type MetricsCollector struct {
kafka *kafka.Producer
redis *redis.Client
handlers map[string]MetricHandler
}

func (mc *MetricsCollector) CollectMetric(metric Metric) error {
// Process metric
processed := mc.handlers[metric.Type].Process(metric)

// Publish to message queue for other services
return mc.kafka.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &metric.Type,
Partition: kafka.PartitionAny,
},
Value: processed.ToJSON(),
}, nil)
}
# Data Processor Service (Python)
import asyncio
import json
from kafka import KafkaConsumer
from typing import Dict, Any

class DataProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'metrics-topic',
bootstrap_servers=['kafka:9092'],
group_id='data-processors',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

async def process_metrics(self):
for message in self.consumer:
metric = Metric.from_dict(message.value)

# Process and aggregate
aggregated = await self.aggregate_metric(metric)

# Store in time-series database
await self.store_metric(aggregated)

# Trigger alerts if needed
await self.check_alerts(aggregated)

async def aggregate_metric(self, metric: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for metric aggregation
return {
'name': metric['name'],
'value': metric['value'],
'timestamp': metric['timestamp'],
'aggregation_window': '5m',
'tags': metric.get('tags', {})
}

async def store_metric(self, metric: Dict[str, Any]):
# Store in InfluxDB or similar time-series database
pass

async def check_alerts(self, metric: Dict[str, Any]):
# Check alert thresholds and notify if needed
pass

Monolithic Implementation:

# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
from typing import Dict, Any

class AnalyticsDashboard:
def __init__(self):
self.metrics = metrics.MetricsModule()
self.processor = processing.ProcessingModule()
self.api = api.APIModule()
self.notifications = notifications.NotificationModule()

def handle_metric(self, raw_metric: Dict[str, Any]) -> Dict[str, Any]:
# All in same process - no network calls
metric = self.metrics.parse(raw_metric)
processed = self.processor.aggregate(metric)

# Check for alerts
if self.processor.check_thresholds(processed):
self.notifications.send_alert(processed)

return processed

def get_dashboard_data(self, time_range: str) -> Dict[str, Any]:
# Single process - can share data structures efficiently
raw_data = self.metrics.get_data(time_range)
processed_data = self.processor.prepare_dashboard_data(raw_data)

return {
'metrics': processed_data,
'alerts': self.notifications.get_active_alerts(),
'system_health': self.api.get_health_status()
}

# Single deployment with clear module boundaries
# Can be extracted to separate services later

When to Choose Each Approach:

FactorMonolithMicroservices
Team Size< 10 developers> 10 developers
ComplexitySimple-mediumComplex domain
Scale< 1M requests/day> 10M requests/day
TechnologySingle stack preferredMultiple technologies needed
DeploymentWeekly/monthlyMultiple times per day
Data ConsistencyStrong consistency neededEventual consistency OK

For Analytics Dashboard Specifically:

  • Early Stage: Start with modular monolith
  • Growth Stage: Extract high-scale components (metrics collector) first
  • Mature Stage: Full microservices with proper DevOps practices

2. Database Sharding Implementation

Strong Answer: Library Analogy Explanation:

📚 Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout

📚📚📚 Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z

Each library (shard) operates independently but part of same system

Sharding Strategy for Analytics Dashboard:

# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List, Optional
import concurrent.futures

class DatabaseSharding:
def __init__(self):
self.shards = {
'shard_americas': {
'host': 'db-americas.example.com',
'regions': ['us', 'ca', 'mx', 'br'],
'connection': self.create_connection('db-americas')
},
'shard_europe': {
'host': 'db-europe.example.com',
'regions': ['uk', 'de', 'fr', 'es'],
'connection': self.create_connection('db-europe')
},
'shard_asia': {
'host': 'db-asia.example.com',
'regions': ['jp', 'sg', 'au', 'in'],
'connection': self.create_connection('db-asia')
}
}

# Time-based sharding for metrics
self.time_shards = {
'metrics_current': 'Last 7 days - hot data',
'metrics_recent': 'Last 30 days - warm data',
'metrics_archive': 'Older than 30 days - cold data'
}

def get_user_shard(self, user_id: str) -> str:
"""Determine shard based on user ID hash"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_value % len(self.shards)
return list(self.shards.keys())[shard_index]

def get_region_shard(self, region: str) -> str:
"""Determine shard based on geographical region"""
for shard_name, shard_info in self.shards.items():
if region.lower() in shard_info['regions']:
return shard_name
return 'shard_americas' # Default fallback

def get_time_shard(self, timestamp: datetime.datetime) -> str:
"""Determine shard based on data age"""
now = datetime.datetime.now()
age = now - timestamp

if age.days <= 7:
return 'metrics_current'
elif age.days <= 30:
return 'metrics_recent'
else:
return 'metrics_archive'

def route_query(self, query_type: str, **kwargs):
"""Route queries to appropriate shard"""

if query_type == 'user_orders':
shard = self.get_user_shard(kwargs['user_id'])
return self.execute_query(shard, query_type, **kwargs)

elif query_type == 'regional_metrics':
shard = self.get_region_shard(kwargs['region'])
return self.execute_query(shard, query_type, **kwargs)

elif query_type == 'historical_data':
# Query multiple time shards and aggregate
return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])

elif query_type == 'cross_shard_analytics':
# Fan-out query to all shards
return self.fan_out_query(query_type, **kwargs)

def query_time_shards(self, start_date, end_date):
"""Query across time-based shards"""
results = []

for shard_name in self.time_shards.keys():
try {
shard_result = self.execute_query(shard_name, 'time_range_query',
start_date=start_date, end_date=end_date)
results.extend(shard_result)
} catch (ShardUnavailableError) {
// Handle shard failures gracefully
this.log_shard_failure(shard_name)
continue
}

return this.aggregate_results(results)

def fan_out_query(self, query_type: str, **kwargs):
"""Execute query across all shards and aggregate results"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
# Submit queries to all shards concurrently
future_to_shard = {
executor.submit(this.execute_query, shard_name, query_type, **kwargs): shard_name
for shard_name in this.shards.keys()
}

for future in concurrent.futures.as_completed(future_to_shard):
shard_name = future_to_shard[future]
try {
result = future.result(timeout=30) # 30 second timeout
results[shard_name] = result
} catch (Exception e) {
this.log_query_failure(shard_name, e)
results[shard_name] = None
}

return this.aggregate_cross_shard_results(results)

def create_connection(self, shard_name: str):
"""Create database connection for shard"""
# Implementation would create actual DB connection
pass

def execute_query(self, shard_name: str, query_type: str, **kwargs):
"""Execute query on specific shard"""
# Implementation would execute actual query
pass

def log_shard_failure(self, shard_name: str):
"""Log shard failure for monitoring"""
pass

def log_query_failure(self, shard_name: str, error: Exception):
"""Log query failure for monitoring"""
pass

def aggregate_results(self, results: List):
"""Aggregate results from multiple shards"""
pass

def aggregate_cross_shard_results(self, results: Dict):
"""Aggregate results from cross-shard queries"""
pass

# Shard-aware query examples
class ShardedAnalyticsQueries:
def __init__(self, sharding: DatabaseSharding):
self.sharding = sharding

def get_user_orders(self, user_id: str):
"""Get orders for specific user"""
return self.sharding.route_query('user_orders', user_id=user_id)

def get_regional_sales(self, region: str, date_range: tuple):
"""Get sales data for specific region"""
return self.sharding.route_query('regional_metrics',
region=region,
start_date=date_range[0],
end_date=date_range[1])

def get_global_metrics(self, metric_type: str):
"""Get global metrics across all shards"""
return self.sharding.route_query('cross_shard_analytics',
metric_type=metric_type)

def get_historical_trends(self, days_back: int):
"""Get historical data across time shards"""
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)

return self.sharding.query_time_shards(start_date, end_date)

Sharding Implementation with PostgreSQL:

-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Shard 2: Europe
CREATE TABLE orders_europe (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Shard 3: Asia
CREATE TABLE orders_asia (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;

CREATE SERVER shard_europe
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');

CREATE USER MAPPING FOR postgres
SERVER shard_europe
OPTIONS (user 'analytics_user', password 'password');

-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
id UUID,
user_id UUID,
region VARCHAR(2),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');

-- View for cross-shard queries
CREATE VIEW orders_global AS
SELECT 'americas' as shard, * FROM orders_americas
UNION ALL
SELECT 'europe' as shard, * FROM orders_europe_remote
UNION ALL
SELECT 'asia' as shard, * FROM orders_asia_remote;