SRE Interview Questions - Comprehensive Guide
This comprehensive guide covers Site Reliability Engineering interview questions with detailed answers, code examples, and practical solutions for high-performance systems.
Part 1: Performance Optimization & Go Specifics
Go Performance Optimization Examples
1. Efficient Duplicate Detection
// BEFORE - O(n²) algorithm
func findDuplicatesSlow(items []string) []string {
var duplicates []string
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i] == items[j] {
duplicates = append(duplicates, items[i])
break
}
}
}
return duplicates
}
// AFTER - O(n) algorithm using map
func findDuplicatesFast(items []string) []string {
seen := make(map[string]bool)
var duplicates []string
for _, item := range items {
if seen[item] {
duplicates = append(duplicates, item)
} else {
seen[item] = true
}
}
return duplicates
}
**2. String Concatenation Optimization**
```go
// 3. Fix: Excessive string concatenation
// BEFORE - Creates new strings repeatedly
func buildResponseSlow(data []Record) string {
var result string
for _, record := range data {
result += record.ID + "," + record.Name + "\n" // Slow!
}
return result
}
// AFTER - Use strings.Builder for efficiency
func buildResponseFast(data []Record) string {
var builder strings.Builder
builder.Grow(len(data) * 50) // Pre-allocate capacity
for _, record := range data {
builder.WriteString(record.ID)
builder.WriteString(",")
builder.WriteString(record.Name)
builder.WriteString("\n")
}
return builder.String()
}
3. Goroutine Management
// 4. Fix: Goroutine leaks
// BEFORE - Goroutines without proper cleanup
func handleRequestsLeaky() {
for {
go func() {
// Long-running operation without context cancellation
processData() // Never exits!
}()
}
}
// AFTER - Proper goroutine management
func handleRequestsProper(ctx context.Context) {
semaphore := make(chan struct{}, 100) // Limit concurrent goroutines
for {
select {
case <-ctx.Done():
return
default:
semaphore <- struct{}{} // Acquire
go func() {
defer func() { <-semaphore }() // Release
// Use context for cancellation
processDataWithContext(ctx)
}()
}
}
}
4. Database Query Optimization
// 5. Fix: Inefficient database queries in loop
// BEFORE - N+1 query problem
func getUserDataSlow(userIDs []string) []UserData {
var users []UserData
for _, id := range userIDs {
user := db.QueryUser(id) // Database hit per user!
users = append(users, user)
}
return users
}
// AFTER - Batch database queries
func getUserDataFast(userIDs []string) []UserData {
// Single query for all users
query := "SELECT * FROM users WHERE id IN (" +
strings.Join(userIDs, ",") + ")"
return db.QueryUsers(query)
}
5. Memory and GC Optimization
// 6. Optimize garbage collection pressure
type MetricsCollector struct {
// BEFORE - Creates garbage
// metrics []map[string]interface{}
// AFTER - Use object pools and typed structs
metricPool sync.Pool
metrics []Metric
}
type Metric struct {
Name string
Value float64
Timestamp int64
}
func NewMetricsCollector() *MetricsCollector {
mc := &MetricsCollector{
metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
}
mc.metricPool = sync.Pool{
New: func() interface{} {
return &Metric{}
},
}
return mc
}
func (mc *MetricsCollector) AddMetric(name string, value float64) {
metric := mc.metricPool.Get().(*Metric)
metric.Name = name
metric.Value = value
metric.Timestamp = time.Now().Unix()
mc.metrics = append(mc.metrics, *metric)
// Return to pool
mc.metricPool.Put(metric)
}
6. Continuous Profiling
// 7. CPU profiling integration
func enableContinuousProfiling() {
// Enable continuous CPU profiling
if os.Getenv("ENABLE_PROFILING") == "true" {
go func() {
for {
f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create CPU profile: %v", err)
time.Sleep(30 * time.Second)
continue
}
pprof.StartCPUProfile(f)
time.Sleep(30 * time.Second)
pprof.StopCPUProfile()
f.Close()
// Upload to object storage for analysis
uploadProfile(f.Name())
}
}()
}
}
Part 2: Monitoring and Alerting
Prometheus Monitoring Rules
# Prometheus rules for Go service CPU monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: go-service-cpu-alerts
spec:
groups:
- name: go-service-performance
rules:
- alert: GoServiceHighCPU
expr: |
(
rate(container_cpu_usage_seconds_total{pod=~"go-service-.*"}[5m])
) > 0.8
for: 5m
labels:
severity: warning
service: go-service
annotations:
summary: "Go service CPU usage above 80%"
description: "Pod {{ $labels.pod }} CPU usage is {{ $value }}%"
- alert: GoServiceGoroutineLeak
expr: |
go_goroutines{job="go-service"} > 10000
for: 10m
labels:
severity: critical
annotations:
summary: "Potential goroutine leak detected"
description: "Service {{ $labels.instance }} has {{ $value }} goroutines"
- alert: GoServiceGCPressure
expr: |
rate(go_gc_duration_seconds_sum[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High GC pressure in Go service"
description: "GC taking {{ $value }}s per collection cycle"
- alert: GoServiceMemoryLeak
expr: |
go_memstats_heap_inuse_bytes / go_memstats_heap_sys_bytes > 0.9
for: 15m
labels:
severity: critical
annotations:
summary: "Potential memory leak in Go service"
description: "Heap utilization is {{ $value | humanizePercentage }}"
Performance Testing and Validation
// Benchmark tests to validate optimizations
func BenchmarkProcessRequestSlow(b *testing.B) {
data := generateTestData(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestSlow(data)
}
}
func BenchmarkProcessRequestFast(b *testing.B) {
data := generateTestData(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestFast(data)
}
}
// Run benchmarks with memory profiling
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
// CPU profiling analysis script
func analyzeCPUProfile() {
// go tool pprof cpu.prof
// Commands in pprof:
// (pprof) top20 - Show top 20 CPU consumers
// (pprof) list function - Show source code with CPU usage
// (pprof) web - Generate web visualization
// (pprof) flamegraph - Generate flame graph
}
Part 3: System Architecture Patterns
Microservices vs Monolithic Architecture
Strong Answer:
Analogy - Car Pool vs Train:
Microservices (Car Pool Approach):
🚗 🚗 🚗 🚗 🚗 (Independent cars)
Each car (service) can:
- Take different routes
- Stop independently
- Break down without affecting others
- Scale by adding more cars
- Use different fuel types (technologies)
Monolithic (Train Approach):
🚂-🚃-🚃-🚃-🚃 (Connected train)
The train (application):
- All cars must follow the same route
- If engine fails, entire train stops
- All cars must move together
- Scale by making train longer or faster
- Single fuel type for entire train
Decision Framework for Analytics Dashboard:
# Decision matrix for architecture choice
class ArchitectureDecision:
def __init__(self):
self.factors = {
'team_size': 0,
'complexity': 0,
'scalability_needs': 0,
'technology_diversity': 0,
'deployment_frequency': 0,
'operational_maturity': 0
}
def assess_microservices_fit(self, dashboard_requirements):
"""Assess if microservices are appropriate for analytics dashboard"""
# Analytics dashboard components
services = {
'metrics_collector': {
'responsibility': 'Collect metrics from various sources',
'scalability': 'High - handles high volume ingestion',
'technology': 'Go - for performance'
},
'data_processor': {
'responsibility': 'Process and aggregate metrics',
'scalability': 'Medium - CPU intensive operations',
'technology': 'Python - for data processing libraries'
},
'api_gateway': {
'responsibility': 'Serve dashboard APIs',
'scalability': 'High - many concurrent users',
'technology': 'Node.js - for async I/O'
},
'notification_service': {
'responsibility': 'Send alerts and notifications',
'scalability': 'Low - occasional alerts',
'technology': 'Python - for integrations'
},
'frontend_bff': {
'responsibility': 'Backend for Frontend',
'scalability': 'Medium - aggregates data for UI',
'technology': 'React/TypeScript'
}
}
return self.evaluate_services(services)
def evaluate_services(self, services):
"""Evaluate microservices approach"""
benefits = [
"Independent scaling per service",
"Technology diversity (Go, Python, Node.js)",
"Team autonomy - different teams own different services",
"Fault isolation - metrics collection failure doesn't break UI",
"Independent deployments - can update notification without affecting API"
]
challenges = [
"Network latency between services",
"Data consistency across services",
"Distributed system complexity",
"Service discovery and load balancing",
"Monitoring and debugging across services"
]
return {
'benefits': benefits,
'challenges': challenges,
'recommendation': self.make_recommendation()
}
def make_recommendation(self):
"""Make architecture recommendation for analytics dashboard"""
if self.is_early_stage():
return {
'choice': 'MODULAR_MONOLITH',
'reason': 'Start simple, can extract services later',
'structure': self.modular_monolith_structure()
}
else:
return {
'choice': 'MICROSERVICES',
'reason': 'Scale and team benefits outweigh complexity',
'structure': self.microservices_structure()
}
def modular_monolith_structure(self):
"""Modular monolith approach - best of both worlds"""
return {
'structure': """
analytics-dashboard/
├── cmd/ # Application entry points
├── internal/
│ ├── metrics/ # Metrics collection module
│ ├── processing/ # Data processing module
│ ├── api/ # API handling module
│ ├── notifications/ # Alert module
│ └── dashboard/ # UI serving module
├── pkg/ # Shared libraries
└── deployments/ # Single deployment unit
""",
'benefits': [
'Single deployment and testing',
'Easier debugging and development',
'No network latency between modules',
'Simpler operational overhead',
'Can extract to microservices later'
]
}
def microservices_structure(self):
"""Full microservices approach"""
return {
'structure': """
Analytics Platform Microservices:
┌─────────────────┐ ┌─────────────────┐
│ Frontend SPA │ │ API Gateway │
│ (React) │◄──►│ (Kong) │
└─────────────────┘ └─────────┬───────┘
│
┌────────────────────┼────────────────────┐
│ │ │
┌───────────▼────────┐ ┌─────────▼─────────┐ ┌── ───────▼─────────┐
│ Metrics Collector │ │ Data Processor │ │ Notification Svc │
│ (Go) │ │ (Python) │ │ (Python) │
└─────────┬──────────┘ └─────────┬─────────┘ └───────────────────┘
│ │
┌─────────▼──────────────────────▼─────────┐
│ Message Queue │
│ (Kafka/Redis) │
└──────────────────────────────────────────┘
""",
'communication': 'Async messaging + HTTP APIs',
'data_strategy': 'Event sourcing with CQRS'
}
def is_early_stage(self):
"""Determine if project is in early stage"""
return (
self.factors['team_size'] < 10 and
self.factors['operational_maturity'] < 3
)
Implementation Examples:
Microservices Implementation:
// Metrics Collector Service (Go)
package main
type MetricsCollector struct {
kafka *kafka.Producer
redis *redis.Client
handlers map[string]MetricHandler
}
func (mc *MetricsCollector) CollectMetric(metric Metric) error {
// Process metric
processed := mc.handlers[metric.Type].Process(metric)
// Publish to message queue for other services
return mc.kafka.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &metric.Type,
Partition: kafka.PartitionAny,
},
Value: processed.ToJSON(),
}, nil)
}
# Data Processor Service (Python)
import asyncio
from kafka import KafkaConsumer
class DataProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'metrics-topic',
bootstrap_servers=['kafka:9092'],
group_id='data-processors'
)
async def process_metrics(self):
for message in self.consumer:
metric = Metric.from_json(message.value)
# Process and aggregate
aggregated = await self.aggregate_metric(metric)
# Store in time-series database
await self.store_metric(aggregated)
# Trigger alerts if needed
await self.check_alerts(aggregated)
Monolithic Implementation:
# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications
class AnalyticsDashboard:
def __init__(self):
self.metrics = metrics.MetricsModule()
self.processor = processing.ProcessingModule()
self.api = api.APIModule()
self.notifications = notifications.NotificationModule()
def handle_metric(self, raw_metric):
# All in same process - no network calls
metric = self.metrics.parse(raw_metric)
processed = self.processor.aggregate(metric)
# Check for alerts
if self.processor.check_thresholds(processed):
self.notifications.send_alert(processed)
return processed
# Single deployment with clear module boundaries
# Can be extracted to separate services later
Decision Matrix:
Factor | Monolith | Microservices |
---|---|---|
Team Size | < 10 developers | > 10 developers |
Complexity | Simple-medium | Complex domain |
Scale | < 1M requests/day | > 10M requests/day |
Technology | Single stack preferred | Multiple technologies needed |
Deployment | Weekly/monthly | Multiple times per day |
Data Consistency | Strong consistency needed | Eventual consistency OK |
For Analytics Dashboard Specifically:
- Early Stage: Start with modular monolith
- Growth Stage: Extract high-scale components (metrics collector) first
- Mature Stage: Full microservices with proper DevOps practices
Database Sharding Implementation
Strong Answer:
Library Analogy Explanation:
📚 Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout
📚📚📚 Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z
Each library (shard) operates independently but part of same system
**Sharding Strategy for Analytics Dashboard:**
```python
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List
class DatabaseSharding:
def __init__(self):
self.shards = {
'shard_americas': {
'host': 'db-americas.example.com',
'regions': ['us', 'ca', 'mx', 'br'],
'connection': self.create_connection('db-americas')
},
'shard_europe': {
'host': 'db-europe.example.com',
'regions': ['uk', 'de', 'fr', 'es'],
'connection': self.create_connection('db-europe')
},
'shard_asia': {
'host': 'db-asia.example.com',
'regions': ['jp', 'sg', 'au', 'in'],
'connection': self.create_connection('db-asia')
}
}
# Time-based sharding for metrics
self.time_shards = {
'metrics_current': 'Last 7 days - hot data',
'metrics_recent': 'Last 30 days - warm data',
'metrics_archive': 'Older than 30 days - cold data'
}
def get_user_shard(self, user_id: str) -> str:
"""Determine shard based on user ID hash"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_value % len(self.shards)
return list(self.shards.keys())[shard_index]
def get_region_shard(self, region: str) -> str:
"""Determine shard based on geographical region"""
for shard_name, shard_info in self.shards.items():
if region.lower() in shard_info['regions']:
return shard_name
return 'shard_americas' # Default fallback
def get_time_shard(self, timestamp: datetime.datetime) -> str:
"""Determine shard based on data age"""
now = datetime.datetime.now()
age = now - timestamp
if age.days <= 7:
return 'metrics_current'
elif age.days <= 30:
return 'metrics_recent'
else:
return 'metrics_archive'
def route_query(self, query_type: str, **kwargs):
"""Route queries to appropriate shard"""
if query_type == 'user_orders':
shard = self.get_user_shard(kwargs['user_id'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'regional_metrics':
shard = self.get_region_shard(kwargs['region'])
return self.execute_query(shard, query_type, **kwargs)
elif query_type == 'historical_data':
# Query multiple time shards and aggregate
return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])
elif query_type == 'cross_shard_analytics':
# Fan-out query to all shards
return self.fan_out_query(query_type, **kwargs)
def query_time_shards(self, start_date, end_date):
"""Query across time-based shards"""
results = []
for shard_name in self.time_shards.keys():
try:
shard_result = self.execute_query(shard_name, 'time_range_query',
start_date=start_date, end_date=end_date)
results.extend(shard_result)
except ShardUnavailableError:
# Handle shard failures gracefully
self.log_shard_failure(shard_name)
continue
return self.aggregate_results(results)
def fan_out_query(self, query_type: str, **kwargs):
"""Execute query across all shards and aggregate results"""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
# Submit queries to all shards concurrently
future_to_shard = {
executor.submit(self.execute_query, shard_name, query_type, **kwargs): shard_name
for shard_name in self.shards.keys()
}
for future in concurrent.futures.as_completed(future_to_shard):
shard_name = future_to_shard[future]
try:
result = future.result(timeout=30) # 30 second timeout
results[shard_name] = result
except Exception as e:
self.log_query_failure(shard_name, e)
results[shard_name] = None
return self.aggregate_cross_shard_results(results)
# Shard-aware query examples
class ShardedAnalyticsQueries:
def __init__(self, sharding: DatabaseSharding):
self.sharding = sharding
def get_user_orders(self, user_id: str):
"""Get orders for specific user"""
return self.sharding.route_query('user_orders', user_id=user_id)
def get_regional_sales(self, region: str, date_range: tuple):
"""Get sales data for specific region"""
return self.sharding.route_query('regional_metrics',
region=region,
start_date=date_range[0],
end_date=date_range[1])
def get_global_metrics(self, metric_type: str):
"""Get global metrics across all shards"""
return self.sharding.route_query('cross_shard_analytics',
metric_type=metric_type)
def get_historical_trends(self, days_back: int):
"""Get historical data across time shards"""
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)
return self.sharding.query_time_shards(start_date, end_date)
PostgreSQL Sharding Implementation:
-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 2: Europe
CREATE TABLE orders_europe (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Shard 3: Asia
CREATE TABLE orders_asia (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;
CREATE SERVER shard_europe
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');
CREATE USER MAPPING FOR postgres
SERVER shard_europe
OPTIONS (user 'analytics_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
id UUID,
user_id UUID,
region VARCHAR(2),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');
-- View for cross-shard queries
CREATE VIEW orders_global AS
SELECT 'americas' as shard, * FROM orders_americas
UNION ALL
SELECT 'europe' as shard, * FROM orders_europe_remote
UNION ALL
SELECT 'asia' as shard, * FROM orders_asia_remote;
Application-Level Sharding Middleware:
# Flask middleware for automatic shard routing
class ShardingMiddleware:
def __init__(self, app):
self.app = app
self.sharding = DatabaseSharding()
def __call__(self, environ, start_response):
# Extract sharding context from request
request = Request(environ)
# Determine shard based on request
if 'user_id' in request.args:
shard = self.sharding.get_user_shard(request.args['user_id'])
environ['DATABASE_SHARD'] = shard
elif 'region' in request.args:
shard = self.sharding.get_region_shard(request.args['region'])
environ['DATABASE_SHARD'] = shard
else:
# Cross-shard query required
environ['DATABASE_SHARD'] = 'cross_shard'
return self.app(environ, start_response)
# Flask route with shard awareness
@app.route('/api/user/<user_id>/orders')
def get_user_orders(user_id):
shard = request.environ.get('DATABASE_SHARD')
if shard == 'cross_shard':
# This shouldn't happen for user-specific queries
abort(400, "Invalid request - user_id required")
# Use shard-specific connection
db = get_shard_connection(shard)
orders = db.execute(
"SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC",
(user_id,)
)
return jsonify(orders)
@app.route('/api/analytics/global')
def get_global_analytics():
# Cross-shard query - aggregate from all shards
sharding = DatabaseSharding()
results = sharding.fan_out_query('global_analytics')
return jsonify({
'total_orders': sum(r['order_count'] for r in results.values()),
'total_revenue': sum(r['revenue'] for r in results.values()),
'by_region': results
})
Shard Management and Failover:
class ShardManager:
def __init__(self):
self.shards = DatabaseSharding().shards
self.health_check_interval = 30 # seconds
def monitor_shard_health(self):
"""Continuously monitor shard health"""
while True:
for shard_name, shard_info in self.shards.items():
try:
# Simple health check query
conn = shard_info['connection']
conn.execute("SELECT 1")
self.mark_shard_healthy(shard_name)
except Exception as e:
self.mark_shard_unhealthy(shard_name, e)
self.trigger_failover(shard_name)
time.sleep(self.health_check_interval)
def trigger_failover(self, failed_shard: str):
"""Handle shard failover"""
# Redirect traffic to healthy shards
self.redistribute_load(failed_shard)
# Alert operations team
self.send_alert(f"Shard {failed_shard} is unhealthy")
# Attempt automated recovery
self.attempt_shard_recovery(failed_shard)
def rebalance_shards(self, new_shard_config: Dict):
"""Rebalance data across shards"""
# This is a complex operation requiring:
# 1. Data migration planning
# 2. Consistent hashing updates
# 3. Gradual traffic shifting
# 4. Rollback capability
migration_plan = self.create_migration_plan(new_shard_config)
for step in migration_plan:
self.execute_migration_step(step)
self.verify_migration_step(step)
self.update_shard_routing(new_shard_config)
Benefits and Challenges:
Benefits:
- Horizontal Scalability: Add more shards as data grows
- Performance: Smaller datasets per shard = faster queries
- Isolation: Shard failures don't affect other shards
- Geographic Distribution: Data close to users
Challenges:
- Cross-shard Queries: Complex and slower
- Rebalancing: Moving data between shards is difficult
- Consistency: Transactions across shards are complex
- Operational Complexity: Multiple databases to manage
For Analytics Dashboard:
- User-based sharding: For user-specific dashboards
- Time-based sharding: For historical data (hot/warm/cold)
- Feature-based sharding: Separate shards for different metrics types
Part 4: Behavioral & Collaboration
Convincing Developers - Code Reliability Changes
Strong Answer:
Situation: Our payment processing service was experiencing intermittent failures during peak traffic, causing revenue loss. The development team had implemented a quick fix that worked locally but didn't address the underlying concurrency issues.
Approach - Data-Driven Persuasion:
1. Quantified the Business Impact
# I created a dashboard showing the real cost
class ReliabilityImpactAnalysis:
def calculate_revenue_impact(self):
return {
"failed_transactions_per_hour": 150,
"average_transaction_value": 85.50,
"revenue_loss_per_hour": 150 * 85.50, # $12,825
"monthly_projected_loss": 12825 * 24 * 30, # $9.23M
"customer_churn_risk": "23 angry customer emails in 2 days"
}
2. Made It Personal and Collaborative Instead of saying "your code is wrong," I said:
- "I found some interesting patterns in our production data that might help us improve performance"
- "What do you think about these metrics? I'm curious about your thoughts on the concurrency patterns"
- "Could we pair program on this? I'd love to understand your approach better"
3. Proposed Solutions, Not Just Problems I came with a working prototype:
# Before (their approach)
def process_payment(payment_data):
global payment_queue
payment_queue.append(payment_data) # Race condition!
return process_queue()
# After (my suggested approach)
import threading
from queue import Queue
class ThreadSafePaymentProcessor:
def __init__(self):
self.payment_queue = Queue()
self.lock = threading.Lock()
def process_payment(self, payment_data):
with self.lock:
# Thread-safe processing
return self.safe_process(payment_data)
4. Used Their Language and Priorities
- Framed it as a "performance optimization" rather than "fixing bugs"
- Showed how it would reduce their on-call burden: "No more 3 AM pages about payment failures"
- Highlighted career benefits: "This would be a great story for your next performance review"
Result: They not only adopted the changes but became advocates for reliability practices. The lead developer started attending SRE meetings and later implemented circuit breakers proactively.
Key Lessons:
- Data beats opinions - metrics are harder to argue with
- Collaboration over confrontation - "How can we solve this together?"
- Show, don't tell - working code examples are persuasive
- Align with their incentives - make reliability their win, not your win
Trade-off Between Reliability and Feature Delivery
Strong Answer:
Situation: During a major product launch, we were at 97% availability (below our 99.5% SLO), but the product team wanted to deploy a new feature that would drive user adoption for the launch.
The Dilemma:
- Product pressure: "This feature will increase user engagement by 40%"
- Reliability concern: Error budget was nearly exhausted
- Timeline: Launch was in 3 days, couldn't delay
My Decision Process:
1. Quantified Both Sides
# Business impact calculation
launch_impact = {
"projected_new_users": 50000,
"revenue_per_user": 25,
"total_revenue_opportunity": 1.25e6, # $1.25M
"competitive_advantage": "First-mover in market segment"
}
reliability_risk = {
"current_error_budget_used": 0.85, # 85% of monthly budget
"remaining_budget": 0.15,
"days_remaining_in_month": 8,
"projected_overage": 0.3, # 30% over budget
"customer_impact": "Potential service degradation"
}
2. Created a Risk-Mitigation Plan Instead of a binary yes/no, I proposed a conditional approach:
# Feature deployment plan with guardrails
deployment_strategy:
phase_1:
rollout: 5% of users
duration: 4 hours
success_criteria:
- error_rate < 0.1%
- p99_latency < 200ms
- no_critical_alerts
phase_2:
rollout: 25% of users
duration: 12 hours
automatic_rollback: true
conditions:
- error_rate > 0.2% for 5 minutes
- p99_latency > 500ms for 10 minutes
phase_3:
rollout: 100% of users
requires: manual_approval_after_phase_2
3. Communicated Trade-offs Transparently I presented to stakeholders:
"We can launch this feature, but here's what it means:
- Upside: $1.25M revenue opportunity, competitive advantage
- Downside: 30% chance of service degradation affecting existing users
- Mitigation: Feature flags for instant rollback, enhanced monitoring
- Commitment: If reliability suffers, we pause new features until we're back on track"
4. The Decision and Implementation We proceeded with the phased rollout:
class FeatureLaunchManager:
def __init__(self):
self.error_budget_monitor = ErrorBudgetMonitor()
self.feature_flag = FeatureFlag("new_user_onboarding")
def monitor_launch_health(self):
while self.feature_flag.enabled:
current_error_rate = self.get_error_rate()
budget_status = self.error_budget_monitor.get_status()
if budget_status.will_exceed_monthly_budget():
self.trigger_rollback("Error budget exceeded")
break
if current_error_rate > 0.002: # 0.2%
self.reduce_rollout_percentage()
time.sleep(60) # Check every minute during launch
def trigger_rollback(self, reason):
self.feature_flag.disable()
self.alert_stakeholders(f"Feature rolled back: {reason}")
self.schedule_post_mortem()
The Outcome:
- Feature launched successfully to 25% of users
- Error rate increased slightly but stayed within acceptable bounds
- Revenue target was hit with partial rollout
- We didn't exceed error budget
- Built trust with product team by delivering on promises
Key Principles I Used:
- Transparency: Show the math, don't hide trade-offs
- Risk mitigation: Find ways to reduce downside while preserving upside
- Stakeholder alignment: Make everyone accountable for the decision
- Data-driven decisions: Use metrics, not emotions
- Learning mindset: Treat it as an experiment with clear success/failure criteria
Follow-up Actions:
- Conducted a post-launch review
- Used learnings to improve our launch process
- Created better error budget forecasting tools
- Established clearer guidelines for future trade-off decisions
Staying Current with SRE Practices and Technologies
Strong Answer:
My Learning Strategy - Multi-layered Approach:
1. Technical Deep Dives
# I maintain a personal learning dashboard
learning_tracker = {
"current_focus": [
"eBPF for system observability",
"Kubernetes operators for automation",
"AI/ML for incident prediction"
],
"weekly_commitments": {
"reading": "2 hours of technical papers",
"hands_on": "4 hours lab/experimentation",
"community": "1 hour in SRE forums/Slack"
},
"monthly_goals": [
"Complete one new certification",
"Contribute to one open source project",
"Write one technical blog post"
]
}
2. Resource Mix - Quality over Quantity
Daily (30 minutes morning routine):
- SRE Weekly Newsletter - concise industry updates
- Hacker News - scan for infrastructure/reliability topics
- Internal Slack channels - #sre-learning, #incidents-learned
Weekly (2-3 hours):
- Google SRE Book Club - our team works through chapters together
- Kubernetes documentation - staying current with new features
- Conference talk videos - KubeCon, SREcon, Velocity recordings
Monthly Deep Dives:
- Academic papers - especially from USENIX, SOSP, OSDI conferences
- Vendor whitepapers - but with healthy skepticism
- Open source project exploration - contribute small patches to learn codebases
3. Hands-on Learning Lab
# Home lab setup for experimentation
homelab_projects:
current_experiments:
- name: "eBPF monitoring tools"
status: "Building custom metrics collector"
learning: "Kernel-level observability"
- name: "Chaos engineering with Litmus"
status: "Testing failure scenarios"
learning: "Resilience patterns"
- name: "Service mesh evaluation"
status: "Comparing Istio vs Linkerd"
learning: "Traffic management at scale"
infrastructure:
platform: "Kubernetes cluster on Raspberry Pi"
monitoring: "Prometheus + Grafana + Jaeger"
ci_cd: "GitLab CI with ArgoCD"
cost: "$200/month AWS credits for cloud integration"
4. Community Engagement
- SRE Discord/Slack communities - daily participation
- Local meetups - monthly CNCF and DevOps meetups
- Conference speaking - submitted 3 talks this year on incident response
- Mentoring - guide 2 junior engineers, which forces me to stay sharp
- Open source contributions - maintain a small monitoring tool, contribute to Prometheus
5. Learning from Failures - Internal and External
class IncidentLearningTracker:
def analyze_industry_incidents(self):
"""Study major outages for lessons"""
recent_studies = [
{
"incident": "Facebook Oct 2021 BGP outage",
"lessons": ["Single points of failure in DNS", "Recovery complexity"],
"applied_locally": "Implemented secondary DNS provider"
},
{
"incident": "AWS us-east-1 Dec 2021",
"lessons": ["Multi-region dependencies", "Circuit breaker importance"],
"applied_locally": "Added cross-region failover testing"
}
]
return recent_studies
def internal_learning(self):
"""Extract patterns from our own incidents"""
return {
"quarterly_review": "What patterns are emerging?",
"cross_team_sharing": "Monthly incident learnings presentation",
"runbook_updates": "Continuously improve based on real scenarios"
}
6. Structured Learning Paths
- Currently pursuing: CKS (Certified Kubernetes Security Specialist)
- Completed this year: AWS Solutions Architect Pro, CKAD
- Next up: HashiCorp Terraform Associate
- Long-term goal: Google Cloud Professional Cloud Architect
7. Teaching and Knowledge Sharing
# My knowledge sharing activities
## Internal (at work):
- Monthly "SRE Patterns" lunch & learn sessions
- Incident post-mortem facilitation
- New hire onboarding for SRE practices
- Internal blog posts on "what I learned this week"
## External:
- Technical blog: medium.com/@myusername
- Conference talks: submitted to SREcon, KubeCon
- Open source: maintainer of small monitoring tool
- Mentoring: 2 junior engineers, 1 career switcher
8. Staying Ahead of Trends I try to identify emerging patterns early:
Current attention areas:
- Platform Engineering - evolution beyond traditional SRE
- FinOps - cost optimization becoming critical
- AI/ML for Operations - automated incident response
- WebAssembly - potential impact on deployment patterns
- Sustainability - green computing in infrastructure
My evaluation framework:
- Signal vs noise: Is this solving real problems or just hype?
- Adoption timeline: When will this be production-ready?
- Investment level: Should I learn basics now or wait?
- Career relevance: How does this align with my growth goals?
Key Success Factors:
- Consistency over intensity - 30 minutes daily beats 8 hours monthly
- Applied learning - immediately try new concepts in lab/work
- Community connection - learning with others accelerates understanding
- Teaching others - best way to solidify knowledge
- Balance breadth and depth - stay broad but go deep on core areas
Resources I highly recommend:
- Books: "Observability Engineering", "Learning eBPF", "Kubernetes Patterns"
- Podcasts: "Software Engineering Radio", "The Cloudcast"
- Newsletters: "SRE Weekly", "DevOps'ish", "The New Stack"
- Communities: SRE Slack, r/sre, CNCF Slack channels
This approach has helped me stay current while avoiding information overload. The key is finding sustainable habits that fit into daily work rather than treating learning as separate from doing.
Part 5: CPU Performance Troubleshooting
High CPU Usage Investigation
When encountering high CPU usage in production Go services, here's a systematic approach to investigate and resolve the issue:
1. Verify the Issue
- Check CPU usage metrics in Prometheus or Grafana.
- Confirm the affected service and pod.
2. Check Recent Changes
- Review Git commits and Kubernetes deployments.
- Roll back recent changes if necessary.
3. Analyze CPU Profiles
- Use
pprof
to analyze CPU profiles. - Look for functions with high CPU time.
4. Inspect Goroutines
- Check for goroutine leaks or deadlocks.
- Use
go tool goroutine
to analyze goroutines.
5. Review Database Queries
- Look for slow or blocking database queries.
- Use
EXPLAIN
to analyze query performance.
6. Check External Dependencies
- Verify the performance of external APIs or services.
- Consider caching responses to reduce load.
7. Optimize Code
- Refactor inefficient algorithms or data structures.
- Use concurrency primitives like worker pools.
8. Scale the Service
- Increase the number of replicas in the Kubernetes deployment.
- Use horizontal pod autoscaling based on CPU usage.
9. Review Resource Requests and Limits
- Ensure proper CPU requests and limits are set in the pod spec.
- Adjust limits if the application legitimately needs more CPU.
10. Investigate Node-Level Issues
- Check for other pods on the same node consuming excessive CPU.
- Consider tainting the node or using node affinity.
11. Analyze System-Level Metrics
- Use
kubectl top
to check pod and node CPU usage. - Investigate any anomalies in system-level metrics.
12. Restart the Affected Pods
- As a last resort, restart the pods with high CPU usage.
- Monitor the pods after the restart to ensure the issue is resolved.
Example Investigation: High CPU Usage in data-processor
Service
1. Verify the Issue
- Prometheus shows
data-processor
at 95% CPU usage.
2. Check Recent Changes
- Last Git commit was 2 days ago, no recent deployments.
3. Analyze CPU Profiles
go tool pprof
showsprocessData
function at 80% CPU time.
4. Inspect Goroutines
go tool goroutine
shows 1000+ goroutines indata-processor
.- Many goroutines stuck in
processData
.
5. Review Database Queries
EXPLAIN
showsSELECT * FROM metrics WHERE time > ?
is slow.
6. Check External Dependencies
- No external API calls in
processData
.
7. Optimize Code
- Refactor
processData
to use a worker pool. - Optimize database query to fetch only needed columns.
8. Scale the Service
- Horizontal pod autoscaler added, scaling between 2-10 replicas.
9. Review Resource Requests and Limits
- CPU limit increased from 500m to 1000m in the pod spec.
10. Investigate Node-Level Issues
- No other pods on the node are consuming excessive CPU.
11. Analyze System-Level Metrics
- Node CPU usage is stable, no anomalies detected.
12. Restart the Affected Pods
- Pods restarted, CPU usage normalized to 30%.
CPU Troubleshooting Toolkit:
#!/bin/bash
# CPU investigation script for Go services
echo "🔍 Starting CPU investigation for Go service..."
# 1. Check current CPU usage
echo "📊 Current CPU usage:"
kubectl top pods -l app=go-service
# 2. Get profiling data
echo "🔬 Collecting CPU profile (30 seconds)..."
kubectl port-forward svc/go-service 6060:6060 &
sleep 2
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
# 3. Check memory allocations
echo "🧠 Checking memory allocations..."
go tool pprof http://localhost:6060/debug/pprof/allocs
# 4. Check GC performance
echo "🗑️ Checking garbage collection stats..."
curl -s http://localhost:6060/debug/vars | jq '.memstats'
# 5. Container-level CPU investigation
echo "🐳 Container CPU stats..."
docker stats --no-stream $(docker ps --filter "name=go-service" --format "{{.Names}}")
# 6. Process-level analysis
echo "⚙️ Process CPU breakdown..."
top -H -p $(pgrep go-service) -n 1
# 7. strace for system call analysis
echo "🔧 System call analysis (10 seconds)..."
timeout 10s strace -c -p $(pgrep go-service)
Code-Level Optimizations:
// Common CPU bottleneck fixes
// 1. Fix: Inefficient JSON parsing
// BEFORE - Slow JSON handling
func processRequestSlow(w http.ResponseWriter, r *http.Request) {
var data map[string]interface{}
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &data)
// Process data...
}
// AFTER - Optimized JSON handling
type RequestData struct {
UserID string `json:"user_id"`
Action string `json:"action"`
// Define specific fields instead of interface{}
}
func processRequestFast(w http.ResponseWriter, r *http.Request) {
var data RequestData
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields() // Faster parsing
if err := decoder.Decode(&data); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Process typed data...
}
// 2. Fix: CPU-intensive loops
// BEFORE - O(n²) algorithm
func findDuplicatesSlow(items []string) []string {
var duplicates []string
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i] == items[j] {
duplicates = append(duplicates, items[i])
break
}
}
}
return duplicates
}
// AFTER - O(n) algorithm using map
func findDuplicatesFast(items []string) []string {
seen := make(map[string]bool)
var duplicates []string
for _, item := range items {
if seen[item] {
duplicates = append(duplicates, item)
} else {
seen[item] = true
}
}
return duplicates
}
// 3. Fix: Inefficient string operations
// BEFORE - Repeated string concatenation
func buildQuerySlow(filters []string) string {
query := "SELECT * FROM table WHERE "
for i, filter := range filters {
if i > 0 {
query += " AND "
}
query += filter
}
return query
}
// AFTER - Use strings.Builder
func buildQueryFast(filters []string) string {
var builder strings.Builder
builder.WriteString("SELECT * FROM table WHERE ")
for i, filter := range filters {
if i > 0 {
builder.WriteString(" AND ")
}
builder.WriteString(filter)
}
return builder.String()
}
Summary
This comprehensive SRE interview guide covers:
Technical Areas
- Performance Optimization: Go-specific optimizations, memory management, CPU profiling
- System Architecture: Microservices vs monolith, database sharding, scalability patterns
- Monitoring & Alerting: Prometheus rules, observability, incident response
- Infrastructure: Container orchestration, deployment strategies, reliability engineering
Behavioral Areas
- Leadership: Convincing teams, managing trade-offs, stakeholder communication
- Continuous Learning: Staying current with technology, community engagement
- Problem Solving: Systematic troubleshooting, root cause analysis
Key Takeaways
- Data-driven decisions: Always quantify impact and use metrics
- Systematic approach: Follow structured methodologies for troubleshooting
- Collaboration: Work with teams rather than against them
- Continuous improvement: Learn from every incident and optimization
- Balance trade-offs: Consider reliability, performance, and business needs
This guide provides both theoretical knowledge and practical examples that demonstrate real-world SRE experience. Use these patterns and adapt them to your specific situations during interviews.
This guide is designed to help you prepare for senior SRE roles by covering both technical depth and the soft skills needed to succeed in Site Reliability Engineering.