from locust import HttpUser, task, between
import random
import time
import json
class RegionalFailoverUser(HttpUser):
"""Test regional failover and disaster recovery"""
wait_time = between(2, 4)
def on_start(self):
"""Initialize regional failover testing"""
self.regions = {
'primary': {
'name': 'us-east-1',
'endpoint': '/api/v1',
'health_check': '/health',
'priority': 1
},
'secondary': {
'name': 'us-west-2',
'endpoint': '/api/v2',
'health_check': '/health',
'priority': 2
},
'tertiary': {
'name': 'eu-west-1',
'endpoint': '/api/v3',
'health_check': '/health',
'priority': 3
}
}
self.current_region = 'primary'
self.failover_attempts = 0
self.max_failover_attempts = 3
print(f"Regional failover testing initialized - starting with {self.current_region}")
@task(4)
def test_primary_region_health(self):
"""Test primary region health and availability"""
region_config = self.regions[self.current_region]
health_endpoint = f"{region_config['endpoint']}{region_config['health_check']}"
with self.client.get(
health_endpoint,
name=f"Health Check - {region_config['name']}",
catch_response=True
) as response:
if response.status_code == 200:
try:
health_data = response.json()
if health_data.get('status') == 'healthy':
print(f"Region {region_config['name']} is healthy")
response.success()
elif health_data.get('status') == 'degraded':
print(f"Region {region_config['name']} is degraded - may trigger failover")
response.success()
self.consider_failover()
else:
print(f"Region {region_config['name']} is unhealthy")
response.failure("Region unhealthy")
self.trigger_failover()
except json.JSONDecodeError:
response.failure("Invalid health check response")
self.trigger_failover()
else:
print(f"Health check failed for {region_config['name']}: {response.status_code}")
response.failure(f"Health check failed: {response.status_code}")
self.trigger_failover()
@task(3)
def test_api_functionality(self):
"""Test core API functionality in current region"""
region_config = self.regions[self.current_region]
api_endpoint = f"{region_config['endpoint']}/data"
with self.client.get(
api_endpoint,
name=f"API Test - {region_config['name']}",
catch_response=True
) as response:
if response.status_code == 200:
try:
data = response.json()
# Validate response includes region information
if 'region' in data:
response_region = data['region']
if response_region == region_config['name']:
print(f"API response from correct region: {response_region}")
response.success()
else:
print(f"API response from unexpected region: {response_region}")
response.success() # Still functional, just different region
else:
response.success()
except json.JSONDecodeError:
response.failure("Invalid API response")
else:
print(f"API request failed in {region_config['name']}: {response.status_code}")
response.failure(f"API failed: {response.status_code}")
self.trigger_failover()
@task(2)
def test_data_consistency(self):
"""Test data consistency across regions"""
region_config = self.regions[self.current_region]
# Write data to current region
test_data = {
'id': f"test_{int(time.time())}",
'region': region_config['name'],
'timestamp': time.time()
}
with self.client.post(
f"{region_config['endpoint']}/data",
json=test_data,
name=f"Data Write - {region_config['name']}",
catch_response=True
) as write_response:
if write_response.status_code in [200, 201]:
# Wait briefly for replication
time.sleep(1)
# Read data back to verify consistency
with self.client.get(
f"{region_config['endpoint']}/data/{test_data['id']}",
name=f"Data Read - {region_config['name']}",
catch_response=True
) as read_response:
if read_response.status_code == 200:
try:
read_data = read_response.json()
if read_data.get('id') == test_data['id']:
print(f"Data consistency verified in {region_config['name']}")
write_response.success()
else:
write_response.failure("Data consistency check failed")
except json.JSONDecodeError:
write_response.failure("Invalid data read response")
else:
write_response.failure("Data read failed after write")
else:
write_response.failure(f"Data write failed: {write_response.status_code}")
@task(1)
def test_cross_region_replication(self):
"""Test cross-region data replication"""
if self.current_region == 'primary':
# Test replication to secondary region
secondary_config = self.regions['secondary']
with self.client.get(
f"{secondary_config['endpoint']}/replication/status",
name=f"Replication Status - {secondary_config['name']}",
catch_response=True
) as response:
if response.status_code == 200:
try:
replication_data = response.json()
lag_seconds = replication_data.get('lag_seconds', 0)
if lag_seconds < 60: # Less than 1 minute lag
print(f"Replication lag acceptable: {lag_seconds}s")
response.success()
else:
print(f"High replication lag: {lag_seconds}s")
response.failure("High replication lag")
except json.JSONDecodeError:
response.failure("Invalid replication status response")
else:
response.failure(f"Replication status check failed: {response.status_code}")
def consider_failover(self):
"""Consider failover based on degraded performance"""
# Implement logic to decide if failover should be triggered
# This could be based on response times, error rates, etc.
if random.random() < 0.1: # 10% chance to trigger failover on degraded status
self.trigger_failover()
def trigger_failover(self):
"""Trigger failover to next available region"""
if self.failover_attempts >= self.max_failover_attempts:
print("Maximum failover attempts reached")
return
current_priority = self.regions[self.current_region]['priority']
# Find next available region
next_region = None
for region_key, region_config in self.regions.items():
if region_config['priority'] > current_priority:
if next_region is None or region_config['priority'] < self.regions[next_region]['priority']:
next_region = region_key
if next_region:
old_region = self.current_region
self.current_region = next_region
self.failover_attempts += 1
print(f"FAILOVER: Switching from {self.regions[old_region]['name']} to {self.regions[next_region]['name']}")
# Test new region immediately
self.test_failover_success()
else:
print("No available regions for failover")
def test_failover_success(self):
"""Test that failover was successful"""
region_config = self.regions[self.current_region]
with self.client.get(
f"{region_config['endpoint']}{region_config['health_check']}",
name=f"Failover Verification - {region_config['name']}",
catch_response=True
) as response:
if response.status_code == 200:
try:
health_data = response.json()
if health_data.get('status') == 'healthy':
print(f"Failover successful - {region_config['name']} is healthy")
else:
print(f"Failover target {region_config['name']} is not healthy")
self.trigger_failover() # Try next region
except json.JSONDecodeError:
print(f"Invalid health response from failover target {region_config['name']}")
self.trigger_failover()
else:
print(f"Failover target {region_config['name']} is not responding")
self.trigger_failover()
class LoadBalancingUser(HttpUser):
"""Test load balancing across multiple regions"""
wait_time = between(1, 2)
def on_start(self):
"""Initialize load balancing testing"""
self.regions = ['us-east-1', 'us-west-2', 'eu-west-1']
self.request_counts = {region: 0 for region in self.regions}
@task(5)
def test_load_distribution(self):
"""Test that load is distributed across regions"""
with self.client.get(
'/api/v1/load-balanced',
name="Load Balanced Request",
catch_response=True
) as response:
if response.status_code == 200:
try:
data = response.json()
serving_region = data.get('region', 'unknown')
if serving_region in self.request_counts:
self.request_counts[serving_region] += 1
print(f"Request served by {serving_region}")
response.success()
else:
print(f"Request served by unknown region: {serving_region}")
response.success()
except json.JSONDecodeError:
response.failure("Invalid load balanced response")
else:
response.failure(f"Load balanced request failed: {response.status_code}")
def on_stop(self):
"""Report load distribution statistics"""
total_requests = sum(self.request_counts.values())
if total_requests > 0:
print("Load Distribution Summary:")
for region, count in self.request_counts.items():
percentage = (count / total_requests) * 100
print(f" {region}: {count} requests ({percentage:.1f}%)")