2
Monitoring and Alerting:
# Prometheus rules for Go service CPU monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: go-service-cpu-alerts
spec:
groups:
- name: go-service-performance
rules:
- alert: GoServiceHighCPU
expr: |
(
rate(container_cpu_usage_seconds_total{pod=~"go-service-.*"}[5m])
) > 0.8
for: 5m
labels:
severity: warning
service: go-service
annotations:
summary: "Go service CPU usage above 80%"
description: "Pod {{ $labels.pod }} CPU usage is {{ $value }}%"
- alert: GoServiceGoroutineLeak
expr: |
go_goroutines{job="go-service"} > 10000
for: 10m
labels:
severity: critical
annotations:
summary: "Potential goroutine leak detected"
- alert: GoServiceGCPressure
expr: |
rate(go_gc_duration_seconds_sum[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High GC pressure in Go service"
description: "GC taking {{ $value }}s per collection cycle"
- alert: GoServiceMemoryLeak
expr: |
go_memstats_heap_inuse_bytes / go_memstats_heap_sys_bytes > 0.9
for: 15m
labels:
severity: critical
annotations:
summary: "Potential memory leak in Go service"
Performance Testing and Validation:
// Benchmark tests to validate optimizations
func BenchmarkProcessRequestSlow(b *testing.B) {
data := generateTestData(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestSlow(data)
}
}
func BenchmarkProcessRequestFast(b *testing.B) {
data := generateTestData(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestFast(data)
}
}
// Run benchmarks with memory profiling
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
// CPU profiling analysis script
func analyzeCPUProfile() {
// go tool pprof cpu.prof
// Commands in pprof:
// (pprof) top20 - Show top 20 CPU consumers
// (pprof) list function - Show source code with CPU usage
// (pprof) web - Generate web visualization
// (pprof) flamegraph - Generate flame graph
}
26. Microservices vs Monolithic Architectureโ
Strong Answer: Car Pool vs Train Analogy Explanation:
Microservices (Car Pool Approach):
๐ ๐ ๐ ๐ ๐ (Independent cars)
Each car (service) can:
- Take different routes
- Stop independently
- Break down without affecting others
- Scale by adding more cars
- Use different fuel types (technologies)
Monolithic (Train Approach):
๐-๐-๐-๐-๐ (Connected train)
The train (application):
- All cars must follow the same route
- If engine fails, entire train stops
- All cars must move together
- Scale by making train longer or faster
- Single fuel type for entire train
For Analytics Dashboard - Decision Framework:
# Decision matrix for architecture choice
class ArchitectureDecision:
def __init__(self):
self.factors = {
'team_size': 0,
'complexity': 0,
'scalability_needs': 0,
'technology_diversity': 0,
'deployment_frequency': 0,
'operational_maturity': 0
}
def assess_microservices_fit(self, dashboard_requirements):
"""Assess if microservices are appropriate"""
# Analytics dashboard components
services = {
'metrics_collector': {
'responsibility': 'Collect metrics from various sources',
'scalability': 'High - handles high volume ingestion',
'technology': 'Go - for performance'
},
'data_processor': {
'responsibility': 'Process and aggregate metrics',
'scalability': 'Medium - CPU intensive operations',
'technology': 'Python - for data processing libraries'
},
'api_gateway': {
'responsibility': 'Serve dashboard APIs',
'scalability': 'High - many concurrent users',
'technology': 'Node.js - for async I/O'
},
'notification_service': {
'responsibility': 'Send alerts and notifications',
'scalability': 'Low - occasional alerts',
'technology': 'Python - for integrations'
},
'frontend_bff': {
'responsibility': 'Backend for Frontend',
'scalability': 'Medium - aggregates data for UI',
'technology': 'React/TypeScript'
}
}
return self.evaluate_services(services)
def evaluate_services(self, services):
"""Evaluate microservices approach"""
microservices_benefits = [
"Independent scaling per service",
"Technology diversity (Go, Python, Node.js)",
"Team autonomy - different teams own different services",
"Fault isolation - metrics collection failure doesn't break UI",
"Independent deployments - can update notification without affecting API"
]
microservices_challenges = [
"Network latency between services",
"Data consistency across services",
"Distributed system complexity",
"Service discovery and load balancing",
"Monitoring and debugging across services"
]
return {
'benefits': microservices_benefits,
'challenges': microservices_challenges,
'recommendation': self.make_recommendation()
}
def make_recommendation(self):
"""Make architecture recommendation for analytics dashboard"""
# For analytics dashboard specifically:
if self.is_early_stage():
return {
'choice': 'MODULAR_MONOLITH',
'reason': 'Start simple, can extract services later',
'structure': self.modular_monolith_structure()
}
else:
return {
'choice': 'MICROSERVICES',
'reason': 'Scale and team benefits outweigh complexity',
'structure': self.microservices_structure()
}
def modular_monolith_structure(self):
"""Modular monolith approach - best of both worlds"""
return {
'structure': """
analytics-dashboard/
โโโ cmd/ # Application entry points
โโโ internal/
โ โโโ metrics/ # Metrics collection module
โ โโโ processing/ # Data processing module
โ โโโ api/ # API handling module
โ โโโ notifications/ # Alert module
โ โโโ dashboard/ # UI serving module
โโโ pkg/ # Shared libraries
โโโ deployments/ # Single deployment unit
""",
'benefits': [
'Single deployment and testing',
'Easier debugging and development',
'No network latency between modules',
'Simpler operational overhead',
'Can extract to microservices later'
]
}
def microservices_structure(self):
"""Full microservices approach"""
return {
'structure': """
Analytics Platform Microservices:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Frontend SPA โ โ API Gateway โ
โ (React) โโโโโบโ (Kong) โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโฌโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโโโผโโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ
โ Metrics Collector โ โ Data Processor โ โ Notification Svc โ
โ (Go) โ โ (Python) โ โ (Python) โ
โโโโโโโโโโโฌโโโโโโโโโโโ โโโโโโโโโโโฌโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ
โ โ
โโโโโโโ โโโโผโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโ
โ Message Queue โ
โ (Kafka/Redis) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
""",
'communication': 'Async messaging + HTTP APIs',
'data_strategy': 'Event sourcing with CQRS'
}
def is_early_stage(self):
"""Determine if project is in early stage"""
return (
self.factors['team_size'] < 10 and
self.factors['operational_maturity'] < 3
)
# Usage for analytics dashboard
decision = ArchitectureDecision()
recommendation = decision.assess_microservices_fit({
'expected_users': 500,
'data_volume': 'high',
'real_time_requirements': True,
'team_size': 8,
'deployment_frequency': 'daily'
})
Implementation Examples:
Microservices Implementation:
// Metrics Collector Service (Go)
package main
type MetricsCollector struct {
kafka *kafka.Producer
redis *redis.Client
handlers map[string]MetricHandler
}
func (mc *MetricsCollector) CollectMetric(metric Metric) error {
// Process metric
processed := mc.handlers[metric.Type].Process(metric)
// Publish to message queue for other services
return mc.kafka.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &metric.Type,
Partition: kafka.PartitionAny,
},
Value: processed.ToJSON(),
}, nil)
}
# Data Processor Service (Python)
import asyncio
from kafka import KafkaConsumer
class DataProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'metrics-topic',
bootstrap_servers=['kafka:9092'],
group_id='data-processors'
)
async def process_metrics(self):
for message in self.consumer:
metric = Metric.from_json(message.value)
# Process and aggregate
aggregated = await self.aggregate_metric(metric)
# Store in time-series database
await self.store_metric(aggregated)
# Trigger alerts if needed
await self.check_alerts(aggregated)
Monolithic Implementation:
# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
class AnalyticsDashboard:
def __init__(self):
self.metrics = metrics.MetricsModule()
self.processor = processing.ProcessingModule()
self.api = api.APIModule()
self.notifications = notifications.NotificationModule()
def handle_metric(self, raw_metric):
# All in same process - no network calls
metric = self.metrics.parse(raw_metric)
processed = self.processor.aggregate(metric)
# Check for alerts
if self.processor.check_thresholds(processed):
self.notifications.send_alert(processed)
return processed
# Single deployment with clear module boundaries
# Can be extracted to separate services later
When to Choose Each Approach:
Factor | Monolith | Microservices |
---|---|---|
Team Size | < 10 developers | > 10 developers |
Complexity | Simple-medium | Complex domain |
Scale | < 1M requests/day | > 10M requests/day |
Technology | Single stack preferred | Multiple technologies needed |
Deployment | Weekly/monthly | Multiple times per day |
Data Consistency | Strong consistency needed | Eventual consistency OK |
For Analytics Dashboard Specifically:
- Early Stage: Start with modular monolith
- Growth Stage: Extract high-scale components (metrics collector) first
- Mature Stage: Full microservices with proper DevOps practices
27. Database Sharding Implementationโ
Strong Answer: Library Analogy Explanation:
๐ Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout
๐๐๐ Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z
Each library (shard) operates independently but part of same system
Sharding Strategy for Analytics Dashboard:
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List
class DatabaseSharding:
def __init__(self):
self.shards = {
'shard_americas': {
'host': 'db-americas.example.com',
'regions': ['us', 'ca', 'mx', 'br'],
'connection': self.create_connection('db-americas')
},
'shard_europe': {
'host': 'db-europe.example.com',
'regions': ['uk', 'de', 'fr', 'es'],
'connection': self.create_connection('db-europe')
},
'shard_asia': {
'host': 'db-asia.example.com',
'regions': ['jp', 'sg', 'au', 'in'],
'connection': self.create_connection('db-asia')
}
}
# Time-based sharding for metrics
self.time_shards = {
'metrics_current': 'Last 7 days - hot data',
'metrics_recent': 'Last 30 days - warm data',
'metrics_archive': 'Older than 30 days - cold data'
}
def get_user_shard(self, user_id: str) -> str:
"""Determine shard based on user ID hash"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_value % len(self.shards)
return list(self.shards.keys())[shard_index]
def get_region_shard(self, region: str) -> str:
"""Determine shard based on geographical region"""
for shard_name, shard_info in self.shards.items():
if region.lower() in shard_info['regions']:
return shard_name
return 'shard_americas' # Default fallback
def get_time_shard(self, timestamp: datetime.datetime) -> str:
"""Determine shard based on data age"""
now = datetime.datetime.now()
age = now - timestamp
if age.days <= 7:
return 'metrics_current'
elif age.days <= 30:
return 'metrics_recent'
else:
return 'metrics_archive'
def route_query(self, query_type: str, **kwargs):
"""Route queries to appropriate shard"""
if query_type == 'user_orders':
shard = self.get_user_shard(kwargs['user_id'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'regional_metrics':
shard = self.get_region_shard(kwargs['region'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'historical_data':
# Query multiple time shards and aggregate
return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])
elif query_type == 'cross_shard_analytics':
# Fan-out query to all shards
return self.fan_out_query(query_type, **kwargs)
def query_time_shards(self, start_date, end_date):
"""Query across time-based shards"""
results = []
for shard_name in self.time_shards.keys():
try:
shard_result = self.execute_query(shard_name, 'time_range_query',
start_date=start_date, end_date=end_date)
results.extend(shard_result)
except ShardUnavailableError:
# Handle shard failures gracefully
self.log_shard_failure(shard_name)
continue
return self.aggregate_results(results)
def fan_out_query(self, query_type: str, **kwargs):
"""Execute query across all shards and aggregate results"""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
# Submit queries to all shards concurrently
future_to_shard = {
executor.submit(self.execute_query, shard_name, query_type, **kwargs): shard_name
for shard_name in self.shards.keys()
}
for future in concurrent.futures.as_completed(future_to_shard):
shard_name = future_to_shard[future]
try:
result = future.result(timeout=30) # 30 second timeout
results[shard_name] = result
except Exception as e:
self.log_query_failure(shard_name, e)
results[shard_name] = None
return self.aggregate_cross_shard_results(results)
# Shard-aware query examples
class ShardedAnalyticsQueries:
def __init__(self, sharding: DatabaseSharding):
self.sharding = sharding
def get_user_orders(self, user_id: str):
"""Get orders for specific user"""
return self.sharding.route_query('user_orders', user_id=user_id)
def get_regional_sales(self, region: str, date_range: tuple):
"""Get sales data for specific region"""
return self.sharding.route_query('regional_metrics',
region=region,
start_date=date_range[0],
end_date=date_range[1])
def get_global_metrics(self, metric_type: str):
"""Get global metrics across all shards"""
return self.sharding.route_query('cross_shard_analytics',
metric_type=metric_type)
def get_historical_trends(self, days_back: int):
"""Get historical data across time shards"""
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)
return self.sharding.query_time_shards(start_date, end_date)
Sharding Implementation with PostgreSQL:
-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 2: Europe
CREATE TABLE orders_europe (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 3: Asia
CREATE TABLE orders_asia (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_europe
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');
CREATE USER MAPPING FOR postgres
SERVER shard_europe
OPTIONS (user 'analytics_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
id UUID,
user_id UUID,
region VARCHAR(2),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');
-- View for cross-shard queries
CREATE VIEW orders_global AS
SELECT 'americas' as shard, * FROM orders_americas
UNION ALL
SELECT 'europe' as shard, * FROM orders_europe_remote
UNION ALL
SELECT 'asia' as shard, * FROM orders_asia_remote;
Sharding Middleware in Application:
# Application-level sharding middleware
class ShardingMiddleware:
def __init__(self, app):
self.app = app
self.sharding = DatabaseSharding()
def __call__(self, environ, start_response):
# Extract sharding context from request
request = Request(environ)
# Determine shard based on request
if 'user_id' in request.args:
shard = self.sharding.get_user_shard(request.args['user_id'])
environ['DATABASE_SHARD'] = shard
elif 'region' in request.args:
shard = self.sharding.get_region_shard(request.args['region'])
environ['DATABASE_SHARD'] = shard
else:
# Cross-shard query required
environ['DATABASE_SHARD'] = 'cross_shard'
return self.app(environ, start_response)
# Flask route with shard awareness
@app.route('/api/user/<user_id>/orders')
def get_user_orders(user_id):
shard = request.environ.get('DATABASE_SHARD')
if shard == 'cross_shard':
# This shouldn't happen for user-specific queries
abort(400, "Invalid request - user_id required")
# Use shard-specific connection
db = get_shard_connection(shard)
orders = db.execute(
"SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC",
(user_id,)
)
return jsonify(orders)
@app.route('/api/analytics/global')
def get_global_analytics():
# Cross-shard query - aggregate from all shards
sharding = DatabaseSharding()
results = sharding.fan_out_query('global_analytics')
return jsonify({
'total_orders': sum(r['order_count'] for r in results.values()),
'total_revenue': sum(r['revenue'] for r in results.values()),
'by_region': results
})
Handling Shard Failures and Rebalancing:
class ShardManager:
def __init__(self):
self.shards = DatabaseSharding().shards
self.health_check_interval = 30 # seconds
def monitor_shard_health(self):
"""Continuously monitor shard health"""
while True:
for shard_name, shard_info in self.shards.items():
try:
# Simple health check query
conn = shard_info['connection']
conn.execute("SELECT 1")
self.mark_shard_healthy(shard_name)
except Exception as e:
self.mark_shard_unhealthy(shard_name, e)
self.trigger_failover(shard_name)
time.sleep(self.health_check_interval)
def trigger_failover(self, failed_shard: str):
"""Handle shard failover"""
# Redirect traffic to healthy shards
self.redistribute_load(failed_shard)
# Alert operations team
self.send_alert(f"Shard {failed_shard} is unhealthy")
# Attempt automated recovery
self.attempt_shard_recovery(failed_shard)
def rebalance_shards(self, new_shard_config: Dict):
"""Rebalance data across shards"""
# This is a complex operation requiring:
# 1. Data migration planning
# 2. Consistent hashing updates
# 3. Gradual traffic shifting
# 4. Rollback capability
migration_plan = self.create_migration_plan(new_shard_config)
for step in migration_plan:
self.execute_migration_step(step)
self.verify_migration_step(step)
self.update_shard_routing(new_shard_config)
Benefits and Challenges:
Benefits:
- Horizontal Scalability: Add more shards as data grows
- Performance: Smaller datasets per shard = faster queries
- Isolation: Shard failures don't affect other shards
- Geographic Distribution: Data close to users
Challenges:
- Cross-shard Queries: Complex and slower
- Rebalancing: Moving data between shards is difficult
- Consistency: Transactions across shards are complex
- Operational Complexity: Multiple databases to manage
For Analytics Dashboard:
- User-based sharding: For user-specific dashboards
- Time-based sharding: For historical data (hot/warm/cold)
- Feature-based sharding: Separate shards for different metrics types
Part 5: Behavioral & Collaborationโ
30. Convincing Developers - Code Reliability Changesโ
Strong Answer: Situation: Our payment processing service was experiencing intermittent failures during peak traffic, causing revenue loss. The development team had implemented a quick fix that worked locally but didn't address the underlying concurrency issues.
Approach - Data-Driven Persuasion:
1. Quantified the Business Impact
# I created a dashboard showing the real cost
class ReliabilityImpactAnalysis:
def calculate_revenue_impact(self):
return {
"failed_transactions_per_hour": 150,
"average_transaction_value": 85.50,
"revenue_loss_per_hour": 150 * 85.50, # $12,825
"monthly_projected_loss": 12825 * 24 * 30, # $9.23M
"customer_churn_risk": "23 angry customer emails in 2 days"
}
2. Made It Personal and Collaborative Instead of saying "your code is wrong," I said:
- "I found some interesting patterns in our production data that might help us improve performance"
- "What do you think about these metrics? I'm curious about your thoughts on the concurrency patterns"
- "Could we pair program on this? I'd love to understand your approach better"
3. Proposed Solutions, Not Just Problems I came with a working prototype:
# Before (their approach)
def process_payment(payment_data):
global payment_queue
payment_queue.append(payment_data) # Race condition!
return process_queue()
# After (my suggested approach)
import threading
from queue import Queue
class ThreadSafePaymentProcessor:
def __init__(self):
self.payment_queue = Queue()
self.lock = threading.Lock()
def process_payment(self, payment_data):
with self.lock:
# Thread-safe processing
return self.safe_process(payment_data)
4. Used Their Language and Priorities
- Framed it as a "performance optimization" rather than "fixing bugs"
- Showed how it would reduce their on-call burden: "No more 3 AM pages about payment failures"
- Highlighted career benefits: "This would be a great story for your next performance review"
Result: They not only adopted the changes but became advocates for reliability practices. The lead developer started attending SRE meetings and later implemented circuit breakers proactively.
Key Lessons:
- Data beats opinions - metrics are harder to argue with
- Collaboration over confrontation - "How can we solve this together?"
- Show, don't just tell - working code examples are persuasive
- Align with their incentives - make reliability their win, not your win
31. Trade-off Between Reliability and Feature Deliveryโ
Strong Answer: Situation: During a major product launch, we were at 97% availability (below our 99.5% SLO), but the product team wanted to deploy a new feature that would drive user adoption for the launch.
The Dilemma:
- Product pressure: "This feature will increase user engagement by 40%"
- Reliability concern: Error budget was nearly exhausted
- Timeline: Launch was in 3 days, couldn't delay
My Decision Process:
1. Quantified Both Sides
# Business impact calculation
launch_impact = {
"projected_new_users": 50000,
"revenue_per_user": 25,
"total_revenue_opportunity": 1.25e6, # $1.25M
"competitive_advantage": "First-mover in market segment"
}
reliability_risk = {
"current_error_budget_used": 0.85, # 85% of monthly budget
"remaining_budget": 0.15,
"days_remaining_in_month": 8,
"projected_overage": 0.3, # 30% over budget
"customer_impact": "Potential service degradation"
}
2. Created a Risk-Mitigation Plan Instead of a binary yes/no, I proposed a conditional approach:
# Feature deployment plan with guardrails
deployment_strategy:
phase_1:
rollout: 5% of users
duration: 4 hours
success_criteria:
- error_rate < 0.1%
- p99_latency < 200ms
- no_critical_alerts
phase_2:
rollout: 25% of users
duration: 12 hours
automatic_rollback: true
conditions:
- error_rate > 0.2% for 5 minutes
- p99_latency > 500ms for 10 minutes
phase_3:
rollout: 100% of users
requires: manual_approval_after_phase_2
3. Communicated Trade-offs Transparently I presented to stakeholders:
"We can launch this feature, but here's what it means:
- Upside: $1.25M revenue opportunity, competitive advantage
- Downside: 30% chance of service degradation affecting existing users
- Mitigation: Feature flags for instant rollback, enhanced monitoring
- Commitment: If reliability suffers, we pause new features until we're back on track"
4. The Decision and Implementation We proceeded with the phased rollout:
class FeatureLaunchManager:
def __init__(self):
self.error_budget_monitor = ErrorBudgetMonitor()
self.feature_flag = FeatureFlag("new_user_onboarding")
def monitor_launch_health(self):
while self.feature_flag.enabled:
current_error_rate = self.get_error_rate()
budget_status = self.error_budget_monitor.get_status()
if budget_status.will_exceed_monthly_budget():
self.trigger_rollback("Error budget exceeded")
break
if current_error_rate > 0.002: # 0.2%
self.reduce_rollout_percentage()
time.sleep(60) # Check every minute during launch
def trigger_rollback(self, reason):
self.feature_flag.disable()
self.alert_stakeholders(f"Feature rolled back: {reason}")
self.schedule_post_mortem()
The Outcome:
- Feature launched successfully to 25% of users
- Error rate increased slightly but stayed within acceptable bounds
- Revenue target was hit with partial rollout
- We didn't exceed error budget
- Built trust with product team by delivering on promises
Key Principles I Used:
- Transparency: Show the math, don't hide trade-offs
- Risk mitigation: Find ways to reduce downside while preserving upside
- Stakeholder alignment: Make everyone accountable for the decision
- Data-driven decisions: Use metrics, not emotions
- Learning mindset: Treat it as an experiment with clear success/failure criteria
Follow-up Actions:
- Conducted a post-launch review
- Used learnings to improve our launch process
- Created better error budget forecasting tools
- Established clearer guidelines for future trade-off decisions
32. Staying Current with SRE Practices and Technologiesโ
Strong Answer: My Learning Strategy - Multi-layered Approach:
1. Technical Deep Dives
# I maintain a personal learning dashboard
learning_tracker = {
"current_focus": [
"eBPF for system observability",
"Kubernetes operators for automation",
"AI/ML for incident prediction"
],
"weekly_commitments": {
"reading": "2 hours of technical papers",
"hands_on": "4 hours lab/experimentation",
"community": "1 hour in SRE forums/Slack"
},
"monthly_goals": [
"Complete one new certification",
"Contribute to one open source project",
"Write one technical blog post"
]
}
2. Resource Mix - Quality over Quantity
Daily (30 minutes morning routine):
- SRE Weekly Newsletter - concise industry updates
- Hacker News - scan for infrastructure/reliability topics
- Internal Slack channels - #sre-learning, #incidents-learned
Weekly (2-3 hours):
- Google SRE Book Club - our team works through chapters together
- Kubernetes documentation - staying current with new features
- Conference talk videos - KubeCon, SREcon, Velocity recordings
Monthly Deep Dives:
- Academic papers - especially from USENIX, SOSP, OSDI conferences
- Vendor whitepapers - but with healthy skepticism
- Open source project exploration - contribute small patches to learn codebases
3. Hands-on Learning Lab
# Home lab setup for experimentation
homelab_projects:
current_experiments:
- name: "eBPF monitoring tools"
status: "Building custom metrics collector"
learning: "Kernel-level observability"
- name: "Chaos engineering with Litmus"
status: "Testing failure scenarios"
learning: "Resilience patterns"
- name: "Service mesh evaluation"
status: "Comparing Istio vs Linkerd"
learning: "Traffic management at scale"
infrastructure:
platform: "Kubernetes cluster on Raspberry Pi"
monitoring: "Prometheus + Grafana + Jaeger"
ci_cd: "GitLab CI with ArgoCD"
cost: "$200/month AWS credits for cloud integration"
4. Community Engagement
- SRE Discord/Slack communities - daily participation
- Local meetups - monthly CNCF and DevOps meetups
- Conference speaking - submitted 3 talks this year on incident response
- Mentoring - guide 2 junior engineers, which forces me to stay sharp
- Open source contributions - maintain a small monitoring tool, contribute to Prometheus
5. Learning from Failures - Internal and External
class IncidentLearningTracker:
def analyze_industry_incidents(self):
"""Study major outages for lessons"""
recent_studies = [
{
"incident": "Facebook Oct 2021 BGP outage",
"lessons": ["Single points of failure in DNS", "Recovery complexity"],
"applied_locally": "Implemented secondary DNS provider"
},
{
"incident": "AWS us-east-1 Dec 2021",
"lessons": ["Multi-region dependencies", "Circuit breaker importance"],
"applied_locally": "Added cross-region failover testing"
}
]
return recent_studies
def internal_learning(self):
"""Extract patterns from our own incidents"""
return {
"quarterly_review": "What patterns are emerging?",
"cross_team_sharing": "Monthly incident learnings presentation",
"runbook_updates": "Continuously improve based on real scenarios"
}
6. Structured Learning Paths
- Currently pursuing: CKS (Certified Kubernetes Security Specialist)
- Completed this year: AWS Solutions Architect Pro, CKAD
- Next up: HashiCorp Terraform Associate
- Long-term goal: Google Cloud Professional Cloud Architect
7. Teaching and Knowledge Sharing
# My knowledge sharing activities
## Internal (at work):
- Monthly "SRE Patterns" lunch & learn sessions
- Incident post-mortem facilitation
- New hire onboarding for SRE practices
- Internal blog posts on "what I learned this week"
## External:
- Technical blog: medium.com/@myusername
- Conference talks: submitted to SREcon, KubeCon
- Open source: maintainer of small monitoring tool
- Mentoring: 2 junior engineers, 1 career switcher
8. Staying Ahead of Trends I try to identify emerging patterns early:
Current attention areas:
- Platform Engineering - evolution beyond traditional SRE
- FinOps - cost optimization becoming critical
- AI/ML for Operations - automated incident response
- WebAssembly - potential impact on deployment patterns
- Sustainability - green computing in infrastructure
My evaluation framework:
- Signal vs noise: Is this solving real problems or just hype?
- Adoption timeline: When will this be production-ready?
- Investment level: Should I learn basics now or wait?
- Career relevance: How does this align with my growth goals?
Key Success Factors:
- Consistency over intensity - 30 minutes daily beats 8 hours monthly
- Applied learning - immediately try new concepts in lab/work
- Community connection - learning with others accelerates understanding
- Teaching others - best way to solidify knowledge
- Balance breadth and depth - stay broad but go deep on core areas
Resources I highly recommend:
- Books: "Observability Engineering", "Learning eBPF", "Kubernetes Patterns"
- Podcasts: "Software Engineering Radio", "The Cloudcast"
- Newsletters: "SRE Weekly", "DevOps'ish", "The New Stack"
- Communities: SRE Slack, r/sre, CNCF Slack channels
This approach has helped me stay current while avoiding information overload. The key is finding sustainable habits that fit into daily work rather than treating learning as separate from doing.
Summaryโ
This comprehensive answer guide covers all aspects of SRE engineering, from fundamental concepts to complex system design challenges. The answers demonstrate:
- Technical depth in SRE practices, monitoring, and system design
- Practical experience with real-world scenarios and solutions
- Problem-solving approach that balances theory with pragmatic implementation
- Communication skills for explaining complex concepts clearly
- Leadership qualities in handling incidents and team collaboration
The guide provides both the technical knowledge and the soft skills necessary for a senior SRE role, with specific examples and code implementations that candidates can adapt to their own experiences.# SRE Interview Questions - Comprehensive Answer Guide
Part 1: SRE Fundamentals & Practicesโ
1. What is the difference between SRE and traditional operations, and how do you balance reliability with feature velocity?โ
Strong Answer: SRE differs from traditional ops in several key ways:
- Proactive vs Reactive: SRE focuses on preventing issues through engineering rather than just responding to them
- Error Budgets: We quantify acceptable unreliability, allowing teams to move fast while maintaining reliability targets
- Automation: SRE emphasizes eliminating toil through automation and self-healing systems
- Shared Ownership: Development and operations work together using the same tools and metrics
Balancing reliability with velocity:
- Set clear SLIs/SLOs with stakeholders (e.g., 99.9% uptime = 43 minutes downtime/month)
- Use error budgets as a shared currency - if we're within budget, dev teams can deploy faster
- When error budget is exhausted, focus shifts to reliability work
- Implement gradual rollouts and feature flags to reduce blast radius
Follow-up - Implementing error budgets with resistant teams:
- Start with education - show how error budgets enable faster delivery
- Use concrete examples of downtime costs vs delayed features
- Begin with lenient budgets and tighten over time
- Make error budget status visible in dashboards and planning meetings
2. Explain the four golden signals of monitoring. How would you implement alerting around these for a Python microservice?โ
Strong Answer: The four golden signals are:
- Latency: Time to process requests
- Traffic: Demand on your system (requests/second)
- Errors: Rate of failed requests
- Saturation: How "full" your service is (CPU, memory, I/O)
Implementation for Python microservice:
# Using Prometheus with Flask
from flask import Flask, request
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
import psutil
app = Flask(__name__)
# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency')
ERROR_RATE = Counter('http_errors_total', 'Total HTTP errors', ['status'])
CPU_USAGE = Gauge('cpu_usage_percent', 'CPU usage percentage')
@app.before_request
def before_request():
request.start_time = time.time()
@app.after_request
def after_request(response):
latency = time.time() - request.start_time
REQUEST_LATENCY.observe(latency)
REQUEST_COUNT.labels(request.method, request.endpoint, response.status_code).inc()
if response.status_code >= 400:
ERROR_RATE.labels(response.status_code).inc()
CPU_USAGE.set(psutil.cpu_percent())
return response
Alerting Rules:
- Latency: Alert if p99 > 500ms for 5 minutes
- Traffic: Alert on 50% increase/decrease from baseline
- Errors: Alert if error rate > 1% for 2 minutes
- Saturation: Alert if CPU > 80% or Memory > 85% for 10 minutes
3. Walk me through how you would conduct a post-mortem for a production incident.โ
Strong Answer: Timeline:
- Immediate: Focus on resolution, collect logs/metrics during incident
- Within 24-48 hours: Conduct post-mortem meeting
- Within 1 week: Publish written post-mortem and track action items
Post-mortem Process:
- Timeline Construction: Build detailed timeline with all events, decisions, and communications
- Root Cause Analysis: Use techniques like "5 Whys" or Fishbone diagrams
- Impact Assessment: Quantify user impact, revenue loss, SLO burn
- Action Items: Focus on systemic fixes, not individual blame
- Follow-up: Track action items to completion
Good Post-mortem Characteristics:
- Blameless culture - focus on systems, not individuals
- Detailed timeline with timestamps
- Clear root cause analysis
- Actionable remediation items with owners and deadlines
- Written in accessible language for all stakeholders
- Includes what went well (not just failures)
Psychological Safety:
- Use "the system allowed..." instead of "person X did..."
- Ask "how can we make this impossible to happen again?"
- Celebrate people who surface problems early
- Make post-mortems learning opportunities, not punishment
4. You notice your application's 99th percentile latency has increased by 50ms over the past week, but the average latency remains the same. How would you investigate this?โ
Strong Answer: This suggests a long tail problem - most requests are fine, but some are much slower.
Investigation Steps:
- Check Request Distribution: Look at latency histograms - are we seeing bimodal distribution?
- Analyze Traffic Patterns: Has the mix of request types changed? Are we getting more complex queries?
- Database Performance: Check for slow queries, table locks, or index problems
- Resource Saturation: Look for memory pressure, GC pauses, or I/O bottlenecks during peak times
- Dependency Analysis: Check latency of downstream services - could be cascading slow responses
- Code Changes: Review recent deployments for inefficient algorithms or new features
Specific Checks:
- Database slow query logs
- Application profiling data
- Memory usage patterns and GC metrics
- Thread pool utilization
- External API response times
- Distributed tracing for slow requests
Tools: Use APM tools like New Relic, DataDog, or distributed tracing with Jaeger/Zipkin to identify bottlenecks.
5. Design a monitoring strategy for a Go-based API that processes financial transactions.โ
Strong Answer: Business Metrics:
- Transaction volume and value per minute
- Success rate by transaction type
- Time to settlement
- Regulatory compliance metrics (PCI DSS)
Technical Metrics:
// Key metrics to track
var (
transactionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "transactions_total"},
[]string{"type", "status", "payment_method"})
transactionLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: "transaction_duration_seconds"},
[]string{"type"})
queueDepth = prometheus.NewGauge(
prometheus.GaugeOpts{Name: "transaction_queue_depth"})
dbConnectionPool = prometheus.NewGauge(
prometheus.GaugeOpts{Name: "db_connections_active"})
)
Logging Strategy:
- Structured logging with correlation IDs
- Log all transaction state changes
- Security events (failed auth, suspicious patterns)
- Audit trail for compliance
Alerting:
- Transaction failure rate > 0.1%
- Processing latency > 2 seconds
- Queue depth > 1000 items
- Database connection pool > 80% utilization
- Any security-related events
Compliance Considerations:
- PII data must be masked in logs
- Audit logs with tamper-proof storage
- Real-time fraud detection alerts
6. How would you implement distributed tracing across a system with Python backend services and a React frontend?โ
Strong Answer: Architecture:
- Use OpenTelemetry standard for vendor-neutral tracing
- Jaeger or Zipkin as tracing backend
- Trace context propagation across service boundaries
Backend Implementation (Python):
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Auto-instrument Flask and requests
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
@app.route('/api/user/<user_id>')
def get_user(user_id):
with tracer.start_as_current_span("get_user") as span:
span.set_attribute("user.id", user_id)
# Service logic here
return response
Frontend Implementation (React):
import { WebTracerProvider } from "@opentelemetry/web";
import { getWebAutoInstrumentations } from "@opentelemetry/auto-instrumentations-web";
const provider = new WebTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(new JaegerExporter()));
// Auto-instrument fetch, XMLHttpRequest
registerInstrumentations({
instrumentations: [getWebAutoInstrumentations()],
});
Key Implementation Points:
- Correlation IDs: Pass trace context in HTTP headers
- Sampling: Use probabilistic sampling (1-10%) to reduce overhead
- Service Map: Visualize service dependencies
- Performance Analysis: Identify bottlenecks across service boundaries
Part 2: Software Engineering & Developmentโ
7. Code Review Scenario: Memory leak optimizationโ
Strong Answer: Problems with the original code:
- Loads entire dataset into memory before processing
- No streaming or chunked processing
- Memory usage grows linearly with file size
Optimized version:
def process_large_dataset(file_path, chunk_size=1000):
"""Process large dataset in chunks to manage memory usage."""
results = []
with open(file_path, 'r') as f:
chunk = []
for line in f:
chunk.append(line.strip())
if len(chunk) >= chunk_size:
# Process chunk and yield results
processed_chunk = [expensive_processing(item) for item in chunk]
partial_result = analyze_data(processed_chunk)
results.append(partial_result)
# Clear chunk to free memory
chunk.clear()
# Process remaining items
if chunk:
processed_chunk = [expensive_processing(item) for item in chunk]
partial_result = analyze_data(processed_chunk)
results.append(partial_result)
return combine_results(results)
# Even better - use generator for streaming
def process_large_dataset_streaming(file_path):
"""Stream processing for minimal memory footprint."""
with open(file_path, 'r') as f:
for line in f:
yield expensive_processing(line.strip())
# Usage
def analyze_streaming_data(file_path):
processed_items = process_large_dataset_streaming(file_path)
return analyze_data_streaming(processed_items)
Additional Optimizations:
- Use
mmap
for very large files - Implement backpressure if processing can't keep up
- Add memory monitoring and circuit breakers
- Consider using
asyncio
for I/O-bound operations
8. In Go, explain the difference between buffered and unbuffered channels.โ
Strong Answer: Unbuffered Channels:
- Synchronous communication - sender blocks until receiver reads
- Zero capacity - no internal storage
- Guarantees handoff between goroutines
ch := make(chan int) // unbuffered
go func() {
ch <- 42 // blocks until someone reads
}()
value := <-ch // blocks until someone sends
Buffered Channels:
- Asynchronous communication up to buffer size
- Sender only blocks when buffer is full
- Receiver only blocks when buffer is empty
ch := make(chan int, 3) // buffered with capacity 3
ch <- 1 // doesn't block
ch <- 2 // doesn't block
ch <- 3 // doesn't block
ch <- 4 // blocks - buffer full
When to use in high-throughput systems:
Unbuffered for:
- Strict synchronization requirements
- Request-response patterns
- When you need guaranteed delivery confirmation
- Worker pools where you want backpressure
Buffered for:
- Producer-consumer with different rates
- Batching operations
- Reducing contention in high-throughput scenarios
- Event streaming where some loss is acceptable
Example - High-throughput log processor:
// Buffered for log ingestion
logChan := make(chan LogEntry, 10000)
// Unbuffered for critical operations
errorChan := make(chan error)
func logProcessor() {
for {
select {
case log := <-logChan:
processLog(log)
case err := <-errorChan:
handleCriticalError(err) // immediate attention
}
}
}
9. React Performance: Optimize dashboard with real-time metricsโ
Strong Answer: Problems with frequent re-renders:
- All components re-render when any metric updates
- Expensive calculations on every render
- DOM thrashing from rapid updates
Optimization Strategy:
import React, { memo, useMemo, useCallback, useRef } from "react";
import { useVirtualizer } from "@tanstack/react-virtual";
// 1. Memoize metric components
const MetricCard = memo(({ metric, value, threshold }) => {
// Only re-render when props actually change
const status = useMemo(
() => (value > threshold ? "critical" : "normal"),
[value, threshold]
);
return (
<div className={`metric-card ${status}`}>
<h3>{metric}</h3>
<span>{value}</span>
</div>
);
});
// 2. Virtualize large lists
const MetricsList = ({ metrics }) => {
const parentRef = useRef();
const virtualizer = useVirtualizer({
count: metrics.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 100,
});
return (
<div ref={parentRef} style={{ height: "400px", overflow: "auto" }}>
{virtualizer.getVirtualItems().map((virtualRow) => (
<MetricCard key={virtualRow.key} {...metrics[virtualRow.index]} />
))}
</div>
);
};
// 3. Debounce updates and batch state changes
const Dashboard = () => {
const [metrics, setMetrics] = useState({});
const updateQueue = useRef(new Map());
const flushTimeout = useRef();
const queueUpdate = useCallback((serviceName, newMetrics) => {
updateQueue.current.set(serviceName, newMetrics);
// Debounce updates - batch multiple rapid changes
clearTimeout(flushTimeout.current);
flushTimeout.current = setTimeout(() => {
setMetrics((prev) => {
const updates = Object.fromEntries(updateQueue.current);
updateQueue.current.clear();
return { ...prev, ...updates };
});
}, 100); // 100ms debounce
}, []);
// 4. Use React.startTransition for non-critical updates
const handleMetricUpdate = useCallback(
(data) => {
if (data.priority === "high") {
queueUpdate(data.service, data.metrics);
} else {
startTransition(() => {
queueUpdate(data.service, data.metrics);
});
}
},
[queueUpdate]
);
return <MetricsList metrics={Object.values(metrics)} />;
};
Additional Optimizations:
- WebSocket connection pooling - single connection for all metrics
- Data normalization - structure data to minimize re-renders
- Service Worker - offload heavy calculations
- Canvas/WebGL - for complex visualizations instead of DOM updates
10. Design a CI/CD pipeline for a multi-service applicationโ
Strong Answer: Pipeline Architecture:
# .github/workflows/main.yml
name: Multi-Service CI/CD
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
detect-changes:
runs-on: ubuntu-latest
outputs:
python-api: ${{ steps.changes.outputs.python-api }}
go-workers: ${{ steps.changes.outputs.go-workers }}
react-frontend: ${{ steps.changes.outputs.react-frontend }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
python-api:
- 'services/api/**'
go-workers:
- 'services/workers/**'
react-frontend:
- 'frontend/**'
test-python:
needs: detect-changes
if: needs.detect-changes.outputs.python-api == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run Python tests
run: |
cd services/api
pip install -r requirements.txt
pytest --cov=. --cov-report=xml
flake8 .
mypy .
test-go:
needs: detect-changes
if: needs.detect-changes.outputs.go-workers == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: "1.21"
- name: Run Go tests
run: |
cd services/workers
go test -race -coverprofile=coverage.out ./...
go vet ./...
staticcheck ./...
deploy:
needs: [test-python, test-go, test-react]
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy with blue-green
run: |
# Database migration strategy
kubectl apply -f k8s/migration-job.yaml
kubectl wait --for=condition=complete job/db-migration
# Deploy new version to green environment
helm upgrade app-green ./helm-chart \
--set image.tag=${{ github.sha }} \
--set environment=green
# Health check green environment
./scripts/health-check.sh green
# Switch traffic to green
kubectl patch service app-service -p \
'{"spec":{"selector":{"version":"green"}}}'
# Verify traffic switch
./scripts/verify-deployment.sh
# Clean up blue environment
helm uninstall app-blue
Database Migration Strategy:
#!/bin/bash
# scripts/db-migration.sh
# Create backup
pg_dump $DATABASE_URL > backup-$(date +%Y%m%d-%H%M%S).sql
# Run migrations in transaction
psql $DATABASE_URL << EOF
BEGIN;
-- Run all pending migrations
\i migrations/001_add_user_table.sql
\i migrations/002_add_index.sql
-- Verify migration success
SELECT count(*) FROM schema_migrations;
COMMIT;
EOF
# Test rollback capability
if [ "$1" == "test-rollback" ]; then
psql $DATABASE_URL < backup-latest.sql
fi
Rollback Strategy:
- Keep previous 3 versions in registry
- Database rollback scripts for each migration
- Feature flags to disable new features quickly
- Automated rollback triggers on error rate increase
11. How would you implement blue-green deployments for a stateful service with a database?โ
Strong Answer: Challenges with Stateful Services:
- Database schema changes
- Data consistency during switch
- Connection management
- State synchronization
Implementation Strategy:
# Blue-Green with Database
apiVersion: v1
kind: Service
metadata:
name: app-active
spec:
selector:
app: myapp
version: blue # Will switch to green
ports:
- port: 80
---
# Database proxy for connection management
apiVersion: apps/v1
kind: Deployment
metadata:
name: db-proxy
spec:
template:
spec:
containers:
- name: pgbouncer
image: pgbouncer/pgbouncer
env:
- name: DATABASES_HOST
value: "postgres-primary"
- name: POOL_MODE
value: "transaction" # Allow connection switching
Deployment Process:
-
Pre-deployment:
- Run backward-compatible schema migrations
- Ensure both versions can operate with new schema
-
Green Deployment:
- Deploy green version with same database
- Warm up green instances (cache, connections)
- Run health checks
-
Traffic Switch:
- Update service selector to point to green
- Monitor metrics for 10-15 minutes
- Keep blue running for quick rollback
-
Post-deployment:
- Run cleanup migrations (remove old columns)
- Terminate blue environment
Database Migration Strategy:
-- Phase 1: Additive changes (safe for both versions)
ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE;
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
-- Phase 2: After green is stable, remove old columns
-- ALTER TABLE users DROP COLUMN old_email_field;
Rollback Plan:
- Revert service selector to blue
- Emergency database rollback scripts
- Circuit breaker to stop problematic requests
Part 3: System Design Deep Diveโ
12. Requirements Gathering Questionsโ
Strong Answer: Functional Requirements:
- What specific metrics need to be displayed? (orders/minute, revenue, concurrent users)
- How real-time? (sub-second, few seconds, minute-level updates)
- What user roles need access? (executives, ops teams, developers)
- What actions can users take? (view-only, alerts, drill-down)
- Geographic distribution of users?
Non-Functional Requirements:
- Scale: How many concurrent dashboard users? (100s, 1000s)
- Data volume: Orders per day? Peak traffic? Data retention period?
- Availability: 99.9% or higher? Maintenance windows?
- Latency: How fast should dashboard updates be?
- Consistency: Can we show slightly stale data? (eventual consistency)
- Security: Authentication, authorization, audit logging?
Technical Constraints:
- Existing infrastructure? (AWS, on-prem, hybrid)
- Integration requirements? (existing systems, APIs)
- Compliance requirements? (SOX, PCI DSS)
- Budget constraints?
13. API Design and Framework Selectionโ
Strong Answer: API Endpoints:
// REST API Design
GET /api/v1/metrics/realtime
{
"active_users": 1250,
"orders_per_minute": 45,
"revenue_per_minute": 12500,
"inventory_alerts": 3,
"system_health": "healthy",
"timestamp": "2025-06-30T10:30:00Z"
}
GET /api/v1/metrics/historical?metric=revenue&period=24h&granularity=1h
{
"metric": "revenue",
"data": [
{"timestamp": "2025-06-30T09:00:00Z", "value": 75000},
{"timestamp": "2025-06-30T10:00:00Z", "value": 82000}
]
}
POST /api/v1/alerts/subscribe
{
"metric": "inventory_level",
"threshold": 100,
"product_id": "12345",
"notification_method": "webhook"
}
Framework Comparison:
REST โ Recommended
- Simple, cacheable
- Wide tooling support
- Good for CRUD operations
- HTTP status codes for errors
WebSockets โ For Real-time Updates
// WebSocket for live updates
const ws = new WebSocket("wss://api.example.com/metrics/stream");
ws.onmessage = (event) => {
const metrics = JSON.parse(event.data);
updateDashboard(metrics);
};
GraphQL โ Not Recommended
- Adds complexity for simple metrics
- Caching more difficult
- Overkill for this use case
Final Architecture:
- REST API for historical data and configuration
- WebSocket for real-time metric streaming
- Server-Sent Events as WebSocket fallback
14. High-Level Architecture Diagramโ
Strong Answer:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ React SPA โ โ Load Balancer โ โ API Gateway โ
โ โโโโโบโ (ALB/NGINX) โโโโโบโ (Kong/Envoy) โ
โ - Dashboard โ โ โ โ - Auth โ
โ - WebSocket โ โ โ โ - Rate Limiting โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโผโโโโโโโโโ โโโโโโโโโโโโโผโโโโโโโโโ โโโโโโโโผโโโโโโโ
โ Metrics API โ โ WebSocket API โ โ Config API โ
โ (Python/Flask) โ โ (Go/Gorilla) โ โ(Python/Fast)โ
โโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโฌโโโโโโโ
โ โ โ
โโโโโโโโโโโผโโโโโโโโโ โโโโโโโโโโโโโผโโโโโโโโโ โโโโโโโโผโโโโโโโ
โ Redis Cache โ โ Message Queue โ โ PostgreSQL โ
โ (Metrics) โ โ (Kafka/Redis) โ โ (Config) โ
โโโ โโโโโโโโฌโโโโโโโโโ โโโโโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโโโโโโ
โ โ
โโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโ
โ Time Series Database โ
โ (InfluxDB/TimescaleDB) โ
โโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโผโโโโโโโโโ
โ ETL Pipeline โ
โ (Apache Beam) โ
โโโโโโโโโโโฌโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโผโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ โโโโโโโผโโโโโโ
โ E-commerce โ โ Inventory โ โ System โ
โ Database โ โ Management โ โ Metrics โ
โ (Orders) โ โ (Stock Levels) โ โ (Health) โ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโ
Data Flow:
- Source Systems โ ETL Pipeline (real-time streaming)
- ETL Pipeline โ Time Series DB (processed metrics)
- Time Series DB โ Redis Cache (frequently accessed data)
- API Services โ Frontend (REST + WebSocket)
- Message Queue โ WebSocket API (real-time updates)
15. Real-time Implementation Approachesโ
Strong Answer: Comparison of Real-time Approaches:
Approach | Pros | Cons | Use Case |
---|---|---|---|
Polling | Simple, stateless | High latency, wasteful | Non-critical updates |
WebSockets | True real-time, bidirectional | Complex, stateful | Live dashboards |
Server-Sent Events | Simpler than WebSocket, auto-reconnect | One-way only | Event streams |
Message Queues | Reliable, scalable | Added complexity | High-volume events |
Recommended Architecture:
// WebSocket implementation in Go
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
subscriptions map[string]bool
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
delete(h.clients, client)
close(client.send)
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
// Kafka consumer for real-time metrics
func consumeMetrics() {
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "dashboard-consumers",
})
consumer.Subscribe("metrics-topic", nil)
for {
msg, _ := consumer.ReadMessage(-1)
// Process metric and broadcast to WebSocket clients
metric := parseMetric(msg.Value)
hub.broadcast <- metric
// Also update Redis cache
updateCache(metric)
}
}
Client-side Connection Management:
class MetricsWebSocket {
constructor(url) {
this.url = url;
this.reconnectInterval = 1000;
this.maxReconnectInterval = 30000;
this.reconnectDecay = 1.5;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("Connected to metrics stream");
this.reconnectInterval = 1000; // Reset backoff
};
this.ws.onmessage = (event) => {
const metrics = JSON.parse(event.data);
this.updateDashboard(metrics);
};
this.ws.onclose = () => {
console.log("Connection lost, reconnecting...");
setTimeout(() => {
this.reconnectInterval = Math.min(
this.reconnectInterval * this.reconnectDecay,
this.maxReconnectInterval
);
this.connect();
}, this.reconnectInterval);
};
}
subscribe(metricType) {
this.ws.send(
JSON.stringify({
action: "subscribe",
metric: metricType,
})
);
}
}
16. Single Points of Failure Analysisโ
Strong Answer: Identified SPOFs and Solutions:
-
Load Balancer SPOF
- Problem: Single ALB failure takes down entire system
- Solution: Multi-AZ deployment with Route 53 health checks
# Terraform for multi-region setup
resource "aws_lb" "primary" {
name = "app-lb-primary"
availability_zones = ["us-east-1a", "us-east-1b"]
}
resource "aws_route53_record" "failover_primary" {
zone_id = aws_route53_zone.main.zone_id
name = "api.example.com"
type = "A"
failover_routing_policy {
type = "PRIMARY"
}
health_check_id = aws_route53_health_check.primary.id
} -
Database SPOF
- Problem: Single database failure
- Solution: Primary-replica setup with automatic failover
# PostgreSQL HA setup
apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
name: postgres-cluster
spec:
instances: 3
primaryUpdateStrategy: unsupervised
postgresql:
parameters:
max_connections: "200"
shared_buffers: "256MB"
backup:
retentionPolicy: "30d"
barmanObjectStore:
destinationPath: "s3://backups/postgres" -
Redis Cache SPOF
- Problem: Cache failure impacts performance
- Solution: Redis Sentinel for HA + graceful degradation
import redis.sentinel
sentinel = redis.sentinel.Sentinel([
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379)
])
def get_metric(key):
try:
master = sentinel.master_for('mymaster', socket_timeout=0.1)
return master.get(key)
except redis.RedisError:
# Graceful degradation - fetch from database
return get_metric_from_db(key) -
Message Queue SPOF
- Problem: Kafka broker failure stops real-time updates
- Solution: Multi-broker Kafka cluster with replication
# Kafka with replication
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
default.replication.factor: 3
min.insync.replicas: 2
17. Caching Implementation Strategyโ
Strong Answer: Multi-Level Caching Strategy:
# 1. Application-level caching
from functools import lru_cache
import redis
import json
class CacheService:
def __init__(self):
self.redis_client = redis.Redis(host='redis-cluster')
self.local_cache = {}
@lru_cache(maxsize=1000)
def get_metric_definition(self, metric_name):
"""Cache metric metadata (rarely changes)"""
return self.fetch_metric_definition(metric_name)
def get_real_time_metric(self, metric_key):
"""Multi-level cache for real-time data"""
# L1: Memory cache (100ms TTL)
if metric_key in self.local_cache:
data, timestamp = self.local_cache[metric_key]
if time.time() - timestamp < 0.1: # 100ms
return data
# L2: Redis cache (5s TTL)
cached = self.redis_client.get(f"metric:{metric_key}")
if cached:
data = json.loads(cached)
self.local_cache[metric_key] = (data, time.time())
return data
# L3: Database fallback
data = self.fetch_from_database(metric_key)
# Cache with appropriate TTL
self.redis_client.setex(
f"metric:{metric_key}",
5, # 5 second TTL
json.dumps(data)
)
self.local_cache[metric_key] = (data, time.time())
return data
2. CDN Caching for Static Assets:
// CloudFront configuration
{
"Origins": [{
"Id": "dashboard-origin",
"DomainName": "dashboard-api.example.com",
"CustomOriginConfig": {
"HTTPPort": 443,
"OriginProtocolPolicy": "https-only"
}
}],
"DefaultCacheBehavior": {
"TargetOriginId": "dashboard-origin",
"ViewerProtocolPolicy": "redirect-to-https",
"CachePolicyId": "custom-cache-policy",
"TTL": {
"DefaultTTL": 300, // 5 minutes for API responses
"MaxTTL": 3600 // 1 hour max
}
},
"CacheBehaviors": [{
"PathPattern": "/static/*",
"TTL": {
"DefaultTTL": 86400, // 24 hours for static assets
"MaxTTL": 31536000 // 1 year max
}
}]
}
3. Database Query Result Caching:
-- Materialized views for expensive aggregations
CREATE MATERIALIZED VIEW hourly_revenue AS
SELECT
date_trunc('hour', order_timestamp) as hour,
SUM(order_total) as revenue,
COUNT(*) as order_count
FROM orders
WHERE order_timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY date_trunc('hour', order_timestamp);
-- Refresh every 5 minutes
CREATE OR REPLACE FUNCTION refresh_hourly_revenue()
RETURNS void AS $
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY hourly_revenue;
END;
$ LANGUAGE plpgsql;
SELECT cron.schedule('refresh-revenue', '*/5 * * * *', 'SELECT refresh_hourly_revenue();');
Cache Invalidation Strategy:
class CacheInvalidator:
def __init__(self):
self.redis_client = redis.Redis()
def invalidate_on_order(self, order_data):
"""Invalidate relevant caches when new order arrives"""
patterns_to_invalidate = [
"metric:orders_per_minute",
"metric:revenue_per_minute",
f"metric:inventory:{order_data['product_id']}"
]
for pattern in patterns_to_invalidate:
# Use Redis pipeline for efficiency
pipe = self.redis_client.pipeline()
pipe.delete(pattern)
pipe.publish('cache_invalidation', pattern)
pipe.execute()
def smart_invalidation(self, event_type, entity_id):
"""Invalidate based on event type"""
invalidation_map = {
'order_created': ['revenue', 'orders', 'inventory'],
'user_login': ['active_users'],
'inventory_update': ['inventory', 'stock_alerts']
}
metrics_to_invalidate = invalidation_map.get(event_type, [])
for metric in metrics_to_invalidate:
self.redis_client.delete(f"metric:{metric}:{entity_id}")
18. Database Design: SQL vs NoSQLโ
Strong Answer: Hybrid Approach - Use Both:
SQL (PostgreSQL) for:
- Transactional Data: Orders, users, inventory
- ACID Requirements: Financial transactions
- Complex Queries: Joins, aggregations
- Data Consistency: Strong consistency needs
-- OLTP Database Schema
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
order_total DECIMAL(10,2) NOT NULL,
order_timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
status VARCHAR(20) NOT NULL DEFAULT 'pending'
);
CREATE INDEX idx_orders_timestamp ON orders(order_timestamp);
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Partitioning for time-series data
CREATE TABLE orders_2025_06 PARTITION OF orders
FOR VALUES FROM ('2025-06-01') TO ('2025-07-01');
NoSQL (InfluxDB) for:
- Time-series Metrics: Performance data, system metrics
- High Write Volume: Thousands of metrics per second
- Retention Policies: Automatic data aging
# InfluxDB for metrics storage
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://influxdb:8086", token="my-token")
write_api = client.write_api(write_options=SYNCHRONOUS)
def write_metric(measurement, tags, fields):
point = Point(measurement) \
.tag("service", tags.get("service")) \
.tag("region", tags.get("region")) \
.field("value", fields["value"]) \
.time(datetime.utcnow(), WritePrecision.S)
write_api.write(bucket="metrics", record=point)
# Example usage
write_metric(
measurement="orders_per_minute",
tags={"service": "ecommerce", "region": "us-east-1"},
fields={"value": 45}
)
Read/Write Load Balancing:
class DatabaseRouter:
def __init__(self):
self.primary_db = PostgreSQLConnection("primary-db")
self.read_replicas = [
PostgreSQLConnection("replica-1"),
PostgreSQLConnection("replica-2"),
PostgreSQLConnection("replica-3")
]
self.current_replica = 0
def get_read_connection(self):
"""Round-robin read replica selection"""
replica = self.read_replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.read_replicas)
return replica
def get_write_connection(self):
"""Always use primary for writes"""
return self.primary_db
def execute_read_query(self, query, params=None):
try:
return self.get_read_connection().execute(query, params)
except Exception:
# Fallback to primary if replica fails
return self.primary_db.execute(query, params)
# Usage
@app.route('/api/metrics/historical')
def get_historical_metrics():
# Use read replica for analytics queries
db = router.get_read_connection()
return db.execute("""
SELECT date_trunc('hour', created_at) as hour,
SUM(total) as revenue
FROM orders
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour
""")
19. Scaling Strategy for 10x Trafficโ
Strong Answer: Scaling Priority Order:
1. Application Tier (Scale First)
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: dashboard-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: dashboard-api
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
2. Database Scaling
# Read replica scaling + connection pooling
class DatabaseScaler:
def __init__(self):
self.connection_pools = {
'primary': create_pool('primary-db', pool_size=20),
'replicas': [
create_pool('replica-1', pool_size=50),
create_pool('replica-2', pool_size=50),
create_pool('replica-3', pool_size=50),
]
}
def scale_read_capacity(self, target_qps):
"""Add more read replicas based on QPS"""
current_capacity = len(self.connection_pools['replicas']) * 1000 # QPS per replica
if target_qps > current_capacity * 0.8: # 80% utilization threshold
# Add new read replica
new_replica = create_read_replica()
self.connection_pools['replicas'].append(
create_pool(new_replica, pool_size=50)
)
def implement_sharding(self):
"""Implement horizontal sharding for orders table"""
shards = {
'shard_1': 'orders_americas',
'shard_2': 'orders_europe',
'shard_3': 'orders_asia'
}
def get_shard(user_id):
return shards[f'shard_{hash(user_id) % 3 + 1}']
3. Cache Scaling
# Redis Cluster for horizontal scaling
import rediscluster
redis_cluster = rediscluster.RedisCluster(
startup_nodes=[
{"host": "redis-1", "port": "7000"},
{"host": "redis-2", "port": "7000"},
{"host": "redis-3", "port": "7000"},
{"host": "redis-4", "port": "7000"},
{"host": "redis-5", "port": "7000"},
{"host": "redis-6", "port": "7000"},
],
decode_responses=True,
skip_full_coverage_check=True
)
# Cache partitioning strategy
def cache_key_partition(metric_type, entity_id):
"""Partition cache keys for better distribution"""
partition = hash(f"{metric_type}:{entity_id}") % 1000
return f"{metric_type}:{partition}:{entity_id}"
4. CDN and Edge Caching
// CloudFlare Workers for edge computing
addEventListener("fetch", (event) => {
event.respondWith(handleRequest(event.request));
});
async function handleRequest(request) {
const url = new URL(request.url);
// Cache API responses at edge
if (url.pathname.startsWith("/api/metrics/")) {
const cacheKey = new Request(url.toString(), request);
const cache = caches.default;
// Check edge cache first
let response = await cache.match(cacheKey);
if (!response) {
// Fetch from origin
response = await fetch(request);
// Cache for 30 seconds
const headers = new Headers(response.headers);
headers.set("Cache-Control", "public, max-age=30");
response = new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: headers,
});
event.waitUntil(cache.put(cacheKey, response.clone()));
}
return response;
}
return fetch(request);
}
5. Message Queue Scaling
# Kafka cluster scaling
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: metrics-kafka
spec:
kafka:
replicas: 9 # Scale from 3 to 9 brokers
config:
num.partitions: 50 # More partitions for parallelism
default.replication.factor: 3
min.insync.replicas: 2
resources:
requests:
memory: 8Gi
cpu: 2000m
limits:
memory: 16Gi
cpu: 4000m
20. Failover Strategy for Database Outageโ
Strong Answer: Multi-Tier Failover Strategy:
# Automatic failover implementation
import time
import threading
from enum import Enum
class DatabaseState(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
FAILED = "failed"
class DatabaseFailoverManager:
def __init__(self):
self.primary_db = "primary-db.example.com"
self.replica_dbs = [
"replica-1.example.com",
"replica-2.example.com",
"replica-3.example.com"
]
self.current_state = DatabaseState.HEALTHY
self.current_primary = self.primary_db
self.health_check_interval = 5 # seconds
def health_check(self, db_host):
"""Check database health"""
try:
conn = psycopg2.connect(
host=db_host,
database="analytics",
user="readonly",
password="password",
connect_timeout=3
)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
return result[0] == 1
except Exception as e:
logger.error(f"Health check failed for {db_host}: {e}")
return False
def promote_replica_to_primary(self, replica_host):
"""Promote replica to primary (manual intervention required)"""
# This would typically involve:
# 1. Stopping replication on chosen replica
# 2. Updating DNS/load balancer to point to new primary
# 3. Updating application config
logger.critical(f"Promoting {replica_host} to primary")
# Update Route 53 record to point to new primary
route53 = boto3.client('route53')
route53.change_resource_record_sets(
HostedZoneId='Z123456789',
ChangeBatch={
'Changes': [{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': 'primary-db.example.com',
'Type': 'CNAME',
'TTL': 60,
'ResourceRecords': [{'Value': replica_host}]
}
}]
}
)
self.current_primary = replica_host
def circuit_breaker_fallback(self):
"""Fallback to cached data when database is unavailable"""
logger.warning("Database unavailable, switching to read-only mode")
# Serve from cache only
app.config['READ_ONLY_MODE'] = True
# Show banner to users
return {
"status": "degraded",
"message": "Real-time data temporarily unavailable",
"data_freshness": "cached_5_minutes_ago"
}
def monitor_and_failover(self):
"""Background thread for monitoring and automatic failover"""
consecutive_failures = 0
while True:
if self.health_check(self.current_primary):
consecutive_failures = 0
self.current_state = DatabaseState.HEALTHY
app.config['READ_ONLY_MODE'] = False
else:
consecutive_failures += 1
logger.warning(f"Primary DB health check failed {consecutive_failures} times")
if consecutive_failures >= 3: # 15 seconds of failures
self.current_state = DatabaseState.FAILED
# Try to find healthy replica
healthy_replica = None
for replica in self.replica_dbs:
if self.health_check(replica):
healthy_replica = replica
break
if healthy_replica:
self.promote_replica_to_primary(healthy_replica)
else:
# All databases failed - enable circuit breaker
self.circuit_breaker_fallback()
time.sleep(self.health_check_interval)
# Start monitoring in background
failover_manager = DatabaseFailoverManager()
monitor_thread = threading.Thread(target=failover_manager.monitor_and_failover)
monitor_thread.daemon = True
monitor_thread.start()
Emergency Procedures:
#!/bin/bash
# emergency-failover.sh
echo "=== EMERGENCY DATABASE FAILOVER ==="
echo "1. Checking primary database status..."
if ! pg_isready -h primary-db.example.com -p 5432; then
echo "โ Primary database is DOWN"
echo "2. Finding healthy replica..."
for replica in replica-1 replica-2 replica-3; do
if pg_isready -h ${replica}.example.com -p 5432; then
echo "โ
Found healthy replica: $replica"
echo "3. Promoting replica to primary..."
# Stop replication
psql -h ${replica}.example.com -c "SELECT pg_promote();"
echo "4. Updating DNS record..."
aws route53 change-resource-record-sets \
--hosted-zone-id Z123456789 \
--change-batch file://failover-dns.json
echo "5. Updating application config..."
kubectl patch configmap app-config \
--patch "{\"data\":{\"DATABASE_HOST\":\"${replica}.example.com\"}}"
echo "6. Restarting application pods..."
kubectl rollout restart deployment/dashboard-api
echo "โ
Failover complete to $replica"
exit 0
fi
done
echo "โ No healthy replicas found - enabling read-only mode"
kubectl patch configmap app-config \
--patch '{"data":{"READ_ONLY_MODE":"true"}}'
else
echo "โ
Primary database is healthy"
fi
21. Circuit Breaker Implementationโ
Strong Answer: Circuit Breaker Pattern for External Services:
import time
import threading
from enum import Enum
from collections import deque
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = threading.Lock()
# Track recent requests for metrics
self.recent_requests = deque(maxlen=100)
def __call__(self, func):
def wrapper(*args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
# Check if timeout period has passed
if time.time() - self.last_failure_time >= self.timeout:
self.state = CircuitState.HALF_OPEN
self.failure_count = 0
else:
# Circuit is open - reject request
raise Exception("Circuit breaker is OPEN - service unavailable")
if self.state == CircuitState.HALF_OPEN:
# Test with single request
try:
result = func(*args, **kwargs)
self.state = CircuitState.CLOSED
self.failure_count = 0
self.record_request(success=True)
return result
except self.expected_exception as e:
self.state = CircuitState.OPEN
self.failure_count += 1
self.last_failure_time = time.time()
self.record_request(success=False)
raise e
# Normal operation (CLOSED state)
try:
result = func(*args, **kwargs)
self.failure_count = 0 # Reset on success
self.record_request(success=True)
return result
except self.expected_exception as e:
self.failure_count += 1
self.record_request(success=False)
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.last_failure_time = time.time()
raise e
return wrapper
def record_request(self, success):
self.recent_requests.append({
'timestamp': time.time(),
'success': success
})
def get_metrics(self):
now = time.time()
recent = [r for r in self.recent_requests if now - r['timestamp'] < 300] # Last 5 minutes
total_requests = len(recent)
successful_requests = sum(1 for r in recent if r['success'])
return {
'state': self.state.value,
'failure_count': self.failure_count,
'success_rate': successful_requests / total_requests if total_requests > 0 else 0,
'total_requests_5min': total_requests
}
# Usage in analytics dashboard
@CircuitBreaker(failure_threshold=3, timeout=30)
def get_inventory_data(product_ids):
"""Call to inventory service with circuit breaker"""
response = requests.get(
f"{INVENTORY_SERVICE_URL}/api/products",
params={'ids': ','.join(product_ids)},
timeout=5
)
response.raise_for_status()
return response.json()
@CircuitBreaker(failure_threshold=5, timeout=60)
def get_user_analytics():
"""Call to analytics service with circuit breaker"""
response = requests.get(f"{ANALYTICS_SERVICE_URL}/api/users/active", timeout=10)
response.raise_for_status()
return response.json()
# Fallback strategies
def get_dashboard_data():
try:
# Try to get real-time inventory data
inventory_data = get_inventory_data(['1', '2', '3'])
except Exception:
# Fallback to cached data
inventory_data = redis_client.get('inventory_cache')
if not inventory_data:
inventory_data = {"error": "Inventory data unavailable"}
try:
# Try to get user analytics
user_data = get_user_analytics()
except Exception:
# Fallback to approximate data
user_data = {"active_users": "~1000", "estimated": True}
return {
"inventory": inventory_data,
"users": user_data,
"timestamp": time.time()
}
Service-Level Circuit Breaker Integration:
# Integration with Flask app
@app.route('/api/dashboard')
def dashboard_endpoint():
# Check circuit breaker states
circuit_states = {
'inventory': inventory_circuit_breaker.get_metrics(),
'analytics': analytics_circuit_breaker.get_metrics(),
}
# Include circuit breaker status in response
response_data = get_dashboard_data()
response_data['service_health'] = circuit_states
# Set appropriate HTTP status based on circuit states
if any(state['state'] == 'open' for state in circuit_states.values()):
return jsonify(response_data), 206 # Partial content
else:
return jsonify(response_data), 200
# Monitoring endpoint for circuit breaker states
@app.route('/api/health/circuits')
def circuit_breaker_health():
return jsonify({
'inventory_service': inventory_circuit_breaker.get_metrics(),
'analytics_service': analytics_circuit_breaker.get_metrics(),
})
Part 4: Advanced SRE & Operationsโ
22. API Response Time Investigation Processโ
Strong Answer: Systematic Investigation Approach:
Step 1: Immediate Assessment (First 2 minutes)
# Quick checks to understand scope
echo "=== INCIDENT RESPONSE CHECKLIST ==="
echo "1. Checking service status..."
curl -w "%{http_code} %{time_total}s" -s -o /dev/null https://api.example.com/health
echo "2. Checking recent deployments..."
kubectl get pods --sort-by=.metadata.creationTimestamp
echo "3. Checking error rates..."
# Query Prometheus for error rate spike
curl -s 'http://prometheus:9090/api/v1/query?query=rate(http_requests_total{status=~"5.."}[5m])'
Step 2: Resource Analysis (Minutes 2-5)
# Automated investigation script
import subprocess
import json
import requests
class IncidentInvestigator:
def __init__(self):
self.findings = []
def check_resource_usage(self):
"""Check CPU, Memory, Disk I/O"""
# CPU utilization
cpu_query = 'avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)'
cpu_data = self.query_prometheus(cpu_query)
for result in cpu_data:
cpu_usage = float(result['value'][1])
if cpu_usage > 0.8: # 80% threshold
self.findings.append(f"High CPU: {result['metric']['pod']} at {cpu_usage:.2%}")
# Memory usage
memory_query = 'container_memory_usage_bytes / container_spec_memory_limit_bytes'
memory_data = self.query_prometheus(memory_query)
for result in memory_data:
memory_usage = float(result['value'][1])
if memory_usage > 0.85: # 85% threshold
self.findings.append(f"High Memory: {result['metric']['pod']} at {memory_usage:.2%}")
def check_database_performance(self):
"""Check database connection pool, slow queries"""
# Connection pool utilization
db_conn_query = 'pg_stat_activity_count / pg_settings_max_connections'
db_data = self.query_prometheus(db_conn_query)
if db_data and float(db_data[0]['value'][1]) > 0.8:
self.findings.append("Database connection pool near capacity")
# Check for slow queries
slow_queries = subprocess.run([
'psql', '-h', 'postgres-primary', '-c',
"SELECT query, query_start, state, waiting FROM pg_stat_activity WHERE state = 'active' AND query_start < NOW() - INTERVAL '30 seconds';"
], capture_output=True, text=True)
if slow_queries.stdout and len(slow_queries.stdout.split('\n')) > 3:
self.findings.append("Long-running queries detected")
def check_external_dependencies(self):
"""Check downstream services"""
services = ['inventory-service', 'payment-service', 'user-service']
for service in services:
try:
response = requests.get(f'http://{service}/health', timeout=5)
if response.status_code != 200 or response.elapsed.total_seconds() > 1.0:
self.findings.append(f"{service} health check failed or slow")
except requests.RequestException:
self.findings.append(f"{service} unreachable")
def check_recent_changes(self):
"""Check recent deployments and config changes"""
# Get recent pod restarts
pods = subprocess.run([
'kubectl', 'get', 'pods', '-o', 'json'
], capture_output=True, text=True)
pod_data = json.loads(pods.stdout)
for pod in pod_data['items']:
restart_count = pod['status']['restartCount']
if restart_count > 0:
self.findings.append(f"Pod {pod['metadata']['name']} has {restart_count} restarts")
def query_prometheus(self, query):
"""Helper to query Prometheus"""
try:
response = requests.get(
'http://prometheus:9090/api/v1/query',
params={'query': query},
timeout=10
)
return response.json()['data']['result']
except:
return []
def investigate(self):
"""Run full investigation"""
print("๐ Starting incident investigation...")
self.check_resource_usage()
self.check_database_performance()
self.check_external_dependencies()
self.check_recent_changes()
print("\n๐ Investigation Summary:")
if self.findings:
for finding in self.findings:
print(f"โ ๏ธ {finding}")
else:
print("โ
No obvious issues found in automated checks")
return self.findings
# Run investigation
investigator = IncidentInvestigator()
findings = investigator.investigate()
Step 3: Deep Dive Analysis (Minutes 5-15)
# Application-level investigation
echo "4. Checking application metrics..."
# Look for specific endpoint latency
kubectl exec -it $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') -- \
curl -s localhost:8080/metrics | grep http_request_duration
# Check garbage collection metrics (for Java/Go apps)
kubectl logs $(kubectl get pods -l app=api -o jsonpath='{.items[0].metadata.name}') | \
grep -i "gc\|garbage"
# Database query analysis
psql -h postgres-primary -c "
SELECT query, calls, total_time, mean_time, stddev_time
FROM pg_stat_statements
WHERE mean_time > 1000
ORDER BY mean_time DESC
LIMIT 10;"
# Check for lock contention
psql -h postgres-primary -c "
SELECT blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON (blocked_locks.locktype = blocking_locks.locktype)
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;"
Step 4: Resolution and Communication
# Incident response actions
class IncidentResponse:
def __init__(self):
self.slack_webhook = "https://hooks.slack.com/services/..."
def notify_team(self, message, severity="warning"):
"""Send alert to incident response channel"""
payload = {
"text": f"๐จ API Performance Incident",
"attachments": [{
"color": "danger" if severity == "critical" else "warning",
"fields": [
{"title": "Issue", "value": message, "short": False},
{"title": "Runbook", "value": "https://wiki.company.com/incident-response", "short": True},
{"title": "Investigator", "value": "SRE On-Call", "short": True}
]
}]
}
requests.post(self.slack_webhook, json=payload)
def apply_immediate_fix(self, issue_type):
"""Apply quick fixes based on identified issue"""
if issue_type == "high_cpu":
# Scale up pods
subprocess.run(['kubectl', 'scale', 'deployment/api', '--replicas=10'])
elif issue_type == "database_load":
# Enable read-only mode temporarily
subprocess.run(['kubectl', 'patch', 'configmap/app-config',
'--patch', '{"data":{"READ_ONLY_MODE":"true"}}'])
elif issue_type == "memory_leak":
# Rolling restart
subprocess.run(['kubectl', 'rollout', 'restart', 'deployment/api'])
elif issue_type == "external_service":
# Enable circuit breaker
subprocess.run(['kubectl', 'patch', 'configmap/app-config',
'--patch', '{"data":{"CIRCUIT_BREAKER_ENABLED":"true"}}'])
# Usage
incident = IncidentResponse()
incident.notify_team("API latency increased from 100ms to 2s. Investigation in progress.")
# Based on findings, apply fixes
if "High CPU" in findings:
incident.apply_immediate_fix("high_cpu")
incident.notify_team("Applied fix: Scaled up API pods to handle load")
23. Error Budget Management Scenarioโ
Strong Answer: Error Budget Analysis:
First, let me calculate the current situation:
- SLO: 99.5% uptime (0.5% error budget per month)
- Current Error Rate: 20% increase = baseline + 20%
- Deployment Impact: Caused the increase
- Error Budget Status: Within budget (need to verify actual numbers)
Decision Framework:
class ErrorBudgetManager:
def __init__(self):
self.slo_target = 0.995 # 99.5% success rate
self.error_budget = 1 - self.slo_target # 0.5% error budget
def calculate_current_burn_rate(self, error_rate, time_period_hours):
"""Calculate how fast we're burning error budget"""
monthly_hours = 30 * 24 # 720 hours
burn_rate = (error_rate * time_period_hours) / monthly_hours
return burn_rate
def assess_situation(self, baseline_error_rate, current_error_rate, hours_since_deploy):
"""Assess if we should keep feature or rollback"""
# Calculate actual error rates
error_increase = current_error_rate - baseline_error_rate
# Calculate error budget consumption
budget_burned = self.calculate_current_burn_rate(error_increase, hours_since_deploy)
remaining_budget = self.error_budget - budget_burned
# Project if this continues for the month
monthly_projection = error_increase * (30 * 24) / hours_since_deploy
assessment = {
"current_error_rate": current_error_rate,
"error_budget_burned": budget_burned,
"remaining_budget": remaining_budget,
"monthly_projection": monthly_projection,
"exceeds_budget": monthly_projection > self.error_budget
}
return assessment
def make_recommendation(self, assessment, business_impact):
"""Provide recommendation based on data"""
if assessment["exceeds_budget"]:
return {
"action": "ROLLBACK",
"reason": "Feature will exceed monthly error budget",
"timeline": "Immediate"
}
elif assessment["remaining_budget"] < 0.001: # Less than 0.1% budget left
return {
"action": "ENHANCED_MONITORING",
"reason": "Close to budget limit, monitor closely",
"conditions": "Rollback if error rate increases further"
}
else:
return {
"action": "KEEP_WITH_CONDITIONS",
"reason": "Within error budget but requires monitoring",
"conditions": [
"Implement feature flag for quick disable",
"Set up aggressive alerting",
"Daily budget review meetings"
]
}
# Real scenario analysis
budget_manager = ErrorBudgetManager()
# Assume baseline was 0.1% error rate, now 1.2% (20% relative increase)
assessment = budget_manager.assess_situation(
baseline_error_rate=0.001, # 0.1%
current_error_rate=0.012, # 1.2%
hours_since_deploy=6
)
recommendation = budget_manager.make_recommendation(assessment, business_impact="high")
Communication Strategy:
def communicate_decision(assessment, recommendation, stakeholders):
"""Structured communication to all stakeholders"""
message = f"""
๐ ERROR BUDGET ASSESSMENT - {datetime.now().strftime('%Y-%m-%d %H:%M')}
๐ฏ SLO Target: 99.5% success rate (0.5% monthly error budget)
๐ Current Situation:
โข Error rate: {assessment['current_error_rate']:.3%}
โข Budget burned: {assessment['error_budget_burned']:.3%}
โข Remaining budget: {assessment['remaining_budget']:.3%}
โข Monthly projection: {assessment['monthly_projection']:.3%}
๐ Analysis:
{'โ Will exceed budget' if assessment['exceeds_budget'] else 'โ
Within budget'}
๐ฏ Recommendation: {recommendation['action']}
Reason: {recommendation['reason']}
โก Next Steps:
"""
if recommendation['action'] == 'ROLLBACK':
message += """
1. Immediate rollback of feature
2. Root cause analysis
3. Fix implementation before retry
4. Post-mortem scheduled
"""
elif recommendation['action'] == 'KEEP_WITH_CONDITIONS':
message += f"""
1. Implement monitoring conditions
2. Set up automated alerts
3. Daily review meetings
4. Feature flag for quick disable
Conditions: {recommendation['conditions']}
"""
# Send to different channels based on audience
send_to_slack('#sre-team', message)
send_to_email(stakeholders['engineering'], message)
send_executive_summary(stakeholders['leadership'])
def send_executive_summary(executives):
"""Simplified version for executives"""
summary = f"""
Feature Deployment Impact Assessment
Status: {'๐ด Action Required' if recommendation['action'] == 'ROLLBACK' else '๐ก Monitoring'}
Decision: {recommendation['action']}
Business Impact: Minimal customer impact, within reliability targets
Engineering is {recommendation['action'].lower().replace('_', ' ')} and monitoring closely.
"""
send_to_email(executives, summary)
Implementation Plan:
# Feature flag for quick disable
apiVersion: v1
kind: ConfigMap
metadata:
name: feature-flags
data:
new-feature-enabled: "true"
error-budget-threshold: "0.004" # 0.4% - alert threshold
---
# Enhanced monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: error-budget-alerts
spec:
groups:
- name: error-budget
rules:
- alert: ErrorBudgetBurnRateHigh
expr: |
(
rate(http_requests_total{status=~"5.."}[1h]) /
rate(http_requests_total[1h])
) > 0.004
for: 5m
labels:
severity: critical
annotations:
summary: "Error budget burn rate too high"
- alert: ErrorBudgetProjectedExceed
expr: |
(
rate(http_requests_total{status=~"5.."}[6h]) /
rate(http_requests_total[6h])
) * 720 > 0.005 # 720 hours in month, 0.5% budget
for: 10m
labels:
severity: warning
annotations:
summary: "Projected to exceed monthly error budget"
24. Load Testing Strategy Designโ
Strong Answer: Comprehensive Load Testing Strategy:
# Load test implementation using Locust
from locust import HttpUser, task, between
import random
import json
class DashboardUser(HttpUser):
wait_time = between(1, 5) # 1-5 second wait between requests
def on_start(self):
"""Setup user session"""
# Authenticate user
response = self.client.post("/api/auth/login", json={
"username": f"loadtest_user_{random.randint(1, 1000)}",
"password": "test_password"
})
if response.status_code == 200:
self.auth_token = response.json().get("token")
self.client.headers.update({"Authorization": f"Bearer {self.auth_token}"})
@task(3) # 3x weight - most common operation
def view_realtime_dashboard(self):
"""Simulate viewing real-time dashboard"""
self.client.get("/api/metrics/realtime")
@task(2) # 2x weight
def view_historical_data(self):
"""Simulate historical data requests"""
periods = ["1h", "6h", "24h", "7d"]
metrics = ["revenue", "orders", "active_users"]
period = random.choice(periods)
metric = random.choice(metrics)
self.client.get(f"/api/metrics/historical", params={
"metric": metric,
"period": period,
"granularity": "1m" if period in ["1h", "6h"] else "1h"
})
@task(1) # 1x weight - less common
def configure_alerts(self):
"""Simulate alert configuration"""
self.client.post("/api/alerts", json={
"metric": "inventory_level",
"threshold": random.randint(50, 200),
"product_id": f"product_{random.randint(1, 100)}"
})
@task(4) # 4x weight - WebSocket simulation via polling
def websocket_simulation(self):
"""Simulate WebSocket updates via HTTP polling"""
self.client.get("/api/metrics/stream", params={
"last_update": "2025-06-30T10:30:00Z"
})
class BurstTrafficUser(DashboardUser):
"""Simulates traffic spikes during incidents"""
wait_time = between(0.1, 0.5) # Much faster requests
@task(5)
def rapid_dashboard_refresh(self):
"""Rapid refreshing during incidents"""
self.client.get("/api/metrics/realtime")
# Load test scenarios
class LoadTestScenarios:
def __init__(self):
self.scenarios = {
"normal_load": {
"users": 100,
"spawn_rate": 10,
"duration": "10m",
"user_class": DashboardUser
},
"peak_load": {
"users": 500,
"spawn_rate": 50,
"duration": "15m",
"user_class": DashboardUser
},
"stress_test": {
"users": 1000,
"spawn_rate": 100,
"duration": "20m",
"user_class": DashboardUser
},
"spike_test": {
"users": 200,
"spawn_rate": 200, # Instant spike
"duration": "5m",
"user_class": BurstTrafficUser
},
"endurance_test": {
"users": 300,
"spawn_rate": 30,
"duration": "2h", # Long running
"user_class": DashboardUser
}
}
Infrastructure Load Testing:
#!/bin/bash
# load-test-runner.sh
echo "๐ Starting Analytics Dashboard Load Tests"
# 1. Baseline Performance Test
echo "๐ Running baseline performance test..."
locust -f dashboard_load_test.py \
--users 50 \
--spawn-rate 10 \
--run-time 5m \
--host https://api-staging.example.com \
--html reports/baseline_$(date +%Y%m%d_%H%M%S).html
# 2. Database Load Test
echo "๐๏ธ Testing database performance..."
# Simulate heavy analytics queries
for i in {1..100}; do
curl -s "https://api-staging.example.com/api/metrics/historical?period=30d&metric=revenue" &
done
wait
# 3. WebSocket Load Test
echo "๐ Testing WebSocket capacity..."
# Use websocket-king for WebSocket load testing
websocket-king \
--url wss://api-staging.example.com/metrics/stream \
--connections 1000 \
--duration 10m \
--rate 10 \
--output ws_load_report.json
# 4. Cache Performance Test
echo "๐พ Testing cache performance..."
# Warm up cache, then test cache hit rates
redis-cli --latency-history -h redis-staging.example.com
# 5. Auto-scaling Test
echo "๐ Testing auto-scaling behavior..."
# Gradually increase load to trigger auto-scaling
locust -f dashboard_load_test.py \
--users 1000 \
--spawn-rate 20 \
--run-time 30m \
--host https://api-staging.example.com \
--html reports/autoscale_test_$(date +%Y%m%d_%H%M%S).html
echo "โ
Load testing complete. Check reports/ directory for results."
Performance Metrics Collection:
# Custom metrics collection during load testing
import psutil
import time
import json
from datetime import datetime
class PerformanceMonitor:
def __init__(self):
self.metrics = []
self.monitoring = False
def start_monitoring(self):
"""Start collecting system metrics"""
self.monitoring = True
while self.monitoring:
metrics = {
"timestamp": datetime.now().isoformat(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_io": psutil.disk_io_counters()._asdict(),
"network_io": psutil.net_io_counters()._asdict(),
"connections": len(psutil.net_connections()),
}
# Database metrics
metrics["db_connections"] = self.get_db_connections()
metrics["cache_hit_rate"] = self.get_cache_hit_rate()
self.metrics.append(metrics)
time.sleep(5) # Collect every 5 seconds
def get_db_connections(self):
"""Get database connection count"""
try:
import psycopg2
conn = psycopg2.connect("postgresql://user:pass@db:5432/analytics")
cursor = conn.cursor()
cursor.execute("SELECT count(*) FROM pg_stat_activity;")
return cursor.fetchone()[0]
except:
return None
def get_cache_hit_rate(self):
"""Get Redis cache hit rate"""
try:
import redis
r = redis.Redis(host='redis-cluster')
info = r.info('stats')
hits = info['keyspace_hits']
misses = info['keyspace_misses']
return hits / (hits + misses) if (hits + misses) > 0 else 0
except:
return None
def stop_monitoring(self):
"""Stop monitoring and save results"""
self.monitoring = False
# Save metrics to file
with open(f"performance_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
json.dump(self.metrics, f, indent=2)
# Generate summary report
self.generate_summary()
def generate_summary(self):
"""Generate performance summary"""
if not self.metrics:
return
cpu_values = [m['cpu_percent'] for m in self.metrics if m['cpu_percent']]
memory_values = [m['memory_percent'] for m in self.metrics if m['memory_percent']]
summary = {
"test_duration": len(self.metrics) * 5, # 5 second intervals
"avg_cpu": sum(cpu_values) / len(cpu_values),
"max_cpu": max(cpu_values),
"avg_memory": sum(memory_values) / len(memory_values),
"max_memory": max(memory_values),
"peak_connections": max([m.get('connections', 0) for m in self.metrics])
}
print("๐ Performance Test Summary:")
print(f" Duration: {summary['test_duration']} seconds")
print(f" Avg CPU: {summary['avg_cpu']:.1f}%")
print(f" Max CPU: {summary['max_cpu']:.1f}%")
print(f" Avg Memory: {summary['avg_memory']:.1f}%")
print(f" Max Memory: {summary['max_memory']:.1f}%")
print(f" Peak Connections: {summary['peak_connections']}")
# Usage with load test
monitor = PerformanceMonitor()
import threading
# Start monitoring in background
monitor_thread = threading.Thread(target=monitor.start_monitoring)
monitor_thread.start()
# Run load test
# ... run locust or other load testing tool ...
# Stop monitoring
monitor.stop_monitoring()
monitor_thread.join()
Test Criteria and Success Metrics:
# Load test success criteria
load_test_criteria:
response_time:
p50: < 200ms
p95: < 500ms
p99: < 1000ms
throughput:
normal_load: > 1000 RPS
peak_load: > 5000 RPS
error_rate:
max_acceptable: 0.1%
resource_utilization:
cpu: < 80%
memory: < 85%
database_connections: < 80% of pool
auto_scaling:
scale_up_time: < 2 minutes
scale_down_time: < 5 minutes
cache_performance:
hit_rate: > 90%
eviction_rate: < 5%
# Failure conditions (auto-stop test)
failure_conditions:
- error_rate > 1% for 2 minutes
- p99_latency > 5000ms for 5 minutes
- cpu_utilization > 95% for 3 minutes
- database_connection_pool > 95%
25. Go Service CPU Investigationโ
Strong Answer: Systematic CPU Investigation Process:
// 1. Enable pprof in Go service for CPU profiling
package main
import (
"context"
"log"
"net/http"
_ "net/http/pprof" // Import pprof
"runtime"
"time"
)
func main() {
// Start pprof server
go func() {
log.Println("Starting pprof server on :6060")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Set GOMAXPROCS to container CPU limit
runtime.GOMAXPROCS(2) // Adjust based on container resources
// Your application code
startApplication()
}
// 2. Add CPU monitoring middleware
func CPUMonitoringMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Record CPU usage before request
var rusageBefore syscall.Rusage
syscall.Getrusage(syscall.RUSAGE_SELF, &rusageBefore)
next.ServeHTTP(w, r)
// Record CPU usage after request
var rusageAfter syscall.Rusage
syscall.Getrusage(syscall.RUSAGE_SELF, &rusageAfter)
duration := time.Since(start)
cpuTime := time.Duration(rusageAfter.Utime-rusageBefore.Utime) * time.Microsecond
// Log high CPU requests
if cpuTime > 100*time.Millisecond {
log.Printf("High CPU request: %s %s - Duration: %v, CPU: %v",
r.Method, r.URL.Path, duration, cpuTime)
}
})
}
Investigation Tools and Commands:
#!/bin/bash
# cpu-investigation.sh
echo "๐ Investigating Go service CPU usage..."
# 1. Get current CPU profile (30 seconds)
echo "๐ Collecting CPU profile..."
go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile?seconds=30
# 2. Check for goroutine leaks
echo "๐งต Checking goroutine count..."
curl -s http://localhost:6060/debug/pprof/goroutine?debug=1 | head -20
# 3. Memory allocation profile (may cause CPU spikes)
echo "๐พ Checking memory allocations..."
go tool pprof http://localhost:6060/debug/pprof/allocs
# 4. Check GC performance
echo "๐๏ธ Checking garbage collection stats..."
curl -s http://localhost:6060/debug/vars | jq '.memstats'
# 5. Container-level CPU investigation
echo "๐ณ Container CPU stats..."
docker stats --no-stream $(docker ps --filter "name=go-service" --format "{{.Names}}")
# 6. Process-level analysis
echo "โ๏ธ Process CPU breakdown..."
top -H -p $(pgrep go-service) -n 1
# 7. strace for system call analysis
echo "๐ง System call analysis (10 seconds)..."
timeout 10s strace -c -p $(pgrep go-service)
Code-Level Optimizations:
// Common CPU bottleneck fixes
// 1. Fix: Inefficient JSON parsing
// BEFORE - Slow JSON handling
func processRequestSlow(w http.ResponseWriter, r *http.Request) {
var data map[string]interface{}
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &data)
// Process data...
}
// AFTER - Optimized JSON handling
type RequestData struct {
UserID string `json:"user_id"`
Action string `json:"action"`
// Define specific fields instead of interface{}
}
func processRequestFast(w http.ResponseWriter, r *http.Request) {
var data RequestData
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields() // Faster parsing
if err := decoder.Decode(&data); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Process typed data...
}
// 2. Fix: CPU-intensive loops
// BEFORE - O(nยฒ) algorithm
func findDuplicatesSlow(items []string) []string {
var duplicates []string
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i] == items[j] {
duplicates = append(duplicates, items[i])
break
}
}