observability_12
Business KPI Monitoringโ
Real-time Revenue Trackingโ
def create_business_kpi_monitoring(self):
"""Business KPIs critical for Black Friday success"""
return {
"revenue_tracking": {
"real_time_revenue": {
"metric": "sum(rate(order_value_dollars[1m])) * 60",
"target": "$100,000/minute during peak",
"alert": "Revenue drops 20% below target for 5 minutes"
},
"conversion_rate": {
"calculation": """
(
sum(rate(orders_completed_total[5m]))
/
sum(rate(sessions_started_total[5m]))
) * 100
""",
"normal_baseline": "2.5%",
"black_friday_target": "> 4%",
"alert": "Conversion rate < 3%"
},
"average_order_value": {
"calculation": """
sum(rate(order_value_dollars[5m]))
/
sum(rate(orders_completed_total[5m]))
""",
"target": "$150 AOV",
"trending": "Track hourly trends vs last year"
}
},
"customer_satisfaction": {
"support_ticket_volume": {
"metric": "support_tickets_created_per_minute",
"alert": "Volume > 5x normal baseline",
"categories": ["payment", "shipping", "product", "technical"]
},
"chat_wait_times": {
"target": "< 2 minutes average wait",
"alert": "> 5 minutes wait time",
"escalation": "Add more chat agents automatically"
},
"return_rates": {
"metric": "returns_initiated / orders_delivered",
"monitoring": "Track for quality issues with high-volume items"
}
},
"operational_efficiency": {
"order_fulfillment_time": {
"metric": "time_from_order_to_shipment",
"target": "< 2 hours for in-stock items",
"alert": "> 4 hours for any order"
},
"warehouse_utilization": {
"picking_efficiency": "items_picked_per_hour",
"packing_efficiency": "orders_packed_per_hour",
"shipping_accuracy": "correct_shipments / total_shipments"
}
}
}
Executive Dashboard Configurationโ
def create_black_friday_dashboard(self):
"""Executive dashboard for Black Friday monitoring"""
dashboard_config = {
"dashboard": {
"title": "Black Friday 2025 - Executive Command Center",
"refresh": "30s",
"panels": [
{
"title": "Revenue Performance",
"type": "stat",
"targets": [
{
"expr": "sum(increase(order_value_dollars[1h]))",
"legendFormat": "Hourly Revenue"
},
{
"expr": "sum(increase(order_value_dollars[24h]))",
"legendFormat": "Daily Revenue"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD",
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "yellow", "value": 5000000}, # $5M
{"color": "green", "value": 10000000} # $10M
]
}
}
}
},
{
"title": "System Health Overview",
"type": "stat",
"targets": [
{
"expr": "avg(up{job=~'checkout|payment|inventory'})",
"legendFormat": "Service Availability"
},
{
"expr": "histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket{job='checkout'}[5m])) by (le))",
"legendFormat": "Checkout P95 Latency"
}
]
},
{
"title": "Customer Experience",
"type": "graph",
"targets": [
{
"expr": "(sum(rate(orders_completed_total[5m])) / sum(rate(sessions_started_total[5m]))) * 100",
"legendFormat": "Conversion Rate %"
},
{
"expr": "(sum(rate(cart_created_total[5m])) - sum(rate(checkout_started_total[5m]))) / sum(rate(cart_created_total[5m])) * 100",
"legendFormat": "Cart Abandonment %"
}
]
},
{
"title": "Inventory Status",
"type": "table",
"targets": [
{
"expr": "topk(20, inventory_stock_level{product_tier='top_100'})",
"format": "table",
"instant": True
}
],
"transformations": [
{
"id": "organize",
"options": {
"includeByName": {
"product_id": True,
"Value": True
},
"renameByName": {
"Value": "Stock Level"
}
}
}
]
},
{
"title": "Payment Gateway Performance",
"type": "graph",
"targets": [
{
"expr": "sum(rate(payment_requests_total[5m])) by (gateway)",
"legendFormat": "\{{ gateway }} RPS"
},
{
"expr": "(sum(rate(payment_failures_total[5m])) by (gateway) / sum(rate(payment_requests_total[5m])) by (gateway)) * 100",
"legendFormat": "\{{ gateway }} Error Rate %"
}
]
}
]
}
}
return dashboard_config
Incident Response & Escalationโ
Incident Response Planโ
def create_incident_response_plan(self):
"""Black Friday specific incident response procedures"""
return {
"severity_levels": {
"sev1_critical": {
"definition": "Revenue impact > $50k/minute",
"examples": [
"Checkout service completely down",
"Payment gateway failing > 10%",
"Homepage not loading"
],
"response_time": "< 5 minutes",
"escalation": "Immediate CEO notification"
},
"sev2_high": {
"definition": "Customer experience degraded",
"examples": [
"Page load times > 5 seconds",
"Search not returning results",
"Mobile app crashes > 1%"
],
"response_time": "< 15 minutes",
"escalation": "VP Engineering notification"
},
"sev3_medium": {
"definition": "Non-critical features impacted",
"examples": [
"Recommendation engine down",
"Email notifications delayed",
"Analytics collection issues"
],
"response_time": "< 1 hour",
"escalation": "Team lead notification"
}
},
"escalation_procedures": {
"automated_escalation": {
"triggers": [
"Alert not acknowledged within 5 minutes",
"Multiple related alerts firing",
"Revenue drop > $100k/hour"
],
"actions": [
"Page senior engineers",
"Start incident bridge",
"Notify business stakeholders"
]
},
"communication_channels": {
"incident_bridge": "Zoom room auto-opened for Sev1/Sev2",
"status_page": "Customer-facing updates every 30 minutes",
"internal_slack": "#black-friday-war-room",
"executive_escalation": "CEO/CTO notification for revenue impact"
}
},
"pre_approved_actions": {
"traffic_management": [
"Enable maintenance mode for non-critical features",
"Redirect traffic to backup data centers",
"Enable aggressive caching on CDN"
],
"capacity_scaling": [
"Auto-scale to maximum capacity",
"Activate reserved cloud instances",
"Enable burst capacity for databases"
],
"feature_toggles": [
"Disable recommendation engine if needed",
"Simplify checkout flow",
"Turn off non-essential analytics"
]
}
}
Load Testing Strategyโ
def create_load_testing_strategy(self):
"""Comprehensive load testing for Black Friday preparation"""
return {
"test_scenarios": {
"baseline_load_test": {
"description": "Current production traffic patterns",
"virtual_users": 10000,
"duration": "2 hours",
"ramp_up": "10 minutes",
"success_criteria": [
"P95 response time < 500ms",
"Error rate < 0.1%",
"Zero timeouts"
]
},
"black_friday_simulation": {
"description": "10x peak traffic simulation",
"virtual_users": 100000,
"duration": "4 hours",
"traffic_pattern": "Gradual ramp to peak, then sustain",
"focus_areas": [
"Checkout flow under extreme load",
"Search and product pages",
"Payment processing capacity",
"Mobile vs desktop performance"
]
}
},
"test_data_strategy": {
"synthetic_products": "100k product catalog",
"test_users": "1M user accounts with purchase history",
"payment_simulation": "Use test payment processors",
"inventory_simulation": "Realistic stock levels and depletion"
}
}
Multi-Store Inventory Monitoringโ
Distributed Monitoring Architectureโ
400+ Store Inventory Monitoring System:
class MultiStoreInventoryMonitoring:
def __init__(self):
self.store_count = 400
self.regions = ["Northeast", "Southeast", "Midwest", "West", "Southwest"]
self.product_categories = ["Grocery", "Pharmacy", "General Merchandise"]
def design_distributed_monitoring_architecture(self):
"""Hierarchical monitoring for 400+ stores"""
return {
"monitoring_hierarchy": {
"store_level": {
"scope": "Individual store (1-400)",
"metrics_frequency": "Real-time (15s intervals)",
"data_retention": "7 days local",
"critical_metrics": [
"stock_levels_by_aisle",
"pos_transaction_rate",
"temperature_sensors",
"power_consumption"
]
},
"regional_level": {
"scope": "Regional aggregation (5 regions)",
"metrics_frequency": "1 minute aggregations",
"data_retention": "30 days",
"aggregated_metrics": [
"regional_inventory_turnover",
"cross_store_transfer_opportunities",
"regional_demand_patterns"
]
},
"corporate_level": {
"scope": "Enterprise-wide view",
"metrics_frequency": "5 minute summaries",
"data_retention": "2 years",
def design_distributed_monitoring_architecture(self):
"""Hierarchical monitoring for 400+ stores"""
return {
"monitoring_hierarchy": {
"store_level": {
"scope": "Individual store (1-400)",
"metrics_frequency": "Real-time (15s intervals)",
"data_retention": "7 days local",
"critical_metrics": [
"stock_levels_by_aisle",
"pos_transaction_rate",
"temperature_sensors",
"power_consumption"
]
},
"regional_level": {
"scope": "Regional aggregation (5 regions)",
"metrics_frequency": "1 minute aggregations",
"data_retention": "30 days",
"aggregated_metrics": [
"regional_inventory_turnover",
"cross_store_transfer_opportunities",
"regional_demand_patterns"
]
},
"corporate_level": {
"scope": "Enterprise-wide view",
"metrics_frequency": "5 minute summaries",
"data_retention": "2 years",
def create_edge_computing_architecture(self):
"""Edge devices in each store for local processing"""
return {
"store_edge_device": {
"hardware": "Industrial IoT gateway with 32GB RAM, 1TB SSD",
"software_stack": [
"K3s (lightweight Kubernetes)",
"Prometheus (local metrics collection)",
"InfluxDB (time-series storage)",
"Local alerting service"
],
"connectivity": {
"primary": "Dedicated fiber internet",
"backup": "4G/5G cellular",
"local": "WiFi for in-store devices"
}
},
"local_monitoring_capabilities": {
"real_time_inventory": {
"rfid_readers": "Track individual item movements",
"weight_sensors": "Shelf weight monitoring for bulk items",
"camera_analytics": "Computer vision for shelf monitoring",
"pos_integration": "Real-time sales data"
},
"environmental_monitoring": {
"temperature": "Freezer/refrigerator monitoring",
"humidity": "Produce section optimization",
"air_quality": "Customer comfort metrics"
},
"operational_metrics": {
"foot_traffic": "Customer flow analytics",
"checkout_wait_times": "Queue length monitoring",
"employee_efficiency": "Task completion tracking"
}
},
"local_alerting": {
"immediate_alerts": [
"Freezer temperature > 0ยฐC",
"Critical item stockout (top 100 SKUs)",
"POS system failure",
"Security system breach"
],
"notification_methods": [
"Store manager mobile app",
"Digital displays in back office",
"Email to regional manager"
]
}
}
def design_inventory_specific_monitoring(self):
"""Detailed inventory monitoring strategy"""
return {
"product_classification": {
"a_class_products": {
"definition": "Top 20% revenue generators",
"monitoring_frequency": "Real-time",
"stockout_tolerance": "Zero tolerance",
"reorder_automation": "Automatic when stock < 5 days",
"alert_threshold": "< 10 units remaining"
},
"b_class_products": {
"definition": "Medium revenue impact",
"monitoring_frequency": "5 minute intervals",
"stockout_tolerance": "< 2 hours",
"reorder_automation": "Automatic when stock < 3 days",
"alert_threshold": "< 5 units remaining"
},
"c_class_products": {
"definition": "Low revenue impact",
"monitoring_frequency": "30 minute intervals",
"stockout_tolerance": "< 24 hours",
"reorder_automation": "Manual approval required",
"alert_threshold": "Out of stock"
}
},
"demand_forecasting_integration": {
"ml_model_inputs": [
"Historical sales data (2+ years)",
"Seasonal patterns",
"Local events (sports, weather, holidays)",
"Economic indicators",
"Competitor pricing"
],
"prediction_accuracy_monitoring": {
"metric": "mean_absolute_percentage_error",
"target": "< 15% MAPE for A-class products",
"alert": "Model accuracy degradation > 20%"
},
"automated_reordering": {
"confidence_threshold": "> 85% prediction confidence",
"safety_stock_buffer": "20% above predicted demand",
"supplier_lead_time_factor": "Dynamic based on supplier performance"
}
},
"cross_store_optimization": {
"transfer_opportunities": {
"identification": """
# Prometheus query to identify transfer opportunities
(
inventory_stock_level{store="A"} >
(avg_over_time(inventory_daily_sales[7d]) * 10) # 10 days of stock
)
and
(
inventory_stock_level{store="B", product_id="same"} <
(avg_over_time(inventory_daily_sales[7d]) * 2) # 2 days of stock
)
and
on(product_id) distance_between_stores < 50 # Within 50 miles
""",
"automation_threshold": "Transfer cost < 20% of product margin",
"approval_workflow": "Auto-approve transfers < $1000 value"
},
"regional_balancing": {
"surplus_detection": "Stock > 15 days of demand",
"shortage_prediction": "Stock < 3 days of demand",
"optimization_algorithm": "Minimize total transportation cost"
}
}
}
def create_business_impact_monitoring(self):
"""Monitor business KPIs across all stores"""
return {
"revenue_impact_tracking": {
"lost_sales_calculation": {
"formula": """
sum(
stockout_duration_minutes *
avg_sales_per_minute_historical *
product_margin
) by (store, product_category)
""",
"real_time_tracking": "Update every 5 minutes",
"daily_reporting": "Automatic executive dashboard"
},
"margin_optimization": {
"high_margin_priority": "Monitor top 10% margin products closely",
"clearance_opportunities": "Identify slow-moving high-value inventory",
"price_elasticity_monitoring": "Track demand vs price changes"
}
},
"customer_satisfaction_metrics": {
"stockout_customer_impact": {
"metric": "customer_frustration_score",
"calculation": "Stockouts of frequently purchased items",
"survey_integration": "Post-visit satisfaction surveys",
"nps_correlation": "Track NPS vs stockout frequency"
},
"substitute_product_success": {
"metric": "substitute_acceptance_rate",
"tracking": "When suggesting alternatives for out-of-stock items",
"optimization": "Improve substitute recommendation algorithm"
}
},
"operational_efficiency": {
"inventory_turnover": {
"calculation": "COGS / Average Inventory Value",
"target": "> 12x annual turnover",
"monitoring": "Monthly trend analysis",
"benchmarking": "Compare stores within same demographic"
},
"shrinkage_monitoring": {
"sources": ["Theft", "Damage", "Expiration", "Administrative errors"],
"acceptable_threshold": "< 1.5% of inventory value",
"investigation_trigger": "Shrinkage > 2% in any category"
}
}
}
def implement_prometheus_configuration(self):
"""Prometheus setup for multi-store monitoring"""
prometheus_config = """
# Global Prometheus configuration for multi-store monitoring
global:
scrape_interval: 30s
evaluation_interval: 30s
external_labels:
cluster: 'retail-monitoring'
company: 'heb'
# Remote write to central storage
remote_write:
- url: "https://prometheus-central.heb.com/api/v1/write"
queue_config:
max_samples_per_send: 10000
batch_send_deadline: 5s
write_relabel_configs:
# Only send critical metrics to central storage
- source_labels: [__name__]
regex: '(inventory_stock_level|pos_transaction_rate|temperature_sensors|revenue_impact).*'
action: keep
# Scrape configurations for different store systems
scrape_configs:
# Inventory management systems
- job_name: 'inventory-pos'
static_configs:
- targets: ['pos-system:9100']
scrape_interval: 15s
metrics_path: '/metrics'
relabel_configs:
- source_labels: [__address__]
target_label: store_id
regex: 'pos-system-([0-9]+):.*'
replacement: '${1}'
# Environmental sensors
- job_name: 'environmental-sensors'
static_configs:
- targets: ['iot-gateway:9101']
scrape_interval: 30s
metric_relabel_configs:
# Add store location metadata
- source_labels: [sensor_id]
target_label: aisle
regex: 'temp_([0-9]+)_.*'
replacement: '${1}'
# RFID readers
- job_name: 'rfid-readers'
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['store-systems']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: 'rfid-reader'
action: keep
- source_labels: [__meta_kubernetes_pod_annotation_store_id]
target_label: store_id
# Recording rules for business metrics
rule_files:
- "store_inventory_rules.yml"
- "business_impact_rules.yml"
"""
recording_rules = """
# store_inventory_rules.yml
groups:
- name: inventory_metrics
interval: 60s
rules:
# Calculate days of inventory remaining
- record: store:inventory_days_remaining
expr: |
inventory_stock_level
/
(avg_over_time(inventory_daily_sales[7d]) + 0.1) # Avoid division by zero
# Identify critical stockouts
- record: store:critical_stockouts
expr: |
(inventory_stock_level{product_class="A"} == 0) * 1
# Calculate store-level inventory value
- record: store:total_inventory_value
expr: |
sum(inventory_stock_level * product_unit_cost) by (store_id)
# Regional inventory summaries
- record: region:inventory_value
expr: |
sum(store:total_inventory_value) by (region)
- name: business_impact
interval: 300s # 5 minute intervals
rules:
# Lost sales due to stockouts
- record: store:lost_sales_hourly
expr: |
sum(
increase(stockout_duration_minutes[1h]) / 60 *
avg_over_time(sales_per_hour_historical[7d]) *
product_margin_percentage / 100
) by (store_id, product_category)
# Inventory turnover rate
- record: store:inventory_turnover_monthly
expr: |
sum(increase(inventory_cost_of_goods_sold[30d])) by (store_id) /
avg_over_time(store:total_inventory_value[30d])
"""
return {
"prometheus_config": prometheus_config,
"recording_rules": recording_rules,
"alerting_rules": self.create_alerting_rules()
}
def create_alerting_rules(self):
"""Comprehensive alerting for inventory management"""
return """
# Inventory alerting rules
groups:
- name: critical_inventory_alerts
rules:
# Critical product stockout
- alert: CriticalProductStockout
expr: |
inventory_stock_level{product_class="A"} == 0
for: 1m
labels:
severity: critical
team: inventory
impact: revenue_loss
annotations:
summary: "Critical product out of stock"
description: "Store \{{ $labels.store_id }} is out of stock for critical product \{{ $labels.product_id }}"
action: "Emergency restock or transfer from nearby store"
estimated_loss: "$500/hour"
# Predicted stockout
- alert: StockoutPredicted
expr: |
store:inventory_days_remaining{product_class="A"} < 1
for: 5m
labels:
severity: warning
team: inventory
annotations:
summary: "Critical product will be out of stock within 24 hours"
description: "\{{ $labels.product_id }} at store \{{ $labels.store_id }} has \{{ $value }} days remaining"
action: "Schedule restock delivery"
# Environmental alerts
- alert: FreezerTemperatureHigh
expr: |
temperature_celsius{sensor_type="freezer"} > 0
for: 2m
labels:
severity: critical
team: facilities
impact: product_loss
annotations:
summary: "Freezer temperature above safe threshold"
description: "Freezer \{{ $labels.sensor_id }} temperature is \{{ $value }}ยฐC"
action: "Immediate maintenance required"
# High lost sales alert
- alert: HighLostSales
expr: |
store:lost_sales_hourly > 1000
for: 15m
labels:
severity: warning
team: inventory
impact: revenue
annotations:
summary: "High lost sales due to stockouts"
description: "Store \{{ $labels.store_id }} losing $\{{ $value }}/hour due to stockouts"
action: "Review inventory management processes"
# Regional inventory imbalance
- alert: RegionalInventoryImbalance
expr: |
(
max(store:total_inventory_value) by (region) /
min(store:total_inventory_value) by (region)
) > 3
for: 30m
labels:
severity: warning
team: supply_chain
annotations:
summary: "Significant inventory imbalance within region"
description: "Region \{{ $labels.region }} has 3x inventory variance between stores"
action: "Consider inventory rebalancing"
"""
## Summary of Observability & Retail Monitoring Coverage
The comprehensive interview guide now includes detailed coverage of:
### **Prometheus & Grafana Expertise (Questions 7-11)**
- **Complete observability stack design** with Prometheus configuration, SLI/SLO recording rules, and alerting strategies
- **DataDog vs Prometheus comparison** with cost analysis for 100-service architecture
- **Migration strategy** from DataDog to Prometheus with phased approach and risk mitigation
- **Multi-tenant Grafana setup** with organization structure, data source segregation, and access controls
- **Prometheus scaling** with federation, sharding, long-term storage (Thanos), and cardinality management
### **Retail/E-commerce Specific Monitoring (Questions 15-17)**
- **Black Friday monitoring strategy** covering inventory, checkout, payments, user experience, and infrastructure scaling
- **Multi-store inventory monitoring** for 400+ locations with edge computing, hierarchical monitoring, and business impact tracking
- **Grocery delivery platform observability** with real-time order tracking, driver dispatch, and inventory management
### **Key Technical Implementations Included:**
**Prometheus Configuration Examples:**
- Federation setup for multi-cluster monitoring
- Recording rules for SLI/SLO calculations
- Alerting rules with business impact correlation
- Cardinality optimization and cost management
**Grafana Dashboard Design:**
- Role-based access control and multi-tenancy
- Executive, operational, and debugging dashboards
- Dashboard-as-code with JSON configurations
- Real-time business KPI tracking
**DataDog Integration & Migration:**
- API-based metric extraction and conversion
- Side-by-side comparison during migration
- Cost optimization strategies
- Custom exporter development
**Retail-Specific Monitoring:**
- Edge computing architecture for store-level monitoring
- Real-time inventory tracking with RFID/IoT integration
- Cross-store inventory optimization
- Black Friday load testing and incident response
**Business Impact Focus:**
- Revenue correlation with technical metrics
- Customer satisfaction monitoring
- Operational efficiency tracking
- Predictive analytics for inventory management
### **Tools & Technologies Covered:**
- **Monitoring**: Prometheus, Grafana, DataDog, Thanos, AlertManager
- **Storage**: InfluxDB, TimescaleDB, object storage (S3/GCS)
- **Retail Systems**: POS integration, RFID readers, IoT sensors
- **Infrastructure**: Kubernetes, edge computing, CDN monitoring
- **Business Analytics**: Revenue tracking, conversion funnels, inventory turnover
This comprehensive coverage ensures candidates can demonstrate expertise in both modern observability tools and domain-specific retail/e-commerce monitoring challenges, making them well-prepared for SRE roles at companies like HEB or similar retail organizations.
The answers provide practical, implementable solutions with real code examples, configuration files, and business impact analysis that shows understanding of both technical excellence and business value delivery.
## Performance Optimization Strategies
### Database Performance Monitoring
```python
class DatabasePerformanceMonitor:
def __init__(self):
self.db_metrics = [
"connection_pool_usage",
"query_latency_percentiles",
"lock_contention",
"index_usage_stats"
]
def monitor_query_performance(self):
"""Monitor slow queries and performance bottlenecks"""
return {
"slow_query_detection": {
"threshold": "> 1000ms execution time",
"alert": "Query execution > 5 seconds",
"analysis": "Include query plan and table statistics"
},
"connection_pool_monitoring": {
"metric": "active_connections / max_connections",
"alert": "Connection pool > 80% utilization",
"scaling_action": "Add read replicas or increase pool size"
},
"lock_monitoring": {
"deadlock_detection": "Monitor pg_locks for blocking queries",
"long_running_transactions": "Alert on transactions > 30 minutes"
}
}
Application Performance Optimizationโ
class ApplicationPerformanceOptimization:
def __init__(self):
self.optimization_areas = [
"memory_management",
"cpu_optimization",
"io_efficiency",
"caching_strategies"
]
def implement_caching_strategy(self):
"""Multi-tier caching for optimal performance"""
return {
"application_cache": {
"tool": "Redis Cluster",
"use_cases": ["Session data", "Frequently accessed data"],
"ttl_strategy": "Dynamic TTL based on access patterns"
},
"database_query_cache": {
"tool": "PostgreSQL query cache + Redis",
"cache_invalidation": "Event-driven cache clearing"
},
"cdn_cache": {
"static_assets": "Long TTL for versioned assets",
"api_responses": "Short TTL for dynamic content"
}
}
Summaryโ
This comprehensive observability guide covers:
- Advanced monitoring strategies for high-scale systems
- Business KPI tracking with technical metrics correlation
- Multi-store inventory monitoring with edge computing
- Black Friday scenario planning with load testing strategies
- Prometheus optimization and scaling techniques
- Incident response procedures with automated escalation
- Performance optimization across the full stack
Retail E-commerce Platform Monitoring (Black Friday)โ
Comprehensive Black Friday Monitoring Strategy:
class BlackFridayMonitoringStrategy:
def __init__(self):
self.peak_multiplier = 10 # 10x normal traffic expected
self.critical_services = [
"product_catalog", "shopping_cart", "checkout_service",
"payment_gateway", "inventory_service", "order_management"
]
def design_inventory_monitoring(self):
"""Real-time inventory monitoring across all channels"""
return {
"inventory_metrics": {
"stock_levels": {
"metric": "inventory_stock_level",
"labels": ["product_id", "warehouse", "channel"],
"alert_thresholds": {
"critical": "stock < 10 units for top 100 products",
"warning": "stock < 50 units for top 500 products"
},
"business_impact": "Lost sales, customer dissatisfaction"
},
"stock_velocity": {
"metric": "inventory_sales_rate_per_minute",
"calculation": "rate(inventory_sold_total[5m])",
"purpose": "Predict stockouts before they happen",
"alert": "If velocity suggests stockout within 2 hours"
},
"restock_alerts": {
"metric": "inventory_restock_needed",
"triggers": [
"Stock below safety threshold",
"Incoming shipment delayed",
"Supplier availability issues"
]
}
},
"cross_channel_sync": {
"online_vs_store": {
"sync_delay_metric": "inventory_sync_delay_seconds",
"alert_threshold": "> 30 seconds",
"business_rule": "Online overselling prevention"
},
"reserved_inventory": {
"metric": "inventory_reserved_units",
"purpose": "Track items in shopping carts",
"timeout_policy": "Release after 15 minutes"
}
},
"prometheus_rules": self.create_inventory_prometheus_rules()
}
def create_inventory_prometheus_rules(self):
"""Prometheus alerting rules for inventory"""
return """
groups:
- name: inventory_alerts
rules:
# Critical stockout alert for top products
- alert: CriticalStockLow
expr: |
(
inventory_stock_level{product_tier="top_100"} < 10
and
rate(inventory_sold_total{product_tier="top_100"}[10m]) > 0
)
for: 1m
labels:
severity: critical
team: inventory
impact: revenue_loss
annotations:
summary: "Critical stock low for top product"
description: "Product \{{ $labels.product_id }} has \{{ $value }} units left"
action: "Immediate restock or remove from sale"
revenue_impact: "~$10k/hour if not resolved"
# Predicted stockout alert
- alert: StockoutPredicted
expr: |
(
(
inventory_stock_level
/
(rate(inventory_sold_total[30m]) * 60) # Units per minute
) < 120 # Less than 2 hours of stock
and
rate(inventory_sold_total[30m]) > 0
)
for: 5m
labels:
severity: warning
team: inventory
annotations:
summary: "Stockout predicted within 2 hours"
description: "\{{ $labels.product_id }} will be out of stock in \{{ $value }} minutes"
# Inventory sync issues
- alert: InventorySyncDelayed
expr: inventory_sync_delay_seconds > 30
for: 2m
labels:
severity: warning
team: platform
annotations:
summary: "Inventory sync delayed between channels"
description: "Sync delay is \{{ $value }} seconds for \{{ $labels.warehouse }}"
"""
def design_checkout_monitoring(self):
"""Critical checkout flow monitoring"""
return {
"checkout_funnel_metrics": {
"cart_abandonment": {
"calculation": """
(
sum(rate(cart_created_total[5m]))
-
sum(rate(checkout_started_total[5m]))
) / sum(rate(cart_created_total[5m])) * 100
""",
"normal_baseline": "65%",
"alert_threshold": "> 75%",
"black_friday_target": "< 70%"
},
"payment_failures": {
"by_gateway": "payment_failures_total by (gateway, error_type)",
"by_card_type": "payment_failures_total by (card_type)",
"alert_threshold": "> 2% failure rate",
"escalation": "Auto-failover to backup payment processor"
},
"checkout_latency": {
"metric": "checkout_duration_seconds",
"slo_target": "P95 < 3 seconds",
"alert_threshold": "P95 > 5 seconds",
"business_impact": "Every 1s increase = 7% conversion drop"
}
},
"payment_processor_health": {
"primary_gateway": {
"availability_sli": "payment_requests_success_rate",
"latency_sli": "payment_processing_duration_p95",
"throughput_capacity": "10,000 transactions/minute"
},
"backup_gateways": {
"auto_failover_threshold": "Primary gateway < 95% success rate",
"manual_override": "Operations dashboard control"
}
},
"fraud_detection_metrics": {
"fraud_score_distribution": "fraud_score_histogram",
"false_positive_rate": "legitimate_orders_blocked / total_orders",
"processing_time": "fraud_check_duration_seconds",
"alert": "Fraud processing > 500ms (impacts checkout flow)"
}
}
def design_user_experience_monitoring(self):
"""Real User Monitoring for customer experience"""
return {
"frontend_metrics": {
"page_load_times": {
"homepage": "Target: < 2s, Alert: > 3s",
"product_pages": "Target: < 1.5s, Alert: > 2.5s",
"checkout_pages": "Target: < 1s, Alert: > 2s"
},
"javascript_errors": {
"error_rate": "js_errors_total / page_views_total",
"alert_threshold": "> 0.5%",
"categorization": ["network", "syntax", "runtime", "resource"]
},
"core_web_vitals": {
"largest_contentful_paint": "Target: < 2.5s",
"first_input_delay": "Target: < 100ms",
"cumulative_layout_shift": "Target: < 0.1"
}
},
"mobile_app_metrics": {
"app_crash_rate": "Target: < 0.1%, Alert: > 0.5%",
"app_startup_time": "Target: < 3s, Alert: > 5s",
"offline_capability": "Track cart persistence during network issues"
},
"search_and_discovery": {
"search_success_rate": """
(
search_queries_with_results_total
/
search_queries_total
) * 100
""",
"search_latency": "search_response_time_p95",
"product_recommendation_ctr": "recommendation_clicks / recommendation_views"
}
}
def design_infrastructure_monitoring(self):
"""Infrastructure monitoring for Black Friday scale"""
return {
"auto_scaling_metrics": {
"horizontal_scaling": {
"trigger_metrics": [
"CPU utilization > 70%",
"Request queue depth > 100",
"Response time P95 > 500ms"
],
"scaling_policy": {
"scale_out": "Add 50% capacity when triggered",
"scale_in": "Remove 25% capacity when load decreases",
"cooldown": "5 minutes between scaling events"
}
},
"database userJourneys: true,
performanceBottlenecks: "automatic_detection",
businessMetricsCorrelation: true
}
};
// DataDog dashboard configuration
const executiveDashboard = {
widgets: [
{
type: "timeseries",
title: "Business KPIs",
requests: [
{
q: "sum:orders.completed{*}.as_count()",
display_type: "line"
},
{
q: "sum:revenue.total{*}",
display_type: "line"
}
],
custom_links: [
{
label: "Drill down to order details",
link: "/dashboard/orders-detail?from={{start_time}}&to={{end_time}}"
}
]
},
{
type: "query_value",
title: "Current System Health",
requests: [
{
q: "avg:system.uptime{*}",
aggregator: "avg"
}
],
conditional_formats: [
{
comparator: ">",
value: 99.5,
palette: "green_on_white"
},
{
comparator: "<=",
value: 99.0,
palette: "red_on_white"
}
]
}
]
};
Cost Analysis for 100-service Microservices Architecture:
class MonitoringCostAnalysis:
def __init__(self):
self.services = 100
self.hosts = 50
self.containers = 500
def prometheus_costs(self):
"""Calculate Prometheus + Grafana costs"""
return {
# Infrastructure costs
"prometheus_servers": {
"count": 3, # HA setup
"instance_type": "c5.2xlarge",
"monthly_cost": 3 * 280, # $840/month
"storage": "1TB SSD per server",
"storage_cost": 3 * 100 # $300/month
},
"grafana_servers": {
"count": 2, # HA setup
"instance_type": "t3.large",
"monthly_cost": 2 * 70, # $140/month
},
"long_term_storage": {
"provider": "S3/GCS",
"monthly_cost": 200, # $200/month for 10TB
},
"engineering_overhead": {
"sre_time": "20% of 1 FTE",
"monthly_cost": 0.2 * 12000, # $2,400/month
},
"total_monthly": 840 + 300 + 140 + 200 + 2400 # $3,880/month
}
def datadog_costs(self):
"""Calculate DataDog costs"""
return {
"infrastructure_monitoring": {
"hosts": self.hosts,
"cost_per_host": 15, # $15/host/month
"monthly_cost": self.hosts * 15 # $750/month
},
"apm_monitoring": {
"hosts": self.hosts,
"cost_per_host": 31, # $31/host/month for APM
"monthly_cost": self.hosts * 31 # $1,550/month
},
"log_management": {
"gb_per_day": 100,
"cost_per_gb": 0.10,
"monthly_cost": 100 * 0.10 * 30 # $300/month
},
"custom_metrics": {
"metric_count": 10000,
"cost_per_100_metrics": 5,
"monthly_cost": (10000/100) * 5 # $500/month
},
"engineering_overhead": {
"sre_time": "5% of 1 FTE", # Much lower maintenance
"monthly_cost": 0.05 * 12000 # $600/month
},
"total_monthly": 750 + 1550 + 300 + 500 + 600 # $3,700/month
}
def decision_matrix(self):
"""Decision framework based on company characteristics"""
return {
"choose_prometheus_if": [
"Cost consciousness (long-term savings)",
"Data sovereignty requirements",
"Complex custom metrics and alerting",
"Strong DevOps/SRE team",
"Multi-cloud or on-premises infrastructure",
"Advanced PromQL requirements"
],
"choose_datadog_if": [
"Rapid time-to-value needed",
"Limited monitoring expertise",
"Comprehensive APM/RUM requirements",
"Strong integration needs",
"Prefer managed solutions",
"Executive dashboards and business metrics"
]
}
9. Migration from DataDog to Prometheus Strategyโ
Strong Answer: Phased Migration Strategy:
class DataDogToPrometheusMigration:
def __init__(self):
self.migration_phases = [
"assessment_and_planning",
"infrastructure_setup",
"metrics_migration",
"dashboard_migration",
"alerting_migration",
"training_and_handover",
"datadog_decommission"
]
def phase_1_assessment(self):
"""Comprehensive assessment of current DataDog usage"""
return {
"datadog_inventory": {
"hosts_monitored": self.audit_hosts(),
"custom_metrics": self.extract_custom_metrics(),
"dashboards": self.export_dashboards(),
"alerts": self.extract_alert_rules(),
"integrations": self.list_integrations(),
"monthly_cost": self.calculate_current_cost()
},
"migration_complexity": {
"high_complexity": [
"Custom business metrics with complex formulas",
"Advanced anomaly detection rules",
"Cross-service dependency mapping",
"Log correlation with metrics"
],
"medium_complexity": [
"Standard infrastructure metrics",
"Application performance metrics",
"Basic alerting rules"
],
"low_complexity": [
"System metrics (CPU, memory, disk)",
"Network metrics",
"Basic availability checks"
]
}
}
def extract_custom_metrics(self):
"""Extract DataDog custom metrics using API"""
datadog_api_script = """
from datadog import initialize, api
import json
options = {
'api_key': 'your_api_key',
'app_key': 'your_app_key'
}
initialize(**options)
# Get all custom metrics
metrics = api.Metric.list()
custom_metrics = []
for metric in metrics['metrics']:
if not metric.startswith(('system.', 'aws.', 'kubernetes.')):
metric_details = api.Metric.query(
query=f"avg:{metric}{{*}}",
from_time=int(time.time() - 3600),
to_time=int(time.time())
)
custom_metrics.append({
'name': metric,
'tags': metric_details.get('series', [{}])[0].get('scope', ''),
'type': 'gauge', # Default, needs manual verification
'description': f"Migrated from DataDog metric: {metric}"
})
return custom_metrics
"""
return datadog_api_script
def phase_2_infrastructure_setup(self):
"""Set up Prometheus infrastructure with HA"""
return {
"prometheus_ha_setup": {
"primary_cluster": "us-east-1",
"replica_cluster": "us-west-2",
"federation_config": self.setup_federation(),
"storage_config": self.setup_long_term_storage()
},
"grafana_setup": {
"instance_count": 2,
"authentication": "SSO integration",
"provisioning": "Infrastructure as Code"
},
"monitoring_migration_dashboard": self.create_migration_dashboard()
}
def setup_federation(self):
"""Configure Prometheus federation for HA"""
return """
# Global Prometheus configuration
global:
scrape_interval: 15s
external_labels:
region: 'global'
scrape_configs:
- job_name: 'federate-east'
scrape_interval: 15s
honor_labels: true
metrics_path: '/federate'
params:
'match[]':
- '{job="kubernetes-apiservers"}'
- '{job="node-exporter"}'
- '{__name__=~"business_.*"}' # Business metrics
- '{__name__=~"sli_.*"}' # SLI metrics
static_configs:
- targets:
- 'prometheus-east.company.com:9090'
- job_name: 'federate-west'
scrape_interval: 15s
honor_labels: true
metrics_path: '/federate'
params:
'match[]':
- '{job="kubernetes-apiservers"}'
- '{job="node-exporter"}'
- '{__name__=~"business_.*"}'
- '{__name__=~"sli_.*"}'
static_configs:
- targets:
- 'prometheus-west.company.com:9090'
"""
def phase_3_metrics_migration(self):
"""Migrate metrics with dual collection period"""
return {
"dual_collection_strategy": {
"duration": "30 days",
"purpose": "Validate metric accuracy",
"comparison_dashboard": "Side-by-side DataDog vs Prometheus"
},
"metric_mapping": self.create_metric_mapping(),
"custom_exporters": self.build_custom_exporters()
}
def create_metric_mapping(self):
"""Map DataDog metrics to Prometheus equivalents"""
return {
# System metrics mapping
"system.cpu.user": {
"prometheus_metric": "node_cpu_seconds_total{mode='user'}",
"transformation": "rate(node_cpu_seconds_total{mode='user'}[5m])",
"validation_query": "Compare 5-minute averages"
},
# Application metrics mapping
"custom.orders.completed": {
"prometheus_metric": "orders_completed_total",
"transformation": "increase(orders_completed_total[1h])",
"exporter": "custom_business_exporter",
"notes": "Counter metric, use increase() for DataDog equivalent"
},
# Database metrics mapping
"postgresql.connections": {
"prometheus_metric": "pg_stat_database_numbackends",
"transformation": "pg_stat_database_numbackends",
"exporter": "postgres_exporter"
}
}
def build_custom_exporters(self):
"""Build exporters for DataDog-specific metrics"""
business_metrics_exporter = """
import time
import requests
from prometheus_client import start_http_server, Counter, Gauge, Histogram
# Define metrics that match DataDog custom metrics
ORDERS_COMPLETED = Counter('orders_completed_total', 'Total completed orders')
REVENUE_TOTAL = Gauge('revenue_total_dollars', 'Total revenue in dollars')
ORDER_PROCESSING_TIME = Histogram('order_processing_seconds',
'Time spent processing orders')
class BusinessMetricsExporter:
def __init__(self):
self.api_endpoint = "https://api.company.com/metrics"
def collect_metrics(self):
\"\"\"Collect business metrics from internal APIs\"\"\"
try:
response = requests.get(f"{self.api_endpoint}/orders")
data = response.json()
# Update Prometheus metrics
ORDERS_COMPLETED._value._value = data['total_orders']
REVENUE_TOTAL.set(data['total_revenue'])
# Histogram metrics need to be observed
for processing_time in data['recent_processing_times']:
ORDER_PROCESSING_TIME.observe(processing_time)
except Exception as e:
print(f"Error collecting metrics: {e}")
def run(self):
start_http_server(8000)
while True:
self.collect_metrics()
time.sleep(60) # Collect every minute
if __name__ == "__main__":
exporter = BusinessMetricsExporter()
exporter.run()
"""
return business_metrics_exporter
def phase_4_dashboard_migration(self):
"""Migrate DataDog dashboards to Grafana"""
return {
"dashboard_conversion_tool": self.build_dashboard_converter(),
"dashboard_categories": {
"executive_dashboards": "High-level business metrics",
"operational_dashboards": "Day-to-day monitoring",
"debugging_dashboards": "Detailed troubleshooting",
"sli_slo_dashboards": "Reliability tracking"
},
"migration_priority": [
"Critical operational dashboards first",
"Executive dashboards second",
"Team-specific dashboards third",
"Experimental/unused dashboards last"
]
}
def build_dashboard_converter(self):
"""Tool to convert DataDog dashboards to Grafana"""
converter_script = """
import json
import re
from datadog import api
class DashboardConverter:
def __init__(self):
self.datadog_to_promql_mapping = {
'avg:system.cpu.user{*}': 'avg(rate(node_cpu_seconds_total{mode="user"}[5m]))',
'sum:custom.orders.completed{*}.as_count()': 'increase(orders_completed_total[1h])',
'avg:system.mem.used{*}': 'avg(node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)'
}
def export_datadog_dashboard(self, dashboard_id):
\"\"\"Export dashboard from DataDog\"\"\"
dashboard = api.Dashboard.get(dashboard_id)
return dashboard
def convert_query(self, datadog_query):
\"\"\"Convert DataDog query to PromQL\"\"\"
# Simple mapping - would need more sophisticated logic for complex queries
for dd_query, promql in self.datadog_to_promql_mapping.items():
if dd_query in datadog_query:
return promql
# Log unconverted queries for manual review
print(f"Manual conversion needed: {datadog_query}")
return f"# TODO: Convert manually - {datadog_query}"
def create_grafana_dashboard(self, datadog_dashboard):
\"\"\"Convert to Grafana dashboard format\"\"\"
grafana_dashboard = {
"dashboard": {
"title": datadog_dashboard['title'],
"tags": ["migrated-from-datadog"],
"panels": []
}
}
for widget in datadog_dashboard.get('widgets', []):
panel = self.convert_widget_to_panel(widget)
grafana_dashboard['dashboard']['panels'].append(panel)
return grafana_dashboard
def convert_widget_to_panel(self, widget):
\"\"\"Convert DataDog widget to Grafana panel\"\"\"
panel_type_mapping = {
'timeseries': 'graph',
'query_value': 'singlestat',
'toplist': 'table'
}
return {
"title": widget.get('title', 'Untitled'),
"type": panel_type_mapping.get(widget['type'], 'graph'),
"targets": [
{
"expr": self.convert_query(request['q']),
"legendFormat": request.get('display_name', '')
}
for request in widget.get('requests', [])
]
}
"""
return converter_script
def phase_5_alerting_migration(self):
"""Migrate DataDog alerts to Prometheus AlertManager"""
return {
"alert_rule_conversion": self.convert_alert_rules(),
"notification_channels": self.setup_notification_channels(),
"testing_strategy": self.create_alert_testing_plan()
}
def convert_alert_rules(self):
"""Convert DataDog monitors to Prometheus alert rules"""
return """
# DataDog monitor conversion example
# DataDog: avg(last_5m):avg:system.cpu.user{*} > 0.8
# Becomes Prometheus:
- alert: HighCPUUsage
expr: avg(rate(node_cpu_seconds_total{mode="user"}[5m])) > 0.8
for: 5m
labels:
severity: warning
team: infrastructure
annotations:
summary: "High CPU usage detected"
description: "CPU usage is {{ $value }}% on {{ $labels.instance }}"
runbook_url: "https://runbooks.company.com/high-cpu"
# DataDog: avg(last_1h):avg:custom.orders.completed{*}.as_count() < 100
# Becomes Prometheus:
- alert: LowOrderVolume
expr: increase(orders_completed_total[1h]) < 100
for: 10m
labels:
severity: critical
team: business
annotations:
summary: "Order volume critically low"
description: "Only {{ $value }} orders in the last hour"
"""
def create_migration_timeline(self):
"""12-week migration timeline"""
return {
"weeks_1_2": {
"tasks": [
"Complete DataDog inventory and assessment",
"Set up Prometheus/Grafana infrastructure",
"Create migration project plan"
],
"deliverables": ["Migration assessment report", "Infrastructure ready"]
},
"weeks_3_4": {
"tasks": [
"Deploy dual collection for system metrics",
"Build custom exporters for business metrics",
"Start dashboard conversion process"
],
"deliverables": ["System metrics in Prometheus", "Custom exporters deployed"]
},
"weeks_5_8": {
"tasks": [
"Migrate critical operational dashboards",
"Convert and test alert rules",
"Train SRE team on Prometheus/Grafana"
],
"deliverables": ["Operational dashboards migrated", "Alert rules tested"]
},
"weeks_9_10": {
"tasks": [
"Migrate remaining dashboards",
"User acceptance testing",
"Performance optimization"
],
"deliverables": ["All dashboards migrated", "Performance optimized"]
},
"weeks_11_12": {
"tasks": [
"Switch primary monitoring to Prometheus",
"Decommission DataDog (gradually)",
"Post-migration optimization"
],
"deliverables": ["Migration complete", "DataDog decommissioned"]
}
}
def risk_mitigation_strategies(self):
"""Key risks and mitigation strategies"""
return {
"data_loss_risk": {
"mitigation": "Maintain DataDog subscription during dual-collection period",
"fallback": "Immediate rollback procedure documented"
},
"alert_gaps": {
"mitigation": "Comprehensive alert rule testing in staging",
"fallback": "Keep DataDog alerts active until Prometheus alerts proven"
},
"dashboard_accuracy": {
"mitigation": "Side-by-side comparison dashboards",
"validation": "Business stakeholder sign-off required"
},
"team_knowledge": {
"mitigation": "Comprehensive training program",
"support": "External Prometheus consultant for first month"
},
"cost_overrun": {
"mitigation": "Detailed cost tracking and regular reviews",
"contingency": "Phased approach allows early cost assessment"
}
}
10. Multi-tenant Grafana Setupโ
Strong Answer: Enterprise Multi-tenant Architecture:
# Grafana configuration for multi-tenancy
grafana_config:
server:
domain: grafana.company.com
root_url: https://grafana.company.com
auth:
# SSO integration for user management
oauth_auto_login: true
generic_oauth:
enabled: true
name: "Company SSO"
client_id: "grafana-client"
client_secret: "$__env{OAUTH_CLIENT_SECRET}"
scopes: "openid email profile groups"
auth_url: "https://sso.company.com/auth"
token_url: "https://sso.company.com/token"
api_url: "https://sso.company.com/userinfo"
# Map SSO groups to Grafana roles
role_attribute_path: |
contains(groups[*], 'sre-team') && 'Admin' ||
contains(groups[*], 'engineering-team') && 'Editor' ||
contains(groups[*], 'business-team') && 'Viewer'
users:
# Prevent users from signing up
allow_sign_up: false
auto_assign_org: true
auto_assign_org_id: 1
auto_assign_org_role: Viewer
# Enable team synchronization from SSO
auth.ldap:
enabled: true
config_file: /etc/grafana/ldap.toml
Organization and Team Structure:
class GrafanaMultiTenantSetup:
def __init__(self):
self.organizations = {
"engineering": {
"name": "Engineering",
"users": ["sre-team", "backend-team", "frontend-team"],
"data_sources": ["prometheus-prod", "prometheus-staging", "jaeger"],
"dashboards": ["infrastructure", "application-performance", "sli-slo"]
},
"product": {
"name": "Product & Business",
"users": ["product-managers", "analysts", "executives"],
"data_sources": ["prometheus-business-metrics", "google-analytics"],
"dashboards": ["business-kpis", "user-analytics", "executive-summary"]
},
"security": {
"name": "Security & Compliance",
"users": ["security-team", "compliance-team"],
"data_sources": ["prometheus-security", "security-logs"],
"dashboards": ["security-monitoring", "compliance-metrics"]
}
}
def create_organization_structure(self):
"""Create Grafana organizations via API"""
api_script = """
import requests
import json
class GrafanaOrgManager:
def __init__(self, grafana_url, admin_token):
self.base_url = grafana_url
self.headers = {
'Authorization': f'Bearer {admin_token}',
'Content-Type': 'application/json'
}
def create_organization(self, org_name):
\"\"\"Create new organization\"\"\"
response = requests.post(
f"{self.base_url}/api/orgs",
headers=self.headers,
json={"name": org_name}
)
return response.json()
def create_team(self, org_id, team_name, members):
\"\"\"Create team within organization\"\"\"
# Switch to organization context
requests.post(
f"{self.base_url}/api/user/using/{org_id}",
headers=self.headers
)
# Create team
team_response = requests.post(
f"{self.base_url}/api/teams",
headers=self.headers,
json={"name": team_name}
)
team_id = team_response.json()['teamId']
# Add members to team
for member in members:
requests.post(
f"{self.base_url}/api/teams/{team_id}/members",
headers=self.headers,
json={"loginOrEmail": member}
)
return team_id
def setup_data_source_permissions(self, org_id, data_source_name, teams):
\"\"\"Configure data source permissions\"\"\"
# Get data source ID
ds_response = requests.get(
f"{self.base_url}/api/datasources/name/{data_source_name}",
headers=self.headers
)
ds_id = ds_response.json()['id']
# Set permissions for each team
for team_name, permission in teams.items():
team_response = requests.get(
f"{self.base_url}/api/teams/search?name={team_name}",
headers=self.headers
)
team_id = team_response.json()['teams'][0]['id']
requests.post(
f"{self.base_url}/api/datasources/{ds_id}/permissions",
headers=self.headers,
json={
"teamId": team_id,
"permission": permission # 1=Query, 2=Admin
}
)
"""
return api_script
def design_dashboard_organization(self):
"""Dashboard folder structure and permissions"""
return {
"folder_structure": {
"Engineering": {
"Infrastructure": {
"dashboards": [
"Kubernetes Cluster Overview",
"Node Performance",
"Network Monitoring",
"Storage Metrics"
],
"permissions": {
"sre-team": "Admin",
"backend-team": "Editor",
"frontend-team": "Viewer"
}
},
"Application Performance": {
"dashboards": [
"Service Mesh Overview",
"Database Performance",
"Cache Hit Rates",
"Error Tracking"
],
"permissions": {
"sre-team": "Admin",
"backend-team": "Admin",
"frontend-team": "Editor"
}
},
"SLI/SLO Tracking": {
"dashboards": [
"Service Level Indicators",
"Error Budget Burn Rate",
"Availability Tracking",
"Latency Analysis"
],
"permissions": {
"sre-team": "Admin",
"engineering-managers": "Viewer"
}
}
},
"Business": {
"Executive Dashboard": {
"dashboards": [
"Business KPIs Overview",
"Revenue Tracking",
"User Growth Metrics",
"System Health Summary"
],
"permissions": {
"executives": "Viewer",
"product-managers": "Editor",
"business-analysts": "Admin"
},
"features": {
"auto_refresh": "5m",
"kiosk_mode": True,
"public_snapshots": False
}
},
"Product Analytics": {
"dashboards": [
"Feature Usage Analytics",
"User Journey Analysis",
"A/B Test Results",
"Customer Satisfaction"
],
"permissions": {
"product-managers": "Admin",
"ux-designers": "Editor",
"executives": "Viewer"
}
}
}
}
}
def implement_data_source_segregation(self):
"""Separate data sources by team needs"""
return {
"prometheus_instances": {
"prometheus-infrastructure": {
"metrics": ["node_*", "container_*", "kubernetes_*"],
"retention": "30d",
"access": ["sre-team", "backend-team"],
"query_timeout": "60s"
},
"prometheus-business": {
"metrics": ["business_*", "orders_*", "revenue_*"],
"retention": "1y",
"access": ["product-team", "business-analysts", "executives"],
"query_timeout": "120s"
},
"prometheus-security": {
"metrics": ["security_*", "audit_*", "compliance_*"],
"retention": "2y", # Compliance requirement
"access": ["security-team", "compliance-team"],
"query_timeout": "30s"
}
},
"data_source_proxy": {
"enabled": True,
"purpose": "Route queries based on user context",
"implementation": self.create_data_source_proxy()
}
}
def create_data_source_proxy(self):
"""Smart data source routing based on user permissions"""
proxy_config = """
# nginx configuration for data source routing
upstream prometheus_infrastructure {
server prometheus-infra-1.company.com:9090;
server prometheus-infra-2.company.com:9090;
}
upstream prometheus_business {
server prometheus-business.company.com:9090;
}
upstream prometheus_security {
server prometheus-security.company.com:9090;
}
# Lua script for routing logic
location /api/v1/query {
access_by_lua_block {
local user_groups = ngx.var.http_x_user_groups
local query = ngx.var.arg_query
# Route infrastructure metrics to appropriate backend
if string.match(query, "node_") or string.match(query, "container_") then
if string.match(user_groups, "sre%-team") or string.match(user_groups, "backend%-team") then
ngx.var.backend = "prometheus_infrastructure"
else
ngx.status = 403
ngx.say("Access denied to infrastructure metrics")
ngx.exit(403)
end
# Route business metrics
elseif string.match(query, "business_") or string.match(query, "orders_") then
if string.match(user_groups, "product%-team") or string.match(user_groups, "business%-") then
ngx.var.backend = "prometheus_business"
else
ngx.status = 403
ngx.say("Access denied to business metrics")
ngx.exit(403)
end
# Route security metrics
elseif string.match(query, "### 7. Design a complete observability stack using Prometheus and Grafana
**Strong Answer:**
**Complete Observability Architecture:**
```yaml
# Prometheus configuration for analytics dashboard
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'analytics-prod'
environment: 'production'
rule_files:
- "analytics_alerts.yml"
- "sli_slo_rules.yml"
scrape_configs:
# Application metrics
- job_name: 'analytics-api'
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['analytics']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
# Infrastructure metrics
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
relabel_configs:
- source_labels: [__address__]
target_label: instance
regex: '([^:]+):.*'
# Database metrics
- job_name: 'postgres-exporter'
static_configs:
- targets: ['postgres-exporter:9187']
# Redis metrics
- job_name: 'redis-exporter'
static_configs:
- targets: ['redis-exporter:9121']
# Remote storage for long-term retention
remote_write:
- url: "https://prometheus-remote-storage.example.com/api/v1/write"
queue_config:
max_samples_per_send: 10000
batch_send_deadline: 5s
# Remote read for historical queries
remote_read:
- url: "https://prometheus-remote-storage.example.com/api/v1/read"
read_recent: true
SLI/SLO Recording Rules:
# sli_slo_rules.yml
groups:
- name: analytics_sli
interval: 30s
rules:
# Availability SLI
- record: analytics:availability_sli
expr: |
(
sum(rate(http_requests_total{job="analytics-api",code!~"5.."}[5m])) /
sum(rate(http_requests_total{job="analytics-api"}[5m]))
)
# Latency SLI (95th percentile)
- record: analytics:latency_sli
expr: |
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{job="analytics-api"}[5m])) by (le)
)
# Error rate SLI
- record: analytics:error_rate_sli
expr: |
(
sum(rate(http_requests_total{job="analytics-api",code=~"5.."}[5m])) /
sum(rate(http_requests_total{job="analytics-api"}[5m]))
)
- name: analytics_slo_burn_rate
interval: 30s
rules:
# Error budget burn rate - 1 hour window
- record: analytics:error_budget_burn_rate_1h
expr: |
(
1 - analytics:availability_sli
) / (1 - 0.995) # 99.5% SLO target
# Error budget burn rate - 6 hour window
- record: analytics:error_budget_burn_rate_6h
expr: |
(
1 - avg_over_time(analytics:availability_sli[6h])
) / (1 - 0.995)
Alerting Rules:
# analytics_alerts.yml
groups:
- name: analytics_alerts
rules:
# Fast burn rate - 2% budget consumed in 1 hour
- alert: AnalyticsErrorBudgetFastBurn
expr: |
(
analytics:error_budget_burn_rate_1h > (14.4 * 1) # 14.4x burn rate
and
analytics:error_budget_burn_rate_6h > (6 * 1) # 6x burn rate
)
for: 2m
labels:
severity: critical
slo: availability
annotations:
summary: "Analytics API fast error budget burn"
description: "Analytics API error budget is burning too fast. Current 1h burn rate: {{ $value }}x"
runbook_url: "https://runbooks.company.com/analytics-error-budget"
# Slow burn rate - 10% budget consumed in 6 hours
- alert: AnalyticsErrorBudgetSlowBurn
expr: |
(
analytics:error_budget_burn_rate_1h > (3 * 1)
and
analytics:error_budget_burn_rate_6h > (1 * 1)
)
for: 15m
labels:
severity: warning
slo: availability
annotations:
summary: "Analytics API slow error budget burn"
# Latency SLO violation
- alert: AnalyticsLatencyHigh
expr: analytics:latency_sli > 0.5 # 500ms threshold
for: 5m
labels:
severity: warning
annotations:
summary: "Analytics API latency SLO violation"
description: "95th percentile latency is {{ $value }}s"
# Infrastructure alerts
- alert: AnalyticsHighCPU
expr: |
(
sum by (instance) (rate(container_cpu_usage_seconds_total{pod=~"analytics-.*"}[5m]))
/
sum by (instance) (container_spec_cpu_quota{pod=~"analytics-.*"} / container_spec_cpu_period{pod=~"analytics-.*"})
) > 0.8
for: 10m
labels:
severity: warning
annotations:
summary: "High CPU usage in analytics pods"
- alert: AnalyticsHighMemory
expr: |
(
container_memory_usage_bytes{pod=~"analytics-.*"}
/
container_spec_memory_limit_bytes{pod=~"analytics-.*"}
) > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "High memory usage in analytics pods"
# Database alerts
- alert: PostgreSQLDown
expr: pg_up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "PostgreSQL is down"
- alert: PostgreSQLSlowQueries
expr: pg_stat_activity_max_tx_duration > 300 # 5 minutes
for: 2m
labels:
severity: warning
annotations:
summary: "PostgreSQL has slow running queries"
Grafana Dashboard as Code:
{
"dashboard": {
"title": "Analytics Platform Overview",
"tags": ["analytics", "sre"],
"templating": {
"list": [
{
"name": "environment",
"type": "query",
"query": "label_values(up{job=\"analytics-api\"}, environment)"
},
{
"name": "pod",
"type": "query",
"query": "label_values(up{job=\"analytics-api\", environment=\"$environment\"}, pod)"
}
]
},
"panels": [
{
"title": "SLO Compliance",
"type": "stat",
"targets": [
{
"expr": "analytics:availability_sli * 100",
"legendFormat": "Availability %"
},
{
"expr": "analytics:latency_sli * 1000",
"legendFormat": "P95 Latency (ms)"
}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 99 },
{ "color": "green", "value": 99.5 }
]
}
}
}
},
{
"title": "Error Budget Burn Rate",
"type": "graph",
"targets": [
{
"expr": "analytics:error_budget_burn_rate_1h",
"legendFormat": "1h burn rate"
},
{
"expr": "analytics:error_budget_burn_rate_6h",
"legendFormat": "6h burn rate"
}
],
"yAxes": [
{
"label": "Burn Rate Multiplier",
"min": 0
}
],
"alert": {
"conditions": [
{
"query": { "queryType": "", "refId": "A" },
"reducer": { "type": "last", "params": [] },
"evaluator": { "params": [10], "type": "gt" }
}
],
"executionErrorState": "alerting",
"for": "5m",
"frequency": "10s",
"handler": 1,
"name": "Error Budget Burn Rate",
"noDataState": "no_data"
}
},
{
"title": "Request Rate & Latency",
"type": "graph",
"targets": [
{
"expr": "sum(rate(http_requests_total{job=\"analytics-api\", environment=\"$environment\"}[5m]))",
"legendFormat": "RPS"
},
{
"expr": "histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket{job=\"analytics-api\", environment=\"$environment\"}[5m])) by (le))",
"legendFormat": "P95 Latency"
}
]
},
{
"title": "Error Rate by Status Code",
"type": "graph",
"targets": [
{
"expr": "sum(rate(http_requests_total{job=\"analytics-api\", environment=\"$environment\"}[5m])) by (code)",
"legendFormat": "{{code}}"
}
],
"stack": true
}
]
}
}
Data Retention Strategy:
# Prometheus retention configuration
prometheus_config:
# Short-term high-resolution data
local_storage:
retention_time: "15d"
retention_size: "50GB"
# Long-term storage with downsampling
remote_storage:
- provider: "Thanos/Cortex"
retention: "2y"
downsampling:
- resolution: "5m"
retention: "30d"
- resolution: "1h"
retention: "1y"
- resolution: "1d"
retention: "2y"
# Different retention for different metric types
recording_rules:
# High-frequency business metrics - longer retention
- group: "business_metrics"
retention: "1y"
metrics: ["orders_per_minute", "revenue_per_hour"]
# Infrastructure metrics - shorter retention
- group: "infrastructure_metrics"
retention: "30d"
metrics: ["cpu_usage", "memory_usage", "disk_io"]
# Debug metrics - very short retention
- group: "debug_metrics"
retention: "7d"
metrics: ["gc_duration", "goroutines_count"]
Cost Optimization Strategy:
# Prometheus metrics optimization
class PrometheusOptimization:
def __init__(self):
self.high_cardinality_metrics = [
"http_requests_total", # Can have many label combinations
"database_query_duration",
"cache_operations"
]
def optimize_cardinality(self):
"""Reduce metric cardinality to control costs"""
optimization_rules = {
# Limit user_id labels to top 1000 users
"http_requests_total": {
"relabel_configs": [
{
"source_labels": ["user_id"],
"target_label": "user_category",
"regex": "(premium|standard|trial)",
"replacement": "${1}"
}
]
},
# Group low-frequency endpoints
"request_duration": {
"metric_relabel_configs": [
{
"source_labels": ["endpoint"],
"target_label": "endpoint_group",
"regex": "/api/admin/.*",
"replacement": "admin_endpoints"
}
]
}
}
return optimization_rules
def implement_sampling(self):
"""Implement sampling for high-volume metrics"""
return {
"trace_sampling": {
"probability": 0.01, # 1% of traces
"max_traces_per_second": 100
},
"metric_sampling": {
"debug_metrics": 0.1, # 10% sampling
"infrastructure_metrics": 1.0, # No sampling
"business_metrics": 1.0 # Never sample business metrics
}
}
8. Compare Prometheus vs DataDog for monitoring microservicesโ
Strong Answer: Comprehensive Comparison:
Aspect | Prometheus + Grafana | DataDog |
---|---|---|
Cost Model | Open source + infrastructure costs | Per-host + per-metric pricing |
Data Ownership | Full control, on-premises | SaaS, data in DataDog cloud |
Scalability | Requires engineering (federation, sharding) | Managed, auto-scaling |
Customization | Unlimited via PromQL | Limited to DataDog's query language |
Learning Curve | Steep, requires Prometheus expertise | Gentle, intuitive UI |
Integrations | 500+ exporters, community-driven | 400+ official integrations |
Technical Deep Dive:
Prometheus Advantages:
# Advanced PromQL capabilities
class PrometheusAdvantages:
def complex_queries(self):
"""Examples of powerful PromQL queries not easily replicated in DataDog"""
queries = {
# Multi-service SLI calculation
"service_mesh_success_rate": """
sum(rate(istio_requests_total{response_code!~"5.."}[5m])) by (destination_service_name) /
sum(rate(istio_requests_total[5m])) by (destination_service_name)
""",
# Cross-service dependency analysis
"service_dependency_latency": """
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{job="api-gateway"}[5m])) by (le, downstream_service)
)
-
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{job=~"user-service|order-service"}[5m])) by (le, job)
)
""",
# Resource efficiency calculations
"cost_per_request": """
(
sum(rate(container_cpu_usage_seconds_total[5m])) by (service) * 0.0001 # CPU cost per second
+
sum(container_memory_usage_bytes) by (service) * 0.000000001 # Memory cost per byte
)
/
sum(rate(http_requests_total[5m])) by (service)
"""
}
return queries
def federation_example(self):
"""Multi-cluster monitoring with Prometheus federation"""
return {
"global_prometheus": {
"scrape_configs": [
{
"job_name": "federate-us-east",
"scrape_interval": "15s",
"honor_labels": True,
"metrics_path": "/federate",
"params": {
"match[]": [
'{job="kubernetes-apiservers"}',
'{job="kubernetes-nodes"}',
'{__name__=~"analytics:.*"}' # Custom SLI metrics
]
},
"static_configs": [
{"targets": ["prometheus-us-east.company.com:9090"]}
]
}
]
}
}
DataDog Advantages:
// DataDog's strength in integrated APM and logs correlation
const datadogAdvantages = {
// Automatic service map generation
serviceMap: {
automatic: true,
includesDBs: true,
showsLatency: true,
tracesIntegration: true
},
// Built-in anomaly detection
anomalyDetection: {
algorithm: "machine_learning",
baseline: "seasonal_trends",
autoThresholds: true,
falsePositiveReduction: "contextual_analysis"
},
// Log correlation with traces
logCorrelation: {
automaticTraceInjection: true,
errorTracking: true,
logPatterns: "ai_detected",
rootCauseAnalysis: true
},
// Real User Monitoring integration
rumIntegration: {
frontendMetrics: true,
userJourneys: }
return duplicates
}
// AFTER - O(n) algorithm using map
func findDuplicatesFast(items []string) []string {
seen := make(map[string]bool)
var duplicates []string
for _, item := range items {
if seen[item] {
duplicates = append(duplicates, item)
} else {
seen[item] = true
}
}
return duplicates
}
// 3. Fix: Excessive string concatenation
// BEFORE - Creates new strings repeatedly
func buildResponseSlow(data []Record) string {
var result string
for _, record := range data {
result += record.ID + "," + record.Name + "\n" // Slow!
}
return result
}
// AFTER - Use strings.Builder for efficiency
func buildResponseFast(data []Record) string {
var builder strings.Builder
builder.Grow(len(data) * 50) // Pre-allocate capacity
for _, record := range data {
builder.WriteString(record.ID)
builder.WriteString(",")
builder.WriteString(record.Name)
builder.WriteString("\n")
}
return builder.String()
}
// 4. Fix: Goroutine leaks
// BEFORE - Goroutines without proper cleanup
func handleRequestsLeaky() {
for {
go func() {
// Long-running operation without context cancellation
processData() // Never exits!
}()
}
}
// AFTER - Proper goroutine management
func handleRequestsProper(ctx context.Context) {
semaphore := make(chan struct{}, 100) // Limit concurrent goroutines
for {
select {
case <-ctx.Done():
return
default:
semaphore <- struct{}{} // Acquire
go func() {
defer func() { <-semaphore }() // Release
// Use context for cancellation
processDataWithContext(ctx)
}()
}
}
}
// 5. Fix: Inefficient database queries in loop
// BEFORE - N+1 query problem
func getUserDataSlow(userIDs []string) []UserData {
var users []UserData
for _, id := range userIDs {
user := db.QueryUser(id) // Database hit per user!
users = append(users, user)
}
return users
}
// AFTER - Batch database queries
func getUserDataFast(userIDs []string) []UserData {
// Single query for all users
query := "SELECT * FROM users WHERE id IN (" +
strings.Join(userIDs, ",") + ")"
return db.QueryUsers(query)
}
Memory and GC Optimization:
// 6. Optimize garbage collection pressure
type MetricsCollector struct {
// BEFORE - Creates garbage
// metrics []map[string]interface{}
// AFTER - Use object pools and typed structs
metricPool sync.Pool
metrics []Metric
}
type Metric struct {
Name string
Value float64
Timestamp int64
}
func NewMetricsCollector() *MetricsCollector {
mc := &MetricsCollector{
metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
}
mc.metricPool = sync.Pool{
New: func() interface{} {
return &Metric{}
},
}
return mc
}
func (mc *MetricsCollector) AddMetric(name string, value float64) {
metric := mc.metricPool.Get().(*Metric)
metric.Name = name
metric.Value = value
metric.Timestamp = time.Now().Unix()
mc.metrics = append(mc.metrics, *metric)
// Return to pool
mc.metricPool.Put(metric)
}
// 7. CPU profiling integration
func enableContinuousProfiling() {
// Enable continuous CPU profiling
if os.Getenv("ENABLE_PROFILING") == "true" {
go func() {
for {
f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create CPU profile: %v", err)
time.Sleep(30 * time.Second)
continue
}
pprof.StartCPUProfile(f)
time.Sleep(30 * time.Second)
pprof.StopCPUProfile()
f.Close()
// Upload to object storage for analysis
uploadProfile(f.Name())
}
}()
}
}
Monitoring and Alerting:
# Prometheus rules for Go service CPU monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: go-service-cpu-alerts
spec:
groups:
- name: go-service-performance
rules:
- alert: GoServiceHighCPU
expr: |
(
rate(container_cpu_usage_seconds_total{pod=~"go-service-.*"}[5m])
) > 0.8
for: 5m
labels:
severity: warning
service: go-service
annotations:
summary: "Go service CPU usage above 80%"
description: "Pod {{ $labels.pod }} CPU usage is {{ $value }}%"
- alert: GoServiceGoroutineLeak
expr: |
go_goroutines{job="go-service"} > 10000
for: 10m
labels:
severity: critical
annotations:
summary: "Potential goroutine leak detected"
- alert: GoServiceGCPressure
expr: |
rate(go_gc_duration_seconds_sum[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High GC pressure in Go service"
description: "GC taking {{ $value }}s per collection cycle"
- alert: GoServiceMemoryLeak
expr: |
go_memstats_heap_inuse_bytes / go_memstats_heap_sys_bytes > 0.9
for: 15m
labels:
severity: critical
annotations:
summary: "Potential memory leak in Go service"
Performance Testing and Validation:
// Benchmark tests to validate optimizations
func BenchmarkProcessRequestSlow(b *testing.B) {
data := generateTestData(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestSlow(data)
}
}
func BenchmarkProcessRequestFast(b *testing.B) {
data := generateTestData(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestFast(data)
}
}
// Run benchmarks with memory profiling
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
// CPU profiling analysis script
func analyzeCPUProfile() {
// go tool pprof cpu.prof
// Commands in pprof:
// (pprof) top20 - Show top 20 CPU consumers
// (pprof) list function - Show source code with CPU usage
// (pprof) web - Generate web visualization
// (pprof) flamegraph - Generate flame graph
}
26. Microservices vs Monolithic Architectureโ
Strong Answer: Car Pool vs Train Analogy Explanation:
Microservices (Car Pool Approach):
๐ ๐ ๐ ๐ ๐ (Independent cars)
Each car (service) can:
- Take different routes
- Stop independently
- Break down without affecting others
- Scale by adding more cars
- Use different fuel types (technologies)
Monolithic (Train Approach):
๐-๐-๐-๐-๐ (Connected train)
The train (application):
- All cars must follow the same route
- If engine fails, entire train stops
- All cars must move together
- Scale by making train longer or faster
- Single fuel type for entire train
For Analytics Dashboard - Decision Framework:
# Decision matrix for architecture choice
class ArchitectureDecision:
def __init__(self):
self.factors = {
'team_size': 0,
'complexity': 0,
'scalability_needs': 0,
'technology_diversity': 0,
'deployment_frequency': 0,
'operational_maturity': 0
}
def assess_microservices_fit(self, dashboard_requirements):
"""Assess if microservices are appropriate"""
# Analytics dashboard components
services = {
'metrics_collector': {
'responsibility': 'Collect metrics from various sources',
'scalability': 'High - handles high volume ingestion',
'technology': 'Go - for performance'
},
'data_processor': {
'responsibility': 'Process and aggregate metrics',
'scalability': 'Medium - CPU intensive operations',
'technology': 'Python - for data processing libraries'
},
'api_gateway': {
'responsibility': 'Serve dashboard APIs',
'scalability': 'High - many concurrent users',
'technology': 'Node.js - for async I/O'
},
'notification_service': {
'responsibility': 'Send alerts and notifications',
'scalability': 'Low - occasional alerts',
'technology': 'Python - for integrations'
},
'frontend_bff': {
'responsibility': 'Backend for Frontend',
'scalability': 'Medium - aggregates data for UI',
'technology': 'React/TypeScript'
}
}
return self.evaluate_services(services)
def evaluate_services(self, services):
"""Evaluate microservices approach"""
microservices_benefits = [
"Independent scaling per service",
"Technology diversity (Go, Python, Node.js)",
"Team autonomy - different teams own different services",
"Fault isolation - metrics collection failure doesn't break UI",
"Independent deployments - can update notification without affecting API"
]
microservices_challenges = [
"Network latency between services",
"Data consistency across services",
"Distributed system complexity",
"Service discovery and load balancing",
"Monitoring and debugging across services"
]
return {
'benefits': microservices_benefits,
'challenges': microservices_challenges,
'recommendation': self.make_recommendation()
}
def make_recommendation(self):
"""Make architecture recommendation for analytics dashboard"""
# For analytics dashboard specifically:
if self.is_early_stage():
return {
'choice': 'MODULAR_MONOLITH',
'reason': 'Start simple, can extract services later',
'structure': self.modular_monolith_structure()
}
else:
return {
'choice': 'MICROSERVICES',
'reason': 'Scale and team benefits outweigh complexity',
'structure': self.microservices_structure()
}
def modular_monolith_structure(self):
"""Modular monolith approach - best of both worlds"""
return {
'structure': """
analytics-dashboard/
โโโ cmd/ # Application entry points
โโโ internal/
โ โโโ metrics/ # Metrics collection module
โ โโโ processing/ # Data processing module
โ โโโ api/ # API handling module
โ โโโ notifications/ # Alert module
โ โโโ dashboard/ # UI serving module
โโโ pkg/ # Shared libraries
โโโ deployments/ # Single deployment unit
""",
'benefits': [
'Single deployment and testing',
'Easier debugging and development',
'No network latency between modules',
'Simpler operational overhead',
'Can extract to microservices later'
]
}
def microservices_structure(self):
"""Full microservices approach"""
return {
'structure': """
Analytics Platform Microservices:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Frontend SPA โ โ API Gateway โ
โ (React) โโโโโบโ (Kong) โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโฌโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโโโผโโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ
โ Metrics Collector โ โ Data Processor โ โ Notification Svc โ
โ (Go) โ โ (Python) โ โ (Python) โ
โโโโโโโโโโโฌโโโโโโโโโโโ โโโโโโโโโโโฌโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ
โ โ
โโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโ
โ Message Queue โ
โ (Kafka/Redis) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
""",
'communication': 'Async messaging + HTTP APIs',
'data_strategy': 'Event sourcing with CQRS'
}
def is_early_stage(self):
"""Determine if project is in early stage"""
return (
self.factors['team_size'] < 10 and
self.factors['operational_maturity'] < 3
)
# Usage for analytics dashboard
decision = ArchitectureDecision()
recommendation = decision.assess_microservices_fit({
'expected_users': 500,
'data_volume': 'high',
'real_time_requirements': True,
'team_size': 8,
'deployment_frequency': 'daily'
})
Implementation Examples:
Microservices Implementation:
// Metrics Collector Service (Go)
package main
type MetricsCollector struct {
kafka *kafka.Producer
redis *redis.Client
handlers map[string]MetricHandler
}
func (mc *MetricsCollector) CollectMetric(metric Metric) error {
// Process metric
processed := mc.handlers[metric.Type].Process(metric)
// Publish to message queue for other services
return mc.kafka.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &metric.Type,
Partition: kafka.PartitionAny,
},
Value: processed.ToJSON(),
}, nil)
}
# Data Processor Service (Python)
import asyncio
from kafka import KafkaConsumer
class DataProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'metrics-topic',
bootstrap_servers=['kafka:9092'],
group_id='data-processors'
)
async def process_metrics(self):
for message in self.consumer:
metric = Metric.from_json(message.value)
# Process and aggregate
aggregated = await self.aggregate_metric(metric)
# Store in time-series database
await self.store_metric(aggregated)
# Trigger alerts if needed
await self.check_alerts(aggregated)
Monolithic Implementation:
# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
class AnalyticsDashboard:
def __init__(self):
self.metrics = metrics.MetricsModule()
self.processor = processing.ProcessingModule()
self.api = api.APIModule()
self.notifications = notifications.NotificationModule()
def handle_metric(self, raw_metric):
# All in same process - no network calls
metric = self.metrics.parse(raw_metric)
processed = self.processor.aggregate(metric)
# Check for alerts
if self.processor.check_thresholds(processed):
self.notifications.send_alert(processed)
return processed
# Single deployment with clear module boundaries
# Can be extracted to separate services later
When to Choose Each Approach:
Factor | Monolith | Microservices |
---|---|---|
Team Size | < 10 developers | > 10 developers |
Complexity | Simple-medium | Complex domain |
Scale | < 1M requests/day | > 10M requests/day |
Technology | Single stack preferred | Multiple technologies needed |
Deployment | Weekly/monthly | Multiple times per day |
Data Consistency | Strong consistency needed | Eventual consistency OK |
For Analytics Dashboard Specifically:
- Early Stage: Start with modular monolith
- Growth Stage: Extract high-scale components (metrics collector) first
- Mature Stage: Full microservices with proper DevOps practices
27. Database Sharding Implementationโ
Strong Answer: Library Analogy Explanation:
๐ Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout
๐๐๐ Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z
Each library (shard) operates independently but part of same system
Sharding Strategy for Analytics Dashboard:
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List
class DatabaseSharding:
def __init__(self):
self.shards = {
'shard_americas': {
'host': 'db-americas.example.com',
'regions': ['us', 'ca', 'mx', 'br'],
'connection': self.create_connection('db-americas')
},
'shard_europe': {
'host': 'db-europe.example.com',
'regions': ['uk', 'de', 'fr', 'es'],
'connection': self.create_connection('db-europe')
},
'shard_asia': {
'host': 'db-asia.example.com',
'regions': ['jp', 'sg', 'au', 'in'],
'connection': self.create_connection('db-asia')
}
}
# Time-based sharding for metrics
self.time_shards = {
'metrics_current': 'Last 7 days - hot data',
'metrics_recent': 'Last 30 days - warm data',
'metrics_archive': 'Older than 30 days - cold data'
}
def get_user_shard(self, user_id: str) -> str:
"""Determine shard based on user ID hash"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_value % len(self.shards)
return list(self.shards.keys())[shard_index]
def get_region_shard(self, region: str) -> str:
"""Determine shard based on geographical region"""
for shard_name, shard_info in self.shards.items():
if region.lower() in shard_info['regions']:
return shard_name
return 'shard_americas' # Default fallback
def get_time_shard(self, timestamp: datetime.datetime) -> str:
"""Determine shard based on data age"""
now = datetime.datetime.now()
age = now - timestamp
if age.days <= 7:
return 'metrics_current'
elif age.days <= 30:
return 'metrics_recent'
else:
return 'metrics_archive'
def route_query(self, query_type: str, **kwargs):
"""Route queries to appropriate shard"""
if query_type == 'user_orders':
shard = self.get_user_shard(kwargs['user_id'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'regional_metrics':
shard = self.get_region_shard(kwargs['region'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'historical_data':
# Query multiple time shards and aggregate
return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])
elif query_type == 'cross_shard_analytics':
# Fan-out query to all shards
return self.fan_out_query(query_type, **kwargs)
def query_time_shards(self, start_date, end_date):
"""Query across time-based shards"""
results = []
for shard_name in self.time_shards.keys():
try:
shard_result = self.execute_query(shard_name, 'time_range_query',
start_date=start_date, end_date=end_date)
results.extend(shard_result)
except ShardUnavailableError:
# Handle shard failures gracefully
self.log_shard_failure(shard_name)
continue
return self.aggregate_results(results)
def fan_out_query(self, query_type: str, **kwargs):
"""Execute query across all shards and aggregate results"""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
# Submit queries to all shards concurrently
future_to_shard = {
executor.submit(self.execute_query, shard_name, query_type, **kwargs): shard_name
for shard_name in self.shards.keys()
}
for future in concurrent.futures.as_completed(future_to_shard):
shard_name = future_to_shard[future]
try:
result = future.result(timeout=30) # 30 second timeout
results[shard_name] = result
except Exception as e:
self.log_query_failure(shard_name, e)
results[shard_name] = None
return self.aggregate_cross_shard_results(results)
# Shard-aware query examples
class ShardedAnalyticsQueries:
def __init__(self, sharding: DatabaseSharding):
self.sharding = sharding
def get_user_orders(self, user_id: str):
"""Get orders for specific user"""
return self.sharding.route_query('user_orders', user_id=user_id)
def get_regional_sales(self, region: str, date_range: tuple):
"""Get sales data for specific region"""
return self.sharding.route_query('regional_metrics',
region=region,
start_date=date_range[0],
end_date=date_range[1])
def get_global_metrics(self, metric_type: str):
"""Get global metrics across all shards"""
return self.sharding.route_query('cross_shard_analytics',
metric_type=metric_type)
def get_historical_trends(self, days_back: int):
"""Get historical data across time shards"""
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)
return self.sharding.query_time_shards(start_date, end_date)
Sharding Implementation with PostgreSQL:
-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 2: Europe
CREATE TABLE orders_europe (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 3: Asia
CREATE TABLE orders_asia (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_europe
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');
CREATE USER MAPPING FOR postgres
SERVER shard_europe
OPTIONS (user 'analytics_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
id UUID,
user_id UUID,
region VARCHAR(2),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');
-- View for cross-shard queries
CREATE VIEW orders_global AS
SELECT 'americas' as shard, * FROM orders_americas
UNION ALL
SELECT 'europe' as shard, * FROM orders_europe_remote
UNION ALL
SELECT 'asia' as shard, * FROM orders_asia_remote;
Sharding Middleware in Application:
# Application-level sharding middleware
class ShardingMiddleware:
def __init__(self, app):
self.app = app
self.sharding = DatabaseSharding()
def __call__(self, environ, start_response):
# Extract sharding context from request
request = Request(environ)
# Determine shard based on request
if 'user_id' in request.args:
shard = self.sharding.get_user_shard(request.args['user_id'])
environ['DATABASE_SHARD'] = shard
elif 'region' in request.args:
shard = self.sharding.get_region_shard(request.args['region'])
environ['DATABASE_SHARD'] = shard
else:
# Cross-shard query required
environ['DATABASE_SHARD'] = 'cross_shard'
return self.app(environ, start_response)
# Flask route with shard awareness
@app.route('/api/user/<user_id>/orders')
def get_user_orders(user_id):
shard = request.environ.get('DATABASE_SHARD')
if shard == 'cross_shard':
# This shouldn't happen for user-specific queries
abort(400, "Invalid request - user_id required")
# Use shard-specific connection
db = get_shard_connection(shard)
orders = db.execute(
"SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC",
(user_id,)
)
return jsonify(orders)
@app.route('/api/analytics/global')
def get_global_analytics():
# Cross-shard query - aggregate from all shards
sharding = DatabaseSharding()
results = sharding.fan_out_query('global_analytics')
return jsonify({
'total_orders': sum(r['order_count'] for r in results.values()),
'total_revenue': sum(r['revenue'] for r in results.values()),
'by_region': results
})
Handling Shard Failures and Rebalancing:
class ShardManager:
def __init__(self):
self.shards = DatabaseSharding().shards
self.health_check_interval = 30 # seconds
def monitor_shard_health(self):
"""Continuously monitor shard health"""
while True:
for shard_name, shard_info in self.shards.items():
try:
# Simple health check query
conn = shard_info['connection']
conn.execute("SELECT 1")
self.mark_shard_healthy(shard_name)
except Exception as e:
self.mark_shard_unhealthy(shard_name, e)
self.trigger_failover(shard_name)
time.sleep(self.health_check_interval)
def trigger_failover(self, failed_shard: str):
"""Handle shard failover"""
# Redirect traffic to healthy shards
self.redistribute_load(failed_shard)
# Alert operations team
self.send_alert(f"Shard {failed_shard} is unhealthy")
# Attempt automated recovery
self.attempt_shard_recovery(failed_shard)
def rebalance_shards(self, new_shard_config: Dict):
"""Rebalance data across shards"""
# This is a complex operation requiring:
# 1. Data migration planning
# 2. Consistent hashing updates
# 3. Gradual traffic shifting
# 4. Rollback capability
migration_plan = self.create_migration_plan(new_shard_config)
for step in migration_plan:
self.execute_migration_step(step)
self.verify_migration_step(step)
self.update_shard_routing(new_shard_config)
Benefits and Challenges:
Benefits:
- Horizontal Scalability: Add more shards as data grows
- Performance: Smaller datasets per shard = faster queries
- Isolation: Shard failures don't affect other shards
- Geographic Distribution: Data close to users
Challenges:
- Cross-shard Queries: Complex and slower
- Rebalancing: Moving data between shards is difficult
- Consistency: Transactions across shards are complex
- Operational Complexity: Multiple databases to manage
For Analytics Dashboard:
- User-based sharding: For user-specific dashboards
- Time-based sharding: For historical data (hot/warm/cold)
- Feature-based sharding: Separate shards for different metrics types
Part 5: Behavioral & Collaborationโ
30. Convincing Developers - Code Reliability Changesโ
Strong Answer: Situation: Our payment processing service was experiencing intermittent failures during peak traffic, causing revenue loss. The development team had implemented a quick fix that worked locally but didn't address the underlying concurrency issues.
Approach - Data-Driven Persuasion:
1. Quantified the Business Impact
# I created a dashboard showing the real cost
class ReliabilityImpactAnalysis:
def calculate_revenue_impact(self):
return {
"failed_transactions_per_hour": 150,
"average_transaction_value": 85.50,
"revenue_loss_per_hour": 150 * 85.50, # $12,825
"monthly_projected_loss": 12825 * 24 * 30, # $9.23M
"customer_churn_risk": "23 angry customer emails in 2 days"
}
2. Made It Personal and Collaborative Instead of saying "your code is wrong," I said:
- "I found some interesting patterns in our production data that might help us improve performance"
- "What do you think about these metrics? I'm curious about your thoughts on the concurrency patterns"
- "Could we pair program on this? I'd love to understand your approach better"
3. Proposed Solutions, Not Just Problems I came with a working prototype:
# Before (their approach)
def process_payment(payment_data):
global payment_queue
payment_queue.append(payment_data) # Race condition!
return process_queue()
# After (my suggested approach)
import threading
from queue import Queue
class ThreadSafePaymentProcessor:
def __init__(self):
self.payment_queue = Queue()
self.lock = threading.Lock()
def process_payment(self, payment_data):
with self.lock:
# Thread-safe processing
return self.safe_process(payment_data)
4. Used Their Language and Priorities
- Framed it as a "performance optimization" rather than "fixing bugs"
- Showed how it would reduce their on-call burden: "No more 3 AM pages about payment failures"
- Highlighted career benefits: "This would be a great story for your next performance review"
Result: They not only adopted the changes but became advocates for reliability practices. The lead developer started attending SRE meetings and later implemented circuit breakers proactively.
Key Lessons:
- Data beats opinions - metrics are harder to argue with
- Collaboration over confrontation - "How can we solve this together?"
- Show, don't just tell - working code examples are persuasive
- Align with their incentives - make reliability their win, not your win
31. Trade-off Between Reliability and Feature Deliveryโ
Strong Answer: Situation: During a major product launch, we were at 97% availability (below our 99.5% SLO), but the product team wanted to deploy a new feature that would drive user adoption for the launch.
The Dilemma:
- Product pressure: "This feature will increase user engagement by 40%"
- Reliability concern: Error budget was nearly exhausted
- Timeline: Launch was in 3 days, couldn't delay
My Decision Process:
1. Quantified Both Sides
# Business impact calculation
launch_impact = {
"projected_new_users": 50000,
"revenue_per_user": 25,
"total_revenue_opportunity": 1.25e6, # $1.25M
"competitive_advantage": "First-mover in market segment"
}
reliability_risk = {
"current_error_budget_used": 0.85, # 85% of monthly budget
"remaining_budget": 0.15,
"days_remaining_in_month": 8,
"projected_overage": 0.3, # 30% over budget
"customer_impact": "Potential service degradation"
}
2. Created a Risk-Mitigation Plan Instead of a binary yes/no, I proposed a conditional approach:
# Feature deployment plan with guardrails
deployment_strategy:
phase_1:
rollout: 5% of users
duration: 4 hours
success_criteria:
- error_rate < 0.1%
- p99_latency < 200ms
- no_critical_alerts
phase_2:
rollout: 25% of users
duration: 12 hours
automatic_rollback: true
conditions:
- error_rate > 0.2% for 5 minutes
- p99_latency > 500ms for 10 minutes
phase_3:
rollout: 100% of users
requires: manual_approval_after_phase_2
3. Communicated Trade-offs Transparently I presented to stakeholders:
"We can launch this feature, but here's what it means:
- Upside: $1.25M revenue opportunity, competitive advantage
- Downside: 30% chance of service degradation affecting existing users
- Mitigation: Feature flags for instant rollback, enhanced monitoring
- Commitment: If reliability suffers, we pause new features until we're back on track"
4. The Decision and Implementation We proceeded with the phased rollout:
class FeatureLaunchManager:
def __init__(self):
self.error_budget_monitor = ErrorBudgetMonitor()
self.feature_flag = FeatureFlag("new_user_onboarding")
def monitor_launch_health(self):
while self.feature_flag.enabled:
current_error_rate = self.get_error_rate()
budget_status = self.error_budget_monitor.get_status()
if budget_status.will_exceed_monthly_budget():
self.trigger_rollback("Error budget exceeded")
break
if current_error_rate > 0.002: # 0.2%
self.reduce_rollout_percentage()
time.sleep(60) # Check every minute during launch
def trigger_rollback(self, reason):
self.feature_flag.disable()
self.alert_stakeholders(f"Feature rolled back: {reason}")
self.schedule_post_mortem()
The Outcome:
- Feature launched successfully to 25% of users
- Error rate increased slightly but stayed within acceptable bounds
- Revenue target was hit with partial rollout
- We didn't exceed error budget
- Built trust with product team by delivering on promises
Key Principles I Used:
- Transparency: Show the math, don't hide trade-offs
- Risk mitigation: Find ways to reduce downside while preserving upside
- Stakeholder alignment: Make everyone accountable for the decision
- Data-driven decisions: Use metrics, not emotions
- Learning mindset: Treat it as an experiment with clear success/failure criteria
Follow-up Actions:
- Conducted a post-launch review
- Used learnings to improve our launch process
- Created better error budget forecasting tools
- Established clearer guidelines for future trade-off decisions
32. Staying Current with SRE Practices and Technologiesโ
Strong Answer: My Learning Strategy - Multi-layered Approach:
1. Technical Deep Dives
# I maintain a personal learning dashboard
learning_tracker = {
"current_focus": [
"eBPF for system observability",
"Kubernetes operators for automation",
"AI/ML for incident prediction"
],
"weekly_commitments": {
"reading": "2 hours of technical papers",
"hands_on": "4 hours lab/experimentation",
"community": "1 hour in SRE forums/Slack"
},
"monthly_goals": [
"Complete one new certification",
"Contribute to one open source project",
"Write one technical blog post"
]
}
2. Resource Mix - Quality over Quantity
Daily (30 minutes morning routine):
- SRE Weekly Newsletter - concise industry updates
- Hacker News - scan for infrastructure/reliability topics
- Internal Slack channels - #sre-learning, #incidents-learned
Weekly (2-3 hours):
- Google SRE Book Club - our team works through chapters together
- Kubernetes documentation - staying current with new features
- Conference talk videos - KubeCon, SREcon, Velocity recordings
Monthly Deep Dives:
- Academic papers - especially from USENIX, SOSP, OSDI conferences
- Vendor whitepapers - but with healthy skepticism
- Open source project exploration - contribute small patches to learn codebases
3. Hands-on Learning Lab
# Home lab setup for experimentation
homelab_projects:
current_experiments:
- name: "eBPF monitoring tools"
status: "Building custom metrics collector"
learning: "Kernel-level observability"
- name: "Chaos engineering with Litmus"
status: "Testing failure scenarios"
learning: "Resilience patterns"
- name: "Service mesh evaluation"
status: "Comparing Istio vs Linkerd"
learning: "Traffic management at scale"
infrastructure:
platform: "Kubernetes cluster on Raspberry Pi"
monitoring: "Prometheus + Grafana + Jaeger"
ci_cd: "GitLab CI with ArgoCD"
cost: "$200/month AWS credits for cloud integration"
4. Community Engagement
- SRE Discord/Slack communities - daily participation
- Local meetups - monthly CNCF and DevOps meetups
- Conference speaking - submitted 3 talks this year on incident response
- Mentoring - guide 2 junior engineers, which forces me to stay sharp
- Open source contributions - maintain a small monitoring tool, contribute to Prometheus
5. Learning from Failures - Internal and External
class IncidentLearningTracker:
def analyze_industry_incidents(self):
"""Study major outages for lessons"""
recent_studies = [
{
"incident": "Facebook Oct 2021 BGP outage",
"lessons": ["Single points of failure in DNS", "Recovery complexity"],
"applied_locally": "Implemented secondary DNS provider"
},
{
"incident": "AWS us-east-1 Dec 2021",
"lessons": ["Multi-region dependencies", "Circuit breaker importance"],
"applied_locally": "Added cross-region failover testing"
}
]
return recent_studies
def internal_learning(self):
"""Extract patterns from our own incidents"""
return {
"quarterly_review": "What patterns are emerging?",
"cross_team_sharing": "Monthly incident learnings presentation",
"runbook_updates": "Continuously improve based on real scenarios"
}
6. Structured Learning Paths
- Currently pursuing: CKS (Certified Kubernetes Security Specialist)
- Completed this year: AWS Solutions Architect Pro, CKAD
- Next up: HashiCorp Terraform Associate
- Long-term goal: Google Cloud Professional Cloud Architect
7. Teaching and Knowledge Sharing
# My knowledge sharing activities
## Internal (at work):
- Monthly "SRE Patterns" lunch & learn sessions
- Incident post-mortem facilitation
- New hire onboarding for SRE practices
- Internal blog posts on "what I learned this week"
## External:
- Technical blog: medium.com/@myusername
- Conference talks: submitted to SREcon, KubeCon
- Open source: maintainer of small monitoring tool
- Mentoring: 2 junior engineers, 1 career switcher
8. Staying Ahead of Trends I try to identify emerging patterns early:
Current attention areas:
- Platform Engineering - evolution beyond traditional SRE
- FinOps - cost optimization becoming critical
- AI/ML for Operations - automated incident response
- WebAssembly - potential impact on deployment patterns
- Sustainability - green computing in infrastructure
My evaluation framework:
- Signal vs noise: Is this solving real problems or just hype?
- Adoption timeline: When will this be production-ready?
- Investment level: Should I learn basics now or wait?
- Career relevance: How does this align with my growth goals?
Key Success Factors:
- Consistency over intensity - 30 minutes daily beats 8 hours monthly
- Applied learning - immediately try new concepts in lab/work
- Community connection - learning with others accelerates understanding
- Teaching others - best way to solidify knowledge
- Balance breadth and depth - stay broad but go deep on core areas
Resources I highly recommend:
- Books: "Observability Engineering", "Learning eBPF", "Kubernetes Patterns"
- Podcasts: "Software Engineering Radio", "The Cloudcast"
- Newsletters: "SRE Weekly", "DevOps'ish", "The New Stack"
- Communities: SRE Slack, r/sre, CNCF Slack channels
This approach has helped me stay current while avoiding information overload. The key is finding sustainable habits that fit into daily work rather than treating learning as separate from doing.
Summaryโ
This comprehensive answer guide covers all aspects of SRE engineering, from fundamental concepts to complex system design challenges. The answers demonstrate:
- Technical depth in SRE practices, monitoring, and system design
- Practical experience with real-world scenarios and solutions
- Problem-solving approach that balances theory with pragmatic implementation
- Communication skills for explaining complex concepts clearly
- Leadership qualities in handling incidents and team collaboration
The guide provides both the technical knowledge and the soft skills necessary for a senior SRE role, with specific examples and code implementations that candidates can adapt to their own experiences.# SRE Interview Questions - Comprehensive Answer Guide
Part 1: SRE Fundamentals & Practicesโ
1. What is the difference between SRE and traditional operations, and how do you balance reliability with feature velocity?โ
Strong Answer: SRE differs from traditional ops in several key ways:
- Proactive vs Reactive: SRE focuses on preventing issues through engineering rather than just responding to them
- Error Budgets: We quantify acceptable unreliability, allowing teams to move fast while maintaining reliability targets
- Automation: SRE emphasizes eliminating toil through automation and self-healing systems
- Shared Ownership: Development and operations work together using the same tools and metrics
Balancing reliability with velocity:
- Set clear SLIs/SLOs with stakeholders (e.g., 99.9% uptime = 43 minutes downtime/month)
- Use error budgets as a shared currency - if we're within budget, dev teams can deploy faster
- When error budget is exhausted, focus shifts to reliability work
- Implement gradual rollouts and feature flags to reduce blast radius
Follow-up - Implementing error budgets with resistant teams:
- Start with education - show how error budgets enable faster delivery
- Use concrete examples of downtime costs vs delayed features
- Begin with lenient budgets and tighten over time
- Make error budget status visible in dashboards and planning meetings
2. Explain the four golden signals of monitoring. How would you implement alerting around these for a Python microservice?โ
Strong Answer: The four golden signals are:
- Latency: Time to process requests
- Traffic: Demand on your system (requests/second)
- Errors: Rate of failed requests
- Saturation: How "full" your service is (CPU, memory, I/O)
Implementation for Python microservice:
# Using Prometheus with Flask
from flask import Flask, request
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
import psutil
app = Flask(__name__)
# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency')
ERROR_RATE = Counter('http_errors_total', 'Total HTTP errors', ['status'])
CPU_USAGE = Gauge('cpu_usage_percent', 'CPU usage percentage')
@app.before_request
def before_request():
request.start_time = time.time()
@app.after_request
def after_request(response):
latency = time.time() - request.start_time
REQUEST_LATENCY.observe(latency)
REQUEST_COUNT.labels(request.method, request.endpoint, response.status_code).inc()
if response.status_code >= 400:
ERROR_RATE.labels(response.status_code).inc()
CPU_USAGE.set(psutil.cpu_percent())
return response
Alerting Rules:
- Latency: Alert if p99 > 500ms for 5 minutes
- Traffic: Alert on 50% increase/decrease from baseline
- Errors: Alert if error rate > 1% for 2 minutes
- Saturation: Alert if CPU > 80% or Memory > 85% for 10 minutes
3. Walk me through how you would conduct a post-mortem for a production incident.โ
Strong Answer: Timeline:
- Immediate: Focus on resolution, collect logs/metrics during incident
- Within 24-48 hours: Conduct post-mortem meeting
- Within 1 week: Publish written post-mortem and track action items
Post-mortem Process:
- Timeline Construction: Build detailed timeline with all events, decisions, and communications
- Root Cause Analysis: Use techniques like "5 Whys" or Fishbone diagrams
- Impact Assessment: Quantify user impact, revenue loss, SLO burn
- Action Items: Focus on systemic fixes, not individual blame
- Follow-up: Track action items to completion
Good Post-mortem Characteristics:
- Blameless culture - focus on systems, not individuals
- Detailed timeline with timestamps
- Clear root cause analysis
- Actionable remediation items with owners and deadlines
- Written in accessible language for all stakeholders
- Includes what went well (not just failures)
Psychological Safety:
- Use "the system allowed..." instead of "person X did..."
- Ask "how can we make this impossible to happen again?"
- Celebrate people who surface problems early
- Make post-mortems learning opportunities, not punishment
4. You notice your application's 99th percentile latency has increased by 50ms over the past week, but the average latency remains the same. How would you investigate this?โ
Strong Answer: This suggests a long tail problem - most requests are fine, but some are much slower.
Investigation Steps:
- Check Request Distribution: Look at latency histograms - are we seeing bimodal distribution?
- Analyze Traffic Patterns: Has the mix of request types changed? Are we getting more complex queries?
- Database Performance: Check for slow queries, table locks, or index problems
- Resource Saturation: Look for memory pressure, GC pauses, or I/O bottlenecks during peak times
- Dependency Analysis: Check latency of downstream services - could be cascading slow responses
- Code Changes: Review recent deployments for inefficient algorithms or new features
Specific Checks:
- Database slow query logs
- Application profiling data
- Memory usage patterns and GC metrics
- Thread pool utilization
- External API response times
- Distributed tracing for slow requests
Tools: Use APM tools like New Relic, DataDog, or distributed tracing with Jaeger/Zipkin to identify bottlenecks.
5. Design a monitoring strategy for a Go-based API that processes financial transactions.โ
Strong Answer: Business Metrics:
- Transaction volume and value per minute
- Success rate by transaction type
- Time to settlement
- Regulatory compliance metrics (PCI DSS)
Technical Metrics:
// Key metrics to track
var (
transactionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "transactions_total"},
[]string{"type", "status", "payment_method"})
transactionLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: "transaction_duration_seconds"},
[]string{"type"})
queueDepth = prometheus.NewGauge(
prometheus.GaugeOpts{Name: "transaction_queue_depth"})
dbConnectionPool = prometheus.NewGauge(
prometheus.GaugeOpts{Name: "db_connections_active"})
)
Logging Strategy:
- Structured logging with correlation IDs
- Log all transaction state changes
- Security events (failed auth, suspicious patterns)
- Audit trail for compliance
Alerting:
- Transaction failure rate > 0.1%
- Processing latency > 2 seconds
- Queue depth > 1000 items
- Database connection pool > 80% utilization
- Any security-related events
Compliance Considerations:
- PII data must be masked in logs
- Audit logs with tamper-proof storage
- Real-time fraud detection alerts
6. How would you implement distributed tracing across a system with Python backend services and a React frontend?โ
Strong Answer: Architecture:
- Use OpenTelemetry standard for vendor-neutral tracing
- Jaeger or Zipkin as tracing backend
- Trace context propagation across service boundaries
Backend Implementation (Python):
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Auto-instrument Flask and requests
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
@app.route('/api/user/<user_id>')
def get_user(user_id):
with tracer.start_as_current_span("get_user") as span:
span.set_attribute("user.id", user_id)
# Service logic here
return response
Frontend Implementation (React):
import { WebTracerProvider } from "@opentelemetry/web";
import { getWebAutoInstrumentations } from "@opentelemetry/auto-instrumentations-web";
const provider = new WebTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(new JaegerExporter()));
// Auto-instrument fetch, XMLHttpRequest
registerInstrumentations({
instrumentations: [getWebAutoInstrumentations()],
});
Key Implementation Points:
- Correlation IDs: Pass trace context in HTTP headers
- Sampling: Use probabilistic sampling (1-10%) to reduce overhead
- Service Map: Visualize service dependencies
- Performance Analysis: Identify bottlenecks across service boundaries
Part 2: Software Engineering & Developmentโ
7. Code Review Scenario: Memory leak optimizationโ
Strong Answer: Problems with the original code:
- Loads entire dataset into memory before processing
- No streaming or chunked processing
- Memory usage grows linearly with file size
Optimized version:
def process_large_dataset(file_path, chunk_size=1000):
"""Process large dataset in chunks to manage memory usage."""
results = []
with open(file_path, 'r') as f:
chunk = []
for line in f:
chunk.append(line.strip())
if len(chunk) >= chunk_size:
# Process chunk and yield results
processed_chunk = [expensive_processing(item) for item in chunk]
partial_result = analyze_data(processed_chunk)
results.append(partial_result)
# Clear chunk to free memory
chunk.clear()
# Process remaining items
if chunk:
processed_chunk = [expensive_processing(item) for item in chunk]
partial_result = analyze_data(processed_chunk)
results.append(partial_result)
return combine_results(results)
# Even better - use generator for streaming
def process_large_dataset_streaming(file_path):
"""Stream processing for minimal memory footprint."""
with open(file_path, 'r') as f:
for line in f:
yield expensive_processing(line.strip())
# Usage
def analyze_streaming_data(file_path):
processed_items = process_large_dataset_streaming(file_path)
return analyze_data_streaming(processed_items)
Additional Optimizations:
- Use
mmap
for very large files - Implement backpressure if processing can't keep up
- Add memory monitoring and circuit breakers
- Consider using
asyncio
for I/O-bound operations
8. In Go, explain the difference between buffered and unbuffered channels.โ
Strong Answer: Unbuffered Channels:
- Synchronous communication - sender blocks until receiver reads
- Zero capacity - no internal storage
- Guarantees handoff between goroutines
ch := make(chan int) // unbuffered
go func() {
ch <- 42 // blocks until someone reads
}()
value := <-ch // blocks until someone sends
Buffered Channels:
- Asynchronous communication up to buffer size
- Sender only blocks when buffer is full
- Receiver only blocks when buffer is empty
ch := make(chan int, 3) // buffered with capacity 3
ch <- 1 // doesn't block
ch <- 2 // doesn't block
ch <- 3 // doesn't block
ch <- 4 // blocks - buffer full
When to use in high-throughput systems:
Unbuffered for:
- Strict synchronization requirements
- Request-response patterns
- When you need guaranteed delivery confirmation
- Worker pools where you want backpressure
Buffered for:
- Producer-consumer with different rates
- Batching operations
- Reducing contention in high-throughput scenarios
- Event streaming where some loss is acceptable
Example - High-throughput log processor:
// Buffered for log ingestion
logChan := make(chan LogEntry, 10000)
// Unbuffered for critical operations
errorChan := make(chan error)
func logProcessor() {
for {
select {
case log := <-logChan:
processLog(log)
case err := <-errorChan:
handleCriticalError(err) // immediate attention
}
}
}
9. React Performance: Optimize dashboard with real-time metricsโ
Strong Answer: Problems with frequent re-renders:
- All components re-render when any metric updates
- Expensive calculations on every render
- DOM thrashing from rapid updates
Optimization Strategy:
import React, { memo, useMemo, useCallback, useRef } from "react";
import { useVirtualizer } from "@tanstack/react-virtual";
// 1. Memoize metric components
const MetricCard = memo(({ metric, value, threshold }) => {
// Only re-render when props actually change
const status = useMemo(
() => (value > threshold ? "critical" : "normal"),
[value, threshold]
);
return (
<div className={`metric-card ${status}`}>
<h3>{metric}</h3>
<span>{value}</span>
</div>
);
});
// 2. Virtualize large lists
const MetricsList = ({ metrics }) => {
const parentRef = useRef();
const virtualizer = useVirtualizer({
count: metrics.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 100,
});
return (
<div ref={parentRef} style={{ height: "400px", overflow: "auto" }}>
{virtualizer.getVirtualItems().map((virtualRow) => (
<MetricCard key={virtualRow.key} {...metrics[virtualRow.index]} />
))}
</div>
);
};
// 3. Debounce updates and batch state changes
const Dashboard = () => {
const [metrics, setMetrics] = useState({});
const updateQueue = useRef(new Map());
const flushTimeout = useRef();
const queueUpdate = useCallback((serviceName, newMetrics) => {
updateQueue.current.set(serviceName, newMetrics);
// Debounce updates - batch multiple rapid changes
clearTimeout(flushTimeout.current);
flushTimeout.current = setTimeout(() => {
setMetrics((prev) => {
const updates = Object.fromEntries(updateQueue.current);
updateQueue.current.clear();
return { ...prev, ...updates };
});
}, 100); // 100ms debounce
}, []);
// 4. Use React.startTransition for non-critical updates
const handleMetricUpdate = useCallback(
(data) => {
if (data.priority === "high") {
queueUpdate(data.service, data.metrics);
} else {
startTransition(() => {
queueUpdate(data.service, data.metrics);
});
}
},
[queueUpdate]
);
return <MetricsList metrics={Object.values(metrics)} />;
};
Additional Optimizations:
- WebSocket connection pooling - single connection for all metrics
- Data normalization - structure data to minimize re-renders
- Service Worker - offload heavy calculations
- Canvas/WebGL - for complex visualizations instead of DOM updates
10. Design a CI/CD pipeline for a multi-service applicationโ
Strong Answer: Pipeline Architecture:
# .github/workflows/main.yml
name: Multi-Service CI/CD
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
detect-changes:
runs-on: ubuntu-latest
outputs:
python-api: ${{ steps.changes.outputs.python-api }}
go-workers: ${{ steps.changes.outputs.go-workers }}
react-frontend: ${{ steps.changes.outputs.react-frontend }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
python-api:
- 'services/api/**'
go-workers:
- 'services/workers/**'
react-frontend:
- 'frontend/**'
test-python:
needs: detect-changes
if: needs.detect-changes.outputs.python-api == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run Python tests
run: |
cd services/api
pip install -r requirements.txt
pytest --cov=. --cov-report=xml
flake8 .
mypy .
test-go:
needs: detect-changes
if: needs.detect-changes.outputs.go-workers == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: "1.21"
- name: Run Go tests
run: |
cd services/workers
go test -race -coverprofile=coverage.out ./...
go vet ./...
staticcheck ./...
deploy:
needs: [test-python, test-go, test-react]
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy with blue-green
run: |
# Database migration strategy
kubectl apply -f k8s/migration-job.yaml
kubectl wait --for=condition=complete job/db-migration
# Deploy new version to green environment
helm upgrade app-green ./helm-chart \
--set image.tag=${{ github.sha }} \
--set environment=green
# Health check green environment
./scripts/health-check.sh green
# Switch traffic to green
kubectl patch service app-service -p \
'{"spec":{"selector":{"version":"green"}}}'
# Verify traffic switch
./scripts/verify-deployment.sh
# Clean up blue environment
helm uninstall app-blue
Database Migration Strategy:
#!/bin/bash
# scripts/db-migration.sh
# Create backup
pg_dump $DATABASE_URL > backup-$(date +%Y%m%d-%H%M%S).sql
# Run migrations in transaction
psql $DATABASE_URL << EOF
BEGIN;
-- Run all pending migrations
\i migrations/001_add_user_table.sql
\i migrations/002_add_index.sql
-- Verify migration success
SELECT count(*) FROM schema_migrations;
COMMIT;
EOF
# Test rollback capability
if [ "$1" == "test-rollback" ]; then
psql $DATABASE_URL < backup-latest.sql
fi
Rollback Strategy:
- Keep previous 3 versions in registry
- Database rollback scripts for each migration
- Feature flags to disable new features quickly
- Automated rollback triggers on error rate increase
11. How would you implement blue-green deployments for a stateful service with a database?โ
Strong Answer: Challenges with Stateful Services:
- Database schema changes
- Data consistency during switch
- Connection management
- State synchronization
Implementation Strategy:
# Blue-Green with Database
apiVersion: v1
kind: Service
metadata:
name: app-active
spec:
selector:
app: myapp
version: blue # Will switch to green
ports:
- port: 80
---
# Database proxy for connection management
apiVersion: apps/v1
kind: Deployment
metadata:
name: db-proxy
spec:
template:
spec:
containers:
- name: pgbouncer
image: pgbouncer/pgbouncer
env:
- name: DATABASES_HOST
value: "postgres-primary"
- name: POOL_MODE
value: "transaction" # Allow connection switching
Deployment Process:
-
Pre-deployment:
- Run backward-compatible schema migrations
- Ensure both versions can operate with new schema
-
Green Deployment:
- Deploy green version with same database
- Warm up green instances (cache, connections)
- Run health checks
-
Traffic Switch:
- Update service selector to point to green
- Monitor metrics for 10-15 minutes
- Keep blue running for quick rollback
-
Post-deployment:
- Run cleanup migrations (remove old columns)
- Terminate blue environment
Database Migration Strategy:
-- Phase 1: Additive changes (safe for both versions)
ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE;
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
-- Phase 2: After green is stable, remove old columns
-- ALTER TABLE users DROP COLUMN old_email_field;
Rollback Plan:
- Revert service selector to blue
- Emergency database rollback scripts
- Circuit breaker to stop problematic requests
Part 3: System Design Deep Diveโ
12. Requirements Gathering Questionsโ
Strong Answer: Functional Requirements:
- What specific metrics need to be displayed? (orders/minute, revenue, concurrent users)
- How real-time? (sub-second, few seconds, minute-level updates)
- What user roles need access? (executives, ops teams, developers)
- What actions can users take? (view-only, alerts, drill-down)
- Geographic distribution of users?
Non-Functional Requirements:
- Scale: How many concurrent dashboard users? (100s, 1000s)
- Data volume: Orders per day? Peak traffic? Data retention period?
- Availability: 99.9% or higher? Maintenance windows?
- Latency: How fast should dashboard updates be?
- Consistency: Can we show slightly stale data? (eventual consistency)
- Security: Authentication, authorization, audit logging?
Technical Constraints:
- Existing infrastructure? (AWS, on-prem, hybrid)
- Integration requirements? (existing systems, APIs)
- Compliance requirements? (SOX, PCI DSS)
- Budget constraints?
13. API Design and Framework Selectionโ
Strong Answer: API Endpoints:
// REST API Design
GET /api/v1/metrics/realtime
{
"active_users": 1250,
"orders_per_minute": 45,
"revenue_per_minute": 12500,
"inventory_alerts": 3,
"system_health": "healthy",
"timestamp": "2025-06-30T10:30:00Z"
}
GET /api/v1/metrics/historical?metric=revenue&period=24h&granularity=1h
{
"metric": "revenue",
"data": [
{"timestamp": "2025-06-30T09:00:00Z", "value": 75000},
{"timestamp": "2025-06-30T10:00:00Z", "value": 82000}
]
}
POST /api/v1/alerts/subscribe
{
"metric": "inventory_level",
"threshold": 100,
"product_id": "12345",
"notification_method": "webhook"
}
Framework Comparison:
REST โ Recommended
- Simple, cacheable
- Wide tooling support
- Good for CRUD operations
- HTTP status codes for errors
WebSockets โ For Real-time Updates
// WebSocket for live updates
const ws = new WebSocket("wss://api.example.com/metrics/stream");
ws.onmessage = (event) => {
const metrics = JSON.parse(event.data);
updateDashboard(metrics);
};
GraphQL โ Not Recommended
- Adds complexity for simple metrics
- Caching more difficult
- Overkill for this use case
Final Architecture:
- REST API for historical data and configuration
- WebSocket for real-time metric streaming
- Server-Sent Events as WebSocket fallback
14. High-Level Architecture Diagramโ
Strong Answer:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ React SPA โ โ Load Balancer โ โ API Gateway โ
โ โโโโโบโ (ALB/NGINX) โโโโโบโ (Kong/Envoy) โ
โ - Dashboard โ โ โ โ - Auth โ
โ - WebSocket โ โ โ โ - Rate Limiting โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโผโโโโโโโโโ โโโโโโโโโโโโโผโโโโโโโโโ โโโโโโโโผโโโโโโโ
โ Metrics API โ โ WebSocket API โ โ Config API โ
โ (Python/Flask) โ โ (Go/Gorilla) โ โ(Python/Fast)โ
โโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโฌโโโโโโโ
โ โ โ
โโโโโโโโโโโผโโโโโโโโโ โโโโโโโโโโโโโผโโโโโโโโโ โโโโโโโโผโโโโโโโ
โ Redis Cache โ โ Message Queue โ โ PostgreSQL โ
โ (Metrics) โ โ (Kafka/Redis) โ โ (Config) โ
โโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโโโโโโ
โ โ
โโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโ
โ Time Series Database โ
โ (InfluxDB/TimescaleDB) โ
โโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโผโโโโโโโโโ
โ ETL Pipeline โ
โ (Apache Beam) โ
โโโโโโโโโโโฌโโโโโโโโโ
โ
โโโโโโโโโโโ โโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโผโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ โโโโโโโผโโโโโโ
โ E-commerce โ โ Inventory โ โ System โ
โ Database โ โ Management โ โ Metrics โ
โ (Orders) โ โ (Stock Levels) โ โ (Health) โ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโ
Data Flow:
- Source Systems โ ETL Pipeline (real-time streaming)
- ETL Pipeline โ Time Series DB (processed metrics)
- Time Series DB โ Redis Cache (frequently accessed data)
- API Services โ Frontend (REST + WebSocket)
- Message Queue โ WebSocket API (real-time updates)
15. Real-time Implementation Approachesโ
Strong Answer: Comparison of Real-time Approaches:
Approach | Pros | Cons | Use Case |
---|---|---|---|
Polling | Simple, stateless | High latency, wasteful | Non-critical updates |
WebSockets | True real-time, bidirectional | Complex, stateful | Live dashboards |
Server-Sent Events | Simpler than WebSocket, auto-reconnect | One-way only | Event streams |
Message Queues | Reliable, scalable | Added complexity | High-volume events |
Recommended Architecture:
// WebSocket implementation in Go
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
subscriptions map[string]bool
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
delete(h.clients, client)
close(client.send)
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
// Kafka consumer for real-time metrics
func consumeMetrics() {
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "dashboard-consumers",
})
consumer.Subscribe("metrics-topic", nil)
for {
msg, _ := consumer.ReadMessage(-1)
// Process metric and broadcast to WebSocket clients
metric := parseMetric(msg.Value)
hub.broadcast <- metric
// Also update Redis cache
updateCache(metric)
}
}
Client-side Connection Management:
class MetricsWebSocket {
constructor(url) {
this.url = url;
this.reconnectInterval = 1000;
this.maxReconnectInterval = 30000;
this.reconnectDecay = 1.5;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("Connected to metrics stream");
this.reconnectInterval = 1000; // Reset backoff
};
this.ws.onmessage = (event) => {
const metrics = JSON.parse(event.data);
this.updateDashboard(metrics);
};
this.ws.onclose = () => {
console.log("Connection lost, reconnecting...");
setTimeout(() => {
this.reconnectInterval = Math.min(
this.reconnectInterval * this.reconnectDecay,
this.maxReconnectInterval
);
this.connect();
}, this.reconnectInterval);
};
}
subscribe(metricType) {
this.ws.send(
JSON.stringify({
action: "subscribe",
metric: metricType,
})
);
}
}
16. Single Points of Failure Analysisโ
Strong Answer: Identified SPOFs and Solutions:
-
Load Balancer SPOF
- Problem: Single ALB failure takes down entire system
- Solution: Multi-AZ deployment with Route 53 health checks
# Terraform for multi-region setup
resource "aws_lb" "primary" {
name = "app-lb-primary"
availability_zones = ["us-east-1a", "us-east-1b"]
}
resource "aws_route53_record" "failover_primary" {
zone_id = aws_route53_zone.main.zone_id
name = "api.example.com"
type = "A"
failover_routing_policy {
type = "PRIMARY"
}
health_check_id = aws_route53_health_check.primary.id
} -
Database SPOF
- Problem: Single database failure
- Solution: Primary-replica setup with automatic failover
# PostgreSQL HA setup
apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
name: postgres-cluster
spec:
instances: 3
primaryUpdateStrategy: unsupervised
postgresql:
parameters:
max_connections: "200"
shared_buffers: "256MB"
backup:
retentionPolicy: "30d"
barmanObjectStore:
destinationPath: "s3://backups/postgres" -
Redis Cache SPOF
- Problem: Cache failure impacts performance
- Solution: Redis Sentinel for HA + graceful degradation
import redis.sentinel
sentinel = redis.sentinel.Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
])
def get_metric(key):
try:
master = sentinel.master_for('mymaster', socket_timeout=0.1)
return master.get(key)
except redis.RedisError:
# Graceful degradation - fetch from database
return get_metric_from_db(key) -
Message Queue SPOF
- Problem: Kafka broker failure stops real-time updates
- Solution: Multi-broker Kafka cluster with replication
# Kafka with replication
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
default.replication.factor: 3
min.insync.replicas: 2
17. Caching Implementation Strategyโ
Strong Answer: Multi-Level Caching Strategy:
# 1. Application-level caching
from functools import lru_cache
import redis
import json
class CacheService:
def __init__(self):
self.redis_client = redis.Redis(host='redis-cluster')
self.local_cache = {}
@lru_cache(maxsize=1000)
def get_metric_definition(self, metric_name):
"""Cache metric metadata (rarely changes)"""
return self.fetch_metric_definition(metric_name)
def get_real_time_metric(self, metric_key):
"""Multi-level cache for real-time data"""
# L1: Memory cache (100ms TTL)
if metric_key in self.local_cache:
data, timestamp = self.local_cache[metric_key]
if time.time() - timestamp < 0.1: # 100ms
return data
# L2: Redis cache (5s TTL)
cached = self.redis_client.get(f"metric:{metric_key}")
if cached:
data = json.loads(cached)
self.local_cache[metric_key] = (data, time.time())
return data
# L3: Database fallback
data = self.fetch_from_database(metric_key)
# Cache with appropriate TTL
self.redis_client.setex(
f"metric:{metric_key}",
5, # 5 second TTL
json.dumps(data)
)
self.local_cache[metric_key] = (data, time.time())
return data
2. CDN Caching for Static Assets:
// CloudFront configuration
{
"Origins": [{
"Id": "dashboard-origin",
"DomainName": "dashboard-api.example.com",
"CustomOriginConfig": {
"HTTPPort": 443,
"OriginProtocolPolicy": "https-only"
}
}],
"DefaultCacheBehavior": {
"TargetOriginId": "dashboard-origin",
"ViewerProtocolPolicy": "redirect-to-https",
"CachePolicyId": "custom-cache-policy",
"TTL": {
"DefaultTTL": 300, // 5 minutes for API responses
"MaxTTL": 3600 // 1 hour max
}
},
"CacheBehaviors": [{
"PathPattern": "/static/*",
"TTL": {
"DefaultTTL": 86400, // 24 hours for static assets
"MaxTTL": 31536000 // 1 year max
}
}]
}
3. Database Query Result Caching:
-- Materialized views for expensive aggregations
CREATE MATERIALIZED VIEW hourly_revenue AS
SELECT
date_trunc('hour', order_timestamp) as hour,
SUM(order_total) as revenue,
COUNT(*) as order_count
FROM orders
WHERE order_timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY date_trunc('hour', order_timestamp);
-- Refresh every 5 minutes
CREATE OR REPLACE FUNCTION refresh_hourly_revenue()
RETURNS void AS $
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_revenue;
END;
$ LANGUAGE plpgsql;
SELECT cron.schedule('refresh-revenue', '*/5 * * * *', 'SELECT refresh_hourly_revenue();');
Cache Invalidation Strategy:
class CacheInvalidator:
def __init__(self):
self.redis_client = redis.Redis()
def invalidate_on_order(self, order_data):
"""Invalidate relevant caches when new order arrives"""
patterns_to_invalidate = [
"metric:orders_per_minute",
"metric:revenue_per_minute",
f"metric:inventory:{order_data['product_id']}"
]
for pattern in patterns_to_invalidate:
# Use Redis pipeline for efficiency
pipe = self.redis_client.pipeline()
pipe.delete(pattern)
pipe.publish('cache_invalidation', pattern)
pipe.execute()
def smart_invalidation(self, event_type, entity_id):
"""Invalidate based on event type"""
invalidation_map = {
'order_created': ['revenue', 'orders', 'inventory'],
'user_login': ['active_users'],
'inventory_update': ['inventory', 'stock_alerts']
}
metrics_to_invalidate = invalidation_map.get(event_type, [])
for metric in metrics_to_invalidate:
self.redis_client.delete(f"metric:{metric}:{entity_id}")
18. Database Design: SQL vs NoSQLโ
Strong Answer: Hybrid Approach - Use Both:
SQL (PostgreSQL) for:
- Transactional Data: Orders, users, inventory
- ACID Requirements: Financial transactions
- Complex Queries: Joins, aggregations
- Data Consistency: Strong consistency needs
-- OLTP Database Schema
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
order_total DECIMAL(10,2) NOT NULL,
order_timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
status VARCHAR(20) NOT NULL DEFAULT 'pending'
);
CREATE INDEX idx_orders_timestamp ON orders(order_timestamp);
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Partitioning for time-series data
CREATE TABLE orders_2025_06 PARTITION OF orders
FOR VALUES FROM ('2025-06-01') TO ('2025-07-01');
NoSQL (InfluxDB) for:
- Time-series Metrics: Performance data, system metrics
- High Write Volume: Thousands of metrics per second
- Retention Policies: Automatic data aging
# InfluxDB for metrics storage
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://influxdb:8086", token="my-token")
write_api = client.write_api(write_options=SYNCHRONOUS)
def write_metric(measurement, tags, fields):
point = Point(measurement) \
.tag("service", tags.get("service")) \
.tag("region", tags.get("region")) \
.field("value", fields["value"]) \
.time(datetime.utcnow(), WritePrecision.S)
write_api.write(bucket="metrics", record=point)
# Example usage
write_metric(
measurement="orders_per_minute",
tags={"service": "ecommerce", "region": "us-east-1"},
fields={"value": 45}
)
Read/Write Load Balancing:
class DatabaseRouter:
def __init__(self):
self.primary_db = PostgreSQLConnection("primary-db")
self.read_replicas = [
PostgreSQLConnection("replica-1"),
PostgreSQLConnection("replica-2"),
PostgreSQLConnection("replica-3")
]
self.current_replica = 0
def get_read_connection(self):
"""Round-robin read replica selection"""
replica = self.read_replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.read_replicas)
return replica
def get_write_connection(self):
"""Always use primary for writes"""
return self.primary_db
def execute_read_query(self, query, params=None):
try:
return self.get_read_connection().execute(query, params)
except Exception:
# Fallback to primary if replica fails
return self.primary_db.execute(query, params)
# Usage
@app.route('/api/metrics/historical')
def get_historical_metrics():
# Use read replica for analytics queries
db = router.get_read_connection()
return db.execute("""
SELECT date_trunc('hour', created_at) as hour,
SUM(total) as revenue
FROM orders
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour
""")
19. Scaling Strategy for 10x Trafficโ
Strong Answer: Scaling Priority Order:
1. Application Tier (Scale First)
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: dashboard-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: dashboard-api
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
2. Database Scaling
# Read replica scaling + connection pooling
class DatabaseScaler:
def __init__(self):
self.connection_pools = {
'primary': create_pool('primary-db', pool_size=20),
'replicas': [
create_pool('replica-1', pool_size=50),
create_pool('replica-2', pool_size=50),
create_pool('replica-3', pool_size=50),
]
}
def scale_read_capacity(self, target_qps):
"""Add more read replicas based on QPS"""
current_capacity = len(self.connection_pools['replicas']) * 1000 # QPS per replica
if target_qps > current_capacity * 0.8: # 80% utilization threshold
# Add new read replica
new_replica = create_read_replica()
self.connection_pools['replicas'].append(
create_pool(new_replica, pool_size=50)
)
def implement_sharding(self):
"""Implement horizontal sharding for orders table"""
shards = {
'shard_1': 'orders_americas',
'shard_2': 'orders_europe',
'shard_3': 'orders_asia'
}
def get_shard(user_id):
return shards[f'shard_{hash(user_id) % 3 + 1}']
3. Cache Scaling
# Redis Cluster for horizontal scaling
import rediscluster
redis_cluster = rediscluster.RedisCluster(
startup_nodes=[
{"host": "redis-1", "port": "7000"},
{"host": "redis-2", "port": "7000"},
{"host": "redis-3", "port": "7000"},
{"host": "redis-4", "port": "7000"},
{"host": "redis-5", "port": "7000"},
{"host": "redis-6", "port": "7000"},
],
decode_responses=True,
skip_full_coverage_check=True
)
# Cache partitioning strategy
def cache_key_partition(metric_type, entity_id):
"""Partition cache keys for better distribution"""
partition = hash(f"{metric_type}:{entity_id}") % 1000
return f"{metric_type}:{partition}:{entity_id}"
4. CDN and Edge Caching
// CloudFlare Workers for edge computing
addEventListener("fetch", (event) => {
event.respondWith(handleRequest(event.request));
});
async function handleRequest(request) {
const url = new URL(request.url);
// Cache API responses at edge
if (url.pathname.startsWith("/api/metrics/")) {
const cacheKey = new Request(url.toString(), request);
const cache = caches.default;
// Check edge cache first
let response = await cache.match(cacheKey);
if (!response) {
// Fetch from origin
response = await fetch(request);
// Cache for 30 seconds
const headers = new Headers(response.headers);
headers.set("Cache-Control", "public, max-age=30");
response = new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: headers,
});
event.waitUntil(cache.put(cacheKey, response.clone()));
}
return response;
}
return fetch(request);
}
5. Message Queue Scaling
# Kafka cluster scaling
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: metrics-kafka
spec:
kafka:
replicas: 9 # Scale from 3 to 9 brokers
config:
num.partitions: 50 # More partitions for parallelism
default.replication.factor: 3
min.insync.replicas: 2
resources:
requests:
memory: 8Gi
cpu: 2000m
limits:
memory: 16Gi
cpu: 4000m
20. Failover Strategy for Database Outageโ
Strong Answer: Multi-Tier Failover Strategy:
# Automatic failover implementation
import time
import threading
from enum import Enum
class DatabaseState(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
FAILED = "failed"
class DatabaseFailoverManager:
def __init__(self):
self.primary_db = "primary-db.example.com"
self.replica_dbs = [
"replica-1.example.com",
"replica-2.example.com",
"replica-3.example.com"
]
self.current_state = DatabaseState.HEALTHY
self.current_primary = self.primary_db
self.health_check_interval = 5 # seconds
def health_check(self, db_host):
"""Check database health"""
try:
conn = psycopg2.connect(
host=db_host,
database="analytics",
user="readonly",
password="password",
connect_timeout=3
)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
return result[0] == 1
except Exception as e:
logger.error(f"Health check failed for {db_host}: {e}")
return False
def promote_replica_to_primary(self, replica_host):
"""Promote replica to primary (manual intervention required)"""
# This would typically involve:
# 1. Stopping replication on chosen replica
# 2. Updating DNS/load balancer to point to new primary
# 3. Updating application config
logger.critical(f"Promoting {replica_host} to primary")
# Update Route 53 record to point to new primary
route53 = boto3.client('route53')
route53.change_resource_record_sets(
HostedZoneId='Z123456789',
ChangeBatch={
'Changes': [{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': 'primary-db.example.com',
'Type': 'CNAME',
'TTL': 60,
'ResourceRecords': [{'Value': replica_host}]
}
}]
}
)
self.current_primary = replica_host
def circuit_breaker_fallback(self):
"""Fallback to cached data when database is unavailable"""
logger.warning("Database unavailable, switching to read-only mode")
# Serve from cache only
app.config['READ_ONLY_MODE'] = True
# Show banner to users
return {
"status": "degraded",
"message": "Real-time data temporarily unavailable",
"data_freshness": "cached_5_minutes_ago"
}
def monitor_and_failover(self):
"""Background thread for monitoring and automatic failover"""
consecutive_failures = 0
while True:
if self.health_check(self.current_primary):
consecutive_failures = 0
self.current_state = DatabaseState.HEALTHY
app.config['READ_ONLY_MODE'] = False
else:
consecutive_failures += 1
logger.warning(f"Primary DB health check failed {consecutive_failures} times")
if consecutive_failures >= 3: # 15 seconds of failures
self.current_state = DatabaseState.FAILED
# Try to find healthy replica
healthy_replica = None
for replica in self.replica_dbs:
if self.health_check(replica):
healthy_replica = replica
break
if healthy_replica:
self.promote_replica_to_primary(healthy_replica)
else:
# All databases failed - enable circuit breaker
self.circuit_breaker_fallback()
time.sleep(self.health_check_interval)
# Start monitoring in background
failover_manager = DatabaseFailoverManager()
monitor_thread = threading.Thread(target=failover_manager.monitor_and_failover)
monitor_thread.daemon = True
monitor_thread.start()
Emergency Procedures:
#!/bin/bash
# emergency-failover.sh
echo "=== EMERGENCY DATABASE FAILOVER ==="
echo "1. Checking primary database status..."
if ! pg_isready -h primary-db.example.com -p 5432; then
echo "โ Primary database is DOWN"
echo "2. Finding healthy replica..."
for replica in replica-1 replica-2 replica-3; do
if pg_isready -h ${replica}.example.com -p 5432; then
echo "โ
Found healthy replica: $replica"
echo "3. Promoting replica to primary..."
# Stop replication
psql -h ${replica}.example.com -c "SELECT pg_promote();"
echo "4. Updating DNS record..."
aws route53 change-resource-record-sets \
--hosted-zone-id Z123456789 \
--change-batch file://failover-dns.json
echo "5. Updating application config..."
kubectl patch configmap app-config \
--patch "{\"data\":{\"DATABASE_HOST\":\"${replica}.example.com\"}}"
echo "6. Restarting application pods..."
kubectl rollout restart deployment/dashboard-api
echo "โ
Failover complete to $replica"
exit 0
fi
done
echo "โ No healthy replicas found - enabling read-only mode"
kubectl patch configmap app-config \
--patch '{"data":{"READ_ONLY_MODE":"true"}}'
else
echo "โ
Primary database is healthy"
fi
21. Circuit Breaker Implementationโ
Strong Answer: Circuit Breaker Pattern for External Services:
import time
import threading
from enum import Enum
from collections import deque
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = threading.Lock()
# Track recent requests for metrics
self.recent_requests = deque(maxlen=100)
def __call__(self, func):
def wrapper(*args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
# Check if timeout period has passed
if time.time() - self.last_failure_time >= self.timeout:
self.state = CircuitState.HALF_OPEN
self.failure_count = 0
else:
# Circuit is open - reject request
raise Exception("Circuit breaker is OPEN - service unavailable")
if self.state == CircuitState.HALF_OPEN:
# Test with single request
try:
result = func(*args, **kwargs)
self.state = CircuitState.CLOSED
self.failure_count = 0
self.record_request(success=True)
return result
except self.expected_exception as e:
self.state = CircuitState.OPEN
self.failure_count += 1
self.last_failure_time = time.time()
self.record_request(success=False)
raise e
# Normal operation (CLOSED state)
try:
result = func(*args, **kwargs)
self.failure_count = 0 # Reset on success
self.record_request(success=True)
return result
except self.expected_exception as e:
self.failure_count += 1
self.record_request(success=False)
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.last_failure_time = time.time()
raise e
return wrapper
def record_request(self, success):
self.recent_requests.append({
'timestamp': time.time(),
'success': success
})
def get_metrics(self):
now = time.time()
recent = [r for r in self.recent_requests if now - r['timestamp'] < 300] # Last 5 minutes
total_requests = len(recent)
successful_requests = sum(1 for r in recent if r['success'])
return {
'state': self.state.value,
'failure_count': self.failure_count,
'success_rate': successful_requests / total_requests if total_requests > 0 else 0,
'total_requests_5min': total_requests
}
# Usage in analytics dashboard
@CircuitBreaker(failure_threshold=3, timeout=30)
def get_inventory_data(product_ids):
"""Call to inventory service with circuit breaker"""
response = requests.get(
f"{INVENTORY_SERVICE_URL}/api/products",
params={'ids': ','.join(product_ids)},
timeout=5
)
response.raise_for_status()
return response.json()
@CircuitBreaker(failure_threshold=5, timeout=60)
def get_user_analytics():
"""Call to analytics service with circuit breaker"""
response = requests.get(f"{ANALYTICS_SERVICE_URL}/api/users/active", timeout=10)
response.raise_for_status()
return response.json()
# Fallback strategies
def get_dashboard_data():
try:
# Try to get real-time inventory data
inventory_data = get_inventory_data(['1', '2', '3'])
except Exception:
# Fallback to cached data
inventory_data = redis_client.get('inventory_cache')
if not inventory_data:
inventory_data = {"error": "Inventory data unavailable"}
try:
# Try to get user analytics
user_data = get_user_analytics()
except Exception:
# Fallback to approximate data
user_data = {"active_users": "~1000", "estimated": True}
return {
"inventory": inventory_data,
"users": user_data,
"timestamp": time.time()
}
Service-Level Circuit Breaker Integration:
# Integration with Flask app
@app.route('/api/dashboard')
def dashboard_endpoint():
# Check circuit breaker states
circuit_states = {
'inventory': inventory_circuit_breaker.get_metrics(),
'analytics': analytics_circuit_breaker.get_metrics(),
}
# Include circuit breaker status in response
response_data = get_dashboard_data()
response_data['service_health'] = circuit_states
# Set appropriate HTTP status based on circuit states
if any(state['state'] == 'open' for state in circuit_states.values()):
return jsonify(response_data), 206 # Partial content
else:
return jsonify(response_data), 200
# Monitoring endpoint for circuit breaker states
@app.route('/api/health/circuits')
def circuit_breaker_health():
return jsonify({
'inventory_service': inventory_circuit_breaker.get_metrics(),
'analytics_service': analytics_circuit_breaker.get_metrics(),
})
Part 4: Advanced SRE & Operationsโ
22. API Response Time Investigation Processโ
Strong Answer: Systematic Investigation Approach:
Step 1: Immediate Assessment (First 2 minutes)
# Quick checks to understand scope
echo "=== INCIDENT RESPONSE CHECKLIST ==="
echo "1. Checking service status..."
curl -w "%{http_code} %{time_total}s" -s -o /dev/null https://api.example.com/health
echo "2. Checking recent deployments..."
kubectl get pods --sort-by=.metadata.creationTimestamp
echo "3. Checking error rates..."
# Query Prometheus for error rate spike
curl -s 'http://prometheus:9090/api/v1/query?query=rate(http_requests_total{status=~"5.."}[5m])'
Step 2: Resource Analysis (Minutes 2-5)
# Automated investigation script
import subprocess
import json
import requests
class IncidentInvestigator:
def __init__(self):
self.findings = []
def check_resource_usage(self):
"""Check CPU, Memory, Disk I/O"""
# CPU utilization
cpu_query = 'avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)'
cpu_data = self.query_prometheus(cpu_query)
for result in cpu_data:
cpu_usage = float(result['value'][1])
if cpu_usage > 0.8: # 80% threshold
self.findings.append(f"High CPU: {result['metric']['pod']} at {cpu_usage:.2%}")
# Memory usage
memory_query = 'container_memory_usage_bytes / container_spec_memory_limit_bytes'
memory_data = self.query_prometheus(memory_query)
for result in memory_data:
memory_usage = float(result['value'][1])
if memory_usage > 0.85: # 85% threshold
self.findings.append(f"High Memory: {result['metric']['pod']} at {memory_usage:.2%}")
def check_database_performance(self):
"""Check database connection pool, slow queries"""
# Connection pool utilization
db_conn_query = 'pg_stat_activity_count / pg_settings_max_connections'
db_data = self.query_prometheus(db_conn_query)
if db_data and float(db_data[0]['value'][1]) > 0.8:
self.findings.append("Database connection pool near capacity")
# Check for slow queries
slow_queries = subprocess.run([
'psql', '-h', 'postgres-primary', '-c',
"SELECT query, query_start, state, waiting FROM pg_stat_activity WHERE state = 'active' AND query_start < NOW() - INTERVAL '30 seconds';"
], capture_output=True, text=True)
if slow_queries.stdout and len(slow_queries.stdout.split('\n')) > 3:
self.findings.append("Long-running queries detected")
def check_external_dependencies(self):
"""Check downstream services"""
services = ['inventory-service', 'payment-service', 'user-service']
for service in services:
try:
response = requests.get(f'http://{service}/health', timeout=5)
if response.status_code != 200 or response.elapsed.total_seconds() > 1.0:
self.findings.append(f"{service} health check failed or slow")
except requests.RequestException:
self.findings.append(f"{service} unreachable")
def check_recent_changes(self):
"""Check recent deployments and config changes"""
# Get recent pod restarts
pods = subprocess.run([
'kubectl', 'get', 'pods', '-o', 'json'
], capture_output=True, text=True)
pod_data = json.loads(pods.stdout)
for pod in pod_data['items']:
restart_count = pod['status']['restartCount']
if restart_count > 0:
self.findings.append(f"Pod {pod['metadata']['name']} has {restart_count} restarts")
def query_prometheus(self, query):
"""Helper to query Prometheus"""
try:
response = requests.get(
'http://prometheus:9090/api/v1/query',
params={'query': query},
timeout=10
)
return response.json()['data']['result']
except:
return []
def investigate(self):
"""Run full investigation"""
print("๐ Starting incident investigation...")
self.check_resource_usage()
self.check_database_performance()
self.check_external_dependencies()
self.check_recent_changes()
print("\n๐ Investigation Summary:")
if self.findings:
for finding in self.findings:
print(f"โ ๏ธ {finding}")
else:
print("โ
No obvious issues found in automated checks")
return self.findings
# Run investigation
investigator = IncidentInvestigator()
findings = investigator.investigate()
Step 3: Deep Dive Analysis (Minutes 5-15)
# Application-level investigation
echo "4. Checking application metrics..."
# Look for specific endpoint latency
kubectl exec -it $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') -- \
curl -s localhost:8080/metrics | grep http_request_duration
# Check garbage collection metrics (for Java/Go apps)
kubectl logs $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') | \
grep -i "gc\|garbage"
# Database query analysis
psql -h postgres-primary -c "
SELECT query, calls, total_time, mean_time, stddev_time
FROM pg_stat_statements
WHERE mean_time > 1000
ORDER BY mean_time DESC
LIMIT 10;"
# Check for lock contention
psql -h postgres-primary -c "
SELECT blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON (blocked_locks.locktype = blocking_locks.locktype)
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;"
Step 4: Resolution and Communication
# Incident response actions
class IncidentResponse:
def __init__(self):
self.slack_webhook = "https://hooks.slack.com/services/..."
def notify_team(self, message, severity="warning"):
"""Send alert to incident response channel"""
payload = {
"text": f"๐จ API Performance Incident",
"attachments": [{
"color": "danger" if severity == "critical" else "warning",
"fields": [
{"title": "Issue", "value": message, "short": False},
{"title": "Runbook", "value": "https://wiki.company.com/incident-response", "short": True},
{"title": "Investigator", "value": "SRE On-Call", "short": True}
]
}]
}
requests.post(self.slack_webhook, json=payload)
def apply_immediate_fix(self, issue_type):
"""Apply quick fixes based on identified issue"""
if issue_type == "high_cpu":
# Scale up pods
subprocess.run(['kubectl', 'scale', 'deployment/api', '--replicas=10'])
elif issue_type == "database_load":
# Enable read-only mode temporarily
subprocess.run(['kubectl', 'patch', 'configmap/app-config',
'--patch', '{"data":{"READ_ONLY_MODE":"true"}}'])
elif issue_type == "memory_leak":
# Rolling restart
subprocess.run(['kubectl', 'rollout', 'restart', 'deployment/api'])
elif issue_type == "external_service":
# Enable circuit breaker
subprocess.run(['kubectl', 'patch', 'configmap/app-config',
'--patch', '{"data":{"CIRCUIT_BREAKER_ENABLED":"true"}}'])
# Usage
incident = IncidentResponse()
incident.notify_team("API latency increased from 100ms to 2s. Investigation in progress.")
# Based on findings, apply fixes
if "High CPU" in findings:
incident.apply_immediate_fix("high_cpu")
incident.notify_team("Applied fix: Scaled up API pods to handle load")
23. Error Budget Management Scenarioโ
Strong Answer: Error Budget Analysis:
First, let me calculate the current situation:
- SLO: 99.5% uptime (0.5% error budget per month)
- Current Error Rate: 20% increase = baseline + 20%
- Deployment Impact: Caused the increase
- Error Budget Status: Within budget (need to verify actual numbers)
Decision Framework:
class ErrorBudgetManager:
def __init__(self):
self.slo_target = 0.995 # 99.5% success rate
self.error_budget = 1 - self.slo_target # 0.5% error budget
def calculate_current_burn_rate(self, error_rate, time_period_hours):
"""Calculate how fast we're burning error budget"""
monthly_hours = 30 * 24 # 720 hours
burn_rate = (error_rate * time_period_hours) / monthly_hours
return burn_rate
def assess_situation(self, baseline_error_rate, current_error_rate, hours_since_deploy):
"""Assess if we should keep feature or rollback"""
# Calculate actual error rates
error_increase = current_error_rate - baseline_error_rate
# Calculate error budget consumption
budget_burned = self.calculate_current_burn_rate(error_increase, hours_since_deploy)
remaining_budget = self.error_budget - budget_burned
# Project if this continues for the month
monthly_projection = error_increase * (30 * 24) / hours_since_deploy
assessment = {
"current_error_rate": current_error_rate,
"error_budget_burned": budget_burned,
"remaining_budget": remaining_budget,
"monthly_projection": monthly_projection,
"exceeds_budget": monthly_projection > self.error_budget
}
return assessment
def make_recommendation(self, assessment, business_impact):
"""Provide recommendation based on data"""
if assessment["exceeds_budget"]:
return {
"action": "ROLLBACK",
"reason": "Feature will exceed monthly error budget",
"timeline": "Immediate"
}
elif assessment["remaining_budget"] < 0.001: # Less than 0.1% budget left
return {
"action": "ENHANCED_MONITORING",
"reason": "Close to budget limit, monitor closely",
"conditions": "Rollback if error rate increases further"
}
else:
return {
"action": "KEEP_WITH_CONDITIONS",
"reason": "Within error budget but requires monitoring",
"conditions": [
"Implement feature flag for quick disable",
"Set up aggressive alerting",
"Daily budget review meetings"
]
}
# Real scenario analysis
budget_manager = ErrorBudgetManager()
# Assume baseline was 0.1% error rate, now 1.2% (20% relative increase)
assessment = budget_manager.assess_situation(
baseline_error_rate=0.001, # 0.1%
current_error_rate=0.012, # 1.2%
hours_since_deploy=6
)
recommendation = budget_manager.make_recommendation(assessment, business_impact="high")
Communication Strategy:
def communicate_decision(assessment, recommendation, stakeholders):
"""Structured communication to all stakeholders"""
message = f"""
๐ ERROR BUDGET ASSESSMENT - {datetime.now().strftime('%Y-%m-%d %H:%M')}
๐ฏ SLO Target: 99.5% success rate (0.5% monthly error budget)
๐ Current Situation:
โข Error rate: {assessment['current_error_rate']:.3%}
โข Budget burned: {assessment['error_budget_burned']:.3%}
โข Remaining budget: {assessment['remaining_budget']:.3%}
โข Monthly projection: {assessment['monthly_projection']:.3%}
๐ Analysis:
{'โ Will exceed budget' if assessment['exceeds_budget'] else 'โ
Within budget'}
๐ฏ Recommendation: {recommendation['action']}
Reason: {recommendation['reason']}
โก Next Steps:
"""
if recommendation['action'] == 'ROLLBACK':
message += """
1. Immediate rollback of feature
2. Root cause analysis
3. Fix implementation before retry
4. Post-mortem scheduled
"""
elif recommendation['action'] == 'KEEP_WITH_CONDITIONS':
message += f"""
1. Implement monitoring conditions
2. Set up automated alerts
3. Daily review meetings
4. Feature flag for quick disable
Conditions: {recommendation['conditions']}
"""
# Send to different channels based on audience
send_to_slack('#sre-team', message)
send_to_email(stakeholders['engineering'], message)
send_executive_summary(stakeholders['leadership'])
def send_executive_summary(executives):
"""Simplified version for executives"""
summary = f"""
Feature Deployment Impact Assessment
Status: {'๐ด Action Required' if recommendation['action'] == 'ROLLBACK' else '๐ก Monitoring'}
Decision: {recommendation['action']}
Business Impact: Minimal customer impact, within reliability targets
Engineering is {recommendation['action'].lower().replace('_', ' ')} and monitoring closely.
"""
send_to_email(executives, summary)
Implementation Plan:
# Feature flag for quick disable
apiVersion: v1
kind: ConfigMap
metadata:
name: feature-flags
data:
new-feature-enabled: "true"
error-budget-threshold: "0.004" # 0.4% - alert threshold
---
# Enhanced monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: error-budget-alerts
spec:
groups:
- name: error-budget
rules:
- alert: ErrorBudgetBurnRateHigh
expr: |
(
rate(http_requests_total{status=~"5.."}[1h]) /
rate(http_requests_total[1h])
) > 0.004
for: 5m
labels:
severity: critical
annotations:
summary: "Error budget burn rate too high"
- alert: ErrorBudgetProjectedExceed
expr: |
(
rate(http_requests_total{status=~"5.."}[6h]) /
rate(http_requests_total[6h])
) * 720 > 0.005 # 720 hours in month, 0.5% budget
for: 10m
labels:
severity: warning
annotations:
summary: "Projected to exceed monthly error budget"
24. Load Testing Strategy Designโ
Strong Answer: Comprehensive Load Testing Strategy:
# Load test implementation using Locust
from locust import HttpUser, task, between
import random
import json
class DashboardUser(HttpUser):
wait_time = between(1, 5) # 1-5 second wait between requests
def on_start(self):
"""Setup user session"""
# Authenticate user
response = self.client.post("/api/auth/login", json={
"username": f"loadtest_user_{random.randint(1, 1000)}",
"password": "test_password"
})
if response.status_code == 200:
self.auth_token = response.json().get("token")
self.client.headers.update({"Authorization": f"Bearer {self.auth_token}"})
@task(3) # 3x weight - most common operation
def view_realtime_dashboard(self):
"""Simulate viewing real-time dashboard"""
self.client.get("/api/metrics/realtime")
@task(2) # 2x weight
def view_historical_data(self):
"""Simulate historical data requests"""
periods = ["1h", "6h", "24h", "7d"]
metrics = ["revenue", "orders", "active_users"]
period = random.choice(periods)
metric = random.choice(metrics)
self.client.get(f"/api/metrics/historical", params={
"metric": metric,
"period": period,
"granularity": "1m" if period in ["1h", "6h"] else "1h"
})
@task(1) # 1x weight - less common
def configure_alerts(self):
"""Simulate alert configuration"""
self.client.post("/api/alerts", json={
"metric": "inventory_level",
"threshold": random.randint(50, 200),
"product_id": f"product_{random.randint(1, 100)}"
})
@task(4) # 4x weight - WebSocket simulation via polling
def websocket_simulation(self):
"""Simulate WebSocket updates via HTTP polling"""
self.client.get("/api/metrics/stream", params={
"last_update": "2025-06-30T10:30:00Z"
})
class BurstTrafficUser(DashboardUser):
"""Simulates traffic spikes during incidents"""
wait_time = between(0.1, 0.5) # Much faster requests
@task(5)
def rapid_dashboard_refresh(self):
"""Rapid refreshing during incidents"""
self.client.get("/api/metrics/realtime")
# Load test scenarios
class LoadTestScenarios:
def __init__(self):
self.scenarios = {
"normal_load": {
"users": 100,
"spawn_rate": 10,
"duration": "10m",
"user_class": DashboardUser
},
"peak_load": {
"users": 500,
"spawn_rate": 50,
"duration": "15m",
"user_class": DashboardUser
},
"stress_test": {
"users": 1000,
"spawn_rate": 100,
"duration": "20m",
"user_class": DashboardUser
},
"spike_test": {
"users": 200,
"spawn_rate": 200, # Instant spike
"duration": "5m",
"user_class": BurstTrafficUser
},
"endurance_test": {
"users": 300,
"spawn_rate": 30,
"duration": "2h", # Long running
"user_class": DashboardUser
}
}
Infrastructure Load Testing:
#!/bin/bash
# load-test-runner.sh
echo "๐ Starting Analytics Dashboard Load Tests"
# 1. Baseline Performance Test
echo "๐ Running baseline performance test..."
locust -f dashboard_load_test.py \
--users 50 \
--spawn-rate 10 \
--run-time 5m \
--host https://api-staging.example.com \
--html reports/baseline_$(date +%Y%m%d_%H%M%S).html
# 2. Database Load Test
echo "๐๏ธ Testing database performance..."
# Simulate heavy analytics queries
for i in {1..100}; do
curl -s "https://api-staging.example.com/api/metrics/historical?period=30d&metric=revenue" &
done
wait
# 3. WebSocket Load Test
echo "๐ Testing WebSocket capacity..."
# Use websocket-king for WebSocket load testing
websocket-king \
--url wss://api-staging.example.com/metrics/stream \
--connections 1000 \
--duration 10m \
--rate 10 \
--output ws_load_report.json
# 4. Cache Performance Test
echo "๐พ Testing cache performance..."
# Warm up cache, then test cache hit rates
redis-cli --latency-history -h redis-staging.example.com
# 5. Auto-scaling Test
echo "๐ Testing auto-scaling behavior..."
# Gradually increase load to trigger auto-scaling
locust -f dashboard_load_test.py \
--users 1000 \
--spawn-rate 20 \
--run-time 30m \
--host https://api-staging.example.com \
--html reports/autoscale_test_$(date +%Y%m%d_%H%M%S).html
echo "โ
Load testing complete. Check reports/ directory for results."
Performance Metrics Collection:
# Custom metrics collection during load testing
import psutil
import time
import json
from datetime import datetime
class PerformanceMonitor:
def __init__(self):
self.metrics = []
self.monitoring = False
def start_monitoring(self):
"""Start collecting system metrics"""
self.monitoring = True
while self.monitoring:
metrics = {
"timestamp": datetime.now().isoformat(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_io": psutil.disk_io_counters()._asdict(),
"network_io": psutil.net_io_counters()._asdict(),
"connections": len(psutil.net_connections()),
}
# Database metrics
metrics["db_connections"] = self.get_db_connections()
metrics["cache_hit_rate"] = self.get_cache_hit_rate()
self.metrics.append(metrics)
time.sleep(5) # Collect every 5 seconds
def get_db_connections(self):
"""Get database connection count"""
try:
import psycopg2
conn = psycopg2.connect("postgresql://user:pass@db:5432/analytics")
cursor = conn.cursor()
cursor.execute("SELECT count(*) FROM pg_stat_activity;")
return cursor.fetchone()[0]
except:
return None
def get_cache_hit_rate(self):
"""Get Redis cache hit rate"""
try:
import redis
r = redis.Redis(host='redis-cluster')
info = r.info('stats')
hits = info['keyspace_hits']
misses = info['keyspace_misses']
return hits / (hits + misses) if (hits + misses) > 0 else 0
except:
return None
def stop_monitoring(self):
"""Stop monitoring and save results"""
self.monitoring = False
# Save metrics to file
with open(f"performance_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
json.dump(self.metrics, f, indent=2)
# Generate summary report
self.generate_summary()
def generate_summary(self):
"""Generate performance summary"""
if not self.metrics:
return
cpu_values = [m['cpu_percent'] for m in self.metrics if m['cpu_percent']]
memory_values = [m['memory_percent'] for m in self.metrics if m['memory_percent']]
summary = {
"test_duration": len(self.metrics) * 5, # 5 second intervals
"avg_cpu": sum(cpu_values) / len(cpu_values),
"max_cpu": max(cpu_values),
"avg_memory": sum(memory_values) / len(memory_values),
"max_memory": max(memory_values),
"peak_connections": max([m.get('connections', 0) for m in self.metrics])
}
print("๐ Performance Test Summary:")
print(f" Duration: {summary['test_duration']} seconds")
print(f" Avg CPU: {summary['avg_cpu']:.1f}%")
print(f" Max CPU: {summary['max_cpu']:.1f}%")
print(f" Avg Memory: {summary['avg_memory']:.1f}%")
print(f" Max Memory: {summary['max_memory']:.1f}%")
print(f" Peak Connections: {summary['peak_connections']}")
# Usage with load test
monitor = PerformanceMonitor()
import threading
# Start monitoring in background
monitor_thread = threading.Thread(target=monitor.start_monitoring)
monitor_thread.start()
# Run load test
# ... run locust or other load testing tool ...
# Stop monitoring
monitor.stop_monitoring()
monitor_thread.join()
Test Criteria and Success Metrics:
# Load test success criteria
load_test_criteria:
response_time:
p50: < 200ms
p95: < 500ms
p99: < 1000ms
throughput:
normal_load: > 1000 RPS
peak_load: > 5000 RPS
error_rate:
max_acceptable: 0.1%
resource_utilization:
cpu: < 80%
memory: < 85%
database_connections: < 80% of pool
auto_scaling:
scale_up_time: < 2 minutes
scale_down_time: < 5 minutes
cache_performance:
hit_rate: > 90%
eviction_rate: < 5%
# Failure conditions (auto-stop test)
failure_conditions:
- error_rate > 1% for 2 minutes
- p99_latency > 5000ms for 5 minutes
- cpu_utilization > 95% for 3 minutes
- database_connection_pool > 95%
25. Go Service CPU Investigationโ
Strong Answer: Systematic CPU Investigation Process:
// 1. Enable pprof in Go service for CPU profiling
package main
import (
"context"
"log"
"net/http"
_ "net/http/pprof" // Import pprof
"runtime"
"time"
)
func main() {
// Start pprof server
go func() {
log.Println("Starting pprof server on :6060")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Set GOMAXPROCS to container CPU limit
runtime.GOMAXPROCS(2) // Adjust based on container resources
// Your application code
startApplication()
}
// 2. Add CPU monitoring middleware
func CPUMonitoringMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Record CPU usage before request
var rusageBefore syscall.Rusage
syscall.Getrusage(syscall.RUSAGE_SELF, &rusageBefore)
next.ServeHTTP(w, r)
// Record CPU usage after request
var rusageAfter syscall.Rusage
syscall.Getrusage(syscall.RUSAGE_SELF, &rusageAfter)
duration := time.Since(start)
cpuTime := time.Duration(rusageAfter.Utime-rusageBefore.Utime) * time.Microsecond
// Log high CPU requests
if cpuTime > 100*time.Millisecond {
log.Printf("High CPU request: %s %s - Duration: %v, CPU: %v",
r.Method, r.URL.Path, duration, cpuTime)
}
})
}
Investigation Tools and Commands:
#!/bin/bash
# cpu-investigation.sh
echo "๐ Investigating Go service CPU usage..."
# 1. Get current CPU profile (30 seconds)
echo "๐ Collecting CPU profile..."
go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile?seconds=30
# 2. Check for goroutine leaks
echo "๐งต Checking goroutine count..."
curl -s http://localhost:6060/debug/pprof/goroutine?debug=1 | head -20
# 3. Memory allocation profile (may cause CPU spikes)
echo "๐พ Checking memory allocations..."
go tool pprof http://localhost:6060/debug/pprof/allocs
# 4. Check GC performance
echo "๐๏ธ Checking garbage collection stats..."
curl -s http://localhost:6060/debug/vars | jq '.memstats'
# 5. Container-level CPU investigation
echo "๐ณ Container CPU stats..."
docker stats --no-stream $(docker ps --filter "name=go-service" --format "{{.Names}}")
# 6. Process-level analysis
echo "โ๏ธ Process CPU breakdown..."
top -H -p $(pgrep go-service) -n 1
# 7. strace for system call analysis
echo "๐ง System call analysis (10 seconds)..."
timeout 10s strace -c -p $(pgrep go-service)
Code-Level Optimizations:
// Common CPU bottleneck fixes
// 1. Fix: Inefficient JSON parsing
// BEFORE - Slow JSON handling
func processRequestSlow(w http.ResponseWriter, r *http.Request) {
var data map[string]interface{}
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &data)
// Process data...
}
// AFTER - Optimized JSON handling
type RequestData struct {
UserID string `json:"user_id"`
Action string `json:"action"`
// Define specific fields instead of interface{}
}
func processRequestFast(w http.ResponseWriter, r *http.Request) {
var data RequestData
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields() // Faster parsing
if err := decoder.Decode(&data); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Process typed data...
}
// 2. Fix: CPU-intensive loops
// BEFORE - O(nยฒ) algorithm
func findDuplicatesSlow(items []string) []string {
var duplicates []string
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i] == items[j] {
duplicates = append(duplicates, items[i])
break
}
}