from locust import task, HttpUser import random import time class ABTestUser ( HttpUser ): def on_start ( self ): # A/B test variants to test self .variants = { "control" : { "name" : "Control (A)" , "param" : "variant=control" , "header" : { "X-Variant" : "control" } }, "variant_b" : { "name" : "Variant B" , "param" : "variant=b" , "header" : { "X-Variant" : "variant_b" } }, "variant_c" : { "name" : "Variant C" , "param" : "variant=c" , "header" : { "X-Variant" : "variant_c" } } } # Track results for each variant self .variant_results = {variant: { "requests" : 0 , "successes" : 0 , "conversions" : 0 , "response_times" : []} for variant in self .variants.keys()} # Test pages self .test_pages = [ "/" , "/product" , "/pricing" , "/signup" ] @task ( 4 ) def test_variant_assignment ( self ): """Test that variants are properly assigned""" # Test random variant assignment page = random.choice( self .test_pages) with self .client.get(page, name = "Variant Assignment" ) as response: if response.status_code == 200 : # Check if variant is indicated in response variant_indicators = [ "variant" , "test" , "experiment" ] response_text = response.text.lower() detected_variant = None for indicator in variant_indicators: if indicator in response_text: detected_variant = "detected" break if detected_variant: print ( f "Variant assignment test: Variant detected on { page } " ) else : print ( f "Variant assignment test: No variant detected on { page } " ) @task ( 3 ) def test_control_variant ( self ): """Test control variant (A)""" self .test_specific_variant( "control" ) @task ( 3 ) def test_variant_b ( self ): """Test variant B""" self .test_specific_variant( "variant_b" ) @task ( 2 ) def test_variant_c ( self ): """Test variant C""" self .test_specific_variant( "variant_c" ) def test_specific_variant ( self , variant_key ): """Test a specific variant""" if variant_key not in self .variants: return variant = self .variants[variant_key] page = random.choice( self .test_pages) # Add variant parameter or header params = {} headers = {} if "param" in variant: param_parts = variant[ "param" ].split( "=" ) if len (param_parts) == 2 : params[param_parts[ 0 ]] = param_parts[ 1 ] if "header" in variant: headers.update(variant[ "header" ]) start_time = time.time() with self .client.get( page, params = params, headers = headers, name = f " { variant[ 'name' ] } - { page } " ) as response: response_time = (time.time() - start_time) * 1000 # Track results self .variant_results[variant_key][ "requests" ] += 1 self .variant_results[variant_key][ "response_times" ].append(response_time) if response.status_code == 200 : self .variant_results[variant_key][ "successes" ] += 1 print ( f " { variant[ 'name' ] } test { page } : { response_time :.0f} ms" ) # Check for conversion indicators if self .check_conversion(response, page): self .variant_results[variant_key][ "conversions" ] += 1 print ( f " { variant[ 'name' ] } conversion detected on { page } " ) else : print ( f " { variant[ 'name' ] } test { page } : ERROR { response.status_code } " ) @task ( 2 ) def test_variant_consistency ( self ): """Test that same user gets same variant consistently""" page = random.choice( self .test_pages) variant_key = random.choice( list ( self .variants.keys())) variant = self .variants[variant_key] # Make multiple requests with same session/user identifier user_id = f "test_user_ { random.randint( 1 , 100 ) } " results = [] for i in range ( 3 ): # Test 3 consecutive requests params = { "user_id" : user_id} # Add variant parameter if "param" in variant: param_parts = variant[ "param" ].split( "=" ) if len (param_parts) == 2 : params[param_parts[ 0 ]] = param_parts[ 1 ] with self .client.get( page, params = params, name = f "Consistency Test - { variant[ 'name' ] } " ) as response: if response.status_code == 200 : # Check if response indicates same variant results.append(response.status_code) else : results.append( None ) # Check consistency if all (r == 200 for r in results): print ( f "Consistency test { variant[ 'name' ] } : PASSED - consistent responses" ) else : print ( f "Consistency test { variant[ 'name' ] } : INCONSISTENT responses" ) @task ( 1 ) def test_traffic_splitting ( self ): """Test that traffic is split between variants""" page = random.choice( self .test_pages) variant_counts = {variant: 0 for variant in self .variants.keys()} # Make multiple requests to see traffic distribution for i in range ( 10 ): # Simulate random user user_id = f "split_test_user_ { i } " with self .client.get( page, params = { "user_id" : user_id}, name = "Traffic Split Test" ) as response: if response.status_code == 200 : # Try to detect which variant was served # This is simplified - real implementation would check response content detected_variant = random.choice( list ( self .variants.keys())) # Simulate detection variant_counts[detected_variant] += 1 # Analyze distribution total_requests = sum (variant_counts.values()) if total_requests > 0 : print ( "Traffic split results:" ) for variant, count in variant_counts.items(): percentage = (count / total_requests) * 100 print ( f " { self .variants[variant][ 'name' ] } : { count } / { total_requests } ( { percentage :.1f} %)" ) def check_conversion ( self , response , page ): """Check if response indicates a conversion""" # Simple conversion detection based on page and response conversion_indicators = { "/signup" : [ "success" , "welcome" , "registered" ], "/pricing" : [ "purchase" , "checkout" , "buy" ], "/product" : [ "add to cart" , "added" , "cart" ] } if page in conversion_indicators: response_text = response.text.lower() for indicator in conversion_indicators[page]: if indicator in response_text: return True return False @task ( 1 ) def compare_variant_performance ( self ): """Compare performance metrics between variants""" print ( "

=== A/B Test Performance Comparison ===" ) for variant_key, variant in self .variants.items(): results = self .variant_results[variant_key] if results[ "requests" ] > 0 : success_rate = (results[ "successes" ] / results[ "requests" ]) * 100 conversion_rate = (results[ "conversions" ] / results[ "requests" ]) * 100 avg_response_time = 0 if results[ "response_times" ]: avg_response_time = sum (results[ "response_times" ]) / len (results[ "response_times" ]) print ( f " { variant[ 'name' ] } :" ) print ( f " Requests: { results[ 'requests' ] } " ) print ( f " Success Rate: { success_rate :.1f} %" ) print ( f " Conversion Rate: { conversion_rate :.1f} %" ) print ( f " Avg Response Time: { avg_response_time :.0f} ms" ) else : print ( f " { variant[ 'name' ] } : No data yet" ) print ( "=" * 40 ) @task ( 1 ) def test_variant_specific_features ( self ): """Test features specific to certain variants""" # Test that variant-specific features work variant_features = { "variant_b" : "/new-feature" , "variant_c" : "/experimental-page" } for variant_key, feature_path in variant_features.items(): variant = self .variants[variant_key] # Add variant parameter params = {} if "param" in variant: param_parts = variant[ "param" ].split( "=" ) if len (param_parts) == 2 : params[param_parts[ 0 ]] = param_parts[ 1 ] with self .client.get( feature_path, params = params, name = f "Feature Test - { variant[ 'name' ] } " ) as response: if response.status_code == 200 : print ( f "Feature test { variant[ 'name' ] } : Feature available" ) elif response.status_code == 404 : print ( f "Feature test { variant[ 'name' ] } : Feature not found (expected for some variants)" ) else : print ( f "Feature test { variant[ 'name' ] } : Unexpected response { response.status_code } " )