Skip to main content

SRE Interview Questions - Comprehensive Guide

This comprehensive guide covers Site Reliability Engineering interview questions with detailed answers, code examples, and practical solutions for high-performance systems.

Part 1: Performance Optimization & Go Specifics

Go Performance Optimization Examples

1. Efficient Duplicate Detection

// BEFORE - O(n²) algorithm
func findDuplicatesSlow(items []string) []string {
var duplicates []string
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i] == items[j] {
duplicates = append(duplicates, items[i])
break
}
}
}
return duplicates
}

// AFTER - O(n) algorithm using map
func findDuplicatesFast(items []string) []string {
seen := make(map[string]bool)
var duplicates []string

for _, item := range items {
if seen[item] {
duplicates = append(duplicates, item)
} else {
seen[item] = true
}
}
return duplicates

}

**2. String Concatenation Optimization**

```go
// 3. Fix: Excessive string concatenation
// BEFORE - Creates new strings repeatedly
func buildResponseSlow(data []Record) string {
var result string
for _, record := range data {
result += record.ID + "," + record.Name + "\n" // Slow!
}
return result
}

// AFTER - Use strings.Builder for efficiency
func buildResponseFast(data []Record) string {
var builder strings.Builder
builder.Grow(len(data) * 50) // Pre-allocate capacity

for _, record := range data {
builder.WriteString(record.ID)
builder.WriteString(",")
builder.WriteString(record.Name)
builder.WriteString("\n")
}
return builder.String()
}

3. Goroutine Management

// 4. Fix: Goroutine leaks
// BEFORE - Goroutines without proper cleanup
func handleRequestsLeaky() {
for {
go func() {
// Long-running operation without context cancellation
processData() // Never exits!
}()
}
}

// AFTER - Proper goroutine management
func handleRequestsProper(ctx context.Context) {
semaphore := make(chan struct{}, 100) // Limit concurrent goroutines

for {
select {
case <-ctx.Done():
return
default:
semaphore <- struct{}{} // Acquire
go func() {
defer func() { <-semaphore }() // Release

// Use context for cancellation
processDataWithContext(ctx)
}()
}
}
}

4. Database Query Optimization

// 5. Fix: Inefficient database queries in loop
// BEFORE - N+1 query problem
func getUserDataSlow(userIDs []string) []UserData {
var users []UserData
for _, id := range userIDs {
user := db.QueryUser(id) // Database hit per user!
users = append(users, user)
}
return users
}

// AFTER - Batch database queries
func getUserDataFast(userIDs []string) []UserData {
// Single query for all users
query := "SELECT * FROM users WHERE id IN (" +
strings.Join(userIDs, ",") + ")"
return db.QueryUsers(query)
}

5. Memory and GC Optimization

// 6. Optimize garbage collection pressure
type MetricsCollector struct {
// BEFORE - Creates garbage
// metrics []map[string]interface{}

// AFTER - Use object pools and typed structs
metricPool sync.Pool
metrics []Metric
}

type Metric struct {
Name string
Value float64
Timestamp int64
}

func NewMetricsCollector() *MetricsCollector {
mc := &MetricsCollector{
metrics: make([]Metric, 0, 1000), // Pre-allocate capacity
}

mc.metricPool = sync.Pool{
New: func() interface{} {
return &Metric{}
},
}

return mc
}

func (mc *MetricsCollector) AddMetric(name string, value float64) {
metric := mc.metricPool.Get().(*Metric)
metric.Name = name
metric.Value = value
metric.Timestamp = time.Now().Unix()

mc.metrics = append(mc.metrics, *metric)

// Return to pool
mc.metricPool.Put(metric)
}

6. Continuous Profiling

// 7. CPU profiling integration
func enableContinuousProfiling() {
// Enable continuous CPU profiling
if os.Getenv("ENABLE_PROFILING") == "true" {
go func() {
for {
f, err := os.Create(fmt.Sprintf("cpu-profile-%d.prof", time.Now().Unix()))
if err != nil {
log.Printf("Could not create CPU profile: %v", err)
time.Sleep(30 * time.Second)
continue
}

pprof.StartCPUProfile(f)
time.Sleep(30 * time.Second)
pprof.StopCPUProfile()
f.Close()

// Upload to object storage for analysis
uploadProfile(f.Name())
}
}()
}
}

Part 2: Monitoring and Alerting

Prometheus Monitoring Rules

# Prometheus rules for Go service CPU monitoring
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: go-service-cpu-alerts
spec:
groups:
- name: go-service-performance
rules:
- alert: GoServiceHighCPU
expr: |
(
rate(container_cpu_usage_seconds_total{pod=~"go-service-.*"}[5m])
) > 0.8
for: 5m
labels:
severity: warning
service: go-service
annotations:
summary: "Go service CPU usage above 80%"
description: "Pod {{ $labels.pod }} CPU usage is {{ $value }}%"

- alert: GoServiceGoroutineLeak
expr: |
go_goroutines{job="go-service"} > 10000
for: 10m
labels:
severity: critical
annotations:
summary: "Potential goroutine leak detected"
description: "Service {{ $labels.instance }} has {{ $value }} goroutines"

- alert: GoServiceGCPressure
expr: |
rate(go_gc_duration_seconds_sum[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High GC pressure in Go service"
description: "GC taking {{ $value }}s per collection cycle"

- alert: GoServiceMemoryLeak
expr: |
go_memstats_heap_inuse_bytes / go_memstats_heap_sys_bytes > 0.9
for: 15m
labels:
severity: critical
annotations:
summary: "Potential memory leak in Go service"
description: "Heap utilization is {{ $value | humanizePercentage }}"

Performance Testing and Validation

// Benchmark tests to validate optimizations
func BenchmarkProcessRequestSlow(b *testing.B) {
data := generateTestData(1000)

b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestSlow(data)
}
}

func BenchmarkProcessRequestFast(b *testing.B) {
data := generateTestData(1000)

b.ResetTimer()
for i := 0; i < b.N; i++ {
processRequestFast(data)
}
}

// Run benchmarks with memory profiling
// go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof

// CPU profiling analysis script
func analyzeCPUProfile() {
// go tool pprof cpu.prof
// Commands in pprof:
// (pprof) top20 - Show top 20 CPU consumers
// (pprof) list function - Show source code with CPU usage
// (pprof) web - Generate web visualization
// (pprof) flamegraph - Generate flame graph
}

Part 3: System Architecture Patterns

Microservices vs Monolithic Architecture

Strong Answer:

Analogy - Car Pool vs Train:

Microservices (Car Pool Approach):

🚗 🚗 🚗 🚗 🚗  (Independent cars)
Each car (service) can:
- Take different routes
- Stop independently
- Break down without affecting others
- Scale by adding more cars
- Use different fuel types (technologies)

Monolithic (Train Approach):

🚂-🚃-🚃-🚃-🚃  (Connected train)
The train (application):
- All cars must follow the same route
- If engine fails, entire train stops
- All cars must move together
- Scale by making train longer or faster
- Single fuel type for entire train

Decision Framework for Analytics Dashboard:

# Decision matrix for architecture choice
class ArchitectureDecision:
def __init__(self):
self.factors = {
'team_size': 0,
'complexity': 0,
'scalability_needs': 0,
'technology_diversity': 0,
'deployment_frequency': 0,
'operational_maturity': 0
}

def assess_microservices_fit(self, dashboard_requirements):
"""Assess if microservices are appropriate for analytics dashboard"""

# Analytics dashboard components
services = {
'metrics_collector': {
'responsibility': 'Collect metrics from various sources',
'scalability': 'High - handles high volume ingestion',
'technology': 'Go - for performance'
},
'data_processor': {
'responsibility': 'Process and aggregate metrics',
'scalability': 'Medium - CPU intensive operations',
'technology': 'Python - for data processing libraries'
},
'api_gateway': {
'responsibility': 'Serve dashboard APIs',
'scalability': 'High - many concurrent users',
'technology': 'Node.js - for async I/O'
},
'notification_service': {
'responsibility': 'Send alerts and notifications',
'scalability': 'Low - occasional alerts',
'technology': 'Python - for integrations'
},
'frontend_bff': {
'responsibility': 'Backend for Frontend',
'scalability': 'Medium - aggregates data for UI',
'technology': 'React/TypeScript'
}
}

return self.evaluate_services(services)

def evaluate_services(self, services):
"""Evaluate microservices approach"""

benefits = [
"Independent scaling per service",
"Technology diversity (Go, Python, Node.js)",
"Team autonomy - different teams own different services",
"Fault isolation - metrics collection failure doesn't break UI",
"Independent deployments - can update notification without affecting API"
]

challenges = [
"Network latency between services",
"Data consistency across services",
"Distributed system complexity",
"Service discovery and load balancing",
"Monitoring and debugging across services"
]

return {
'benefits': benefits,
'challenges': challenges,
'recommendation': self.make_recommendation()
}

def make_recommendation(self):
"""Make architecture recommendation for analytics dashboard"""

if self.is_early_stage():
return {
'choice': 'MODULAR_MONOLITH',
'reason': 'Start simple, can extract services later',
'structure': self.modular_monolith_structure()
}
else:
return {
'choice': 'MICROSERVICES',
'reason': 'Scale and team benefits outweigh complexity',
'structure': self.microservices_structure()
}

def modular_monolith_structure(self):
"""Modular monolith approach - best of both worlds"""
return {
'structure': """
analytics-dashboard/
├── cmd/ # Application entry points
├── internal/
│ ├── metrics/ # Metrics collection module
│ ├── processing/ # Data processing module
│ ├── api/ # API handling module
│ ├── notifications/ # Alert module
│ └── dashboard/ # UI serving module
├── pkg/ # Shared libraries
└── deployments/ # Single deployment unit
""",
'benefits': [
'Single deployment and testing',
'Easier debugging and development',
'No network latency between modules',
'Simpler operational overhead',
'Can extract to microservices later'
]
}

def microservices_structure(self):
"""Full microservices approach"""
return {
'structure': """
Analytics Platform Microservices:

┌─────────────────┐ ┌─────────────────┐
│ Frontend SPA │ │ API Gateway │
│ (React) │◄──►│ (Kong) │
└─────────────────┘ └─────────┬───────┘

┌────────────────────┼────────────────────┐
│ │ │
┌───────────▼────────┐ ┌─────────▼─────────┐ ┌─────────▼─────────┐
│ Metrics Collector │ │ Data Processor │ │ Notification Svc │
│ (Go) │ │ (Python) │ │ (Python) │
└─────────┬──────────┘ └─────────┬─────────┘ └───────────────────┘
│ │
┌─────────▼──────────────────────▼─────────┐
│ Message Queue │
│ (Kafka/Redis) │
└──────────────────────────────────────────┘
""",
'communication': 'Async messaging + HTTP APIs',
'data_strategy': 'Event sourcing with CQRS'
}

def is_early_stage(self):
"""Determine if project is in early stage"""
return (
self.factors['team_size'] < 10 and
self.factors['operational_maturity'] < 3
)

Implementation Examples:

Microservices Implementation:

// Metrics Collector Service (Go)
package main

type MetricsCollector struct {
kafka *kafka.Producer
redis *redis.Client
handlers map[string]MetricHandler
}

func (mc *MetricsCollector) CollectMetric(metric Metric) error {
// Process metric
processed := mc.handlers[metric.Type].Process(metric)

// Publish to message queue for other services
return mc.kafka.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &metric.Type,
Partition: kafka.PartitionAny,
},
Value: processed.ToJSON(),
}, nil)
}
# Data Processor Service (Python)
import asyncio
from kafka import KafkaConsumer

class DataProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'metrics-topic',
bootstrap_servers=['kafka:9092'],
group_id='data-processors'
)

async def process_metrics(self):
for message in self.consumer:
metric = Metric.from_json(message.value)

# Process and aggregate
aggregated = await self.aggregate_metric(metric)

# Store in time-series database
await self.store_metric(aggregated)

# Trigger alerts if needed
await self.check_alerts(aggregated)

Monolithic Implementation:

# Modular Monolith (Python)
from dashboard.modules import metrics, processing, api, notifications

class AnalyticsDashboard:
def __init__(self):
self.metrics = metrics.MetricsModule()
self.processor = processing.ProcessingModule()
self.api = api.APIModule()
self.notifications = notifications.NotificationModule()

def handle_metric(self, raw_metric):
# All in same process - no network calls
metric = self.metrics.parse(raw_metric)
processed = self.processor.aggregate(metric)

# Check for alerts
if self.processor.check_thresholds(processed):
self.notifications.send_alert(processed)

return processed

# Single deployment with clear module boundaries
# Can be extracted to separate services later

Decision Matrix:

FactorMonolithMicroservices
Team Size< 10 developers> 10 developers
ComplexitySimple-mediumComplex domain
Scale< 1M requests/day> 10M requests/day
TechnologySingle stack preferredMultiple technologies needed
DeploymentWeekly/monthlyMultiple times per day
Data ConsistencyStrong consistency neededEventual consistency OK

For Analytics Dashboard Specifically:

  • Early Stage: Start with modular monolith
  • Growth Stage: Extract high-scale components (metrics collector) first
  • Mature Stage: Full microservices with proper DevOps practices

Database Sharding Implementation

Strong Answer:

Library Analogy Explanation:

📚 Traditional Database (Single Library):
All books in one building - gets crowded, hard to find books,
long queues at checkout

📚📚📚 Sharded Database (Multiple Library Branches):
Books distributed across locations:
- Fiction Library: Books A-H
- Science Library: Books I-P
- History Library: Books Q-Z

Each library (shard) operates independently but part of same system

**Sharding Strategy for Analytics Dashboard:**

```python
# Database sharding implementation
import hashlib
import datetime
from typing import Dict, List

class DatabaseSharding:
def __init__(self):
self.shards = {
'shard_americas': {
'host': 'db-americas.example.com',
'regions': ['us', 'ca', 'mx', 'br'],
'connection': self.create_connection('db-americas')
},
'shard_europe': {
'host': 'db-europe.example.com',
'regions': ['uk', 'de', 'fr', 'es'],
'connection': self.create_connection('db-europe')
},
'shard_asia': {
'host': 'db-asia.example.com',
'regions': ['jp', 'sg', 'au', 'in'],
'connection': self.create_connection('db-asia')
}
}

# Time-based sharding for metrics
self.time_shards = {
'metrics_current': 'Last 7 days - hot data',
'metrics_recent': 'Last 30 days - warm data',
'metrics_archive': 'Older than 30 days - cold data'
}

def get_user_shard(self, user_id: str) -> str:
"""Determine shard based on user ID hash"""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_index = hash_value % len(self.shards)
return list(self.shards.keys())[shard_index]

def get_region_shard(self, region: str) -> str:
"""Determine shard based on geographical region"""
for shard_name, shard_info in self.shards.items():
if region.lower() in shard_info['regions']:
return shard_name
return 'shard_americas' # Default fallback

def get_time_shard(self, timestamp: datetime.datetime) -> str:
"""Determine shard based on data age"""
now = datetime.datetime.now()
age = now - timestamp

if age.days <= 7:
return 'metrics_current'
elif age.days <= 30:
return 'metrics_recent'
else:
return 'metrics_archive'

def route_query(self, query_type: str, **kwargs):
"""Route queries to appropriate shard"""

if query_type == 'user_orders':
shard = self.get_user_shard(kwargs['user_id'])
return self.execute_query(shard, query_type, **kwargs)

elif query_type == 'regional_metrics':
shard = self.get_region_shard(kwargs['region'])
return self.execute_query(shard, query_type, **kwargs)

elif query_type == 'historical_data':
# Query multiple time shards and aggregate
return self.query_time_shards(kwargs['start_date'], kwargs['end_date'])

elif query_type == 'cross_shard_analytics':
# Fan-out query to all shards
return self.fan_out_query(query_type, **kwargs)

def query_time_shards(self, start_date, end_date):
"""Query across time-based shards"""
results = []

for shard_name in self.time_shards.keys():
try:
shard_result = self.execute_query(shard_name, 'time_range_query',
start_date=start_date, end_date=end_date)
results.extend(shard_result)
except ShardUnavailableError:
# Handle shard failures gracefully
self.log_shard_failure(shard_name)
continue

return self.aggregate_results(results)

def fan_out_query(self, query_type: str, **kwargs):
"""Execute query across all shards and aggregate results"""
import concurrent.futures

results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
# Submit queries to all shards concurrently
future_to_shard = {
executor.submit(self.execute_query, shard_name, query_type, **kwargs): shard_name
for shard_name in self.shards.keys()
}

for future in concurrent.futures.as_completed(future_to_shard):
shard_name = future_to_shard[future]
try:
result = future.result(timeout=30) # 30 second timeout
results[shard_name] = result
except Exception as e:
self.log_query_failure(shard_name, e)
results[shard_name] = None

return self.aggregate_cross_shard_results(results)

# Shard-aware query examples
class ShardedAnalyticsQueries:
def __init__(self, sharding: DatabaseSharding):
self.sharding = sharding

def get_user_orders(self, user_id: str):
"""Get orders for specific user"""
return self.sharding.route_query('user_orders', user_id=user_id)

def get_regional_sales(self, region: str, date_range: tuple):
"""Get sales data for specific region"""
return self.sharding.route_query('regional_metrics',
region=region,
start_date=date_range[0],
end_date=date_range[1])

def get_global_metrics(self, metric_type: str):
"""Get global metrics across all shards"""
return self.sharding.route_query('cross_shard_analytics',
metric_type=metric_type)

def get_historical_trends(self, days_back: int):
"""Get historical data across time shards"""
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)

return self.sharding.query_time_shards(start_date, end_date)

PostgreSQL Sharding Implementation:

-- Create shard-specific tables
-- Shard 1: Americas
CREATE TABLE orders_americas (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('US', 'CA', 'MX', 'BR')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Shard 2: Europe
CREATE TABLE orders_europe (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('UK', 'DE', 'FR', 'ES')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Shard 3: Asia
CREATE TABLE orders_asia (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
region VARCHAR(2) CHECK (region IN ('JP', 'SG', 'AU', 'IN')),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Create foreign data wrapper for cross-shard queries
CREATE EXTENSION postgres_fdw;

CREATE SERVER shard_europe
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'db-europe.example.com', port '5432', dbname 'analytics');

CREATE USER MAPPING FOR postgres
SERVER shard_europe
OPTIONS (user 'analytics_user', password 'password');

-- Create foreign tables
CREATE FOREIGN TABLE orders_europe_remote (
id UUID,
user_id UUID,
region VARCHAR(2),
order_total DECIMAL(10,2),
created_at TIMESTAMP WITH TIME ZONE
)
SERVER shard_europe
OPTIONS (schema_name 'public', table_name 'orders_europe');

-- View for cross-shard queries
CREATE VIEW orders_global AS
SELECT 'americas' as shard, * FROM orders_americas
UNION ALL
SELECT 'europe' as shard, * FROM orders_europe_remote
UNION ALL
SELECT 'asia' as shard, * FROM orders_asia_remote;

Application-Level Sharding Middleware:

# Flask middleware for automatic shard routing
class ShardingMiddleware:
def __init__(self, app):
self.app = app
self.sharding = DatabaseSharding()

def __call__(self, environ, start_response):
# Extract sharding context from request
request = Request(environ)

# Determine shard based on request
if 'user_id' in request.args:
shard = self.sharding.get_user_shard(request.args['user_id'])
environ['DATABASE_SHARD'] = shard

elif 'region' in request.args:
shard = self.sharding.get_region_shard(request.args['region'])
environ['DATABASE_SHARD'] = shard

else:
# Cross-shard query required
environ['DATABASE_SHARD'] = 'cross_shard'

return self.app(environ, start_response)

# Flask route with shard awareness
@app.route('/api/user/<user_id>/orders')
def get_user_orders(user_id):
shard = request.environ.get('DATABASE_SHARD')

if shard == 'cross_shard':
# This shouldn't happen for user-specific queries
abort(400, "Invalid request - user_id required")

# Use shard-specific connection
db = get_shard_connection(shard)
orders = db.execute(
"SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC",
(user_id,)
)

return jsonify(orders)

@app.route('/api/analytics/global')
def get_global_analytics():
# Cross-shard query - aggregate from all shards
sharding = DatabaseSharding()
results = sharding.fan_out_query('global_analytics')

return jsonify({
'total_orders': sum(r['order_count'] for r in results.values()),
'total_revenue': sum(r['revenue'] for r in results.values()),
'by_region': results
})

Shard Management and Failover:

class ShardManager:
def __init__(self):
self.shards = DatabaseSharding().shards
self.health_check_interval = 30 # seconds

def monitor_shard_health(self):
"""Continuously monitor shard health"""
while True:
for shard_name, shard_info in self.shards.items():
try:
# Simple health check query
conn = shard_info['connection']
conn.execute("SELECT 1")
self.mark_shard_healthy(shard_name)

except Exception as e:
self.mark_shard_unhealthy(shard_name, e)
self.trigger_failover(shard_name)

time.sleep(self.health_check_interval)

def trigger_failover(self, failed_shard: str):
"""Handle shard failover"""
# Redirect traffic to healthy shards
self.redistribute_load(failed_shard)

# Alert operations team
self.send_alert(f"Shard {failed_shard} is unhealthy")

# Attempt automated recovery
self.attempt_shard_recovery(failed_shard)

def rebalance_shards(self, new_shard_config: Dict):
"""Rebalance data across shards"""
# This is a complex operation requiring:
# 1. Data migration planning
# 2. Consistent hashing updates
# 3. Gradual traffic shifting
# 4. Rollback capability

migration_plan = self.create_migration_plan(new_shard_config)

for step in migration_plan:
self.execute_migration_step(step)
self.verify_migration_step(step)

self.update_shard_routing(new_shard_config)

Benefits and Challenges:

Benefits:

  • Horizontal Scalability: Add more shards as data grows
  • Performance: Smaller datasets per shard = faster queries
  • Isolation: Shard failures don't affect other shards
  • Geographic Distribution: Data close to users

Challenges:

  • Cross-shard Queries: Complex and slower
  • Rebalancing: Moving data between shards is difficult
  • Consistency: Transactions across shards are complex
  • Operational Complexity: Multiple databases to manage

For Analytics Dashboard:

  • User-based sharding: For user-specific dashboards
  • Time-based sharding: For historical data (hot/warm/cold)
  • Feature-based sharding: Separate shards for different metrics types

Part 4: Behavioral & Collaboration

Convincing Developers - Code Reliability Changes

Strong Answer:

Situation: Our payment processing service was experiencing intermittent failures during peak traffic, causing revenue loss. The development team had implemented a quick fix that worked locally but didn't address the underlying concurrency issues.

Approach - Data-Driven Persuasion:

1. Quantified the Business Impact

# I created a dashboard showing the real cost
class ReliabilityImpactAnalysis:
def calculate_revenue_impact(self):
return {
"failed_transactions_per_hour": 150,
"average_transaction_value": 85.50,
"revenue_loss_per_hour": 150 * 85.50, # $12,825
"monthly_projected_loss": 12825 * 24 * 30, # $9.23M
"customer_churn_risk": "23 angry customer emails in 2 days"
}

2. Made It Personal and Collaborative Instead of saying "your code is wrong," I said:

  • "I found some interesting patterns in our production data that might help us improve performance"
  • "What do you think about these metrics? I'm curious about your thoughts on the concurrency patterns"
  • "Could we pair program on this? I'd love to understand your approach better"

3. Proposed Solutions, Not Just Problems I came with a working prototype:

# Before (their approach)
def process_payment(payment_data):
global payment_queue
payment_queue.append(payment_data) # Race condition!
return process_queue()

# After (my suggested approach)
import threading
from queue import Queue

class ThreadSafePaymentProcessor:
def __init__(self):
self.payment_queue = Queue()
self.lock = threading.Lock()

def process_payment(self, payment_data):
with self.lock:
# Thread-safe processing
return self.safe_process(payment_data)

4. Used Their Language and Priorities

  • Framed it as a "performance optimization" rather than "fixing bugs"
  • Showed how it would reduce their on-call burden: "No more 3 AM pages about payment failures"
  • Highlighted career benefits: "This would be a great story for your next performance review"

Result: They not only adopted the changes but became advocates for reliability practices. The lead developer started attending SRE meetings and later implemented circuit breakers proactively.

Key Lessons:

  • Data beats opinions - metrics are harder to argue with
  • Collaboration over confrontation - "How can we solve this together?"
  • Show, don't tell - working code examples are persuasive
  • Align with their incentives - make reliability their win, not your win

Trade-off Between Reliability and Feature Delivery

Strong Answer:

Situation: During a major product launch, we were at 97% availability (below our 99.5% SLO), but the product team wanted to deploy a new feature that would drive user adoption for the launch.

The Dilemma:

  • Product pressure: "This feature will increase user engagement by 40%"
  • Reliability concern: Error budget was nearly exhausted
  • Timeline: Launch was in 3 days, couldn't delay

My Decision Process:

1. Quantified Both Sides

# Business impact calculation
launch_impact = {
"projected_new_users": 50000,
"revenue_per_user": 25,
"total_revenue_opportunity": 1.25e6, # $1.25M
"competitive_advantage": "First-mover in market segment"
}

reliability_risk = {
"current_error_budget_used": 0.85, # 85% of monthly budget
"remaining_budget": 0.15,
"days_remaining_in_month": 8,
"projected_overage": 0.3, # 30% over budget
"customer_impact": "Potential service degradation"
}

2. Created a Risk-Mitigation Plan Instead of a binary yes/no, I proposed a conditional approach:

# Feature deployment plan with guardrails
deployment_strategy:
phase_1:
rollout: 5% of users
duration: 4 hours
success_criteria:
- error_rate < 0.1%
- p99_latency < 200ms
- no_critical_alerts

phase_2:
rollout: 25% of users
duration: 12 hours
automatic_rollback: true
conditions:
- error_rate > 0.2% for 5 minutes
- p99_latency > 500ms for 10 minutes

phase_3:
rollout: 100% of users
requires: manual_approval_after_phase_2

3. Communicated Trade-offs Transparently I presented to stakeholders:

"We can launch this feature, but here's what it means:

  • Upside: $1.25M revenue opportunity, competitive advantage
  • Downside: 30% chance of service degradation affecting existing users
  • Mitigation: Feature flags for instant rollback, enhanced monitoring
  • Commitment: If reliability suffers, we pause new features until we're back on track"

4. The Decision and Implementation We proceeded with the phased rollout:

class FeatureLaunchManager:
def __init__(self):
self.error_budget_monitor = ErrorBudgetMonitor()
self.feature_flag = FeatureFlag("new_user_onboarding")

def monitor_launch_health(self):
while self.feature_flag.enabled:
current_error_rate = self.get_error_rate()
budget_status = self.error_budget_monitor.get_status()

if budget_status.will_exceed_monthly_budget():
self.trigger_rollback("Error budget exceeded")
break

if current_error_rate > 0.002: # 0.2%
self.reduce_rollout_percentage()

time.sleep(60) # Check every minute during launch

def trigger_rollback(self, reason):
self.feature_flag.disable()
self.alert_stakeholders(f"Feature rolled back: {reason}")
self.schedule_post_mortem()

The Outcome:

  • Feature launched successfully to 25% of users
  • Error rate increased slightly but stayed within acceptable bounds
  • Revenue target was hit with partial rollout
  • We didn't exceed error budget
  • Built trust with product team by delivering on promises

Key Principles I Used:

  1. Transparency: Show the math, don't hide trade-offs
  2. Risk mitigation: Find ways to reduce downside while preserving upside
  3. Stakeholder alignment: Make everyone accountable for the decision
  4. Data-driven decisions: Use metrics, not emotions
  5. Learning mindset: Treat it as an experiment with clear success/failure criteria

Follow-up Actions:

  • Conducted a post-launch review
  • Used learnings to improve our launch process
  • Created better error budget forecasting tools
  • Established clearer guidelines for future trade-off decisions

Staying Current with SRE Practices and Technologies

Strong Answer:

My Learning Strategy - Multi-layered Approach:

1. Technical Deep Dives

# I maintain a personal learning dashboard
learning_tracker = {
"current_focus": [
"eBPF for system observability",
"Kubernetes operators for automation",
"AI/ML for incident prediction"
],
"weekly_commitments": {
"reading": "2 hours of technical papers",
"hands_on": "4 hours lab/experimentation",
"community": "1 hour in SRE forums/Slack"
},
"monthly_goals": [
"Complete one new certification",
"Contribute to one open source project",
"Write one technical blog post"
]
}

2. Resource Mix - Quality over Quantity

Daily (30 minutes morning routine):

  • SRE Weekly Newsletter - concise industry updates
  • Hacker News - scan for infrastructure/reliability topics
  • Internal Slack channels - #sre-learning, #incidents-learned

Weekly (2-3 hours):

  • Google SRE Book Club - our team works through chapters together
  • Kubernetes documentation - staying current with new features
  • Conference talk videos - KubeCon, SREcon, Velocity recordings

Monthly Deep Dives:

  • Academic papers - especially from USENIX, SOSP, OSDI conferences
  • Vendor whitepapers - but with healthy skepticism
  • Open source project exploration - contribute small patches to learn codebases

3. Hands-on Learning Lab

# Home lab setup for experimentation
homelab_projects:
current_experiments:
- name: "eBPF monitoring tools"
status: "Building custom metrics collector"
learning: "Kernel-level observability"

- name: "Chaos engineering with Litmus"
status: "Testing failure scenarios"
learning: "Resilience patterns"

- name: "Service mesh evaluation"
status: "Comparing Istio vs Linkerd"
learning: "Traffic management at scale"

infrastructure:
platform: "Kubernetes cluster on Raspberry Pi"
monitoring: "Prometheus + Grafana + Jaeger"
ci_cd: "GitLab CI with ArgoCD"
cost: "$200/month AWS credits for cloud integration"

4. Community Engagement

  • SRE Discord/Slack communities - daily participation
  • Local meetups - monthly CNCF and DevOps meetups
  • Conference speaking - submitted 3 talks this year on incident response
  • Mentoring - guide 2 junior engineers, which forces me to stay sharp
  • Open source contributions - maintain a small monitoring tool, contribute to Prometheus

5. Learning from Failures - Internal and External

class IncidentLearningTracker:
def analyze_industry_incidents(self):
"""Study major outages for lessons"""
recent_studies = [
{
"incident": "Facebook Oct 2021 BGP outage",
"lessons": ["Single points of failure in DNS", "Recovery complexity"],
"applied_locally": "Implemented secondary DNS provider"
},
{
"incident": "AWS us-east-1 Dec 2021",
"lessons": ["Multi-region dependencies", "Circuit breaker importance"],
"applied_locally": "Added cross-region failover testing"
}
]
return recent_studies

def internal_learning(self):
"""Extract patterns from our own incidents"""
return {
"quarterly_review": "What patterns are emerging?",
"cross_team_sharing": "Monthly incident learnings presentation",
"runbook_updates": "Continuously improve based on real scenarios"
}

6. Structured Learning Paths

  • Currently pursuing: CKS (Certified Kubernetes Security Specialist)
  • Completed this year: AWS Solutions Architect Pro, CKAD
  • Next up: HashiCorp Terraform Associate
  • Long-term goal: Google Cloud Professional Cloud Architect

7. Teaching and Knowledge Sharing

# My knowledge sharing activities

## Internal (at work):

- Monthly "SRE Patterns" lunch & learn sessions
- Incident post-mortem facilitation
- New hire onboarding for SRE practices
- Internal blog posts on "what I learned this week"

## External:

- Technical blog: medium.com/@myusername
- Conference talks: submitted to SREcon, KubeCon
- Open source: maintainer of small monitoring tool
- Mentoring: 2 junior engineers, 1 career switcher

8. Staying Ahead of Trends I try to identify emerging patterns early:

Current attention areas:

  • Platform Engineering - evolution beyond traditional SRE
  • FinOps - cost optimization becoming critical
  • AI/ML for Operations - automated incident response
  • WebAssembly - potential impact on deployment patterns
  • Sustainability - green computing in infrastructure

My evaluation framework:

  1. Signal vs noise: Is this solving real problems or just hype?
  2. Adoption timeline: When will this be production-ready?
  3. Investment level: Should I learn basics now or wait?
  4. Career relevance: How does this align with my growth goals?

Key Success Factors:

  • Consistency over intensity - 30 minutes daily beats 8 hours monthly
  • Applied learning - immediately try new concepts in lab/work
  • Community connection - learning with others accelerates understanding
  • Teaching others - best way to solidify knowledge
  • Balance breadth and depth - stay broad but go deep on core areas

Resources I highly recommend:

  • Books: "Observability Engineering", "Learning eBPF", "Kubernetes Patterns"
  • Podcasts: "Software Engineering Radio", "The Cloudcast"
  • Newsletters: "SRE Weekly", "DevOps'ish", "The New Stack"
  • Communities: SRE Slack, r/sre, CNCF Slack channels

This approach has helped me stay current while avoiding information overload. The key is finding sustainable habits that fit into daily work rather than treating learning as separate from doing.

Part 5: CPU Performance Troubleshooting

High CPU Usage Investigation

When encountering high CPU usage in production Go services, here's a systematic approach to investigate and resolve the issue:

1. Verify the Issue

  • Check CPU usage metrics in Prometheus or Grafana.
  • Confirm the affected service and pod.

2. Check Recent Changes

  • Review Git commits and Kubernetes deployments.
  • Roll back recent changes if necessary.

3. Analyze CPU Profiles

  • Use pprof to analyze CPU profiles.
  • Look for functions with high CPU time.

4. Inspect Goroutines

  • Check for goroutine leaks or deadlocks.
  • Use go tool goroutine to analyze goroutines.

5. Review Database Queries

  • Look for slow or blocking database queries.
  • Use EXPLAIN to analyze query performance.

6. Check External Dependencies

  • Verify the performance of external APIs or services.
  • Consider caching responses to reduce load.

7. Optimize Code

  • Refactor inefficient algorithms or data structures.
  • Use concurrency primitives like worker pools.

8. Scale the Service

  • Increase the number of replicas in the Kubernetes deployment.
  • Use horizontal pod autoscaling based on CPU usage.

9. Review Resource Requests and Limits

  • Ensure proper CPU requests and limits are set in the pod spec.
  • Adjust limits if the application legitimately needs more CPU.

10. Investigate Node-Level Issues

  • Check for other pods on the same node consuming excessive CPU.
  • Consider tainting the node or using node affinity.

11. Analyze System-Level Metrics

  • Use kubectl top to check pod and node CPU usage.
  • Investigate any anomalies in system-level metrics.

12. Restart the Affected Pods

  • As a last resort, restart the pods with high CPU usage.
  • Monitor the pods after the restart to ensure the issue is resolved.

Example Investigation: High CPU Usage in data-processor Service

1. Verify the Issue

  • Prometheus shows data-processor at 95% CPU usage.

2. Check Recent Changes

  • Last Git commit was 2 days ago, no recent deployments.

3. Analyze CPU Profiles

  • go tool pprof shows processData function at 80% CPU time.

4. Inspect Goroutines

  • go tool goroutine shows 1000+ goroutines in data-processor.
  • Many goroutines stuck in processData.

5. Review Database Queries

  • EXPLAIN shows SELECT * FROM metrics WHERE time > ? is slow.

6. Check External Dependencies

  • No external API calls in processData.

7. Optimize Code

  • Refactor processData to use a worker pool.
  • Optimize database query to fetch only needed columns.

8. Scale the Service

  • Horizontal pod autoscaler added, scaling between 2-10 replicas.

9. Review Resource Requests and Limits

  • CPU limit increased from 500m to 1000m in the pod spec.

10. Investigate Node-Level Issues

  • No other pods on the node are consuming excessive CPU.

11. Analyze System-Level Metrics

  • Node CPU usage is stable, no anomalies detected.

12. Restart the Affected Pods

  • Pods restarted, CPU usage normalized to 30%.

CPU Troubleshooting Toolkit:

#!/bin/bash
# CPU investigation script for Go services

echo "🔍 Starting CPU investigation for Go service..."

# 1. Check current CPU usage
echo "📊 Current CPU usage:"
kubectl top pods -l app=go-service

# 2. Get profiling data
echo "🔬 Collecting CPU profile (30 seconds)..."
kubectl port-forward svc/go-service 6060:6060 &
sleep 2
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30

# 3. Check memory allocations
echo "🧠 Checking memory allocations..."
go tool pprof http://localhost:6060/debug/pprof/allocs

# 4. Check GC performance
echo "🗑️ Checking garbage collection stats..."
curl -s http://localhost:6060/debug/vars | jq '.memstats'

# 5. Container-level CPU investigation
echo "🐳 Container CPU stats..."
docker stats --no-stream $(docker ps --filter "name=go-service" --format "{{.Names}}")

# 6. Process-level analysis
echo "⚙️ Process CPU breakdown..."
top -H -p $(pgrep go-service) -n 1

# 7. strace for system call analysis
echo "🔧 System call analysis (10 seconds)..."
timeout 10s strace -c -p $(pgrep go-service)

Code-Level Optimizations:

// Common CPU bottleneck fixes

// 1. Fix: Inefficient JSON parsing
// BEFORE - Slow JSON handling
func processRequestSlow(w http.ResponseWriter, r *http.Request) {
var data map[string]interface{}
body, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(body, &data)

// Process data...
}

// AFTER - Optimized JSON handling
type RequestData struct {
UserID string `json:"user_id"`
Action string `json:"action"`
// Define specific fields instead of interface{}
}

func processRequestFast(w http.ResponseWriter, r *http.Request) {
var data RequestData
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields() // Faster parsing

if err := decoder.Decode(&data); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// Process typed data...
}

// 2. Fix: CPU-intensive loops
// BEFORE - O(n²) algorithm
func findDuplicatesSlow(items []string) []string {
var duplicates []string
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i] == items[j] {
duplicates = append(duplicates, items[i])
break
}
}
}
return duplicates
}

// AFTER - O(n) algorithm using map
func findDuplicatesFast(items []string) []string {
seen := make(map[string]bool)
var duplicates []string

for _, item := range items {
if seen[item] {
duplicates = append(duplicates, item)
} else {
seen[item] = true
}
}
return duplicates
}

// 3. Fix: Inefficient string operations
// BEFORE - Repeated string concatenation
func buildQuerySlow(filters []string) string {
query := "SELECT * FROM table WHERE "
for i, filter := range filters {
if i > 0 {
query += " AND "
}
query += filter
}
return query
}

// AFTER - Use strings.Builder
func buildQueryFast(filters []string) string {
var builder strings.Builder
builder.WriteString("SELECT * FROM table WHERE ")

for i, filter := range filters {
if i > 0 {
builder.WriteString(" AND ")
}
builder.WriteString(filter)
}
return builder.String()
}

Summary

This comprehensive SRE interview guide covers:

Technical Areas

  • Performance Optimization: Go-specific optimizations, memory management, CPU profiling
  • System Architecture: Microservices vs monolith, database sharding, scalability patterns
  • Monitoring & Alerting: Prometheus rules, observability, incident response
  • Infrastructure: Container orchestration, deployment strategies, reliability engineering

Behavioral Areas

  • Leadership: Convincing teams, managing trade-offs, stakeholder communication
  • Continuous Learning: Staying current with technology, community engagement
  • Problem Solving: Systematic troubleshooting, root cause analysis

Key Takeaways

  1. Data-driven decisions: Always quantify impact and use metrics
  2. Systematic approach: Follow structured methodologies for troubleshooting
  3. Collaboration: Work with teams rather than against them
  4. Continuous improvement: Learn from every incident and optimization
  5. Balance trade-offs: Consider reliability, performance, and business needs

This guide provides both theoretical knowledge and practical examples that demonstrate real-world SRE experience. Use these patterns and adapt them to your specific situations during interviews.


This guide is designed to help you prepare for senior SRE roles by covering both technical depth and the soft skills needed to succeed in Site Reliability Engineering.