SRE Interview Questions - Comprehensive Answer Guide
Part 1: SRE Fundamentals & Practices
1. What is the difference between SRE and traditional operations, and how do you balance reliability with feature velocity?
Strong Answer: SRE differs from traditional ops in several key ways:
- Proactive vs Reactive: SRE focuses on preventing issues through engineering rather than just responding to them
 - Error Budgets: We quantify acceptable unreliability, allowing teams to move fast while maintaining reliability targets
 - Automation: SRE emphasizes eliminating toil through automation and self-healing systems
 - Shared Ownership: Development and operations work together using the same tools and metrics
 
Balancing reliability with velocity:
- Set clear SLIs/SLOs with stakeholders (e.g., 99.9% uptime = 43 minutes downtime/month)
 - Use error budgets as a shared currency - if we're within budget, dev teams can deploy faster
 - When error budget is exhausted, focus shifts to reliability work
 - Implement gradual rollouts and feature flags to reduce blast radius
 
Follow-up - Implementing error budgets with resistant teams:
- Start with education - show how error budgets enable faster delivery
 - Use concrete examples of downtime costs vs delayed features
 - Begin with lenient budgets and tighten over time
 - Make error budget status visible in dashboards and planning meetings
 
2. Explain the four golden signals of monitoring. How would you implement alerting around these for a Python microservice?
Strong Answer: The four golden signals are:
- Latency: Time to process requests
 - Traffic: Demand on your system (requests/second)
 - Errors: Rate of failed requests
 - Saturation: How "full" your service is (CPU, memory, I/O)
 
Implementation for Python microservice:
# Using Prometheus with Flask
from flask import Flask, request
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
import psutil
app = Flask(__name__)
# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency')
ERROR_RATE = Counter('http_errors_total', 'Total HTTP errors', ['status'])
CPU_USAGE = Gauge('cpu_usage_percent', 'CPU usage percentage')
@app.before_request
def before_request():
    request.start_time = time.time()
@app.after_request
def after_request(response):
    latency = time.time() - request.start_time
    REQUEST_LATENCY.observe(latency)
    REQUEST_COUNT.labels(request.method, request.endpoint, response.status_code).inc()
    if response.status_code >= 400:
        ERROR_RATE.labels(response.status_code).inc()
    CPU_USAGE.set(psutil.cpu_percent())
    return response
Alerting Rules:
- Latency: Alert if p99 > 500ms for 5 minutes
 - Traffic: Alert on 50% increase/decrease from baseline
 - Errors: Alert if error rate > 1% for 2 minutes
 - Saturation: Alert if CPU > 80% or Memory > 85% for 10 minutes
 
3. Walk me through how you would conduct a post-mortem for a production incident.
Strong Answer: Timeline:
- Immediate: Focus on resolution, collect logs/metrics during incident
 - Within 24-48 hours: Conduct post-mortem meeting
 - Within 1 week: Publish written post-mortem and track action items
 
Post-mortem Process:
- Timeline Construction: Build detailed timeline with all events, decisions, and communications
 - Root Cause Analysis: Use techniques like "5 Whys" or Fishbone diagrams
 - Impact Assessment: Quantify user impact, revenue loss, SLO burn
 - Action Items: Focus on systemic fixes, not individual blame
 - Follow-up: Track action items to completion
 
Good Post-mortem Characteristics:
- Blameless culture - focus on systems, not individuals
 - Detailed timeline with timestamps
 - Clear root cause analysis
 - Actionable remediation items with owners and deadlines
 - Written in accessible language for all stakeholders
 - Includes what went well (not just failures)
 
Psychological Safety:
- Use "the system allowed..." instead of "person X did..."
 - Ask "how can we make this impossible to happen again?"
 - Celebrate people who surface problems early
 - Make post-mortems learning opportunities, not punishment
 
4. You notice your application's 99th percentile latency has increased by 50ms over the past week, but the average latency remains the same. How would you investigate this?
Strong Answer: This suggests a long tail problem - most requests are fine, but some are much slower.
Investigation Steps:
- Check Request Distribution: Look at latency histograms - are we seeing bimodal distribution?
 - Analyze Traffic Patterns: Has the mix of request types changed? Are we getting more complex queries?
 - Database Performance: Check for slow queries, table locks, or index problems
 - Resource Saturation: Look for memory pressure, GC pauses, or I/O bottlenecks during peak times
 - Dependency Analysis: Check latency of downstream services - could be cascading slow responses
 - Code Changes: Review recent deployments for inefficient algorithms or new features
 
Specific Checks:
- Database slow query logs
 - Application profiling data
 - Memory usage patterns and GC metrics
 - Thread pool utilization
 - External API response times
 - Distributed tracing for slow requests
 
Tools: Use APM tools like New Relic, DataDog, or distributed tracing with Jaeger/Zipkin to identify bottlenecks.
5. Design a monitoring strategy for a Go-based API that processes financial transactions.
Strong Answer: Business Metrics:
- Transaction volume and value per minute
 - Success rate by transaction type
 - Time to settlement
 - Regulatory compliance metrics (PCI DSS)
 
Technical Metrics:
// Key metrics to track
var (
    transactionCounter = prometheus.NewCounterVec(
        prometheus.CounterOpts{Name: "transactions_total"},
        []string{"type", "status", "payment_method"})
    transactionLatency = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{Name: "transaction_duration_seconds"},
        []string{"type"})
    queueDepth = prometheus.NewGauge(
        prometheus.GaugeOpts{Name: "transaction_queue_depth"})
    dbConnectionPool = prometheus.NewGauge(
        prometheus.GaugeOpts{Name: "db_connections_active"})
)
Logging Strategy:
- Structured logging with correlation IDs
 - Log all transaction state changes
 - Security events (failed auth, suspicious patterns)
 - Audit trail for compliance
 
Alerting:
- Transaction failure rate > 0.1%
 - Processing latency > 2 seconds
 - Queue depth > 1000 items
 - Database connection pool > 80% utilization
 - Any security-related events
 
Compliance Considerations:
- PII data must be masked in logs
 - Audit logs with tamper-proof storage
 - Real-time fraud detection alerts
 
6. How would you implement distributed tracing across a system with Python backend services and a React frontend?
Strong Answer: Architecture:
- Use OpenTelemetry standard for vendor-neutral tracing
 - Jaeger or Zipkin as tracing backend
 - Trace context propagation across service boundaries
 
Backend Implementation (Python):
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Auto-instrument Flask and requests
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
@app.route('/api/user/<user_id>')
def get_user(user_id):
    with tracer.start_as_current_span("get_user") as span:
        span.set_attribute("user.id", user_id)
        # Service logic here
        return response
Frontend Implementation (React):
import { WebTracerProvider } from "@opentelemetry/web"
import { getWebAutoInstrumentations } from "@opentelemetry/auto-instrumentations-web"
const provider = new WebTracerProvider()
provider.addSpanProcessor(new BatchSpanProcessor(new JaegerExporter()))
// Auto-instrument fetch, XMLHttpRequest
registerInstrumentations({
  instrumentations: [getWebAutoInstrumentations()],
})
Key Implementation Points:
- Correlation IDs: Pass trace context in HTTP headers
 - Sampling: Use probabilistic sampling (1-10%) to reduce overhead
 - Service Map: Visualize service dependencies
 - Performance Analysis: Identify bottlenecks across service boundaries
 
Part 2: Software Engineering & Development
7. Code Review Scenario: Memory leak optimization
Strong Answer: Problems with the original code:
- Loads entire dataset into memory before processing
 - No streaming or chunked processing
 - Memory usage grows linearly with file size
 
Optimized version:
def process_large_dataset(file_path, chunk_size=1000):
    """Process large dataset in chunks to manage memory usage."""
    results = []
    with open(file_path, 'r') as f:
        chunk = []
        for line in f:
            chunk.append(line.strip())
            if len(chunk) >= chunk_size:
                # Process chunk and yield results
                processed_chunk = [expensive_processing(item) for item in chunk]
                partial_result = analyze_data(processed_chunk)
                results.append(partial_result)
                # Clear chunk to free memory
                chunk.clear()
        # Process remaining items
        if chunk:
            processed_chunk = [expensive_processing(item) for item in chunk]
            partial_result = analyze_data(processed_chunk)
            results.append(partial_result)
    return combine_results(results)
# Even better - use generator for streaming
def process_large_dataset_streaming(file_path):
    """Stream processing for minimal memory footprint."""
    with open(file_path, 'r') as f:
        for line in f:
            yield expensive_processing(line.strip())
# Usage
def analyze_streaming_data(file_path):
    processed_items = process_large_dataset_streaming(file_path)
    return analyze_data_streaming(processed_items)
Additional Optimizations:
- Use 
mmapfor very large files - Implement backpressure if processing can't keep up
 - Add memory monitoring and circuit breakers
 - Consider using 
asynciofor I/O-bound operations 
8. In Go, explain the difference between buffered and unbuffered channels.
Strong Answer: Unbuffered Channels:
- Synchronous communication - sender blocks until receiver reads
 - Zero capacity - no internal storage
 - Guarantees handoff between goroutines
 
ch := make(chan int) // unbuffered
go func() {
    ch <- 42 // blocks until someone reads
}()
value := <-ch // blocks until someone sends
Buffered Channels:
- Asynchronous communication up to buffer size
 - Sender only blocks when buffer is full
 - Receiver only blocks when buffer is empty
 
ch := make(chan int, 3) // buffered with capacity 3
ch <- 1 // doesn't block
ch <- 2 // doesn't block
ch <- 3 // doesn't block
ch <- 4 // blocks - buffer full
When to use in high-throughput systems:
Unbuffered for:
- Strict synchronization requirements
 - Request-response patterns
 - When you need guaranteed delivery confirmation
 - Worker pools where you want backpressure
 
Buffered for:
- Producer-consumer with different rates
 - Batching operations
 - Reducing contention in high-throughput scenarios
 - Event streaming where some loss is acceptable
 
Example - High-throughput log processor:
// Buffered for log ingestion
logChan := make(chan LogEntry, 10000)
// Unbuffered for critical operations
errorChan := make(chan error)
func logProcessor() {
    for {
        select {
        case log := <-logChan:
            processLog(log)
        case err := <-errorChan:
            handleCriticalError(err) // immediate attention
        }
    }
}
9. React Performance: Optimize dashboard with real-time metrics
Strong Answer: Problems with frequent re-renders:
- All components re-render when any metric updates
 - Expensive calculations on every render
 - DOM thrashing from rapid updates
 
Optimization Strategy:
import React, { memo, useMemo, useCallback, useRef } from "react"
import { useVirtualizer } from "@tanstack/react-virtual"
// 1. Memoize metric components
const MetricCard = memo(({ metric, value, threshold }) => {
  // Only re-render when props actually change
  const status = useMemo(
    () => (value > threshold ? "critical" : "normal"),
    [value, threshold]
  )
  return (
    <div className={`metric-card ${status}`}>
      <h3>{metric}</h3>
      <span>{value}</span>
    </div>
  )
})
// 2. Virtualize large lists
const MetricsList = ({ metrics }) => {
  const parentRef = useRef()
  const virtualizer = useVirtualizer({
    count: metrics.length,
    getScrollElement: () => parentRef.current,
    estimateSize: () => 100,
  })
  return (
    <div ref={parentRef} style={{ height: "400px", overflow: "auto" }}>
      {virtualizer.getVirtualItems().map((virtualRow) => (
        <MetricCard key={virtualRow.key} {...metrics[virtualRow.index]} />
      ))}
    </div>
  )
}
// 3. Debounce updates and batch state changes
const Dashboard = () => {
  const [metrics, setMetrics] = useState({})
  const updateQueue = useRef(new Map())
  const flushTimeout = useRef()
  const queueUpdate = useCallback((serviceName, newMetrics) => {
    updateQueue.current.set(serviceName, newMetrics)
    // Debounce updates - batch multiple rapid changes
    clearTimeout(flushTimeout.current)
    flushTimeout.current = setTimeout(() => {
      setMetrics((prev) => {
        const updates = Object.fromEntries(updateQueue.current)
        updateQueue.current.clear()
        return { ...prev, ...updates }
      })
    }, 100) // 100ms debounce
  }, [])
  // 4. Use React.startTransition for non-critical updates
  const handleMetricUpdate = useCallback(
    (data) => {
      if (data.priority === "high") {
        queueUpdate(data.service, data.metrics)
      } else {
        startTransition(() => {
          queueUpdate(data.service, data.metrics)
        })
      }
    },
    [queueUpdate]
  )
  return <MetricsList metrics={Object.values(metrics)} />
}
Additional Optimizations:
- WebSocket connection pooling - single connection for all metrics
 - Data normalization - structure data to minimize re-renders
 - Service Worker - offload heavy calculations
 - Canvas/WebGL - for complex visualizations instead of DOM updates
 
10. Design a CI/CD pipeline for a multi-service application
Strong Answer: Pipeline Architecture:
# .github/workflows/main.yml
name: Multi-Service CI/CD
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
jobs:
  detect-changes:
    runs-on: ubuntu-latest
    outputs:
      python-api: ${{ steps.changes.outputs.python-api }}
      go-workers: ${{ steps.changes.outputs.go-workers }}
      react-frontend: ${{ steps.changes.outputs.react-frontend }}
    steps:
      - uses: actions/checkout@v3
      - uses: dorny/paths-filter@v2
        id: changes
        with:
          filters: |
            python-api:
              - 'services/api/**'
            go-workers:
              - 'services/workers/**'
            react-frontend:
              - 'frontend/**'
  test-python:
    needs: detect-changes
    if: needs.detect-changes.outputs.python-api == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run Python tests
        run: |
          cd services/api
          pip install -r requirements.txt
          pytest --cov=. --cov-report=xml
          flake8 .
          mypy .
  test-go:
    needs: detect-changes
    if: needs.detect-changes.outputs.go-workers == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v3
        with:
          go-version: "1.21"
      - name: Run Go tests
        run: |
          cd services/workers
          go test -race -coverprofile=coverage.out ./...
          go vet ./...
          staticcheck ./...
  deploy:
    needs: [test-python, test-go, test-react]
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy with blue-green
        run: |
          # Database migration strategy
          kubectl apply -f k8s/migration-job.yaml
          kubectl wait --for=condition=complete job/db-migration
          # Deploy new version to green environment
          helm upgrade app-green ./helm-chart \
            --set image.tag=${{ github.sha }} \
            --set environment=green
          # Health check green environment
          ./scripts/health-check.sh green
          # Switch traffic to green
          kubectl patch service app-service -p \
            '{"spec":{"selector":{"version":"green"}}}'
          # Verify traffic switch
          ./scripts/verify-deployment.sh
          # Clean up blue environment
          helm uninstall app-blue
Database Migration Strategy:
#!/bin/bash
# scripts/db-migration.sh
# Create backup
pg_dump $DATABASE_URL > backup-$(date +%Y%m%d-%H%M%S).sql
# Run migrations in transaction
psql $DATABASE_URL << EOF
BEGIN;
-- Run all pending migrations
\i migrations/001_add_user_table.sql
\i migrations/002_add_index.sql
-- Verify migration success
SELECT count(*) FROM schema_migrations;
COMMIT;
EOF
# Test rollback capability
if [ "$1" == "test-rollback" ]; then
    psql $DATABASE_URL < backup-latest.sql
fi
Rollback Strategy:
- Keep previous 3 versions in registry
 - Database rollback scripts for each migration
 - Feature flags to disable new features quickly
 - Automated rollback triggers on error rate increase
 
11. How would you implement blue-green deployments for a stateful service with a database?
Strong Answer: Challenges with Stateful Services:
- Database schema changes
 - Data consistency during switch
 - Connection management
 - State synchronization
 
Implementation Strategy:
# Blue-Green with Database
apiVersion: v1
kind: Service
metadata:
  name: app-active
spec:
  selector:
    app: myapp
    version: blue # Will switch to green
  ports:
    - port: 80
---
# Database proxy for connection management
apiVersion: apps/v1
kind: Deployment
metadata:
  name: db-proxy
spec:
  template:
    spec:
      containers:
        - name: pgbouncer
          image: pgbouncer/pgbouncer
          env:
            - name: DATABASES_HOST
              value: "postgres-primary"
            - name: POOL_MODE
              value: "transaction" # Allow connection switching
Deployment Process:
- 
Pre-deployment:
- Run backward-compatible schema migrations
 - Ensure both versions can operate with new schema
 
 - 
Green Deployment:
- Deploy green version with same database
 - Warm up green instances (cache, connections)
 - Run health checks
 
 - 
Traffic Switch:
- Update service selector to point to green
 - Monitor metrics for 10-15 minutes
 - Keep blue running for quick rollback
 
 - 
Post-deployment:
- Run cleanup migrations (remove old columns)
 - Terminate blue environment
 
 
Database Migration Strategy:
-- Phase 1: Additive changes (safe for both versions)
ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE;
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
-- Phase 2: After green is stable, remove old columns
-- ALTER TABLE users DROP COLUMN old_email_field;
Rollback Plan:
- Revert service selector to blue
 - Emergency database rollback scripts
 - Circuit breaker to stop problematic requests
 
Part 3: System Design Deep Dive
12. Requirements Gathering Questions
Strong Answer: Functional Requirements:
- What specific metrics need to be displayed? (orders/minute, revenue, concurrent users)
 - How real-time? (sub-second, few seconds, minute-level updates)
 - What user roles need access? (executives, ops teams, developers)
 - What actions can users take? (view-only, alerts, drill-down)
 - Geographic distribution of users?
 
Non-Functional Requirements:
- Scale: How many concurrent dashboard users? (100s, 1000s)
 - Data volume: Orders per day? Peak traffic? Data retention period?
 - Availability: 99.9% or higher? Maintenance windows?
 - Latency: How fast should dashboard updates be?
 - Consistency: Can we show slightly stale data? (eventual consistency)
 - Security: Authentication, authorization, audit logging?
 
Technical Constraints:
- Existing infrastructure? (AWS, on-prem, hybrid)
 - Integration requirements? (existing systems, APIs)
 - Compliance requirements? (SOX, PCI DSS)
 - Budget constraints?
 
13. API Design and Framework Selection
Strong Answer: API Endpoints:
// REST API Design
GET /api/v1/metrics/realtime
{
  "active_users": 1250,
  "orders_per_minute": 45,
  "revenue_per_minute": 12500,
  "inventory_alerts": 3,
  "system_health": "healthy",
  "timestamp": "2025-06-30T10:30:00Z"
}
GET /api/v1/metrics/historical?metric=revenue&period=24h&granularity=1h
{
  "metric": "revenue",
  "data": [
    {"timestamp": "2025-06-30T09:00:00Z", "value": 75000},
    {"timestamp": "2025-06-30T10:00:00Z", "value": 82000}
  ]
}
POST /api/v1/alerts/subscribe
{
  "metric": "inventory_level",
  "threshold": 100,
  "product_id": "12345",
  "notification_method": "webhook"
}
Framework Comparison:
REST ✅ Recommended
- Simple, cacheable
 - Wide tooling support
 - Good for CRUD operations
 - HTTP status codes for errors
 
WebSockets ✅ For Real-time Updates
// WebSocket for live updates
const ws = new WebSocket("wss://api.example.com/metrics/stream")
ws.onmessage = (event) => {
  const metrics = JSON.parse(event.data)
  updateDashboard(metrics)
}
GraphQL ❌ Not Recommended
- Adds complexity for simple metrics
 - Caching more difficult
 - Overkill for this use case
 
Final Architecture:
- REST API for historical data and configuration
 - WebSocket for real-time metric streaming
 - Server-Sent Events as WebSocket fallback
 
14. High-Level Architecture Diagram
Strong Answer:
┌──────────────── ─┐    ┌──────────────────┐    ┌─────────────────┐
│   React SPA     │    │   Load Balancer  │    │   API Gateway   │
│                 │◄──►│   (ALB/NGINX)    │◄──►│   (Kong/Envoy)  │
│ - Dashboard     │    │                  │    │ - Auth          │
│ - WebSocket     │    │                  │    │ - Rate Limiting │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                                                         │
                              ┌──────────────────────────┼─────────────────┐
                              │                          │                 │
                    ┌─────────▼────────┐    ┌───────────▼────────┐ ┌──────▼──────┐
                    │   Metrics API    │    │   WebSocket API    │ │ Config API  │
                    │   (Python/Flask) │    │   (Go/Gorilla)     │ │(Python/Fast)│
                    └─────────┬────────┘    └───────────┬────────┘ └──────┬──────┘
                              │                         │                 │
                    ┌─────────▼────────┐    ┌───────────▼────────┐ ┌──────▼──────┐
                    │   Redis Cache    │    │   Message Queue    │ │ PostgreSQL  │
                    │   (Metrics)      │    │   (Kafka/Redis)    │ │ (Config)    │
                    └─────────┬────────┘    └───────────┬────────┘ └─────────────┘
                              │                         │
                    ┌─────────▼─────────────────────────▼────────┐
                    │           Time Series Database             │
                    │           (InfluxDB/TimescaleDB)           │
                    └─────────┬──────────────────────────────────┘
                              │
                    ┌─────────▼────────┐
                    │   ETL Pipeline   │
                    │   (Apache Beam)  │
                    └─────────┬────────┘
                              │
        ┌─────────────────────┼─────────────────────┐
        │                     │                     │
┌───────▼──────┐    ┌─────────▼─────────┐    ┌─────▼─────┐
│ E-commerce   │    │   Inventory       │    │  System   │
│ Database     │    │   Management      │    │  Metrics  │
│ (Orders)     │    │   (Stock Levels)  │    │ (Health)  │
└──────────────┘    └───────────────────┘    └───────────┘
Data Flow:
- Source Systems → ETL Pipeline (real-time streaming)
 - ETL Pipeline → Time Series DB (processed metrics)
 - Time Series DB → Redis Cache (frequently accessed data)
 - API Services → Frontend (REST + WebSocket)
 - Message Queue → WebSocket API (real-time updates)
 
15. Real-time Implementation Approaches
Strong Answer: Comparison of Real-time Approaches:
| Approach | Pros | Cons | Use Case | 
|---|---|---|---|
| Polling | Simple, stateless | High latency, wasteful | Non-critical updates | 
| WebSockets | True real-time, bidirectional | Complex, stateful | Live dashboards | 
| Server-Sent Events | Simpler than WebSocket, auto-reconnect | One-way only | Event streams | 
| Message Queues | Reliable, scalable | Added complexity | High-volume events | 
Recommended Architecture:
// WebSocket implementation in Go
type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
}
type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    subscriptions map[string]bool
}
func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            delete(h.clients, client)
            close(client.send)
        case message := <-h.broadcast:
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
        }
    }
}
// Kafka consumer for real-time metrics
func consumeMetrics() {
    consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "dashboard-consumers",
    })
    consumer.Subscribe("metrics-topic", nil)
    for {
        msg, _ := consumer.ReadMessage(-1)
        // Process metric and broadcast to WebSocket clients
        metric := parseMetric(msg.Value)
        hub.broadcast <- metric
        // Also update Redis cache
        updateCache(metric)
    }
}
Client-side Connection Management:
class MetricsWebSocket {
  constructor(url) {
    this.url = url
    this.reconnectInterval = 1000
    this.maxReconnectInterval = 30000
    this.reconnectDecay = 1.5
    this.connect()
  }
  connect() {
    this.ws = new WebSocket(this.url)
    this.ws.onopen = () => {
      console.log("Connected to metrics stream")
      this.reconnectInterval = 1000 // Reset backoff
    }
    this.ws.onmessage = (event) => {
      const metrics = JSON.parse(event.data)
      this.updateDashboard(metrics)
    }
    this.ws.onclose = () => {
      console.log("Connection lost, reconnecting...")
      setTimeout(() => {
        this.reconnectInterval = Math.min(
          this.reconnectInterval * this.reconnectDecay,
          this.maxReconnectInterval
        )
        this.connect()
      }, this.reconnectInterval)
    }
  }
  subscribe(metricType) {
    this.ws.send(
      JSON.stringify({
        action: "subscribe",
        metric: metricType,
      })
    )
  }
}
16. Single Points of Failure Analysis
Strong Answer: Identified SPOFs and Solutions:
- 
Load Balancer SPOF
- Problem: Single ALB failure takes down entire system
 - Solution: Multi-AZ deployment with Route 53 health checks
 
# Terraform for multi-region setup
resource "aws_lb" "primary" {
name = "app-lb-primary"
availability_zones = ["us-east-1a", "us-east-1b"]
}
resource "aws_route53_record" "failover_primary" {
zone_id = aws_route53_zone.main.zone_id
name = "api.example.com"
type = "A"
failover_routing_policy {
type = "PRIMARY"
}
health_check_id = aws_route53_health_check.primary.id
} - 
Database SPOF
- Problem: Single database failure
 - Solution: Primary-replica setup with automatic failover
 
# PostgreSQL HA setup
apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
name: postgres-cluster
spec:
instances: 3
primaryUpdateStrategy: unsupervised
postgresql:
parameters:
max_connections: "200"
shared_buffers: "256MB"
backup:
retentionPolicy: "30d"
barmanObjectStore:
destinationPath: "s3://backups/postgres" - 
Redis Cache SPOF
- Problem: Cache failure impacts performance
 - Solution: Redis Sentinel for HA + graceful degradation
 
import redis.sentinel
sentinel = redis.sentinel.Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
])
def get_metric(key):
try:
master = sentinel.master_for('mymaster', socket_timeout=0.1)
return master.get(key)
except redis.RedisError:
# Graceful degradation - fetch from database
return get_metric_from_db(key) - 
Message Queue SPOF
- Problem: Kafka broker failure stops real-time updates
 - Solution: Multi-broker Kafka cluster with replication
 
# Kafka with replication
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
default.replication.factor: 3
min.insync.replicas: 2 
17. Caching Implementation Strategy
Strong Answer: Multi-Level Caching Strategy:
# 1. Application-level caching
from functools import lru_cache
import redis
import json
class CacheService:
    def __init__(self):
        self.redis_client = redis.Redis(host='redis-cluster')
        self.local_cache = {}
    @lru_cache(maxsize=1000)
    def get_metric_definition(self, metric_name):
        """Cache metric metadata (rarely changes)"""
        return self.fetch_metric_definition(metric_name)
    def get_real_time_metric(self, metric_key):
        """Multi-level cache for real-time data"""
        # L1: Memory cache (100ms TTL)
        if metric_key in self.local_cache:
            data, timestamp = self.local_cache[metric_key]
            if time.time() - timestamp < 0.1:  # 100ms
                return data
        # L2: Redis cache (5s TTL)
        cached = self.redis_client.get(f"metric:{metric_key}")
        if cached:
            data = json.loads(cached)
            self.local_cache[metric_key] = (data, time.time())
            return data
        # L3: Database fallback
        data = self.fetch_from_database(metric_key)
        # Cache with appropriate TTL
        self.redis_client.setex(
            f"metric:{metric_key}",
            5,  # 5 second TTL
            json.dumps(data)
        )
        self.local_cache[metric_key] = (data, time.time())
        return data
2. CDN Caching for Static Assets:
// CloudFront configuration
{
  "Origins": [{
    "Id": "dashboard-origin",
    "DomainName": "dashboard-api.example.com",
    "CustomOriginConfig": {
      "HTTPPort": 443,
      "OriginProtocolPolicy": "https-only"
    }
  }],
  "DefaultCacheBehavior": {
    "TargetOriginId": "dashboard-origin",
    "ViewerProtocolPolicy": "redirect-to-https",
    "CachePolicyId": "custom-cache-policy",
    "TTL": {
      "DefaultTTL": 300,     // 5 minutes for API responses
      "MaxTTL": 3600         // 1 hour max
    }
  },
  "CacheBehaviors": [{
    "PathPattern": "/static/*",
    "TTL": {
      "DefaultTTL": 86400,   // 24 hours for static assets
      "MaxTTL": 31536000     // 1 year max
    }
  }]
}
3. Database Query Result Caching:
-- Materialized views for expensive aggregations
CREATE MATERIALIZED VIEW hourly_revenue AS
SELECT
    date_trunc('hour', order_timestamp) as hour,
    SUM(order_total) as revenue,
    COUNT(*) as order_count
FROM orders
WHERE order_timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY date_trunc('hour', order_timestamp);
-- Refresh every 5 minutes
CREATE OR REPLACE FUNCTION refresh_hourly_revenue()
RETURNS void AS $
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_revenue;
END;
$ LANGUAGE plpgsql;
SELECT cron.schedule('refresh-revenue', '*/5 * * * *', 'SELECT refresh_hourly_revenue();');
Cache Invalidation Strategy:
class CacheInvalidator:
    def __init__(self):
        self.redis_client = redis.Redis()
    def invalidate_on_order(self, order_data):
        """Invalidate relevant caches when new order arrives"""
        patterns_to_invalidate = [
            "metric:orders_per_minute",
            "metric:revenue_per_minute",
            f"metric:inventory:{order_data['product_id']}"
        ]
        for pattern in patterns_to_invalidate:
            # Use Redis pipeline for efficiency
            pipe = self.redis_client.pipeline()
            pipe.delete(pattern)
            pipe.publish('cache_invalidation', pattern)
            pipe.execute()
    def smart_invalidation(self, event_type, entity_id):
        """Invalidate based on event type"""
        invalidation_map = {
            'order_created': ['revenue', 'orders', 'inventory'],
            'user_login': ['active_users'],
            'inventory_update': ['inventory', 'stock_alerts']
        }
        metrics_to_invalidate = invalidation_map.get(event_type, [])
        for metric in metrics_to_invalidate:
            self.redis_client.delete(f"metric:{metric}:{entity_id}")
18. Database Design: SQL vs NoSQL
Strong Answer: Hybrid Approach - Use Both:
SQL (PostgreSQL) for:
- Transactional Data: Orders, users, inventory
 - ACID Requirements: Financial transactions
 - Complex Queries: Joins, aggregations
 - Data Consistency: Strong consistency needs
 
-- OLTP Database Schema
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    order_total DECIMAL(10,2) NOT NULL,
    order_timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    status VARCHAR(20) NOT NULL DEFAULT 'pending'
);
CREATE INDEX idx_orders_timestamp ON orders(order_timestamp);
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Partitioning for time-series data
CREATE TABLE orders_2025_06 PARTITION OF orders
FOR VALUES FROM ('2025-06-01') TO ('2025-07-01');
NoSQL (InfluxDB) for:
- Time-series Metrics: Performance data, system metrics
 - High Write Volume: Thousands of metrics per second
 - Retention Policies: Automatic data aging
 
# InfluxDB for metrics storage
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://influxdb:8086", token="my-token")
write_api = client.write_api(write_options=SYNCHRONOUS)
def write_metric(measurement, tags, fields):
    point = Point(measurement) \
        .tag("service", tags.get("service")) \
        .tag("region", tags.get("region")) \
        .field("value", fields["value"]) \
        .time(datetime.utcnow(), WritePrecision.S)
    write_api.write(bucket="metrics", record=point)
# Example usage
write_metric(
    measurement="orders_per_minute",
    tags={"service": "ecommerce", "region": "us-east-1"},
    fields={"value": 45}
)
Read/Write Load Balancing:
class DatabaseRouter:
    def __init__(self):
        self.primary_db = PostgreSQLConnection("primary-db")
        self.read_replicas = [
            PostgreSQLConnection("replica-1"),
            PostgreSQLConnection("replica-2"),
            PostgreSQLConnection("replica-3")
        ]
        self.current_replica = 0
    def get_read_connection(self):
        """Round-robin read replica selection"""
        replica = self.read_replicas[self.current_replica]
        self.current_replica = (self.current_replica + 1) % len(self.read_replicas)
        return replica
    def get_write_connection(self):
        """Always use primary for writes"""
        return self.primary_db
    def execute_read_query(self, query, params=None):
        try:
            return self.get_read_connection().execute(query, params)
        except Exception:
            # Fallback to primary if replica fails
            return self.primary_db.execute(query, params)
# Usage
@app.route('/api/metrics/historical')
def get_historical_metrics():
    # Use read replica for analytics queries
    db = router.get_read_connection()
    return db.execute("""
        SELECT date_trunc('hour', created_at) as hour,
               SUM(total) as revenue
        FROM orders
        WHERE created_at >= NOW() - INTERVAL '24 hours'
        GROUP BY hour
        ORDER BY hour
    """)
19. Scaling Strategy for 10x Traffic
Strong Answer: Scaling Priority Order:
1. Application Tier (Scale First)
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: dashboard-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: dashboard-api
  minReplicas: 3
  maxReplicas: 50
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Percent
          value: 100
          periodSeconds: 15
2. Database Scaling
# Read replica scaling + connection pooling
class DatabaseScaler:
    def __init__(self):
        self.connection_pools = {
            'primary': create_pool('primary-db', pool_size=20),
            'replicas': [
                create_pool('replica-1', pool_size=50),
                create_pool('replica-2', pool_size=50),
                create_pool('replica-3', pool_size=50),
            ]
        }
    def scale_read_capacity(self, target_qps):
        """Add more read replicas based on QPS"""
        current_capacity = len(self.connection_pools['replicas']) * 1000  # QPS per replica
        if target_qps > current_capacity * 0.8:  # 80% utilization threshold
            # Add new read replica
            new_replica = create_read_replica()
            self.connection_pools['replicas'].append(
                create_pool(new_replica, pool_size=50)
            )
    def implement_sharding(self):
        """Implement horizontal sharding for orders table"""
        shards = {
            'shard_1': 'orders_americas',
            'shard_2': 'orders_europe',
            'shard_3': 'orders_asia'
        }
        def get_shard(user_id):
            return shards[f'shard_{hash(user_id) % 3 + 1}']
3. Cache Scaling
# Redis Cluster for horizontal scaling
import rediscluster
redis_cluster = rediscluster.RedisCluster(
    startup_nodes=[
        {"host": "redis-1", "port": "7000"},
        {"host": "redis-2", "port": "7000"},
        {"host": "redis-3", "port": "7000"},
        {"host": "redis-4", "port": "7000"},
        {"host": "redis-5", "port": "7000"},
        {"host": "redis-6", "port": "7000"},
    ],
    decode_responses=True,
    skip_full_coverage_check=True
)
# Cache partitioning strategy
def cache_key_partition(metric_type, entity_id):
    """Partition cache keys for better distribution"""
    partition = hash(f"{metric_type}:{entity_id}") % 1000
    return f"{metric_type}:{partition}:{entity_id}"
4. CDN and Edge Caching
// CloudFlare Workers for edge computing
addEventListener("fetch", (event) => {
  event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
  const url = new URL(request.url)
  // Cache API responses at edge
  if (url.pathname.startsWith("/api/metrics/")) {
    const cacheKey = new Request(url.toString(), request)
    const cache = caches.default
    // Check edge cache first
    let response = await cache.match(cacheKey)
    if (!response) {
      // Fetch from origin
      response = await fetch(request)
      // Cache for 30 seconds
      const headers = new Headers(response.headers)
      headers.set("Cache-Control", "public, max-age=30")
      response = new Response(response.body, {
        status: response.status,
        statusText: response.statusText,
        headers: headers,
      })
      event.waitUntil(cache.put(cacheKey, response.clone()))
    }
    return response
  }
  return fetch(request)
}
5. Message Queue Scaling
# Kafka cluster scaling
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: metrics-kafka
spec:
  kafka:
    replicas: 9 # Scale from 3 to 9 brokers
    config:
      num.partitions: 50 # More partitions for parallelism
      default.replication.factor: 3
      min.insync.replicas: 2
    resources:
      requests:
        memory: 8Gi
        cpu: 2000m
      limits:
        memory: 16Gi
        cpu: 4000m
20. Failover Strategy for Database Outage
Strong Answer: Multi-Tier Failover Strategy:
# Automatic failover implementation
import time
import threading
from enum import Enum
class DatabaseState(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    FAILED = "failed"
class DatabaseFailoverManager:
    def __init__(self):
        self.primary_db = "primary-db.example.com"
        self.replica_dbs = [
            "replica-1.example.com",
            "replica-2.example.com",
            "replica-3.example.com"
        ]
        self.current_state = DatabaseState.HEALTHY
        self.current_primary = self.primary_db
        self.health_check_interval = 5  # seconds
    def health_check(self, db_host):
        """Check database health"""
        try:
            conn = psycopg2.connect(
                host=db_host,
                database="analytics",
                user="readonly",
                password="password",
                connect_timeout=3
            )
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            result = cursor.fetchone()
            conn.close()
            return result[0] == 1
        except Exception as e:
            logger.error(f"Health check failed for {db_host}: {e}")
            return False
    def promote_replica_to_primary(self, replica_host):
        """Promote replica to primary (manual intervention required)"""
        # This would typically involve:
        # 1. Stopping replication on chosen replica
        # 2. Updating DNS/load balancer to point to new primary
        # 3. Updating application config
        logger.critical(f"Promoting {replica_host} to primary")
        # Update Route 53 record to point to new primary
        route53 = boto3.client('route53')
        route53.change_resource_record_sets(
            HostedZoneId='Z123456789',
            ChangeBatch={
                'Changes': [{
                    'Action': 'UPSERT',
                    'ResourceRecordSet': {
                        'Name': 'primary-db.example.com',
                        'Type': 'CNAME',
                        'TTL': 60,
                        'ResourceRecords': [{'Value': replica_host}]
                    }
                }]
            }
        )
        self.current_primary = replica_host
    def circuit_breaker_fallback(self):
        """Fallback to cached data when database is unavailable"""
        logger.warning("Database unavailable, switching to read-only mode")
        # Serve from cache only
        app.config['READ_ONLY_MODE'] = True
        # Show banner to users
        return {
            "status": "degraded",
            "message": "Real-time data temporarily unavailable",
            "data_freshness": "cached_5_minutes_ago"
        }
    def monitor_and_failover(self):
        """Background thread for monitoring and automatic failover"""
        consecutive_failures = 0
        while True:
            if self.health_check(self.current_primary):
                consecutive_failures = 0
                self.current_state = DatabaseState.HEALTHY
                app.config['READ_ONLY_MODE'] = False
            else:
                consecutive_failures += 1
                logger.warning(f"Primary DB health check failed {consecutive_failures} times")
                if consecutive_failures >= 3:  # 15 seconds of failures
                    self.current_state = DatabaseState.FAILED
                    # Try to find healthy replica
                    healthy_replica = None
                    for replica in self.replica_dbs:
                        if self.health_check(replica):
                            healthy_replica = replica
                            break
                    if healthy_replica:
                        self.promote_replica_to_primary(healthy_replica)
                    else:
                        # All databases failed - enable circuit breaker
                        self.circuit_breaker_fallback()
            time.sleep(self.health_check_interval)
# Start monitoring in background
failover_manager = DatabaseFailoverManager()
monitor_thread = threading.Thread(target=failover_manager.monitor_and_failover)
monitor_thread.daemon = True
monitor_thread.start()
Emergency Procedures:
#!/bin/bash
# emergency-failover.sh
echo "=== EMERGENCY DATABASE FAILOVER ==="
echo "1. Checking primary database status..."
if ! pg_isready -h primary-db.example.com -p 5432; then
    echo "❌ Primary database is DOWN"
    echo "2. Finding healthy replica..."
    for replica in replica-1 replica-2 replica-3; do
        if pg_isready -h ${replica}.example.com -p 5432; then
            echo "✅ Found healthy replica: $replica"
            echo "3. Promoting replica to primary..."
            # Stop replication
            psql -h ${replica}.example.com -c "SELECT pg_promote();"
            echo "4. Updating DNS record..."
            aws route53 change-resource-record-sets \
                --hosted-zone-id Z123456789 \
                --change-batch file://failover-dns.json
            echo "5. Updating application config..."
            kubectl patch configmap app-config \
                --patch "{\"data\":{\"DATABASE_HOST\":\"${replica}.example.com\"}}"
            echo "6. Restarting application pods..."
            kubectl rollout restart deployment/dashboard-api
            echo "✅ Failover complete to $replica"
            exit 0
        fi
    done
    echo "❌ No healthy replicas found - enabling read-only mode"
    kubectl patch configmap app-config \
        --patch '{"data":{"READ_ONLY_MODE":"true"}}'
else
    echo "✅ Primary database is healthy"
fi
21. Circuit Breaker Implementation
Strong Answer: Circuit Breaker Pattern for External Services:
import time
import threading
from enum import Enum
from collections import deque
class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.lock = threading.Lock()
        # Track recent requests for metrics
        self.recent_requests = deque(maxlen=100)
    def __call__(self, func):
        def wrapper(*args, **kwargs):
            with self.lock:
                if self.state == CircuitState.OPEN:
                    # Check if timeout period has passed
                    if time.time() - self.last_failure_time >= self.timeout:
                        self.state = CircuitState.HALF_OPEN
                        self.failure_count = 0
                    else:
                        # Circuit is open - reject request
                        raise Exception("Circuit breaker is OPEN - service unavailable")
                if self.state == CircuitState.HALF_OPEN:
                    # Test with single request
                    try:
                        result = func(*args, **kwargs)
                        self.state = CircuitState.CLOSED
                        self.failure_count = 0
                        self.record_request(success=True)
                        return result
                    except self.expected_exception as e:
                        self.state = CircuitState.OPEN
                        self.failure_count += 1
                        self.last_failure_time = time.time()
                        self.record_request(success=False)
                        raise e
                # Normal operation (CLOSED state)
                try:
                    result = func(*args, **kwargs)
                    self.failure_count = 0  # Reset on success
                    self.record_request(success=True)
                    return result
                except self.expected_exception as e:
                    self.failure_count += 1
                    self.record_request(success=False)
                    if self.failure_count >= self.failure_threshold:
                        self.state = CircuitState.OPEN
                        self.last_failure_time = time.time()
                    raise e
        return wrapper
    def record_request(self, success):
        self.recent_requests.append({
            'timestamp': time.time(),
            'success': success
        })
    def get_metrics(self):
        now = time.time()
        recent = [r for r in self.recent_requests if now - r['timestamp'] < 300]  # Last 5 minutes
        total_requests = len(recent)
        successful_requests = sum(1 for r in recent if r['success'])
        return {
            'state': self.state.value,
            'failure_count': self.failure_count,
            'success_rate': successful_requests / total_requests if total_requests > 0 else 0,
            'total_requests_5min': total_requests
        }
# Usage in analytics dashboard
@CircuitBreaker(failure_threshold=3, timeout=30)
def get_inventory_data(product_ids):
    """Call to inventory service with circuit breaker"""
    response = requests.get(
        f"{INVENTORY_SERVICE_URL}/api/products",
        params={'ids': ','.join(product_ids)},
        timeout=5
    )
    response.raise_for_status()
    return response.json()
@CircuitBreaker(failure_threshold=5, timeout=60)
def get_user_analytics():
    """Call to analytics service with circuit breaker"""
    response = requests.get(f"{ANALYTICS_SERVICE_URL}/api/users/active", timeout=10)
    response.raise_for_status()
    return response.json()
# Fallback strategies
def get_dashboard_data():
    try:
        # Try to get real-time inventory data
        inventory_data = get_inventory_data(['1', '2', '3'])
    except Exception:
        # Fallback to cached data
        inventory_data = redis_client.get('inventory_cache')
        if not inventory_data:
            inventory_data = {"error": "Inventory data unavailable"}
    try:
        # Try to get user analytics
        user_data = get_user_analytics()
    except Exception:
        # Fallback to approximate data
        user_data = {"active_users": "~1000", "estimated": True}
    return {
        "inventory": inventory_data,
        "users": user_data,
        "timestamp": time.time()
    }
Service-Level Circuit Breaker Integration:
# Integration with Flask app
@app.route('/api/dashboard')
def dashboard_endpoint():
    # Check circuit breaker states
    circuit_states = {
        'inventory': inventory_circuit_breaker.get_metrics(),
        'analytics': analytics_circuit_breaker.get_metrics(),
    }
    # Include circuit breaker status in response
    response_data = get_dashboard_data()
    response_data['service_health'] = circuit_states
    # Set appropriate HTTP status based on circuit states
    if any(state['state'] == 'open' for state in circuit_states.values()):
        return jsonify(response_data), 206  # Partial content
    else:
        return jsonify(response_data), 200
# Monitoring endpoint for circuit breaker states
@app.route('/api/health/circuits')
def circuit_breaker_health():
    return jsonify({
        'inventory_service': inventory_circuit_breaker.get_metrics(),
        'analytics_service': analytics_circuit_breaker.get_metrics(),
    })
Part 4: Advanced SRE & Operations
22. API Response Time Investigation Process
Strong Answer: Systematic Investigation Approach:
Step 1: Immediate Assessment (First 2 minutes)
# Quick checks to understand scope
echo "=== INCIDENT RESPONSE CHECKLIST ==="
echo "1. Checking service status..."
curl -w "%{http_code} %{time_total}s" -s -o /dev/null https://api.example.com/health
echo "2. Checking recent deployments..."
kubectl get pods --sort-by=.metadata.creationTimestamp
echo "3. Checking error rates..."
# Query Prometheus for error rate spike
curl -s 'http://prometheus:9090/api/v1/query?query=rate(http_requests_total{status=~"5.."}[5m])'
Step 2: Resource Analysis (Minutes 2-5)
# Automated investigation script
import subprocess
import json
import requests
class IncidentInvestigator:
    def __init__(self):
        self.findings = []
    def check_resource_usage(self):
        """Check CPU, Memory, Disk I/O"""
        # CPU utilization
        cpu_query = 'avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)'
        cpu_data = self.query_prometheus(cpu_query)
        for result in cpu_data:
            cpu_usage = float(result['value'][1])
            if cpu_usage > 0.8:  # 80% threshold
                self.findings.append(f"High CPU: {result['metric']['pod']} at {cpu_usage:.2%}")
        # Memory usage
        memory_query = 'container_memory_usage_bytes / container_spec_memory_limit_bytes'
        memory_data = self.query_prometheus(memory_query)
        for result in memory_data:
            memory_usage = float(result['value'][1])
            if memory_usage > 0.85:  # 85% threshold
                self.findings.append(f"High Memory: {result['metric']['pod']} at {memory_usage:.2%}")
    def check_database_performance(self):
        """Check database connection pool, slow queries"""
        # Connection pool utilization
        db_conn_query = 'pg_stat_activity_count / pg_settings_max_connections'
        db_data = self.query_prometheus(db_conn_query)
        if db_data and float(db_data[0]['value'][1]) > 0.8:
            self.findings.append("Database connection pool near capacity")
        # Check for slow queries
        slow_queries = subprocess.run([
            'psql', '-h', 'postgres-primary', '-c',
            "SELECT query, query_start, state, waiting FROM pg_stat_activity WHERE state = 'active' AND query_start < NOW() - INTERVAL '30 seconds';"
        ], capture_output=True, text=True)
        if slow_queries.stdout and len(slow_queries.stdout.split('\n')) > 3:
            self.findings.append("Long-running queries detected")
    def check_external_dependencies(self):
        """Check downstream services"""
        services = ['inventory-service', 'payment-service', 'user-service']
        for service in services:
            try:
                response = requests.get(f'http://{service}/health', timeout=5)
                if response.status_code != 200 or response.elapsed.total_seconds() > 1.0:
                    self.findings.append(f"{service} health check failed or slow")
            except requests.RequestException:
                self.findings.append(f"{service} unreachable")
    def check_recent_changes(self):
        """Check recent deployments and config changes"""
        # Get recent pod restarts
        pods = subprocess.run([
            'kubectl', 'get', 'pods', '-o', 'json'
        ], capture_output=True, text=True)
        pod_data = json.loads(pods.stdout)
        for pod in pod_data['items']:
            restart_count = pod['status']['restartCount']
            if restart_count > 0:
                self.findings.append(f"Pod {pod['metadata']['name']} has {restart_count} restarts")
    def query_prometheus(self, query):
        """Helper to query Prometheus"""
        try:
            response = requests.get(
                'http://prometheus:9090/api/v1/query',
                params={'query': query},
                timeout=10
            )
            return response.json()['data']['result']
        except:
            return []
    def investigate(self):
        """Run full investigation"""
        print("🔍 Starting incident investigation...")
        self.check_resource_usage()
        self.check_database_performance()
        self.check_external_dependencies()
        self.check_recent_changes()
        print("\n📋 Investigation Summary:")
        if self.findings:
            for finding in self.findings:
                print(f"⚠️  {finding}")
        else:
            print("✅ No obvious issues found in automated checks")
        return self.findings
# Run investigation
investigator = IncidentInvestigator()
findings = investigator.investigate()
Step 3: Deep Dive Analysis (Minutes 5-15)
# Application-level investigation
echo "4. Checking application metrics..."
# Look for specific endpoint latency
kubectl exec -it $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') -- \
  curl -s localhost:8080/metrics | grep http_request_duration
# Check garbage collection metrics (for Java/Go apps)
kubectl logs $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') | \
  grep -i "gc\|garbage"
# Database query analysis
psql -h postgres-primary -c "
SELECT query, calls, total_time, mean_time, stddev_time
FROM pg_stat_statements
WHERE mean_time > 1000
ORDER BY mean_time DESC
LIMIT 10;"
# Check for lock contention
psql -h postgres-primary -c "
SELECT blocked_locks.pid AS blocked_pid,
       blocked_activity.usename AS blocked_user,
       blocking_locks.pid AS blocking_pid,
       blocking_activity.usename AS blocking_user,
       blocked_activity.query AS blocked_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON (blocked_locks.locktype = blocking_locks.locktype)
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;"
Step 4: Resolution and Communication
# Incident response actions
class IncidentResponse:
    def __init__(self):
        self.slack_webhook = "https://hooks.slack.com/services/..."
    def notify_team(self, message, severity="warning"):
        """Send alert to incident response channel"""
        payload = {
            "text": f"🚨 API Performance Incident",
            "attachments": [{
                "color": "danger" if severity == "critical" else "warning",
                "fields": [
                    {"title": "Issue", "value": message, "short": False},
                    {"title": "Runbook", "value": "https://wiki.company.com/incident-response", "short": True},
                    {"title": "Investigator", "value": "SRE On-Call", "short": True}
                ]
            }]
        }
        requests.post(self.slack_webhook, json=payload)
    def apply_immediate_fix(self, issue_type):
        """Apply quick fixes based on identified issue"""
        if issue_type == "high_cpu":
            # Scale up pods
            subprocess.run(['kubectl', 'scale', 'deployment/api', '--replicas=10'])
        elif issue_type == "database_load":
            # Enable read-only mode temporarily
            subprocess.run(['kubectl', 'patch', 'configmap/app-config',
                           '--patch', '{"data":{"READ_ONLY_MODE":"true"}}'])
        elif issue_type == "memory_leak":
            # Rolling restart
            subprocess.run(['kubectl', 'rollout', 'restart', 'deployment/api'])
        elif issue_type == "external_service":
            # Enable circuit breaker
            subprocess.run(['kubectl', 'patch', 'configmap/app-config',
                           '--patch', '{"data":{"CIRCUIT_BREAKER_ENABLED":"true"}}'])
# Usage
incident = IncidentResponse()
incident.notify_team("API latency increased from 100ms to 2s. Investigation in progress.")
# Based on findings, apply fixes
if "High CPU" in findings:
    incident.apply_immediate_fix("high_cpu")
    incident.notify_team("Applied fix: Scaled up API pods to handle load")
23. Error Budget Management Scenario
Strong Answer: Error Budget Analysis:
First, let me calculate the current situation:
- SLO: 99.5% uptime (0.5% error budget per month)
 - Current Error Rate: 20% increase = baseline + 20%
 - Deployment Impact: Caused the increase
 - Error Budget Status: Within budget (need to verify actual numbers)
 
Decision Framework:
class ErrorBudgetManager:
    def __init__(self):
        self.slo_target = 0.995  # 99.5% success rate
        self.error_budget = 1 - self.slo_target  # 0.5% error budget
    def calculate_current_burn_rate(self, error_rate, time_period_hours):
        """Calculate how fast we're burning error budget"""
        monthly_hours = 30 * 24  # 720 hours
        burn_rate = (error_rate * time_period_hours) / monthly_hours
        return burn_rate
    def assess_situation(self, baseline_error_rate, current_error_rate, hours_since_deploy):
        """Assess if we should keep feature or rollback"""
        # Calculate actual error rates
        error_increase = current_error_rate - baseline_error_rate
        # Calculate error budget consumption
        budget_burned = self.calculate_current_burn_rate(error_increase, hours_since_deploy)
        remaining_budget = self.error_budget - budget_burned
        # Project if this continues for the month
        monthly_projection = error_increase * (30 * 24) / hours_since_deploy
        assessment = {
            "current_error_rate": current_error_rate,
            "error_budget_burned": budget_burned,
            "remaining_budget": remaining_budget,
            "monthly_projection": monthly_projection,
            "exceeds_budget": monthly_projection > self.error_budget
        }
        return assessment
    def make_recommendation(self, assessment, business_impact):
        """Provide recommendation based on data"""
        if assessment["exceeds_budget"]:
            return {
                "action": "ROLLBACK",
                "reason": "Feature will exceed monthly error budget",
                "timeline": "Immediate"
            }
        elif assessment["remaining_budget"] < 0.001:  # Less than 0.1% budget left
            return {
                "action": "ENHANCED_MONITORING",
                "reason": "Close to budget limit, monitor closely",
                "conditions": "Rollback if error rate increases further"
            }
        else:
            return {
                "action": "KEEP_WITH_CONDITIONS",
                "reason": "Within error budget but requires monitoring",
                "conditions": [
                    "Implement feature flag for quick disable",
                    "Set up aggressive alerting",
                    "Daily budget review meetings"
                ]
            }
# Real scenario analysis
budget_manager = ErrorBudgetManager()
# Assume baseline was 0.1% error rate, now 1.2% (20% relative increase)
assessment = budget_manager.assess_situation(
    baseline_error_rate=0.001,  # 0.1%
    current_error_rate=0.012,   # 1.2%
    hours_since_deploy=6
)
recommendation = budget_manager.make_recommendation(assessment, business_impact="high")
Communication Strategy:
def communicate_decision(assessment, recommendation, stakeholders):
    """Structured communication to all stakeholders"""
    message = f"""
    📊 ERROR BUDGET ASSESSMENT - {datetime.now().strftime('%Y-%m-%d %H:%M')}
    🎯 SLO Target: 99.5% success rate (0.5% monthly error budget)
    📈 Current Situation:
    • Error rate: {assessment['current_error_rate']:.3%}
    • Budget burned: {assessment['error_budget_burned']:.3%}
    • Remaining budget: {assessment['remaining_budget']:.3%}
    • Monthly projection: {assessment['monthly_projection']:.3%}
    🔍 Analysis:
    {'❌ Will exceed budget' if assessment['exceeds_budget'] else '✅ Within budget'}
    🎯 Recommendation: {recommendation['action']}
    Reason: {recommendation['reason']}
    ⚡ Next Steps:
    """
    if recommendation['action'] == 'ROLLBACK':
        message += """
        1. Immediate rollback of feature
        2. Root cause analysis
        3. Fix implementation before retry
        4. Post-mortem scheduled
        """
    elif recommendation['action'] == 'KEEP_WITH_CONDITIONS':
        message += f"""
        1. Implement monitoring conditions
        2. Set up automated alerts
        3. Daily review meetings
        4. Feature flag for quick disable
        Conditions: {recommendation['conditions']}
        """
    # Send to different channels based on audience
    send_to_slack('#sre-team', message)
    send_to_email(stakeholders['engineering'], message)
    send_executive_summary(stakeholders['leadership'])
def send_executive_summary(executives):
    """Simplified version for executives"""
    summary = f"""
    Feature Deployment Impact Assessment
    Status: {'🔴 Action Required' if recommendation['action'] == 'ROLLBACK' else '🟡 Monitoring'}
    Decision: {recommendation['action']}
    Business Impact: Minimal customer impact, within reliability targets
    Engineering is {recommendation['action'].lower().replace('_', ' ')} and monitoring closely.
    """
    send_to_email(executives, summary)
Implementation Plan:
# Feature flag for quick disable
apiVersion: v1
kind: ConfigMap
metadata:
  name: feature-flags
data:
  new-feature-enabled: "true"
  error-budget-threshold: "0.004" # 0.4% - alert threshold
---
# Enhanced monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: error-budget-alerts
spec:
  groups:
    - name: error-budget
      rules:
        - alert: ErrorBudgetBurnRateHigh
          expr: |
            (
              rate(http_requests_total{status=~"5.."}[1h]) / 
              rate(http_requests_total[1h])
            ) > 0.004
          for: 5m
          labels:
            severity: critical
          annotations:
            summary: "Error budget burn rate too high"
        - alert: ErrorBudgetProjectedExceed
          expr: |
            (
              rate(http_requests_total{status=~"5.."}[6h]) / 
              rate(http_requests_total[6h])
            ) * 720 > 0.005  # 720 hours in month, 0.5% budget
          for: 10m
          labels:
            severity: warning
          annotations:
            summary: "Projected to exceed monthly error budget"
24. Load Testing Strategy Design
Strong Answer: Comprehensive Load Testing Strategy:
# Load test implementation using Locust
from locust import HttpUser, task, between
import random
import json
class DashboardUser(HttpUser):
    wait_time = between(1, 5)  # 1-5 second wait between requests
    def on_start(self):
        """Setup user session"""
        # Authenticate user
        response = self.client.post("/api/auth/login", json={
            "username": f"loadtest_user_{random.randint(1, 1000)}",
            "password": "test_password"
        })
        if response.status_code == 200:
            self.auth_token = response.json().get("token")
            self.client.headers.update({"Authorization": f"Bearer {self.auth_token}"})
    @task(3)  # 3x weight - most common operation
    def view_realtime_dashboard(self):
        """Simulate viewing real-time dashboard"""
        self.client.get("/api/metrics/realtime")
    @task(2)  # 2x weight
    def view_historical_data(self):
        """Simulate historical data requests"""
        periods = ["1h", "6h", "24h", "7d"]
        metrics = ["revenue", "orders", "active_users"]
        period = random.choice(periods)
        metric = random.choice(metrics)
        self.client.get(f"/api/metrics/historical", params={
            "metric": metric,
            "period": period,
            "granularity": "1m" if period in ["1h", "6h"] else "1h"
        })
    @task(1)  # 1x weight - less common
    def configure_alerts(self):
        """Simulate alert configuration"""
        self.client.post("/api/alerts", json={
            "metric": "inventory_level",
            "threshold": random.randint(50, 200),
            "product_id": f"product_{random.randint(1, 100)}"
        })
    @task(4)  # 4x weight - WebSocket simulation via polling
    def websocket_simulation(self):
        """Simulate WebSocket updates via HTTP polling"""
        self.client.get("/api/metrics/stream", params={
            "last_update": "2025-06-30T10:30:00Z"
        })
class BurstTrafficUser(DashboardUser):
    """Simulates traffic spikes during incidents"""
    wait_time = between(0.1, 0.5)  # Much faster requests
    @task(5)
    def rapid_dashboard_refresh(self):
        """Rapid refreshing during incidents"""
        self.client.get("/api/metrics/realtime")
# Load test scenarios
class LoadTestScenarios:
    def __init__(self):
        self.scenarios = {
            "normal_load": {
                "users": 100,
                "spawn_rate": 10,
                "duration": "10m",
                "user_class": DashboardUser
            },
            "peak_load": {
                "users": 500,
                "spawn_rate": 50,
                "duration": "15m",
                "user_class": DashboardUser
            },
            "stress_test": {
                "users": 1000,
                "spawn_rate": 100,
                "duration": "20m",
                "user_class": DashboardUser
            },
            "spike_test": {
                "users": 200,
                "spawn_rate": 200,  # Instant spike
                "duration": "5m",
                "user_class": BurstTrafficUser
            },
            "endurance_test": {
                "users": 300,
                "spawn_rate": 30,
                "duration": "2h",  # Long running
                "user_class": DashboardUser
            }
        }
Infrastructure Load Testing:
#!/bin/bash
# load-test-runner.sh
echo "🚀 Starting Analytics Dashboard Load Tests"
# 1. Baseline Performance Test
echo "📊 Running baseline performance test..."
locust -f dashboard_load_test.py \
    --users 50 \
    --spawn-rate 10 \
    --run-time 5m \
    --host https://api-staging.example.com \
    --html reports/baseline_$(date +%Y%m%d_%H%M%S).html
# 2. Database Load Test
echo "🗃️ Testing database performance..."
# Simulate heavy analytics queries
for i in {1..100}; do
    curl -s "https://api-staging.example.com/api/metrics/historical?period=30d&metric=revenue" &
done
wait
# 3. WebSocket Load Test
echo "🔌 Testing WebSocket capacity..."
# Use websocket-king for WebSocket load testing
websocket-king \
    --url wss://api-staging.example.com/metrics/stream \
    --connections 1000 \
    --duration 10m \
    --rate 10 \
    --output ws_load_report.json
# 4. Cache Performance Test
echo "💾 Testing cache performance..."
# Warm up cache, then test cache hit rates
redis-cli --latency-history -h redis-staging.example.com
# 5. Auto-scaling Test
echo "📈 Testing auto-scaling behavior..."
# Gradually increase load to trigger auto-scaling
locust -f dashboard_load_test.py \
    --users 1000 \
    --spawn-rate 20 \
    --run-time 30m \
    --host https://api-staging.example.com \
    --html reports/autoscale_test_$(date +%Y%m%d_%H%M%S).html
echo "✅ Load testing complete. Check reports/ directory for results."
Performance Metrics Collection:
# Custom metrics collection during load testing
import psutil
import time
import json
from datetime import datetime
class PerformanceMonitor:
    def __init__(self):
        self.metrics = []
        self.monitoring = False
    def start_monitoring(self):
        """Start collecting system metrics"""
        self.monitoring = True
        while self.monitoring:
            metrics = {
                "timestamp": datetime.now().isoformat(),
                "cpu_percent": psutil.cpu_percent(interval=1),
                "memory_percent": psutil.virtual_memory().percent,
                "disk_io": psutil.disk_io_counters()._asdict(),
                "network_io": psutil.net_io_counters()._asdict(),
                "connections": len(psutil.net_connections()),
            }
            # Database metrics
            metrics["db_connections"] = self.get_db_connections()
            metrics["cache_hit_rate"] = self.get_cache_hit_rate()
            self.metrics.append(metrics)
            time.sleep(5)  # Collect every 5 seconds
    def get_db_connections(self):
        """Get database connection count"""
        try:
            import psycopg2
            conn = psycopg2.connect("postgresql://user:pass@db:5432/analytics")
            cursor = conn.cursor()
            cursor.execute("SELECT count(*) FROM pg_stat_activity;")
            return cursor.fetchone()[0]
        except:
            return None
    def get_cache_hit_rate(self):
        """Get Redis cache hit rate"""
        try:
            import redis
            r = redis.Redis(host='redis-cluster')
            info = r.info('stats')
            hits = info['keyspace_hits']
            misses = info['keyspace_misses']
            return hits / (hits + misses) if (hits + misses) > 0 else 0
        except:
            return None
    def stop_monitoring(self):
        """Stop monitoring and save results"""
        self.monitoring = False
        # Save metrics to file
        with open(f"performance_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
            json.dump(self.metrics, f, indent=2)
        # Generate summary report
        self.generate_summary()
    def generate_summary(self):
        """Generate performance summary"""
        if not self.metrics:
            return
        cpu_values = [m['cpu_percent'] for m in self.metrics if m['cpu_percent']]
        memory_values = [m['memory_percent'] for m in self.metrics if m['memory_percent']]
        summary = {
            "test_duration": len(self.metrics) * 5,  # 5 second intervals
            "avg_cpu": sum(cpu_values) / len(cpu_values),
            "max_cpu": max(cpu_values),
            "avg_memory": sum(memory_values) / len(memory_values),
            "max_memory": max(memory_values),
            "peak_connections": max([m.get('connections', 0) for m in self.metrics])
        }
        print("📊 Performance Test Summary:")
        print(f"   Duration: {summary['test_duration']} seconds")
        print(f"   Avg CPU: {summary['avg_cpu']:.1f}%")
        print(f"   Max CPU: {summary['max_cpu']:.1f}%")
        print(f"   Avg Memory: {summary['avg_memory']:.1f}%")
        print(f"   Max Memory: {summary['max_memory']:.1f}%")
        print(f"   Peak Connections: {summary['peak_connections']}")
# Usage with load test
monitor = PerformanceMonitor()
import threading
# Start monitoring in background
monitor_thread = threading.Thread(target=monitor.start_monitoring)
monitor_thread.start()
# Run load test
# ... run locust or other load testing tool ...
# Stop monitoring
monitor.stop_monitoring()
monitor_thread.join()
Test Criteria and Success Metrics:
# Load test success criteria
load_test_criteria:
  response_time:
    p50: < 200ms
    p95: < 500ms
    p99: < 1000ms
  throughput:
    normal_load: > 1000 RPS
    peak_load: > 5000 RPS
  error_rate:
    max_acceptable: 0.1%
  resource_utilization:
    cpu: < 80%
    memory: < 85%
    database_connections: < 80% of pool
  auto_scaling:
    scale_up_time: < 2 minutes
    scale_down_time: < 5 minutes
  cache_performance:
    hit_rate: > 90%
    eviction_rate: < 5%
# Failure conditions (auto-stop test)
failure_conditions:
  - error_rate > 1% for 2 minutes
  - p99_latency > 5000ms for 5 minutes
  - cpu_utilization > 95% for 3 minutes
  - database_connection_pool > 95%
25. Go Service CPU Investigation
Strong Answer: Systematic CPU Investigation Process:
// 1. Enable pprof in Go service for CPU profiling
package main
import (
    "context"
    "log"
    "net/http"
    _ "net/http/pprof"  // Import pprof
    "runtime"
    "time"
)
func main() {
    // Start pprof server
    go func() {
        log.Println("Starting pprof server on :6060")
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    // Set GOMAXPROCS to container CPU limit
    runtime.GOMAXPROCS(2)  // Adjust based on container resources
    // Your application code
    startApplication()
}
// 2. Add CPU monitoring middleware
func CPUMonitoringMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        // Record CPU usage before request
        var rusageBefore syscall.Rusage
        syscall.Getrusage(syscall.RUSAGE_SELF, &rusageBefore)
        next.ServeHTTP(w, r)
        // Record CPU usage after request
        var rusageAfter syscall.Rusage
        syscall.Getrusage(syscall.RUSAGE_SELF, &rusageAfter)
        duration := time.Since(start)
        cpuTime := time.Duration(rusageAfter.Utime-rusageBefore.Utime) * time.Microsecond
        // Log high CPU requests
        if cpuTime > 100*time.Millisecond {
            log.Printf("High CPU request: %s %s - Duration: %v, CPU: %v",
                r.Method, r.URL.Path, duration, cpuTime)
        }
    })
}
Investigation Tools and Commands:
#!/bin/bash
# cpu-investigation.sh
echo "🔍 Investigating Go service CPU usage..."
# 1. Get current CPU profile (30 seconds)
echo "📊 Collecting CPU profile..."
go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile?seconds=30
# 2. Check for goroutine leaks
echo "🧵 Checking goroutine count..."
curl -s http://localhost:6060/debug/pprof/goroutine?debug=1 | head -20
# 3. Memory allocation profile (may cause CPU spikes)
echo "💾 Checking memory allocations..."
go tool pprof http://localhost:6060/debug/pprof/allocs
# 4. Check GC performance
echo "🗑️ Checking garbage collection stats..."
curl -s http://localhost:6060/debug/vars | jq '.memstats'
# 5. Container-level CPU investigation
echo "🐳 Container CPU stats..."
docker stats --no-stream $(docker ps --filter "name=go-service" --format "{{.Names}}")
# 6. Process-level analysis
echo "⚙️ Process CPU breakdown..."
top -H -p $(pgrep go-service) -n 1
# 7. strace for system call analysis
echo "🔧 System call analysis (10 seconds)..."
timeout 10s strace -c -p $(pgrep go-service)
Code-Level Optimizations:
// Common CPU bottleneck fixes
// 1. Fix: Inefficient JSON parsing
// BEFORE - Slow JSON handling
func processRequestSlow(w http.ResponseWriter, r *http.Request) {
    var data map[string]interface{}
    body, _ := ioutil.ReadAll(r.Body)
    json.Unmarshal(body, &data)
    // Process data...
}
// AFTER - Optimized JSON handling
type RequestData struct {
    UserID string `json:"user_id"`
    Action string `json:"action"`
    // Define specific fields instead of interface{}
}
func processRequestFast(w http.ResponseWriter, r *http.Request) {
    var data RequestData
    decoder := json.NewDecoder(r.Body)
    decoder.DisallowUnknownFields()  // Faster parsing
    if err := decoder.Decode(&data); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    // Process typed data...
}
// 2. Fix: CPU-intensive loops
// BEFORE - O(n²) algorithm
func findDuplicatesSlow(items []string) []string {
    var duplicates []string
    for i := 0; i < len(items); i++ {
        for j := i + 1; j < len(items); j++ {
            if items[i] == items[j] {
                duplicates = append(duplicates, items[i])
                break
            }
        }
}
return duplicates
}
// AFTER - O(n) algorithm using map
func findDuplicatesFast(items []string) []string {
seen := make(map[string]bool)
var duplicates []string
    for _, item := range items {
        if seen[item] {
            duplicates = append(duplicates, item)
        } else {
            seen[item] = true
        }
    }
    return duplicates
}
// 3. Fix: Excessive string concatenation
// BEFORE - Creates new strings repeatedly
func buildResponseSlow(data []Record) string {
var result string
for \_, record := range data {
result += record.ID + "," + record.Name + "\n" // Slow!
}
return result
}
// AFTER - Use strings.Builder for efficiency
func buildResponseFast(data []Record) string {
var builder strings.Builder
builder.Grow(len(data) \* 50) // Pre-allocate capacity
    for _, record := range data {
        builder.WriteString(record.ID)
        builder.WriteString(",")
        builder.WriteString(record.Name)
        builder.WriteString("\n")
    }
    return builder.String()
}
// 4. Fix: Goroutine leaks
// BEFORE - Goroutines without proper cleanup
func handleRequestsLeaky() {
for {
go func() {
// Long-running operation without context cancellation
processData() // Never exits!
}()
}
}
// AFTER - Proper goroutine management
func handleRequestsProper(ctx context.Context) {
semaphore := make(chan struct{}, 100) // Limit concurrent goroutines
    for {
        select {
        case <-ctx.Done():
            return
        default:
            semaphore <- struct{}{} // Acquire
            go func() {
                defer func() { <-semaphore }() // Release
                // Use context for cancellation
                processDataWithContext(ctx)
            }()
        }
    }
}
// 5. Fix: Inefficient database queries in loop
// BEFORE - N+1 query problem
func getUserDataSlow(userIDs []string) []UserData {
var users []UserData
for \_, id := range userIDs {
user := db.QueryUser(id) // Database hit per user!
users = append(users, user)
}
return users
}
// AFTER - Batch database queries
func getUserDataFast(userIDs []string) []UserData {
// Single query for all users
query := "SELECT \* FROM users WHERE id IN (" +
strings.Join(userIDs, ",") + ")"
return db.QueryUsers(query)
}
Memory and GC Optimization:
// 6. Optimize garbage collection pressure
type MetricsCollector struct {
    // BEFORE - Creates garbage
    // metrics []map[string]interface{}
    // AFTER - Use object pools and typed structs
    metricPool sync.Pool
    metrics    []Metric
}
type Metric struct {
    Name      string
    Value     float64
    Timestamp int64
}
func NewMetricsCollector() *MetricsCollector {
    mc := &MetricsCollector{
        metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
    }
    mc.metricPool = sync.Pool{
        New: func() interface{} {
            return &Metric{}
        },
    }
    return mc
}
func (mc *MetricsCollector) AddMetric(name string, value float64) {
    metric := mc.metricPool.Get().(*Metric)
    metric.Name = name
    metric.Value = value
    metric.Timestamp = time.Now().Unix()
    mc.metrics = append(mc.metrics, *metric)
    // Return to pool
    mc.metricPool.Put(metric)
}
// 7. CPU profiling integration
func enableContinuousProfiling() {
    // Enable continuous CPU profiling
    if os.Getenv("ENABLE_PROFILING") == "true" {
        go func() {
            for {
                f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
                if err != nil {
                    log.Printf("Could not create CPU profile: %v", err)
                    time.Sleep(30 * time.Second)
                    continue
                }
                pprof.StartCPUProfile(f)
                time.Sleep(30 * time.Second)
                pprof.StopCPUProfile()
                f.Close()
                // Upload to object storage for analysis
                uploadProfile(f.Name())
            }
        }()
    }
}
Monitoring and Alerting:
# Prometheus rules for Go service CPU monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: go-service-cpu-alerts
spec:
  groups:
    - name: go-service-performance
      rules:
        - alert: GoServiceHighCPU
          expr: |
            (
              rate(container_cpu_usage_seconds_total{pod=~"go-service-.*"}[5m]) 
            ) > 0.8
          for: 5m
          labels:
            severity: warning
            service: go-service
          annotations:
            summary: "Go service CPU usage above 80%"
            description: "Pod {{ $labels.pod }} CPU usage is {{ $value }}%"
        - alert: GoServiceGoroutineLeak
          expr: |
            go_goroutines{job="go-service"} > 10000
          for: 10m
          labels:
            severity: critical
          annotations:
            summary: "Potential goroutine leak detected"
        - alert: GoServiceGCPressure
          expr: |
            rate(go_gc_duration_seconds_sum[5m]) > 0.1
          for: 5m
          labels:
            severity: warning
          annotations:
            summary: "High GC pressure in Go service"
            description: "GC taking {{ $value }}s per collection cycle"
        - alert: GoServiceMemoryLeak
          expr: |
            go_memstats_heap_inuse_bytes / go_memstats_heap_sys_bytes > 0.9
          for: 15m
          labels:
            severity: critical
          annotations:
            summary: "Potential memory leak in Go service"
Performance Testing and Validation:
// Benchmark tests to validate optimizations
func BenchmarkProcessRequestSlow(b *testing.B) {
    data := generateTestData(1000)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        processRequestSlow(data)
    }
}
func BenchmarkProcessRequestFast(b *testing.B) {
    data := generateTestData(1000)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        processRequestFast(data)
    }
}
// Run benchmarks with memory profiling
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
// CPU profiling analysis script
func analyzeCPUProfile() {
    // go tool pprof cpu.prof
    // Commands in pprof:
    // (pprof) top20          - Show top 20 CPU consumers
    // (pprof) list function  - Show source code with CPU usage
    // (pprof) web           - Generate web visualization
    // (pprof) flamegraph    - Generate flame graph
}
26. Microservices vs Monolithic Architecture
Strong Answer: Car Pool vs Train Analogy Explanation:
Microservices (Car Pool Approach):
🚗 🚗 🚗 🚗 🚗  (Independent cars)
Each car (service) can:
- Take different routes
- Stop independently
- Break down without affecting others
- Scale by adding more cars
- Use different fuel types (technologies)
Monolithic (Train Approach):
🚂-🚃-🚃-🚃-🚃  (Connected train)
The train (application):
- All cars must follow the same route
- If engine fails, entire train stops
- All cars must move together
- Scale by making train longer or faster
- Single fuel type for entire train
For Analytics Dashboard - Decision Framework:
# Decision matrix for architecture choice
class ArchitectureDecision:
    def __init__(self):
        self.factors = {
            'team_size': 0,
            'complexity': 0,
            'scalability_needs': 0,
            'technology_diversity': 0,
            'deployment_frequency': 0,
            'operational_maturity': 0
        }
    def assess_microservices_fit(self, dashboard_requirements):
        """Assess if microservices are appropriate"""
        # Analytics dashboard components
        services = {
            'metrics_collector': {
                'responsibility': 'Collect metrics from various sources',
                'scalability': 'High - handles high volume ingestion',
                'technology': 'Go - for performance'
            },
            'data_processor': {
                'responsibility': 'Process and aggregate metrics',
                'scalability': 'Medium - CPU intensive operations',
                'technology': 'Python - for data processing libraries'
            },
            'api_gateway': {
                'responsibility': 'Serve dashboard APIs',
                'scalability': 'High - many concurrent users',
                'technology': 'Node.js - for async I/O'
            },
            'notification_service': {
                'responsibility': 'Send alerts and notifications',
                'scalability': 'Low - occasional alerts',
                'technology': 'Python - for integrations'
            },
            'frontend_bff': {
                'responsibility': 'Backend for Frontend',
                'scalability': 'Medium - aggregates data for UI',
                'technology': 'React/TypeScript'
            }
        }
        return self.evaluate_services(services)
    def evaluate_services(self, services):
        """Evaluate microservices approach"""
        microservices_benefits = [
            "Independent scaling per service",
            "Technology diversity (Go, Python, Node.js)",
            "Team autonomy - different teams own different services",
            "Fault isolation - metrics collection failure doesn't break UI",
            "Independent deployments - can update notification without affecting API"
        ]
        microservices_challenges = [
            "Network latency between services",
            "Data consistency across services",
            "Distributed system complexity",
            "Service discovery and load balancing",
            "Monitoring and debugging across services"
        ]
        return {
            'benefits': microservices_benefits,
            'challenges': microservices_challenges,
            'recommendation': self.make_recommendation()
        }
    def make_recommendation(self):
        """Make architecture recommendation for analytics dashboard"""
        # For analytics dashboard specifically:
        if self.is_early_stage():
            return {
                'choice': 'MODULAR_MONOLITH',
                'reason': 'Start simple, can extract services later',
                'structure': self.modular_monolith_structure()
            }
        else:
            return {
                'choice': 'MICROSERVICES',
                'reason': 'Scale and team benefits outweigh complexity',
                'structure': self.microservices_structure()
            }
    def modular_monolith_structure(self):
        """Modular monolith approach - best of both worlds"""
        return {
            'structure': """
            analytics-dashboard/
            ├── cmd/                    # Application entry points
            ├── internal/
            │   ├── metrics/           # Metrics collection module
            │   ├── processing/        # Data processing module
            │   ├── api/              # API handling module
            │   ├── notifications/    # Alert module
            │   └── dashboard/        # UI serving module
            ├── pkg/                   # Shared libraries
            └── deployments/          # Single deployment unit
            """,
            'benefits': [
                'Single deployment and testing',
                'Easier debugging and development',
                'No network latency between modules',
                'Simpler operational overhead',
                'Can extract to microservices later'
            ]
        }
    def microservices_structure(self):
        """Full microservices approach"""
        return {
            'structure': """
            Analytics Platform Microservices:
            ┌─────────────────┐    ┌─────────────────┐
            │  Frontend SPA   │    │   API Gateway   │
            │    (React)      │◄──►│    (Kong)       │
            └─────────────────┘    └─────────┬───────┘
                                             │
                        ┌────────────────────┼────────────────────┐
                        │                    │                    │
            ┌───────────▼────────┐ ┌─────────▼─────────┐ ┌─────────▼─────────┐
            │  Metrics Collector │ │  Data Processor   │ │ Notification Svc  │
            │      (Go)          │ │    (Python)       │ │    (Python)       │
            └─────────┬ ──────────┘ └─────────┬─────────┘ └───────────────────┘
                      │                      │
            ┌─────────▼──────────────────────▼─────────┐
            │           Message Queue                  │
            │           (Kafka/Redis)                  │
            └──────────────────────────────────────────┘
            """,
            'communication': 'Async messaging + HTTP APIs',
            'data_strategy': 'Event sourcing with CQRS'
        }
    def is_early_stage(self):
        """Determine if project is in early stage"""
        return (
            self.factors['team_size'] < 10 and
            self.factors['operational_maturity'] < 3
        )
# Usage for analytics dashboard
decision = ArchitectureDecision()
recommendation = decision.assess_microservices_fit({
    'expected_users': 500,
    'data_volume': 'high',
    'real_time_requirements': True,
    'team_size': 8,
    'deployment_frequency': 'daily'
})
Implementation Examples:
Microservices Implementation:
// Metrics Collector Service (Go)
package main
type MetricsCollector struct {
    kafka    *kafka.Producer
    redis    *redis.Client
    handlers map[string]MetricHandler
}
func (mc *MetricsCollector) CollectMetric(metric Metric) error {
    // Process metric
    processed := mc.handlers[metric.Type].Process(metric)
    // Publish to message queue for other services
    return mc.kafka.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &metric.Type,
            Partition: kafka.PartitionAny,
        },
        Value: processed.ToJSON(),
    }, nil)
}
# Data Processor Service (Python)
import asyncio
from kafka import KafkaConsumer
class DataProcessor:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'metrics-topic',
            bootstrap_servers=['kafka:9092'],
            group_id='data-processors'
        )
    async def process_metrics(self):
        for message in self.consumer:
            metric = Metric.from_json(message.value)
            # Process and aggregate
            aggregated = await self.aggregate_metric(metric)
            # Store in time-series database
            await self.store_metric(aggregated)
            # Trigger alerts if needed
            await self.check_alerts(aggregated)
Monolithic Implementation:
# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
class AnalyticsDashboard:
    def __init__(self):
        self.metrics = metrics.MetricsModule()
        self.processor = processing.ProcessingModule()
        self.api = api.APIModule()
        self.notifications = notifications.NotificationModule()
    def handle_metric(self, raw_metric):
        # All in same process - no network calls
        metric = self.metrics.parse(raw_metric)
        processed = self.processor.aggregate(metric)
        # Check for alerts
        if self.processor.check_thresholds(processed):
            self.notifications.send_alert(processed)
        return processed
# Single deployment with clear module boundaries
# Can be extracted to separate services later
When to Choose Each Approach:
| Factor | Monolith | Microservices | 
|---|---|---|
| Team Size | < 10 developers | > 10 developers | 
| Complexity | Simple-medium | Complex domain | 
| Scale | < 1M requests/day | > 10M requests/day | 
| Technology | Single stack preferred | Multiple technologies needed | 
| Deployment | Weekly/monthly | Multiple times per day | 
| Data Consistency | Strong consistency needed | Eventual consistency OK | 
For Analytics Dashboard Specifically:
- Early Stage: Start with modular monolith
 - Growth Stage: Extract high-scale components (metrics collector) first
 - Mature Stage: Full microservices with proper DevOps practices
 
27. Database Sharding Implementation
Strong Answer: Library Analogy Explanation:
📚 Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout
📚📚📚 Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z
Each library (shard) operates independently but part of same system
Sharding Strategy for Analytics Dashboard:
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List
class DatabaseSharding:
    def __init__(self):
        self.shards = {
            'shard_americas': {
                'host': 'db-americas.example.com',
                'regions': ['us', 'ca', 'mx', 'br'],
                'connection': self.create_connection('db-americas')
            },
            'shard_europe': {
                'host': 'db-europe.example.com',
                'regions': ['uk', 'de', 'fr', 'es'],
                'connection': self.create_connection('db-europe')
            },
            'shard_asia': {
                'host': 'db-asia.example.com',
                'regions': ['jp', 'sg', 'au', 'in'],
                'connection': self.create_connection('db-asia')
            }
        }
        # Time-based sharding for metrics
        self.time_shards = {
            'metrics_current': 'Last 7 days - hot data',
            'metrics_recent': 'Last 30 days - warm data',
            'metrics_archive': 'Older than 30 days - cold data'
        }
    def get_user_shard(self, user_id: str) -> str:
        """Determine shard based on user ID hash"""
        hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        shard_index = hash_value % len(self.shards)
        return list(self.shards.keys())[shard_index]
    def get_region_shard(self, region: str) -> str:
        """Determine shard based on geographical region"""
        for shard_name, shard_info in self.shards.items():
            if region.lower() in shard_info['regions']:
                return shard_name
        return 'shard_americas'  # Default fallback
    def get_time_shard(self, timestamp: datetime.datetime) -> str:
        """Determine shard based on data age"""
        now = datetime.datetime.now()
        age = now - timestamp
        if age.days <= 7:
            return 'metrics_current'
        elif age.days <= 30:
            return 'metrics_recent'
        else:
            return 'metrics_archive'
    def route_query(self, query_type: str, **kwargs):
        """Route queries to appropriate shard"""
        if query_type == 'user_orders':
            shard = self.get_user_shard(kwargs['user_id'])
            return self.execute_query(shard, query_type, **kwargs)
        elif query_type == 'regional_metrics':
            shard = self.get_region_shard(kwargs['region'])
            return self.execute_query(shard, query_type, **kwargs)
        elif query_type == 'historical_data':
            # Query multiple time shards and aggregate
            return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])
        elif query_type == 'cross_shard_analytics':
            # Fan-out query to all shards
            return self.fan_out_query(query_type, **kwargs)
    def query_time_shards(self, start_date, end_date):
        """Query across time-based shards"""
        results = []
        for shard_name in self.time_shards.keys():
            try:
                shard_result = self.execute_query(shard_name, 'time_range_query',
                                                start_date=start_date, end_date=end_date)
                results.extend(shard_result)
            except ShardUnavailableError:
                # Handle shard failures gracefully
                self.log_shard_failure(shard_name)
                continue
        return self.aggregate_results(results)
    def fan_out_query(self, query_type: str, **kwargs):
        """Execute query across all shards and aggregate results"""
        import concurrent.futures
        results = {}
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
            # Submit queries to all shards concurrently
            future_to_shard = {
                executor.submit(self.execute_query, shard_name, query_type, **kwargs): shard_name
                for shard_name in self.shards.keys()
            }
            for future in concurrent.futures.as_completed(future_to_shard):
                shard_name = future_to_shard[future]
                try:
                    result = future.result(timeout=30)  # 30 second timeout
                    results[shard_name] = result
                except Exception as e:
                    self.log_query_failure(shard_name, e)
                    results[shard_name] = None
        return self.aggregate_cross_shard_results(results)
# Shard-aware query examples
class ShardedAnalyticsQueries:
    def __init__(self, sharding: DatabaseSharding):
        self.sharding = sharding
    def get_user_orders(self, user_id: str):
        """Get orders for specific user"""
        return self.sharding.route_query('user_orders', user_id=user_id)
    def get_regional_sales(self, region: str, date_range: tuple):
        """Get sales data for specific region"""
        return self.sharding.route_query('regional_metrics',
                                       region=region,
                                       start_date=date_range[0],
                                       end_date=date_range[1])
    def get_global_metrics(self, metric_type: str):
        """Get global metrics across all shards"""
        return self.sharding.route_query('cross_shard_analytics',
                                       metric_type=metric_type)
    def get_historical_trends(self, days_back: int):
        """Get historical data across time shards"""
        end_date = datetime.datetime.now()
        start_date = end_date - datetime.timedelta(days=days_back)
        return self.sharding.query_time_shards(start_date, end_date)
Sharding Implementation with PostgreSQL:
-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 2: Europe
CREATE TABLE orders_europe (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 3: Asia
CREATE TABLE orders_asia (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_europe
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');
CREATE USER MAPPING FOR postgres
    SERVER shard_europe
    OPTIONS (user 'analytics_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
    id UUID,
    user_id UUID,
    region VARCHAR(2),
    order_total DECIMAL(10,2),
    created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');
-- View for cross-shard queries
CREATE VIEW orders_global AS
    SELECT 'americas' as shard, * FROM orders_americas
    UNION ALL
    SELECT 'europe' as shard, * FROM orders_europe_remote
    UNION ALL
    SELECT 'asia' as shard, * FROM orders_asia_remote;
Sharding Middleware in Application:
# Application-level sharding middleware
class ShardingMiddleware:
    def __init__(self, app):
        self.app = app
        self.sharding = DatabaseSharding()
    def __call__(self, environ, start_response):
        # Extract sharding context from request
        request = Request(environ)
        # Determine shard based on request
        if 'user_id' in request.args:
            shard = self.sharding.get_user_shard(request.args['user_id'])
            environ['DATABASE_SHARD'] = shard
        elif 'region' in request.args:
            shard = self.sharding.get_region_shard(request.args['region'])
            environ['DATABASE_SHARD'] = shard
        else:
            # Cross-shard query required
            environ['DATABASE_SHARD'] = 'cross_shard'
        return self.app(environ, start_response)
# Flask route with shard awareness
@app.route('/api/user/<user_id>/orders')
def get_user_orders(user_id):
    shard = request.environ.get('DATABASE_SHARD')
    if shard == 'cross_shard':
        # This shouldn't happen for user-specific queries
        abort(400, "Invalid request - user_id required")
    # Use shard-specific connection
    db = get_shard_connection(shard)
    orders = db.execute(
        "SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC",
        (user_id,)
    )
    return jsonify(orders)
@app.route('/api/analytics/global')
def get_global_analytics():
    # Cross-shard query - aggregate from all shards
    sharding = DatabaseSharding()
    results = sharding.fan_out_query('global_analytics')
    return jsonify({
        'total_orders': sum(r['order_count'] for r in results.values()),
        'total_revenue': sum(r['revenue'] for r in results.values()),
        'by_region': results
    })
Handling Shard Failures and Rebalancing:
class ShardManager:
    def __init__(self):
        self.shards = DatabaseSharding().shards
        self.health_check_interval = 30  # seconds
    def monitor_shard_health(self):
        """Continuously monitor shard health"""
        while True:
            for shard_name, shard_info in self.shards.items():
                try:
                    # Simple health check query
                    conn = shard_info['connection']
                    conn.execute("SELECT 1")
                    self.mark_shard_healthy(shard_name)
                except Exception as e:
                    self.mark_shard_unhealthy(shard_name, e)
                    self.trigger_failover(shard_name)
            time.sleep(self.health_check_interval)
    def trigger_failover(self, failed_shard: str):
        """Handle shard failover"""
        # Redirect traffic to healthy shards
        self.redistribute_load(failed_shard)
        # Alert operations team
        self.send_alert(f"Shard {failed_shard} is unhealthy")
        # Attempt automated recovery
        self.attempt_shard_recovery(failed_shard)
    def rebalance_shards(self, new_shard_config: Dict):
        """Rebalance data across shards"""
        # This is a complex operation requiring:
        # 1. Data migration planning
        # 2. Consistent hashing updates
        # 3. Gradual traffic shifting
        # 4. Rollback capability
        migration_plan = self.create_migration_plan(new_shard_config)
        for step in migration_plan:
            self.execute_migration_step(step)
            self.verify_migration_step(step)
        self.update_shard_routing(new_shard_config)
Benefits and Challenges:
Benefits:
- Horizontal Scalability: Add more shards as data grows
 - Performance: Smaller datasets per shard = faster queries
 - Isolation: Shard failures don't affect other shards
 - Geographic Distribution: Data close to users
 
Challenges:
- Cross-shard Queries: Complex and slower
 - Rebalancing: Moving data between shards is difficult
 - Consistency: Transactions across shards are complex
 - Operational Complexity: Multiple databases to manage
 
For Analytics Dashboard:
- User-based sharding: For user-specific dashboards
 - Time-based sharding: For historical data (hot/warm/cold)
 - Feature-based sharding: Separate shards for different metrics types
 
Part 5: Behavioral & Collaboration
30. Convincing Developers - Code Reliability Changes
Strong Answer: Situation: Our payment processing service was experiencing intermittent failures during peak traffic, causing revenue loss. The development team had implemented a quick fix that worked locally but didn't address the underlying concurrency issues.
Approach - Data-Driven Persuasion:
1. Quantified the Business Impact
# I create# SRE Interview Questions - Comprehensive Answer Guide
## Part 1: SRE Fundamentals & Practices
### 1. What is the difference between SRE and traditional operations, and how do you balance reliability with feature velocity?
**Strong Answer:**
SRE differs from traditional ops in several key ways:
- **Proactive vs Reactive**: SRE focuses on preventing issues through engineering rather than just responding to them
- **Error Budgets**: We quantify acceptable unreliability, allowing teams to move fast while maintaining reliability targets
- **Automation**: SRE emphasizes eliminating toil through automation and self-healing systems
- **Shared Ownership**: Development and operations work together using the same tools and metrics
**Balancing reliability with velocity:**
- Set clear SLIs/SLOs with stakeholders (e.g., 99.9% uptime = 43 minutes downtime/month)
- Use error budgets as a shared currency - if we're within budget, dev teams can deploy faster
- When error budget is exhausted, focus shifts to reliability work
- Implement gradual rollouts and feature flags to reduce blast radius
**Follow-up - Implementing error budgets with resistant teams:**
- Start with education - show how error budgets enable faster delivery
- Use concrete examples of downtime costs vs delayed features
- Begin with lenient budgets and tighten over time
- Make error budget status visible in dashboards and planning meetings
### 2. Explain the four golden signals of monitoring. How would you implement alerting around these for a Python microservice?
**Strong Answer:**
The four golden signals are:
1. **Latency**: Time to process requests
2. **Traffic**: Demand on your system (requests/second)
3. **Errors**: Rate of failed requests
4. **Saturation**: How "full" your service is (CPU, memory, I/O)
**Implementation for Python microservice:**
```python
# Using Prometheus with Flask
from flask import Flask, request
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
import psutil
app = Flask(__name__)
# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency')
ERROR_RATE = Counter('http_errors_total', 'Total HTTP errors', ['status'])
CPU_USAGE = Gauge('cpu_usage_percent', 'CPU usage percentage')
@app.before_request
def before_request():
    request.start_time = time.time()
@app.after_request
def after_request(response):
    latency = time.time() - request.start_time
    REQUEST_LATENCY.observe(latency)
    REQUEST_COUNT.labels(request.method, request.endpoint, response.status_code).inc()
    if response.status_code >= 400:
        ERROR_RATE.labels(response.status_code).inc()
    CPU_USAGE.set(psutil.cpu_percent())
    return response
Alerting Rules:
- Latency: Alert if p99 > 500ms for 5 minutes
 - Traffic: Alert on 50% increase/decrease from baseline
 - Errors: Alert if error rate > 1% for 2 minutes
 - Saturation: Alert if CPU > 80% or Memory > 85% for 10 minutes
 
3. Walk me through how you would conduct a post-mortem for a production incident.
Strong Answer: Timeline:
- Immediate: Focus on resolution, collect logs/metrics during incident
 - Within 24-48 hours: Conduct post-mortem meeting
 - Within 1 week: Publish written post-mortem and track action items
 
Post-mortem Process:
- Timeline Construction: Build detailed timeline with all events, decisions, and communications
 - Root Cause Analysis: Use techniques like "5 Whys" or Fishbone diagrams
 - Impact Assessment: Quantify user impact, revenue loss, SLO burn
 - Action Items: Focus on systemic fixes, not individual blame
 - Follow-up: Track action items to completion
 
Good Post-mortem Characteristics:
- Blameless culture - focus on systems, not individuals
 - Detailed timeline with timestamps
 - Clear root cause analysis
 - Actionable remediation items with owners and deadlines
 - Written in accessible language for all stakeholders
 - Includes what went well (not just failures)
 
Psychological Safety:
- Use "the system allowed..." instead of "person X did..."
 - Ask "how can we make this impossible to happen again?"
 - Celebrate people who surface problems early
 - Make post-mortems learning opportunities, not punishment
 
4. You notice your application's 99th percentile latency has increased by 50ms over the past week, but the average latency remains the same. How would you investigate this?
Strong Answer: This suggests a long tail problem - most requests are fine, but some are much slower.
Investigation Steps:
- Check Request Distribution: Look at latency histograms - are we seeing bimodal distribution?
 - Analyze Traffic Patterns: Has the mix of request types changed? Are we getting more complex queries?
 - Database Performance: Check for slow queries, table locks, or index problems
 - Resource Saturation: Look for memory pressure, GC pauses, or I/O bottlenecks during peak times
 - Dependency Analysis: Check latency of downstream services - could be cascading slow responses
 - Code Changes: Review recent deployments for inefficient algorithms or new features
 
Specific Checks:
- Database slow query logs
 - Application profiling data
 - Memory usage patterns and GC metrics
 - Thread pool utilization
 - External API response times
 - Distributed tracing for slow requests
 
Tools: Use APM tools like New Relic, DataDog, or distributed tracing with Jaeger/Zipkin to identify bottlenecks.
5. Design a monitoring strategy for a Go-based API that processes financial transactions.
Strong Answer: Business Metrics:
- Transaction volume and value per minute
 - Success rate by transaction type
 - Time to settlement
 - Regulatory compliance metrics (PCI DSS)
 
Technical Metrics:
// Key metrics to track
var (
    transactionCounter = prometheus.NewCounterVec(
        prometheus.CounterOpts{Name: "transactions_total"},
        []string{"type", "status", "payment_method"})
    transactionLatency = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{Name: "transaction_duration_seconds"},
        []string{"type"})
    queueDepth = prometheus.NewGauge(
        prometheus.GaugeOpts{Name: "transaction_queue_depth"})
    dbConnectionPool = prometheus.NewGauge(
        prometheus.GaugeOpts{Name: "db_connections_active"})
)
Logging Strategy:
- Structured logging with correlation IDs
 - Log all transaction state changes
 - Security events (failed auth, suspicious patterns)
 - Audit trail for compliance
 
Alerting:
- Transaction failure rate > 0.1%
 - Processing latency > 2 seconds
 - Queue depth > 1000 items
 - Database connection pool > 80% utilization
 - Any security-related events
 
Compliance Considerations:
- PII data must be masked in logs
 - Audit logs with tamper-proof storage
 - Real-time fraud detection alerts
 
6. How would you implement distributed tracing across a system with Python backend services and a React frontend?
Strong Answer: Architecture:
- Use OpenTelemetry standard for vendor-neutral tracing
 - Jaeger or Zipkin as tracing backend
 - Trace context propagation across service boundaries
 
Backend Implementation (Python):
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Auto-instrument Flask and requests
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
@app.route('/api/user/<user_id>')
def get_user(user_id):
    with tracer.start_as_current_span("get_user") as span:
        span.set_attribute("user.id", user_id)
        # Service logic here
        return response
Frontend Implementation (React):
import { WebTracerProvider } from "@opentelemetry/web"
import { getWebAutoInstrumentations } from "@opentelemetry/auto-instrumentations-web"
const provider = new WebTracerProvider()
provider.addSpanProcessor(new BatchSpanProcessor(new JaegerExporter()))
// Auto-instrument fetch, XMLHttpRequest
registerInstrumentations({
  instrumentations: [getWebAutoInstrumentations()],
})
Key Implementation Points:
- Correlation IDs: Pass trace context in HTTP headers
 - Sampling: Use probabilistic sampling (1-10%) to reduce overhead
 - Service Map: Visualize service dependencies
 - Performance Analysis: Identify bottlenecks across service boundaries
 
Part 2: Software Engineering & Development
7. Code Review Scenario: Memory leak optimization
Strong Answer: Problems with the original code:
- Loads entire dataset into memory before processing
 - No streaming or chunked processing
 - Memory usage grows linearly with file size
 
Optimized version:
def process_large_dataset(file_path, chunk_size=1000):
    """Process large dataset in chunks to manage memory usage."""
    results = []
    with open(file_path, 'r') as f:
        chunk = []
        for line in f:
            chunk.append(line.strip())
            if len(chunk) >= chunk_size:
                # Process chunk and yield results
                processed_chunk = [expensive_processing(item) for item in chunk]
                partial_result = analyze_data(processed_chunk)
                results.append(partial_result)
                # Clear chunk to free memory
                chunk.clear()
        # Process remaining items
        if chunk:
            processed_chunk = [expensive_processing(item) for item in chunk]
            partial_result = analyze_data(processed_chunk)
            results.append(partial_result)
    return combine_results(results)
# Even better - use generator for streaming
def process_large_dataset_streaming(file_path):
    """Stream processing for minimal memory footprint."""
    with open(file_path, 'r') as f:
        for line in f:
            yield expensive_processing(line.strip())
# Usage
def analyze_streaming_data(file_path):
    processed_items = process_large_dataset_streaming(file_path)
    return analyze_data_streaming(processed_items)
Additional Optimizations:
- Use 
mmapfor very large files - Implement backpressure if processing can't keep up
 - Add memory monitoring and circuit breakers
 - Consider using 
asynciofor I/O-bound operations 
8. In Go, explain the difference between buffered and unbuffered channels.
Strong Answer: Unbuffered Channels:
- Synchronous communication - sender blocks until receiver reads
 - Zero capacity - no internal storage
 - Guarantees handoff between goroutines
 
ch := make(chan int) // unbuffered
go func() {
    ch <- 42 // blocks until someone reads
}()
value := <-ch // blocks until someone sends
Buffered Channels:
- Asynchronous communication up to buffer size
 - Sender only blocks when buffer is full
 - Receiver only blocks when buffer is empty
 
ch := make(chan int, 3) // buffered with capacity 3
ch <- 1 // doesn't block
ch <- 2 // doesn't block
ch <- 3 // doesn't block
ch <- 4 // blocks - buffer full
When to use in high-throughput systems:
Unbuffered for:
- Strict synchronization requirements
 - Request-response patterns
 - When you need guaranteed delivery confirmation
 - Worker pools where you want backpressure
 
Buffered for:
- Producer-consumer with different rates
 - Batching operations
 - Reducing contention in high-throughput scenarios
 - Event streaming where some loss is acceptable
 
Example - High-throughput log processor:
// Buffered for log ingestion
logChan := make(chan LogEntry, 10000)
// Unbuffered for critical operations
errorChan := make(chan error)
func logProcessor() {
    for {
        select {
        case log := <-logChan:
            processLog(log)
        case err := <-errorChan:
            handleCriticalError(err) // immediate attention
        }
    }
}
9. React Performance: Optimize dashboard with real-time metrics
Strong Answer: Problems with frequent re-renders:
- All components re-render when any metric updates
 - Expensive calculations on every render
 - DOM thrashing from rapid updates
 
Optimization Strategy:
import React, { memo, useMemo, useCallback, useRef } from "react"
import { useVirtualizer } from "@tanstack/react-virtual"
// 1. Memoize metric components
const MetricCard = memo(({ metric, value, threshold }) => {
  // Only re-render when props actually change
  const status = useMemo(
    () => (value > threshold ? "critical" : "normal"),
    [value, threshold]
  )
  return (
    <div className={`metric-card ${status}`}>
      <h3>{metric}</h3>
      <span>{value}</span>
    </div>
  )
})
// 2. Virtualize large lists
const MetricsList = ({ metrics }) => {
  const parentRef = useRef()
  const virtualizer = useVirtualizer({
    count: metrics.length,
    getScrollElement: () => parentRef.current,
    estimateSize: () => 100,
  })
  return (
    <div ref={parentRef} style={{ height: "400px", overflow: "auto" }}>
      {virtualizer.getVirtualItems().map((virtualRow) => (
        <MetricCard key={virtualRow.key} {...metrics[virtualRow.index]} />
      ))}
    </div>
  )
}
// 3. Debounce updates and batch state changes
const Dashboard = () => {
  const [metrics, setMetrics] = useState({})
  const updateQueue = useRef(new Map())
  const flushTimeout = useRef()
  const queueUpdate = useCallback((serviceName, newMetrics) => {
    updateQueue.current.set(serviceName, newMetrics)
    // Debounce updates - batch multiple rapid changes
    clearTimeout(flushTimeout.current)
    flushTimeout.current = setTimeout(() => {
      setMetrics((prev) => {
        const updates = Object.fromEntries(updateQueue.current)
        updateQueue.current.clear()
        return { ...prev, ...updates }
      })
    }, 100) // 100ms debounce
  }, [])
  // 4. Use React.startTransition for non-critical updates
  const handleMetricUpdate = useCallback(
    (data) => {
      if (data.priority === "high") {
        queueUpdate(data.service, data.metrics)
      } else {
        startTransition(() => {
          queueUpdate(data.service, data.metrics)
        })
      }
    },
    [queueUpdate]
  )
  return <MetricsList metrics={Object.values(metrics)} />
}
Additional Optimizations:
- WebSocket connection pooling - single connection for all metrics
 - Data normalization - structure data to minimize re-renders
 - Service Worker - offload heavy calculations
 - Canvas/WebGL - for complex visualizations instead of DOM updates
 
10. Design a CI/CD pipeline for a multi-service application
Strong Answer: Pipeline Architecture:
# .github/workflows/main.yml
name: Multi-Service CI/CD
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
jobs:
  detect-changes:
    runs-on: ubuntu-latest
    outputs:
      python-api: ${{ steps.changes.outputs.python-api }}
      go-workers: ${{ steps.changes.outputs.go-workers }}
      react-frontend: ${{ steps.changes.outputs.react-frontend }}
    steps:
      - uses: actions/checkout@v3
      - uses: dorny/paths-filter@v2
        id: changes
        with:
          filters: |
            python-api:
              - 'services/api/**'
            go-workers:
              - 'services/workers/**'
            react-frontend:
              - 'frontend/**'
  test-python:
    needs: detect-changes
    if: needs.detect-changes.outputs.python-api == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run Python tests
        run: |
          cd services/api
          pip install -r requirements.txt
          pytest --cov=. --cov-report=xml
          flake8 .
          mypy .
  test-go:
    needs: detect-changes
    if: needs.detect-changes.outputs.go-workers == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v3
        with:
          go-version: "1.21"
      - name: Run Go tests
        run: |
          cd services/workers
          go test -race -coverprofile=coverage.out ./...
          go vet ./...
          staticcheck ./...
  deploy:
    needs: [test-python, test-go, test-react]
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy with blue-green
        run: |
          # Database migration strategy
          kubectl apply -f k8s/migration-job.yaml
          kubectl wait --for=condition=complete job/db-migration
          # Deploy new version to green environment
          helm upgrade app-green ./helm-chart \
            --set image.tag=${{ github.sha }} \
            --set environment=green
          # Health check green environment
          ./scripts/health-check.sh green
          # Switch traffic to green
          kubectl patch service app-service -p \
            '{"spec":{"selector":{"version":"green"}}}'
          # Verify traffic switch
          ./scripts/verify-deployment.sh
          # Clean up blue environment
          helm uninstall app-blue
Database Migration Strategy:
#!/bin/bash
# scripts/db-migration.sh
# Create backup
pg_dump $DATABASE_URL > backup-$(date +%Y%m%d-%H%M%S).sql
# Run migrations in transaction
psql $DATABASE_URL << EOF
BEGIN;
-- Run all pending migrations
\i migrations/001_add_user_table.sql
\i migrations/002_add_index.sql
-- Verify migration success
SELECT count(*) FROM schema_migrations;
COMMIT;
EOF
# Test rollback capability
if [ "$1" == "test-rollback" ]; then
    psql $DATABASE_URL < backup-latest.sql
fi
Rollback Strategy:
- Keep previous 3 versions in registry
 - Database rollback scripts for each migration
 - Feature flags to disable new features quickly
 - Automated rollback triggers on error rate increase
 
11. How would you implement blue-green deployments for a stateful service with a database?
Strong Answer: Challenges with Stateful Services:
- Database schema changes
 - Data consistency during switch
 - Connection management
 - State synchronization
 
Implementation Strategy:
# Blue-Green with Database
apiVersion: v1
kind: Service
metadata:
  name: app-active
spec:
  selector:
    app: myapp
    version: blue # Will switch to green
  ports:
    - port: 80
---
# Database proxy for connection management
apiVersion: apps/v1
kind: Deployment
metadata:
  name: db-proxy
spec:
  template:
    spec:
      containers:
        - name: pgbouncer
          image: pgbouncer/pgbouncer
          env:
            - name: DATABASES_HOST
              value: "postgres-primary"
            - name: POOL_MODE
              value: "transaction" # Allow connection switching
Deployment Process:
- 
Pre-deployment:
- Run backward-compatible schema migrations
 - Ensure both versions can operate with new schema
 
 - 
Green Deployment:
- Deploy green version with same database
 - Warm up green instances (cache, connections)
 - Run health checks
 
 - 
Traffic Switch:
- Update service selector to point to green
 - Monitor metrics for 10-15 minutes
 - Keep blue running for quick rollback
 
 - 
Post-deployment:
- Run cleanup migrations (remove old columns)
 - Terminate blue environment
 
 
Database Migration Strategy:
-- Phase 1: Additive changes (safe for both versions)
ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE;
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
-- Phase 2: After green is stable, remove old columns
-- ALTER TABLE users DROP COLUMN old_email_field;
Rollback Plan:
- Revert service selector to blue
 - Emergency database rollback scripts
 - Circuit breaker to stop problematic requests
 
Part 3: System Design Deep Dive
12. Requirements Gathering Questions
Strong Answer: Functional Requirements:
- What specific metrics need to be displayed? (orders/minute, revenue, concurrent users)
 - How real-time? (sub-second, few seconds, minute-level updates)
 - What user roles need access? (executives, ops teams, developers)
 - What actions can users take? (view-only, alerts, drill-down)
 - Geographic distribution of users?
 
Non-Functional Requirements:
- Scale: How many concurrent dashboard users? (100s, 1000s)
 - Data volume: Orders per day? Peak traffic? Data retention period?
 - Availability: 99.9% or higher? Maintenance windows?
 - Latency: How fast should dashboard updates be?
 - Consistency: Can we show slightly stale data? (eventual consistency)
 - Security: Authentication, authorization, audit logging?
 
Technical Constraints:
- Existing infrastructure? (AWS, on-prem, hybrid)
 - Integration requirements? (existing systems, APIs)
 - Compliance requirements? (SOX, PCI DSS)
 - Budget constraints?
 
13. API Design and Framework Selection
Strong Answer: API Endpoints:
// REST API Design
GET /api/v1/metrics/realtime
{
  "active_users": 1250,
  "orders_per_minute": 45,
  "revenue_per_minute": 12500,
  "inventory_alerts": 3,
  "system_health": "healthy",
  "timestamp": "2025-06-30T10:30:00Z"
}
GET /api/v1/metrics/historical?metric=revenue&period=24h&granularity=1h
{
  "metric": "revenue",
  "data": [
    {"timestamp": "2025-06-30T09:00:00Z", "value": 75000},
    {"timestamp": "2025-06-30T10:00:00Z", "value": 82000}
  ]
}
POST /api/v1/alerts/subscribe
{
  "metric": "inventory_level",
  "threshold": 100,
  "product_id": "12345",
  "notification_method": "webhook"
}
Framework Comparison:
REST ✅ Recommended
- Simple, cacheable
 - Wide tooling support
 - Good for CRUD operations
 - HTTP status codes for errors
 
WebSockets ✅ For Real-time Updates
// WebSocket for live updates
const ws = new WebSocket("wss://api.example.com/metrics/stream")
ws.onmessage = (event) => {
  const metrics = JSON.parse(event.data)
  updateDashboard(metrics)
}
GraphQL ❌ Not Recommended
- Adds complexity for simple metrics
 - Caching more difficult
 - Overkill for this use case
 
Final Architecture:
- REST API for historical data and configuration
 - WebSocket for real-time metric streaming
 - Server-Sent Events as WebSocket fallback
 
14. High-Level Architecture Diagram
Strong Answer:
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   React SPA     │    │   Load Balancer  │    │   API Gateway   │
│                 │◄──►│   (ALB/NGINX)    │◄──►│   (Kong/Envoy)  │
│ - Dashboard     │    │                  │    │ - Auth          │
│ - WebSocket     │    │                  │    │ - Rate Limiting │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                                                         │
                              ┌──────────────────────────┼─────────────────┐
                              │                          │                 │
                    ┌─────────▼────────┐    ┌───────────▼────────┐ ┌──────▼──────┐
                    │   Metrics API    │    │   WebSocket API    │ │ Config API  │
                    │   (Python/Flask) │    │   (Go/Gorilla)     │ │(Python/Fast)│
                    └─────────┬────────┘    └───────────┬────────┘ └──────┬──────┘
                              │                         │                 │
                    ┌─────────▼────────┐    ┌───────────▼────────┐ ┌──────▼──────┐
                    │   Redis Cache    │    │   Message Queue    │ │ PostgreSQL  │
                    │   (Metrics)      │    │   (Kafka/Redis)    │ │ (Config)    │
                    └─────────┬────────┘    └───────────┬────────┘ └─────────────┘
                              │                         │
                    ┌─────────▼─────────────────────────▼────────┐
                    │           Time Series Database             │
                    │           (InfluxDB/TimescaleDB)           │
                    └─────────┬──────────────────────────────────┘
                              │
                    ┌─────────▼────────┐
                    │   ETL Pipeline   │
                    │   (Apache Beam)  │
                    └─────────┬────────┘
                              │
        ┌─────────────────────┼─────────────────────┐
        │                     │                     │
┌───────▼──────┐    ┌─────────▼─────────┐    ┌─────▼─────┐
│ E-commerce   │    │   Inventory       │    │  System   │
│ Database     │    │   Management      │    │  Metrics  │
│ (Orders)     │    │   (Stock Levels)  │    │ (Health)  │
└──────────────┘    └───────────────────┘    └───────────┘
Data Flow:
- Source Systems → ETL Pipeline (real-time streaming)
 - ETL Pipeline → Time Series DB (processed metrics)
 - Time Series DB → Redis Cache (frequently accessed data)
 - API Services → Frontend (REST + WebSocket)
 - Message Queue → WebSocket API (real-time updates)
 
15. Real-time Implementation Approaches
Strong Answer: Comparison of Real-time Approaches:
| Approach | Pros | Cons | Use Case | 
|---|---|---|---|
| Polling | Simple, stateless | High latency, wasteful | Non-critical updates | 
| WebSockets | True real-time, bidirectional | Complex, stateful | Live dashboards | 
| Server-Sent Events | Simpler than WebSocket, auto-reconnect | One-way only | Event streams | 
| Message Queues | Reliable, scalable | Added complexity | High-volume events | 
Recommended Architecture:
// WebSocket implementation in Go
type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
}
type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    subscriptions map[string]bool
}
func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            delete(h.clients, client)
            close(client.send)
        case message := <-h.broadcast:
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
        }
    }
}
// Kafka consumer for real-time metrics
func consumeMetrics() {
    consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "dashboard-consumers",
    })
    consumer.Subscribe("metrics-topic", nil)
    for {
        msg, _ := consumer.ReadMessage(-1)
        // Process metric and broadcast to WebSocket clients
        metric := parseMetric(msg.Value)
        hub.broadcast <- metric
        // Also update Redis cache
        updateCache(metric)
    }
}
Client-side Connection Management:
class MetricsWebSocket {
  constructor(url) {
    this.url = url
    this.reconnectInterval = 1000
    this.maxReconnectInterval = 30000
    this.reconnectDecay = 1.5
    this.connect()
  }
  connect() {
    this.ws = new WebSocket(this.url)
    this.ws.onopen = () => {
      console.log("Connected to metrics stream")
      this.reconnectInterval = 1000 // Reset backoff
    }
    this.ws.onmessage = (event) => {
      const metrics = JSON.parse(event.data)
      this.updateDashboard(metrics)
    }
    this.ws.onclose = () => {
      console.log("Connection lost, reconnecting...")
      setTimeout(() => {
        this.reconnectInterval = Math.min(
          this.reconnectInterval * this.reconnectDecay,
          this.maxReconnectInterval
        )
        this.connect()
      }, this.reconnectInterval)
    }
  }
  subscribe(metricType) {
    this.ws.send(
      JSON.stringify({
        action: "subscribe",
        metric: metricType,
      })
    )
  }
}
16. Single Points of Failure Analysis
Strong Answer: Identified SPOFs and Solutions:
- 
Load Balancer SPOF
- Problem: Single ALB failure takes down entire system
 - Solution: Multi-AZ deployment with Route 53 health checks
 
# Terraform for multi-region setup
resource "aws_lb" "primary" {
name = "app-lb-primary"
availability_zones = ["us-east-1a", "us-east-1b"]
}
resource "aws_route53_record" "failover_primary" {
zone_id = aws_route53_zone.main.zone_id
name = "api.example.com"
type = "A"
failover_routing_policy {
type = "PRIMARY"
}
health_check_id = aws_route53_health_check.primary.id
} - 
Database SPOF
- Problem: Single database failure
 - Solution: Primary-replica setup with automatic failover
 
# PostgreSQL HA setup
apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
name: postgres-cluster
spec:
instances: 3
primaryUpdateStrategy: unsupervised
postgresql:
parameters:
max_connections: "200"
shared_buffers: "256MB"
backup:
retentionPolicy: "30d"
barmanObjectStore:
destinationPath: "s3://backups/postgres" - 
Redis Cache SPOF
- Problem: Cache failure impacts performance
 - Solution: Redis Sentinel for HA + graceful degradation
 
import redis.sentinel
sentinel = redis.sentinel.Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
])
def get_metric(key):
try:
master = sentinel.master_for('mymaster', socket_timeout=0.1)
return master.get(key)
except redis.RedisError:
# Graceful degradation - fetch from database
return get_metric_from_db(key) - 
Message Queue SPOF
- Problem: Kafka broker failure stops real-time updates
 - Solution: Multi-broker Kafka cluster with replication
 
# Kafka with replication
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
default.replication.factor: 3
min.insync.replicas: 2 
17. Caching Implementation Strategy
Strong Answer: Multi-Level Caching Strategy:
# 1. Application-level caching
from functools import lru_cache
import redis
import json
class CacheService:
    def __init__(self):
        self.redis_client = redis.Redis(host='redis-cluster')
        self.local_cache = {}
    @lru_cache(maxsize=1000)
    def get_metric_definition(self, metric_name):
        """Cache metric metadata (rarely changes)"""
        return self.fetch_metric_definition(metric_name)
    def get_real_time_metric(self, metric_key):
        """Multi-level cache for real-time data"""
        # L1: Memory cache (100ms TTL)
        if metric_key in self.local_cache:
            data, timestamp = self.local_cache[metric_key]
            if time.time() - timestamp < 0.1:  # 100ms
                return data
        # L2: Redis cache (5s TTL)
        cached = self.redis_client.get(f"metric:{metric_key}")
        if cached:
            data = json.loads(cached)
            self.local_cache[metric_key] = (data, time.time())
            return data
        # L3: Database fallback
        data = self.fetch_from_database(metric_key)
        # Cache with appropriate TTL
        self.redis_client.setex(
            f"metric:{metric_key}",
            5,  # 5 second TTL
            json.dumps(data)
        )
        self.local_cache[metric_key] = (data, time.time())
        return data
2. CDN Caching for Static Assets:
// CloudFront configuration
{
  "Origins": [{
    "Id": "dashboard-origin",
    "DomainName": "dashboard-api.example.com",
    "CustomOriginConfig": {
      "HTTPPort": 443,
      "OriginProtocolPolicy": "https-only"
    }
  }],
  "DefaultCacheBehavior": {
    "TargetOriginId": "dashboard-origin",
    "ViewerProtocolPolicy": "redirect-to-https",
    "CachePolicyId": "custom-cache-policy",
    "TTL": {
      "DefaultTTL": 300,     // 5 minutes for API responses
      "MaxTTL": 3600         // 1 hour max
    }
  },
  "CacheBehaviors": [{
    "PathPattern": "/static/*",
    "TTL": {
      "DefaultTTL": 86400,   // 24 hours for static assets
      "MaxTTL": 31536000     // 1 year max
    }
  }]
}
3. Database Query Result Caching:
-- Materialized views for expensive aggregations
CREATE MATERIALIZED VIEW hourly_revenue AS
SELECT
    date_trunc('hour', order_timestamp) as hour,
    SUM(order_total) as revenue,
    COUNT(*) as order_count
FROM orders
WHERE order_timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY date_trunc('hour', order_timestamp);
-- Refresh every 5 minutes
CREATE OR REPLACE FUNCTION refresh_hourly_revenue()
RETURNS void AS $
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_revenue;
END;
$ LANGUAGE plpgsql;
SELECT cron.schedule('refresh-revenue', '*/5 * * * *', 'SELECT refresh_hourly_revenue();');
Cache Invalidation Strategy:
class CacheInvalidator:
    def __init__(self):
        self.redis_client = redis.Redis()
    def invalidate_on_order(self, order_data):
        """Invalidate relevant caches when new order arrives"""
        patterns_to_invalidate = [
            "metric:orders_per_minute",
            "metric:revenue_per_minute",
            f"metric:inventory:{order_data['product_id']}"
        ]
        for pattern in patterns_to_invalidate:
            # Use Redis pipeline for efficiency
            pipe = self.redis_client.pipeline()
            pipe.delete(pattern)
            pipe.publish('cache_invalidation', pattern)
            pipe.execute()
    def smart_invalidation(self, event_type, entity_id):
        """Invalidate based on event type"""
        invalidation_map = {
            'order_created': ['revenue', 'orders', 'inventory'],
            'user_login': ['active_users'],
            'inventory_update': ['inventory', 'stock_alerts']
        }
        metrics_to_invalidate = invalidation_map.get(event_type, [])
        for metric in metrics_to_invalidate:
            self.redis_client.delete(f"metric:{metric}:{entity_id}")
18. Database Design: SQL vs NoSQL
Strong Answer: Hybrid Approach - Use Both:
SQL (PostgreSQL) for:
- Transactional Data: Orders, users, inventory
 - ACID Requirements: Financial transactions
 - Complex Queries: Joins, aggregations
 - Data Consistency: Strong consistency needs
 
-- OLTP Database Schema
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    order_total DECIMAL(10,2) NOT NULL,
    order_timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    status VARCHAR(20) NOT NULL DEFAULT 'pending'
);
CREATE INDEX idx_orders_timestamp ON orders(order_timestamp);
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Partitioning for time-series data
CREATE TABLE orders_2025_06 PARTITION OF orders
FOR VALUES FROM ('2025-06-01') TO ('2025-07-01');
NoSQL (InfluxDB) for:
- Time-series Metrics: Performance data, system metrics
 - High Write Volume: Thousands of metrics per second
 - Retention Policies: Automatic data aging
 
# InfluxDB for metrics storage
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://influxdb:8086", token="my-token")
write_api = client.write_api(write_options=SYNCHRONOUS)
def write_metric(measurement, tags, fields):
    point = Point(measurement) \
        .tag("service", tags.get("service")) \
        .tag("region", tags.get("region")) \
        .field("value", fields["value"]) \
        .time(datetime.utcnow(), WritePrecision.S)
    write_api.write(bucket="metrics", record=point)
# Example usage
write_metric(
    measurement="orders_per_minute",
    tags={"service": "ecommerce", "region": "us-east-1"},
    fields={"value": 45}
)
Read/Write Load Balancing:
class DatabaseRouter:
    def __init__(self):
        self.primary_db = PostgreSQLConnection("primary-db")
        self.read_replicas = [
            PostgreSQLConnection("replica-1"),
            PostgreSQLConnection("replica-2"),
            PostgreSQLConnection("replica-3")
        ]
        self.current_replica = 0
    def get_read_connection(self):
        """Round-robin read replica selection"""
        replica = self.read_replicas[self.current_replica]
        self.current_replica = (self.current_replica + 1) % len(self.read_replicas)
        return replica
    def get_write_connection(self):
        """Always use primary for writes"""
        return self.primary_db
    def execute_read_query(self, query, params=None):
        try:
            return self.get_read_connection().execute(query, params)
        except Exception:
            # Fallback to primary if replica fails
            return self.primary_db.execute(query, params)
# Usage
@app.route('/api/metrics/historical')
def get_historical_metrics():
    # Use read replica for analytics queries
    db = router.get_read_connection()
    return db.execute("""
        SELECT date_trunc('hour', created_at) as hour,
               SUM(total) as revenue
        FROM orders
        WHERE created_at >= NOW() - INTERVAL '24 hours'
        GROUP BY hour
        ORDER BY hour
    """)
19. Scaling Strategy for 10x Traffic
Strong Answer: Scaling Priority Order:
1. Application Tier (Scale First)
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: dashboard-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: dashboard-api
  minReplicas: 3
  maxReplicas: 50
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Percent
          value: 100
          periodSeconds: 15
2. Database Scaling
# Read replica scaling + connection pooling
class DatabaseScaler:
    def __init__(self):
        self.connection_pools = {
            'primary': create_pool('primary-db', pool_size=20),
            'replicas': [
                create_pool('replica-1', pool_size=50),
                create_pool('replica-2', pool_size=50),
                create_pool('replica-3', pool_size=50),
            ]
        }
    def scale_read_capacity(self, target_qps):
        """Add more read replicas based on QPS"""
        current_capacity = len(self.connection_pools['replicas']) * 1000  # QPS per replica
        if target_qps > current_capacity * 0.8:  # 80% utilization threshold
            # Add new read replica
            new_replica = create_read_replica()
            self.connection_pools['replicas'].append(
                create_pool(new_replica, pool_size=50)
            )
    def implement_sharding(self):
        """Implement horizontal sharding for orders table"""
        shards = {
            'shard_1': 'orders_americas',
            'shard_2': 'orders_europe',
            'shard_3': 'orders_asia'
        }
        def get_shard(user_id):
            return shards[f'shard_{hash(user_id) % 3 + 1}']
3. Cache Scaling
# Redis Cluster for horizontal scaling
import rediscluster
redis_cluster = rediscluster.RedisCluster(
    startup_nodes=[
        {"host": "redis-1", "port": "7000"},
        {"host": "redis-2", "port": "7000"},
        {"host": "redis-3", "port": "7000"},
        {"host": "redis-4", "port": "7000"},
        {"host": "redis-5", "port": "7000"},
        {"host": "redis-6", "port": "7000"},
    ],
    decode_responses=True,
    skip_full_coverage_check=True
)
# Cache partitioning strategy
def cache_key_partition(metric_type, entity_id):
    """Partition cache keys for better distribution"""
    partition = hash(f"{metric_type}:{entity_id}") % 1000
    return f"{metric_type}:{partition}:{entity_id}"
4. CDN and Edge Caching
// CloudFlare Workers for edge computing
addEventListener("fetch", (event) => {
  event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
  const url = new URL(request.url)
  // Cache API responses at edge
  if (url.pathname.startsWith("/api/metrics/")) {
    const cacheKey = new Request(url.toString(), request)
    const cache = caches.default
    // Check edge cache first
    let response = await cache.match(cacheKey)
    if (!response) {
      // Fetch from origin
      response = await fetch(request)
      // Cache for 30 seconds
      const headers = new Headers(response.headers)
      headers.set("Cache-Control", "public, max-age=30")
      response = new Response(response.body, {
        status: response.status,
        statusText: response.statusText,
        headers: headers,
      })
      event.waitUntil(cache.put(cacheKey, response.clone()))
    }
    return response
  }
  return fetch(request)
}
5. Message Queue Scaling
# Kafka cluster scaling
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: metrics-kafka
spec:
  kafka:
    replicas: 9 # Scale from 3 to 9 brokers
    config:
      num.partitions: 50 # More partitions for parallelism
      default.replication.factor: 3
      min.insync.replicas: 2
    resources:
      requests:
        memory: 8Gi
        cpu: 2000m
      limits:
        memory: 16Gi
        cpu: 4000m
20. Failover Strategy for Database Outage
Strong Answer: Multi-Tier Failover Strategy:
# Automatic failover implementation
import time
import threading
from enum import Enum
class DatabaseState(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    FAILED = "failed"
class DatabaseFailoverManager:
    def __init__(self):
        self.primary_db = "primary-db.example.com"
        self.replica_dbs = [
            "replica-1.example.com",
            "replica-2.example.com",
            "replica-3.example.com"
        ]
        self.current_state = DatabaseState.HEALTHY
        self.current_primary = self.primary_db
        self.health_check_interval = 5  # seconds
    def health_check(self, db_host):
        """Check database health"""
        try:
            conn = psycopg2.connect(
                host=db_host,
                database="analytics",
                user="readonly",
                password="password",
                connect_timeout=3
            )
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            result = cursor.fetchone()
            conn.close()
            return result[0] == 1
        except Exception as e:
            logger.error(f"Health check failed for {db_host}: {e}")
            return False
    def promote_replica_to_primary(self, replica_host):
        """Promote replica to primary (manual intervention required)"""
        # This would typically involve:
        # 1. Stopping replication on chosen replica
        # 2. Updating DNS/load balancer to point to new primary
        # 3. Updating application config
        logger.critical(f"Promoting {replica_host} to primary")
        # Update Route 53 record to point to new primary
        route53 = boto3.client('route53')
        route53.change_resource_record_sets(
            HostedZoneId='Z123456789',
            ChangeBatch={
                'Changes': [{
                    'Action': 'UPSERT',
                    'ResourceRecordSet': {
                        'Name': 'primary-db.example.com',
                        'Type': 'CNAME',
                        'TTL': 60,
                        'ResourceRecords': [{'Value': replica_host}]
                    }
                }]
            }
        )
        self.current_primary = replica_host
    def circuit_breaker_fallback(self):
        """Fallback to cached data when database is unavailable"""
        logger.warning("Database unavailable, switching to read-only mode")
        # Serve from cache only
        app.config['READ_ONLY_MODE'] = True
        # Show banner to users
        return {
            "status": "degraded",
            "message": "Real-time data temporarily unavailable",
            "data_freshness": "cached_5_minutes_ago"
        }
    def monitor_and_failover(self):
        """Background thread for monitoring and automatic failover"""
        consecutive_failures = 0
        while True:
            if self.health_check(self.current_primary):
                consecutive_failures = 0
                self.current_state = DatabaseState.HEALTHY
                app.config['READ_ONLY_MODE'] = False
            else:
                consecutive_failures += 1
                logger.warning(f"Primary DB health check failed {consecutive_failures} times")
                if consecutive_failures >= 3:  # 15 seconds of failures
                    self.current_state = DatabaseState.FAILED
                    # Try to find healthy replica
                    healthy_replica = None
                    for replica in self.replica_dbs:
                        if self.health_check(replica):
                            healthy_replica = replica
                            break
                    if healthy_replica:
                        self.promote_replica_to_primary(healthy_replica)
                    else:
                        # All databases failed - enable circuit breaker
                        self.circuit_breaker_fallback()
            time.sleep(self.health_check_interval)
# Start monitoring in background
failover_manager = DatabaseFailoverManager()
monitor_thread = threading.Thread(target=failover_manager.monitor_and_failover)
monitor_thread.daemon = True
monitor_thread.start()
Emergency Procedures:
#!/bin/bash
# emergency-failover.sh
echo "=== EMERGENCY DATABASE FAILOVER ==="
echo "1. Checking primary database status..."
if ! pg_isready -h primary-db.example.com -p 5432; then
    echo "❌ Primary database is DOWN"
    echo "2. Finding healthy replica..."
    for replica in replica-1 replica-2 replica-3; do
        if pg_isready -h ${replica}.example.com -p 5432; then
            echo "✅ Found healthy replica: $replica"
            echo "3. Promoting replica to primary..."
            # Stop replication
            psql -h ${replica}.example.com -c "SELECT pg_promote();"
            echo "4. Updating DNS record..."
            aws route53 change-resource-record-sets \
                --hosted-zone-id Z123456789 \
                --change-batch file://failover-dns.json
            echo "5. Updating application config..."
            kubectl patch configmap app-config \
                --patch "{\"data\":{\"DATABASE_HOST\":\"${replica}.example.com\"}}"
            echo "6. Restarting application pods..."
            kubectl rollout restart deployment/dashboard-api
            echo "✅ Failover complete to $replica"
            exit 0
        fi
    done
    echo "❌ No healthy replicas found - enabling read-only mode"
    kubectl patch configmap app-config \
        --patch '{"data":{"READ_ONLY_MODE":"true"}}'
else
    echo "✅ Primary database is healthy"
fi
21. Circuit Breaker Implementation
Strong Answer: Circuit Breaker Pattern for External Services:
import time
import threading
from enum import Enum
from collections import deque
class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.lock = threading.Lock()
        # Track recent requests for metrics
        self.recent_requests = deque(maxlen=100)
    def __call__(self, func):
        def wrapper(*args, **kwargs):
            with self.lock:
                if self.state == CircuitState.OPEN:
                    # Check if timeout period has passed
                    if time.time() - self.last_failure_time >= self.timeout:
                        self.state = CircuitState.HALF_OPEN
                        self.failure_count = 0
                    else:
                        # Circuit is open - reject request
                        raise Exception("Circuit breaker is OPEN - service unavailable")
                if self.state == CircuitState.HALF_OPEN:
                    # Test with single request
                    try:
                        result = func(*args, **kwargs)
                        self.state = CircuitState.CLOSED
                        self.failure_count = 0
                        self.record_request(success=True)
                        return result
                    except self.expected_exception as e:
                        self.state = CircuitState.OPEN
                        self.failure_count += 1
                        self.last_failure_time = time.time()
                        self.record_request(success=False)
                        raise e
                # Normal operation (CLOSED state)
                try:
                    result = func(*args, **kwargs)
                    self.failure_count = 0  # Reset on success
                    self.record_request(success=True)
                    return result
                except self.expected_exception as e:
                    self.failure_count += 1
                    self.record_request(success=False)
                    if self.failure_count >= self.failure_threshold:
                        self.state = CircuitState.OPEN
                        self.last_failure_time = time.time()
                    raise e
        return wrapper
    def record_request(self, success):
        self.recent_requests.append({
            'timestamp': time.time(),
            'success': success
        })
    def get_metrics(self):
        now = time.time()
        recent = [r for r in self.recent_requests if now - r['timestamp'] < 300]  # Last 5 minutes
        total_requests = len(recent)
        successful_requests = sum(1 for r in recent if r['success'])
        return {
            'state': self.state.value,
            'failure_count': self.failure_count,
            'success_rate': successful_requests / total_requests if total_requests > 0 else 0,
            'total_requests_5min': total_requests
        }
# Usage in analytics dashboard
@CircuitBreaker(failure_threshold=3, timeout=30)
def get_inventory_data(product_ids):
    """Call to inventory service with circuit breaker"""
    response = requests.get(
        f"{INVENTORY_SERVICE_URL}/api/products",
        params={'ids': ','.join(product_ids)},
        timeout=5
    )
    response.raise_for_status()
    return response.json()
@CircuitBreaker(failure_threshold=5, timeout=60)
def get_user_analytics():
    """Call to analytics service with circuit breaker"""
    response = requests.get(f"{ANALYTICS_SERVICE_URL}/api/users/active", timeout=10)
    response.raise_for_status()
    return response.json()
# Fallback strategies
def get_dashboard_data():
    try:
        # Try to get real-time inventory data
        inventory_data = get_inventory_data(['1', '2', '3'])
    except Exception:
        # Fallback to cached data
        inventory_data = redis_client.get('inventory_cache')
        if not inventory_data:
            inventory_data = {"error": "Inventory data unavailable"}
    try:
        # Try to get user analytics
        user_data = get_user_analytics()
    except Exception:
        # Fallback to approximate data
        user_data = {"active_users": "~1000", "estimated": True}
    return {
        "inventory": inventory_data,
        "users": user_data,
        "timestamp": time.time()
    }
Service-Level Circuit Breaker Integration:
# Integration with Flask app
@app.route('/api/dashboard')
def dashboard_endpoint():
    # Check circuit breaker states
    circuit_states = {
        'inventory': inventory_circuit_breaker.get_metrics(),
        'analytics': analytics_circuit_breaker.get_metrics(),
    }
    # Include circuit breaker status in response
    response_data = get_dashboard_data()
    response_data['service_health'] = circuit_states
    # Set appropriate HTTP status based on circuit states
    if any(state['state'] == 'open' for state in circuit_states.values()):
        return jsonify(response_data), 206  # Partial content
    else:
        return jsonify(response_data), 200
# Monitoring endpoint for circuit breaker states
@app.route('/api/health/circuits')
def circuit_breaker_health():
    return jsonify({
        'inventory_service': inventory_circuit_breaker.get_metrics(),
        'analytics_service': analytics_circuit_breaker.get_metrics(),
    })
Part 4: Advanced SRE & Operations
22. API Response Time Investigation Process
Strong Answer: Systematic Investigation Approach:
Step 1: Immediate Assessment (First 2 minutes)
# Quick checks to understand scope
echo "=== INCIDENT RESPONSE CHECKLIST ==="
echo "1. Checking service status..."
curl -w "%{http_code} %{time_total}s" -s -o /dev/null https://api.example.com/health
echo "2. Checking recent deployments..."
kubectl get pods --sort-by=.metadata.creationTimestamp
echo "3. Checking error rates..."
# Query Prometheus for error rate spike
curl -s 'http://prometheus:9090/api/v1/query?query=rate(http_requests_total{status=~"5.."}[5m])'
Step 2: Resource Analysis (Minutes 2-5)
# Automated investigation script
import subprocess
import json
import requests
class IncidentInvestigator:
    def __init__(self):
        self.findings = []
    def check_resource_usage(self):
        """Check CPU, Memory, Disk I/O"""
        # CPU utilization
        cpu_query = 'avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)'
        cpu_data = self.query_prometheus(cpu_query)
        for result in cpu_data:
            cpu_usage = float(result['value'][1])
            if cpu_usage > 0.8:  # 80% threshold
                self.findings.append(f"High CPU: {result['metric']['pod']} at {cpu_usage:.2%}")
        # Memory usage
        memory_query = 'container_memory_usage_bytes / container_spec_memory_limit_bytes'
        memory_data = self.query_prometheus(memory_query)
        for result in memory_data:
            memory_usage = float(result['value'][1])
            if memory_usage > 0.85:  # 85% threshold
                self.findings.append(f"High Memory: {result['metric']['pod']} at {memory_usage:.2%}")
    def check_database_performance(self):
        """Check database connection pool, slow queries"""
        # Connection pool utilization
        db_conn_query = 'pg_stat_activity_count / pg_settings_max_connections'
        db_data = self.query_prometheus(db_conn_query)
        if db_data and float(db_data[0]['value'][1]) > 0.8:
            self.findings.append("Database connection pool near capacity")
        # Check for slow queries
        slow_queries = subprocess.run([
            'psql', '-h', 'postgres-primary', '-c',
            "SELECT query, query_start, state, waiting FROM pg_stat_activity WHERE state = 'active' AND query_start < NOW() - INTERVAL '30 seconds';"
        ], capture_output=True, text=True)
        if slow_queries.stdout and len(slow_queries.stdout.split('\n')) > 3:
            self.findings.append("Long-running queries detected")
    def check_external_dependencies(self):
        """Check downstream services"""
        services = ['inventory-service', 'payment-service', 'user-service']
        for service in services:
            try:
                response = requests.get(f'http://{service}/health', timeout=5)
                if response.status_code != 200 or response.elapsed.total_seconds() > 1.0:
                    self.findings.append(f"{service} health check failed or slow")
            except requests.RequestException:
                self.findings.append(f"{service} unreachable")
    def check_recent_changes(self):
        """Check recent deployments and config changes"""
        # Get recent pod restarts
        pods = subprocess.run([
            'kubectl', 'get', 'pods', '-o', 'json'
        ], capture_output=True, text=True)
        pod_data = json.loads(pods.stdout)
        for pod in pod_data['items']:
            restart_count = pod['status']['restartCount']
            if restart_count > 0:
                self.findings.append(f"Pod {pod['metadata']['name']} has {restart_count} restarts")
    def query_prometheus(self, query):
        """Helper to query Prometheus"""
        try:
            response = requests.get(
                'http://prometheus:9090/api/v1/query',
                params={'query': query},
                timeout=10
            )
            return response.json()['data']['result']
        except:
            return []
    def investigate(self):
        """Run full investigation"""
        print("🔍 Starting incident investigation...")
        self.check_resource_usage()
        self.check_database_performance()
        self.check_external_dependencies()
        self.check_recent_changes()
        print("\n📋 Investigation Summary:")
        if self.findings:
            for finding in self.findings:
                print(f"⚠️  {finding}")
        else:
            print("✅ No obvious issues found in automated checks")
        return self.findings
# Run investigation
investigator = IncidentInvestigator()
findings = investigator.investigate()
Step 3: Deep Dive Analysis (Minutes 5-15)
# Application-level investigation
echo "4. Checking application metrics..."
# Look for specific endpoint latency
kubectl exec -it $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') -- \
  curl -s localhost:8080/metrics | grep http_request_duration
# Check garbage collection metrics (for Java/Go apps)
kubectl logs $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') | \
  grep -i "gc\|garbage"
# Database query analysis
psql -h postgres-primary -c "
SELECT query, calls, total_time, mean_time, stddev_time
FROM pg_stat_statements
WHERE mean_time > 1000
ORDER BY mean_time DESC
LIMIT 10;"
# Check for lock contention
psql -h postgres-primary -c "
SELECT blocked_locks.pid AS blocked_pid,
       blocked_activity.usename AS blocked_user,
       blocking_locks.pid AS blocking_pid,
       blocking_activity.usename AS blocking_user,
       blocked_activity.query AS blocked_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON (blocked_locks.locktype = blocking_locks.locktype)
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;"
Step 4: Resolution and Communication
# Incident response actions
class IncidentResponse:
    def __init__(self):
        self.slack_webhook = "https://hooks.slack.com/services/..."
    def notify_team(self, message, severity="warning"):
        """Send alert to incident response channel"""
        payload = {
            "text": f"🚨 API Performance Incident",
            "attachments": [{
                "color": "danger" if severity == "critical" else "warning",
                "fields": [
                    {"title": "Issue", "value": message, "short": False},
                    {"title": "Runbook", "value": "https://wiki.company.com/incident-response", "short": True},
                    {"title": "Investigator", "value": "SRE On-Call", "short": True}
                ]
            }]
        }
        requests.post(self.slack_webhook, json=payload)
    def apply_immediate_fix(self, issue_type):
        """Apply quick fixes based on identified issue"""
        if issue_type == "high_cpu":
            # Scale up pods
            subprocess.run(['kubectl', 'scale', 'deployment/api', '--replicas=10'])
        elif issue_type == "database_load":
            # Enable read-only mode temporarily
            subprocess.run(['kubectl', 'patch', 'configmap/app-config',
                           '--patch', '{"data":{"READ_ONLY_MODE":"true"}}'])
        elif issue_type == "memory_leak":
            # Rolling restart
            subprocess.run(['kubectl', 'rollout', 'restart', 'deployment/api'])
        elif issue_type == "external_service":
            # Enable circuit breaker
            subprocess.run(['kubectl', 'patch', 'configmap/app-config',
                           '--patch', '{"data":{"CIRCUIT_BREAKER_ENABLED":"true"}}'])
# Usage
incident = IncidentResponse()
incident.notify_team("API latency increased from 100ms to 2s. Investigation in progress.")
# Based on findings, apply fixes
if "High CPU" in findings:
    incident.apply_immediate_fix("high_cpu")
    incident.notify_team("Applied fix: Scaled up API pods to handle load")
23. Error Budget Management Scenario
Strong Answer: Error Budget Analysis:
First, let me calculate the current situation:
- SLO: 99.5% uptime (0.5% error budget per month)
 - Current Error Rate: 20% increase = baseline + 20%
 - Deployment Impact: Caused the increase
 - Error Budget Status: Within budget (need to verify actual numbers)
 
Decision Framework:
class ErrorBudgetManager:
    def __init__(self):
        self.slo_target = 0.995  # 99.5% success rate
        self.error_budget = 1 - self.slo_target  # 0.5% error budget
    def calculate_current_burn_rate(self, error_rate, time_period_hours):
        """Calculate how fast we're burning error budget"""
        monthly_hours = 30 * 24  # 720 hours
        burn_rate = (error_rate * time_period_hours) / monthly_hours
        return burn_rate
    def assess_situation(self, baseline_error_rate, current_error_rate, hours_since_deploy):
        """Assess if we should keep feature or rollback"""
        # Calculate actual error rates
        error_increase = current_error_rate - baseline_error_rate
        # Calculate error budget consumption
        budget_burned = self.calculate_current_burn_rate(error_increase, hours_since_deploy)
        remaining_budget = self.error_budget - budget_burned
        # Project if this continues for the month
        monthly_projection = error_increase * (30 * 24) / hours_since_deploy
        assessment = {
            "current_error_rate": current_error_rate,
            "error_budget_burned": budget_burned,
            "remaining_budget": remaining_budget,
            "monthly_projection": monthly_projection,
            "exceeds_budget": monthly_projection > self.error_budget
        }
        return assessment
    def make_recommendation(self, assessment, business_impact):
        """Provide recommendation based on data"""
        if assessment["exceeds_budget"]:
            return {
                "action": "ROLLBACK",
                "reason": "Feature will exceed monthly error budget",
                "timeline": "Immediate"
            }
        elif assessment["remaining_budget"] < 0.001:  # Less than 0.1% budget left
            return {
                "action": "ENHANCED_MONITORING",
                "reason": "Close to budget limit, monitor closely",
                "conditions": "Rollback if error rate increases further"
            }
        else:
            return {
                "action": "KEEP_WITH_CONDITIONS",
                "reason": "Within error budget but requires monitoring",
                "conditions": [
                    "Implement feature flag for quick disable",
                    "Set up aggressive alerting",
                    "Daily budget review meetings"
                ]
            }
# Real scenario analysis
budget_manager = ErrorBudgetManager()
# Assume baseline was 0.1% error rate, now 1.2% (20% relative increase)
assessment = budget_manager.assess_situation(
    baseline_error_rate=0.001,  # 0.1%
    current_error_rate=0.012,   # 1.2%
    hours_since_deploy=6
)
recommendation = budget_manager.make_recommendation(assessment, business_impact="high")
Communication Strategy:
def communicate_decision(assessment, recommendation, stakeholders):
    """Structured communication to all stakeholders"""
    message = f"""
    📊 ERROR BUDGET ASSESSMENT - {datetime.now().strftime('%Y-%m-%d %H:%M')}
    🎯 SLO Target: 99.5% success rate (0.5% monthly error budget)
    📈 Current Situation:
    • Error rate: {assessment['current_error_rate']:.3%}
    • Budget burned: {assessment['error_budget_burned']:.3%}
    • Remaining budget: {assessment['remaining_budget']:.3%}
    • Monthly projection: {assessment['monthly_projection']:.3%}
    🔍 Analysis:
    {'❌ Will exceed budget' if assessment['exceeds_budget'] else '✅ Within budget'}
    🎯 Recommendation: {recommendation['action']}
    Reason: {recommendation['reason']}
    ⚡ Next Steps:
    """
    if recommendation['action'] == 'ROLLBACK':
        message += """
        1. Immediate rollback of feature
        2. Root cause analysis
        3. Fix implementation before retry
        4. Post-mortem scheduled
        """
    elif recommendation['action'] == 'KEEP_WITH_CONDITIONS':
        message += f"""
        1. Implement monitoring conditions
        2. Set up automated alerts
        3. Daily review meetings
        4. Feature flag for quick disable
        Conditions: {recommendation['conditions']}
        """
    # Send to different channels based on audience
    send_to_slack('#sre-team', message)
    send_to_email(stakeholders['engineering'], message)
    send_executive_summary(stakeholders['leadership'])
def send_executive_summary(executives):
    """Simplified version for executives"""
    summary = f"""
    Feature Deployment Impact Assessment
    Status: {'🔴 Action Required' if recommendation['action'] == 'ROLLBACK' else '🟡 Monitoring'}
    Decision: {recommendation['action']}
    Business Impact: Minimal customer impact, within reliability targets
    Engineering is {recommendation['action'].lower().replace('_', ' ')} and monitoring closely.
    """
    send_to_email(executives, summary)
Implementation Plan:
# Feature flag for quick disable
apiVersion: v1
kind: ConfigMap
metadata:
  name: feature-flags
data:
  new-feature-enabled: "true"
  error-budget-threshold: "0.004" # 0.4% - alert threshold
---
# Enhanced monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: error-budget-alerts
spec:
  groups:
    - name: error-budget
      rules:
        - alert: ErrorBudgetBurnRateHigh
          expr: |
            (
              rate(http_requests_total{status=~"5.."}[1h]) / 
              rate(http_requests_total[1h])
            ) > 0.004
          for: 5m
          labels:
            severity: critical
          annotations:
            summary: "Error budget burn rate too high"
        - alert: ErrorBudgetProjectedExceed
          expr: |
            (
              rate(http_requests_total{status=~"5.."}[6h]) / 
              rate(http_requests_total[6h])
            ) * 720 > 0.005  # 720 hours in month, 0.5% budget
          for: 10m
          labels:
            severity: warning
          annotations:
            summary: "Projected to exceed monthly error budget"
24. Load Testing Strategy Design
Strong Answer: Comprehensive Load Testing Strategy:
# Load test implementation using Locust
from locust import HttpUser, task, between
import random
import json
class DashboardUser(HttpUser):
    wait_time = between(1, 5)  # 1-5 second wait between requests
    def on_start(self):
        """Setup user session"""
        # Authenticate user
        response = self.client.post("/api/auth/login", json={
            "username": f"loadtest_user_{random.randint(1, 1000)}",
            "password": "test_password"
        })
        if response.status_code == 200:
            self.auth_token = response.json().get("token")
            self.client.headers.update({"Authorization": f"Bearer {self.auth_token}"})
    @task(3)  # 3x weight - most common operation
    def view_realtime_dashboard(self):
        """Simulate viewing real-time dashboard"""
        self.client.get("/api/metrics/realtime")
    @task(2)  # 2x weight
    def view_historical_data(self):
        """Simulate historical data requests"""
        periods = ["1h", "6h", "24h", "7d"]
        metrics = ["revenue", "orders", "active_users"]
        period = random.choice(periods)
        metric = random.choice(metrics)
        self.client.get(f"/api/metrics/historical", params={
            "metric": metric,
            "period": period,
            "granularity": "1m" if period in ["1h", "6h"] else "1h"
        })
    @task(1)  # 1x weight - less common
    def configure_alerts(self):
        """Simulate alert configuration"""
        self.client.post("/api/alerts", json={
            "metric": "inventory_level",
            "threshold": random.randint(50, 200),
            "product_id": f"product_{random.randint(1, 100)}"
        })
    @task(4)  # 4x weight - WebSocket simulation via polling
    def websocket_simulation(self):
        """Simulate WebSocket updates via HTTP polling"""
        self.client.get("/api/metrics/stream", params={
            "last_update": "2025-06-30T10:30:00Z"
        })
class BurstTrafficUser(DashboardUser):
    """Simulates traffic spikes during incidents"""
    wait_time = between(0.1, 0.5)  # Much faster requests
    @task(5)
    def rapid_dashboard_refresh(self):
        """Rapid refreshing during incidents"""
        self.client.get("/api/metrics/realtime")
# Load test scenarios
class LoadTestScenarios:
    def __init__(self):
        self.scenarios = {
            "normal_load": {
                "users": 100,
                "spawn_rate": 10,
                "duration": "10m",
                "user_class": DashboardUser
            },
            "peak_load": {
                "users": 500,
                "spawn_rate": 50,
                "duration": "15m",
                "user_class": DashboardUser
            },
            "stress_test": {
                "users": 1000,
                "spawn_rate": 100,
                "duration": "20m",
                "user_class": DashboardUser
            },
            "spike_test": {
                "users": 200,
                "spawn_rate": 200,  # Instant spike
                "duration": "5m",
                "user_class": BurstTrafficUser
            },
            "endurance_test": {
                "users": 300,
                "spawn_rate": 30,
                "duration": "2h",  # Long running
                "user_class": DashboardUser
            }
        }
Infrastructure Load Testing:
#!/bin/bash
# load-test-runner.sh
echo "🚀 Starting Analytics Dashboard Load Tests"
# 1. Baseline Performance Test
echo "📊 Running baseline performance test..."
locust -f dashboard_load_test.py \
    --users 50 \
    --spawn-rate 10 \
    --run-time 5m \
    --host https://api-staging.example.com \
    --html reports/baseline_$(date +%Y%m%d_%H%M%S).html
# 2. Database Load Test
echo "🗃️ Testing database performance..."
# Simulate heavy analytics queries
for i in {1..100}; do
    curl -s "https://api-staging.example.com/api/metrics/historical?period=30d&metric=revenue" &
done
wait
# 3. WebSocket Load Test
echo "🔌 Testing WebSocket capacity..."
# Use websocket-king for WebSocket load testing
websocket-king \
    --url wss://api-staging.example.com/metrics/stream \
    --connections 1000 \
    --duration 10m \
    --rate 10 \
    --output ws_load_report.json
# 4. Cache Performance Test
echo "💾 Testing cache performance..."
# Warm up cache, then test cache hit rates
redis-cli --latency-history -h redis-staging.example.com
# 5. Auto-scaling Test
echo "📈 Testing auto-scaling behavior..."
# Gradually increase load to trigger auto-scaling
locust -f dashboard_load_test.py \
    --users 1000 \
    --spawn-rate 20 \
    --run-time 30m \
    --host https://api-staging.example.com \
    --html reports/autoscale_test_$(date +%Y%m%d_%H%M%S).html
echo "✅ Load testing complete. Check reports/ directory for results."
Performance Metrics Collection:
# Custom metrics collection during load testing
import psutil
import time
import json
from datetime import datetime
class PerformanceMonitor:
    def __init__(self):
        self.metrics = []
        self.monitoring = False
    def start_monitoring(self):
        """Start collecting system metrics"""
        self.monitoring = True
        while self.monitoring:
            metrics = {
                "timestamp": datetime.now().isoformat(),
                "cpu_percent": psutil.cpu_percent(interval=1),
                "memory_percent": psutil.virtual_memory().percent,
                "disk_io": psutil.disk_io_counters()._asdict(),
                "network_io": psutil.net_io_counters()._asdict(),
                "connections": len(psutil.net_connections()),
            }
            # Database metrics
            metrics["db_connections"] = self.get_db_connections()
            metrics["cache_hit_rate"] = self.get_cache_hit_rate()
            self.metrics.append(metrics)
            time.sleep(5)  # Collect every 5 seconds
    def get_db_connections(self):
        """Get database connection count"""
        try:
            import psycopg2
            conn = psycopg2.connect("postgresql://user:pass@db:5432/analytics")
            cursor = conn.cursor()
            cursor.execute("SELECT count(*) FROM pg_stat_activity;")
            return cursor.fetchone()[0]
        except:
            return None
    def get_cache_hit_rate(self):
        """Get Redis cache hit rate"""
        try:
            import redis
            r = redis.Redis(host='redis-cluster')
            info = r.info('stats')
            hits = info['keyspace_hits']
            misses = info['keyspace_misses']
            return hits / (hits + misses) if (hits + misses) > 0 else 0
        except:
            return None
    def stop_monitoring(self):
        """Stop monitoring and save results"""
        self.monitoring = False
        # Save metrics to file
        with open(f"performance_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
            json.dump(self.metrics, f, indent=2)
        # Generate summary report
        self.generate_summary()
    def generate_summary(self):
        """Generate performance summary"""
        if not self.metrics:
            return
        cpu_values = [m['cpu_percent'] for m in self.metrics if m['cpu_percent']]
        memory_values = [m['memory_percent'] for m in self.metrics if m['memory_percent']]
        summary = {
            "test_duration": len(self.metrics) * 5,  # 5 second intervals
            "avg_cpu": sum(cpu_values) / len(cpu_values),
            "max_cpu": max(cpu_values),
            "avg_memory": sum(memory_values) / len(memory_values),
            "max_memory": max(memory_values),
            "peak_connections": max([m.get('connections', 0) for m in self.metrics])
        }
        print("📊 Performance Test Summary:")
        print(f"   Duration: {summary['test_duration']} seconds")
        print(f"   Avg CPU: {summary['avg_cpu']:.1f}%")
        print(f"   Max CPU: {summary['max_cpu']:.1f}%")
        print(f"   Avg Memory: {summary['avg_memory']:.1f}%")
        print(f"   Max Memory: {summary['max_memory']:.1f}%")
        print(f"   Peak Connections: {summary['peak_connections']}")
# Usage with load test
monitor = PerformanceMonitor()
import threading
# Start monitoring in background
monitor_thread = threading.Thread(target=monitor.start_monitoring)
monitor_thread.start()
# Run load test
# ... run locust or other load testing tool ...
# Stop monitoring
monitor.stop_monitoring()
monitor_thread.join()
Test Criteria and Success Metrics:
# Load test success criteria
load_test_criteria:
  response_time:
    p50: < 200ms
    p95: < 500ms
    p99: < 1000ms
  throughput:
    normal_load: > 1000 RPS
    peak_load: > 5000 RPS
  error_rate:
    max_acceptable: 0.1%
  resource_utilization:
    cpu: < 80%
    memory: < 85%
    database_connections: < 80% of pool
  auto_scaling:
    scale_up_time: < 2 minutes
    scale_down_time: < 5 minutes
  cache_performance:
    hit_rate: > 90%
    eviction_rate: < 5%
# Failure conditions (auto-stop test)
failure_conditions:
  - error_rate > 1% for 2 minutes
  - p99_latency > 5000ms for 5 minutes
  - cpu_utilization > 95% for 3 minutes
  - database_connection_pool > 95%
25. Go Service CPU Investigation
Strong Answer: Systematic CPU Investigation Process:
// 1. Enable pprof in Go service for CPU profiling
package main
import (
    "context"
    "log"
    "net/http"
    _ "net/http/pprof"  // Import pprof
    "runtime"
    "time"
)
func main() {
    // Start pprof server
    go func() {
        log.Println("Starting pprof server on :6060")
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    // Set GOMAXPROCS to container CPU limit
    runtime.GOMAXPROCS(2)  // Adjust based on container resources
    // Your application code
    startApplication()
}
// 2. Add CPU monitoring middleware
func CPUMonitoringMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        // Record CPU usage before request
        var rusageBefore syscall.Rusage
        syscall.Getrusage(syscall.RUSAGE_SELF, &rusageBefore)
        next.ServeHTTP(w, r)
        // Record CPU usage after request
        var rusageAfter syscall.Rusage
        syscall.Getrusage(syscall.RUSAGE_SELF, &rusageAfter)
        duration := time.Since(start)
        cpuTime := time.Duration(rusageAfter.Utime-rusageBefore.Utime) * time.Microsecond
        // Log high CPU requests
        if cpuTime > 100*time.Millisecond {
            log.Printf("High CPU request: %s %s - Duration: %v, CPU: %v",
                r.Method, r.URL.Path, duration, cpuTime)
        }
    })
}
Investigation Tools and Commands:
#!/bin/bash
# cpu-investigation.sh
echo "🔍 Investigating Go service CPU usage..."
# 1. Get current CPU profile (30 seconds)
echo "📊 Collecting CPU profile..."
go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile?seconds=30
# 2. Check for goroutine leaks
echo "🧵 Checking goroutine count..."
curl -s http://localhost:6060/debug/pprof/goroutine?debug=1 | head -20
# 3. Memory allocation profile (may cause CPU spikes)
echo "💾 Checking memory allocations..."
go tool pprof http://localhost:6060/debug/pprof/allocs
# 4. Check GC performance
echo "🗑️ Checking garbage collection stats..."
curl -s http://localhost:6060/debug/vars | jq '.memstats'
# 5. Container-level CPU investigation
echo "🐳 Container CPU stats..."
docker stats --no-stream $(docker ps --filter "name=go-service" --format "{{.Names}}")
# 6. Process-level analysis
echo "⚙️ Process CPU breakdown..."
top -H -p $(pgrep go-service) -n 1
# 7. strace for system call analysis
echo "🔧 System call analysis (10 seconds)..."
timeout 10s strace -c -p $(pgrep go-service)
Code-Level Optimizations:
// Common CPU bottleneck fixes
// 1. Fix: Inefficient JSON parsing
// BEFORE - Slow JSON handling
func processRequestSlow(w http.ResponseWriter, r *http.Request) {
    var data map[string]interface{}
    body, _ := ioutil.ReadAll(r.Body)
    json.Unmarshal(body, &data)
    // Process data...
}
// AFTER - Optimized JSON handling
type RequestData struct {
    UserID string `json:"user_id"`
    Action string `json:"action"`
    // Define specific fields instead of interface{}
}
func processRequestFast(w http.ResponseWriter, r *http.Request) {
    var data RequestData
    decoder := json.NewDecoder(r.Body)
    decoder.DisallowUnknownFields()  // Faster parsing
    if err := decoder.Decode(&data); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    // Process typed data...
}
// 2. Fix: CPU-intensive loops
// BEFORE - O(n²) algorithm
func findDuplicatesSlow(items []string) []string {
    var duplicates []string
    for i := 0; i < len(items); i++ {
        for j := i + 1; j < len(items); j++ {
            if items[i] == items[j] {
                duplicates = append(duplicates, items[i])
                break
            }
        }