from locust import task, HttpUser import json import hashlib import time class SnapshotTestUser ( HttpUser ): def on_start ( self ): # Endpoints to snapshot self .endpoints = [ "/api/users" , "/api/products" , "/api/orders" , "/api/health" ] # Store snapshots in memory (in real use, save to file/database) self .snapshots = {} self .snapshot_mismatches = [] # Initialize baseline snapshots self .create_baseline_snapshots() def create_baseline_snapshots ( self ): """Create initial snapshots to compare against""" print ( "Creating baseline snapshots..." ) for endpoint in self .endpoints: try : with self .client.get(endpoint, name = f "Baseline - { endpoint } " ) as response: if response.status_code == 200 : snapshot_data = self .create_snapshot(response) self .snapshots[endpoint] = snapshot_data print ( f "Baseline snapshot created for { endpoint } " ) else : print ( f "Failed to create baseline for { endpoint } : { response.status_code } " ) except Exception as e: print ( f "Error creating baseline for { endpoint } : { e } " ) @task ( 4 ) def test_api_snapshot ( self ): """Test API response against saved snapshot""" endpoint = self .random_endpoint() with self .client.get(endpoint, name = f "Snapshot Test - { endpoint } " ) as response: if response.status_code == 200 : current_snapshot = self .create_snapshot(response) self .compare_snapshots(endpoint, current_snapshot) else : response.failure( f "Snapshot test failed: { response.status_code } " ) @task ( 2 ) def test_response_structure ( self ): """Test that response structure matches snapshot""" endpoint = self .random_endpoint() with self .client.get(endpoint, name = f "Structure Test - { endpoint } " ) as response: if response.status_code == 200 : try : data = response.json() structure = self .extract_structure(data) if endpoint in self .snapshots: baseline_structure = self .snapshots[endpoint].get( "structure" ) if structure == baseline_structure: print ( f "Structure test { endpoint } : PASSED" ) else : print ( f "Structure test { endpoint } : FAILED - structure changed" ) response.failure( "Response structure changed" ) else : print ( f "Structure test { endpoint } : No baseline available" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) @task ( 2 ) def test_response_fields ( self ): """Test that required fields are present""" endpoint = self .random_endpoint() with self .client.get(endpoint, name = f "Fields Test - { endpoint } " ) as response: if response.status_code == 200 : try : data = response.json() if endpoint in self .snapshots: baseline_fields = self .snapshots[endpoint].get( "fields" , set ()) current_fields = self .extract_fields(data) missing_fields = baseline_fields - current_fields new_fields = current_fields - baseline_fields if missing_fields: print ( f "Fields test { endpoint } : Missing fields: { missing_fields } " ) response.failure( f "Missing fields: { missing_fields } " ) elif new_fields: print ( f "Fields test { endpoint } : New fields detected: { new_fields } " ) else : print ( f "Fields test { endpoint } : PASSED" ) else : print ( f "Fields test { endpoint } : No baseline available" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) @task ( 1 ) def test_response_size ( self ): """Test that response size is within expected range""" endpoint = self .random_endpoint() with self .client.get(endpoint, name = f "Size Test - { endpoint } " ) as response: if response.status_code == 200 : current_size = len (response.text) if endpoint in self .snapshots: baseline_size = self .snapshots[endpoint].get( "size" , 0 ) size_diff = abs (current_size - baseline_size) size_change_percent = (size_diff / baseline_size * 100 ) if baseline_size > 0 else 0 if size_change_percent > 50 : # More than 50% change print ( f "Size test { endpoint } : Large size change: { size_change_percent :.1f} %" ) response.failure( f "Response size changed significantly: { size_change_percent :.1f} %" ) elif size_change_percent > 20 : # 20-50% change print ( f "Size test { endpoint } : Moderate size change: { size_change_percent :.1f} %" ) else : print ( f "Size test { endpoint } : Size stable ( { size_change_percent :.1f} % change)" ) else : print ( f "Size test { endpoint } : No baseline size available" ) def create_snapshot ( self , response ): """Create a snapshot from API response""" try : data = response.json() snapshot = { "timestamp" : time.time(), "status_code" : response.status_code, "size" : len (response.text), "content_hash" : hashlib.md5(response.text.encode()).hexdigest(), "structure" : self .extract_structure(data), "fields" : self .extract_fields(data), "headers" : dict (response.headers) } return snapshot except json.JSONDecodeError: return { "timestamp" : time.time(), "status_code" : response.status_code, "size" : len (response.text), "content_hash" : hashlib.md5(response.text.encode()).hexdigest(), "error" : "Invalid JSON" } def extract_structure ( self , data ): """Extract the structure of JSON data""" if isinstance (data, dict ): return {key: self .extract_structure(value) for key, value in data.items()} elif isinstance (data, list ): if data: return [ self .extract_structure(data[ 0 ])] # Structure of first item else : return [] else : return type (data). __name__ def extract_fields ( self , data ): """Extract all field names from JSON data""" fields = set () def collect_fields ( obj , prefix = "" ): if isinstance (obj, dict ): for key, value in obj.items(): field_name = f " { prefix } . { key } " if prefix else key fields.add(field_name) collect_fields(value, field_name) elif isinstance (obj, list ) and obj: collect_fields(obj[ 0 ], prefix) # Check first item structure collect_fields(data) return fields def compare_snapshots ( self , endpoint , current_snapshot ): """Compare current snapshot with baseline""" if endpoint not in self .snapshots: print ( f "Snapshot comparison { endpoint } : No baseline snapshot" ) return baseline = self .snapshots[endpoint] # Compare content hash if current_snapshot[ "content_hash" ] == baseline[ "content_hash" ]: print ( f "Snapshot comparison { endpoint } : IDENTICAL" ) return # Check what changed changes = [] if current_snapshot[ "status_code" ] != baseline[ "status_code" ]: changes.append( f "status_code: { baseline[ 'status_code' ] } -> { current_snapshot[ 'status_code' ] } " ) if current_snapshot[ "structure" ] != baseline[ "structure" ]: changes.append( "response structure changed" ) size_diff = current_snapshot[ "size" ] - baseline[ "size" ] if abs (size_diff) > 100 : # More than 100 bytes difference changes.append( f "size: { size_diff :+d} bytes" ) if changes: print ( f "Snapshot comparison { endpoint } : CHANGED - { ', ' .join(changes) } " ) self .snapshot_mismatches.append({ "endpoint" : endpoint, "changes" : changes, "timestamp" : current_snapshot[ "timestamp" ] }) else : print ( f "Snapshot comparison { endpoint } : Minor changes detected" ) def random_endpoint ( self ): """Get a random endpoint for testing""" import random return random.choice( self .endpoints) @task ( 1 ) def report_snapshot_status ( self ): """Report overall snapshot testing status""" total_endpoints = len ( self .endpoints) endpoints_with_baselines = len ( self .snapshots) total_mismatches = len ( self .snapshot_mismatches) print ( f "Snapshot Status: { endpoints_with_baselines } / { total_endpoints } baselines, { total_mismatches } mismatches" ) if total_mismatches > 0 : print ( "Recent mismatches:" ) for mismatch in self .snapshot_mismatches[ - 3 :]: # Show last 3 print ( f " { mismatch[ 'endpoint' ] } : { ', ' .join(mismatch[ 'changes' ]) } " )