from locust import HttpUser, task, between import random import time import json class RegionalFailoverUser ( HttpUser ): """Test regional failover and disaster recovery""" wait_time = between( 2 , 4 ) def on_start ( self ): """Initialize regional failover testing""" self .regions = { 'primary' : { 'name' : 'us-east-1' , 'endpoint' : '/api/v1' , 'health_check' : '/health' , 'priority' : 1 }, 'secondary' : { 'name' : 'us-west-2' , 'endpoint' : '/api/v2' , 'health_check' : '/health' , 'priority' : 2 }, 'tertiary' : { 'name' : 'eu-west-1' , 'endpoint' : '/api/v3' , 'health_check' : '/health' , 'priority' : 3 } } self .current_region = 'primary' self .failover_attempts = 0 self .max_failover_attempts = 3 print ( f "Regional failover testing initialized - starting with { self .current_region } " ) @task ( 4 ) def test_primary_region_health ( self ): """Test primary region health and availability""" region_config = self .regions[ self .current_region] health_endpoint = f " { region_config[ 'endpoint' ] }{ region_config[ 'health_check' ] } " with self .client.get( health_endpoint, name = f "Health Check - { region_config[ 'name' ] } " , catch_response = True ) as response: if response.status_code == 200 : try : health_data = response.json() if health_data.get( 'status' ) == 'healthy' : print ( f "Region { region_config[ 'name' ] } is healthy" ) response.success() elif health_data.get( 'status' ) == 'degraded' : print ( f "Region { region_config[ 'name' ] } is degraded - may trigger failover" ) response.success() self .consider_failover() else : print ( f "Region { region_config[ 'name' ] } is unhealthy" ) response.failure( "Region unhealthy" ) self .trigger_failover() except json.JSONDecodeError: response.failure( "Invalid health check response" ) self .trigger_failover() else : print ( f "Health check failed for { region_config[ 'name' ] } : { response.status_code } " ) response.failure( f "Health check failed: { response.status_code } " ) self .trigger_failover() @task ( 3 ) def test_api_functionality ( self ): """Test core API functionality in current region""" region_config = self .regions[ self .current_region] api_endpoint = f " { region_config[ 'endpoint' ] } /data" with self .client.get( api_endpoint, name = f "API Test - { region_config[ 'name' ] } " , catch_response = True ) as response: if response.status_code == 200 : try : data = response.json() # Validate response includes region information if 'region' in data: response_region = data[ 'region' ] if response_region == region_config[ 'name' ]: print ( f "API response from correct region: { response_region } " ) response.success() else : print ( f "API response from unexpected region: { response_region } " ) response.success() # Still functional, just different region else : response.success() except json.JSONDecodeError: response.failure( "Invalid API response" ) else : print ( f "API request failed in { region_config[ 'name' ] } : { response.status_code } " ) response.failure( f "API failed: { response.status_code } " ) self .trigger_failover() @task ( 2 ) def test_data_consistency ( self ): """Test data consistency across regions""" region_config = self .regions[ self .current_region] # Write data to current region test_data = { 'id' : f "test_ { int (time.time()) } " , 'region' : region_config[ 'name' ], 'timestamp' : time.time() } with self .client.post( f " { region_config[ 'endpoint' ] } /data" , json = test_data, name = f "Data Write - { region_config[ 'name' ] } " , catch_response = True ) as write_response: if write_response.status_code in [ 200 , 201 ]: # Wait briefly for replication time.sleep( 1 ) # Read data back to verify consistency with self .client.get( f " { region_config[ 'endpoint' ] } /data/ { test_data[ 'id' ] } " , name = f "Data Read - { region_config[ 'name' ] } " , catch_response = True ) as read_response: if read_response.status_code == 200 : try : read_data = read_response.json() if read_data.get( 'id' ) == test_data[ 'id' ]: print ( f "Data consistency verified in { region_config[ 'name' ] } " ) write_response.success() else : write_response.failure( "Data consistency check failed" ) except json.JSONDecodeError: write_response.failure( "Invalid data read response" ) else : write_response.failure( "Data read failed after write" ) else : write_response.failure( f "Data write failed: { write_response.status_code } " ) @task ( 1 ) def test_cross_region_replication ( self ): """Test cross-region data replication""" if self .current_region == 'primary' : # Test replication to secondary region secondary_config = self .regions[ 'secondary' ] with self .client.get( f " { secondary_config[ 'endpoint' ] } /replication/status" , name = f "Replication Status - { secondary_config[ 'name' ] } " , catch_response = True ) as response: if response.status_code == 200 : try : replication_data = response.json() lag_seconds = replication_data.get( 'lag_seconds' , 0 ) if lag_seconds < 60 : # Less than 1 minute lag print ( f "Replication lag acceptable: { lag_seconds } s" ) response.success() else : print ( f "High replication lag: { lag_seconds } s" ) response.failure( "High replication lag" ) except json.JSONDecodeError: response.failure( "Invalid replication status response" ) else : response.failure( f "Replication status check failed: { response.status_code } " ) def consider_failover ( self ): """Consider failover based on degraded performance""" # Implement logic to decide if failover should be triggered # This could be based on response times, error rates, etc. if random.random() < 0.1 : # 10% chance to trigger failover on degraded status self .trigger_failover() def trigger_failover ( self ): """Trigger failover to next available region""" if self .failover_attempts >= self .max_failover_attempts: print ( "Maximum failover attempts reached" ) return current_priority = self .regions[ self .current_region][ 'priority' ] # Find next available region next_region = None for region_key, region_config in self .regions.items(): if region_config[ 'priority' ] > current_priority: if next_region is None or region_config[ 'priority' ] < self .regions[next_region][ 'priority' ]: next_region = region_key if next_region: old_region = self .current_region self .current_region = next_region self .failover_attempts += 1 print ( f "FAILOVER: Switching from { self .regions[old_region][ 'name' ] } to { self .regions[next_region][ 'name' ] } " ) # Test new region immediately self .test_failover_success() else : print ( "No available regions for failover" ) def test_failover_success ( self ): """Test that failover was successful""" region_config = self .regions[ self .current_region] with self .client.get( f " { region_config[ 'endpoint' ] }{ region_config[ 'health_check' ] } " , name = f "Failover Verification - { region_config[ 'name' ] } " , catch_response = True ) as response: if response.status_code == 200 : try : health_data = response.json() if health_data.get( 'status' ) == 'healthy' : print ( f "Failover successful - { region_config[ 'name' ] } is healthy" ) else : print ( f "Failover target { region_config[ 'name' ] } is not healthy" ) self .trigger_failover() # Try next region except json.JSONDecodeError: print ( f "Invalid health response from failover target { region_config[ 'name' ] } " ) self .trigger_failover() else : print ( f "Failover target { region_config[ 'name' ] } is not responding" ) self .trigger_failover() class LoadBalancingUser ( HttpUser ): """Test load balancing across multiple regions""" wait_time = between( 1 , 2 ) def on_start ( self ): """Initialize load balancing testing""" self .regions = [ 'us-east-1' , 'us-west-2' , 'eu-west-1' ] self .request_counts = {region: 0 for region in self .regions} @task ( 5 ) def test_load_distribution ( self ): """Test that load is distributed across regions""" with self .client.get( '/api/v1/load-balanced' , name = "Load Balanced Request" , catch_response = True ) as response: if response.status_code == 200 : try : data = response.json() serving_region = data.get( 'region' , 'unknown' ) if serving_region in self .request_counts: self .request_counts[serving_region] += 1 print ( f "Request served by { serving_region } " ) response.success() else : print ( f "Request served by unknown region: { serving_region } " ) response.success() except json.JSONDecodeError: response.failure( "Invalid load balanced response" ) else : response.failure( f "Load balanced request failed: { response.status_code } " ) def on_stop ( self ): """Report load distribution statistics""" total_requests = sum ( self .request_counts.values()) if total_requests > 0 : print ( "Load Distribution Summary:" ) for region, count in self .request_counts.items(): percentage = (count / total_requests) * 100 print ( f " { region } : { count } requests ( { percentage :.1f} %)" )