from locust import task, HttpUser import random import json import time class CacheTestUser ( HttpUser ): def on_start ( self ): # Cache endpoints self .cache_endpoints = { "get" : "/api/cache/ {key} " , "set" : "/api/cache/ {key} " , "delete" : "/api/cache/ {key} " , "flush" : "/api/cache/flush" } # Test data for caching self .cache_keys = [ "user_profile_" , "product_data_" , "session_info_" , "config_setting_" , "analytics_data_" ] # Test values of different sizes self .test_values = { "small" : { "name" : "test" , "value" : 123 }, "medium" : { "data" : "x" * 1000 , "timestamp" : int (time.time())}, "large" : { "content" : "y" * 10000 , "metadata" : { "size" : "large" , "created" : int (time.time())}} } # TTL values for testing self .ttl_values = [ 60 , 300 , 900 , 3600 ] # 1min, 5min, 15min, 1hour def generate_cache_key ( self ): """Generate a random cache key""" prefix = random.choice( self .cache_keys) suffix = random.randint( 1000 , 9999 ) return f " { prefix }{ suffix } " @task ( 4 ) def test_cache_set ( self ): """Test setting cache values""" cache_key = self .generate_cache_key() value_size = random.choice([ "small" , "medium" , "large" ]) test_value = self .test_values[value_size].copy() ttl = random.choice( self .ttl_values) cache_data = { "value" : test_value, "ttl" : ttl } set_url = self .cache_endpoints[ "set" ].format( key = cache_key) with self .client.post( set_url, json = cache_data, name = "Cache Set" ) as response: if response.status_code == 200 : try : result = response.json() success = result.get( "success" , True ) print ( f "Cache set: { cache_key } ( { value_size } , TTL: { ttl } s)" ) if not success: response.failure( "Cache set reported failure" ) # Test immediate retrieval self ._test_cache_get(cache_key, expect_hit = True ) except json.JSONDecodeError: print ( f "Cache set: { cache_key } (no JSON response)" ) else : response.failure( f "Cache set failed: { response.status_code } " ) @task ( 5 ) def test_cache_get ( self ): """Test getting cache values (mix of hits and misses)""" # Mix of potentially existing and non-existing keys if random.choice([ True , False ]): cache_key = self .generate_cache_key() # Likely miss else : cache_key = f " { random.choice( self .cache_keys) }{ random.randint( 1000 , 1100 ) } " # Possible hit self ._test_cache_get(cache_key) def _test_cache_get ( self , cache_key , expect_hit = False ): """Helper method to test cache retrieval""" get_url = self .cache_endpoints[ "get" ].format( key = cache_key) with self .client.get( get_url, name = "Cache Get" ) as response: if response.status_code == 200 : try : result = response.json() hit = result.get( "hit" , True ) cached_value = result.get( "value" ) ttl_remaining = result.get( "ttl_remaining" ) if hit: print ( f "Cache hit: { cache_key } (TTL: { ttl_remaining } s)" ) else : print ( f "Cache miss: { cache_key } " ) if expect_hit and not hit: response.failure( f "Expected cache hit but got miss: { cache_key } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) elif response.status_code == 404 : print ( f "Cache miss (404): { cache_key } " ) else : response.failure( f "Cache get failed: { response.status_code } " ) @task ( 2 ) def test_cache_delete ( self ): """Test deleting cache entries""" cache_key = f " { random.choice( self .cache_keys) }{ random.randint( 1000 , 1100 ) } " delete_url = self .cache_endpoints[ "delete" ].format( key = cache_key) with self .client.delete( delete_url, name = "Cache Delete" ) as response: if response.status_code in [ 200 , 204 ]: print ( f "Cache delete: { cache_key } " ) # Verify deletion by trying to get the key self ._test_cache_get(cache_key, expect_hit = False ) elif response.status_code == 404 : print ( f "Cache delete (not found): { cache_key } " ) else : response.failure( f "Cache delete failed: { response.status_code } " ) @task ( 2 ) def test_cache_update ( self ): """Test updating existing cache entries""" cache_key = self .generate_cache_key() # First, set a value initial_value = { "data" : "initial" , "version" : 1 } cache_data = { "value" : initial_value, "ttl" : 300 } set_url = self .cache_endpoints[ "set" ].format( key = cache_key) with self .client.post(set_url, json = cache_data, name = "Cache Set Initial" ) as response: if response.status_code == 200 : # Update the value updated_value = { "data" : "updated" , "version" : 2 } update_data = { "value" : updated_value, "ttl" : 600 } with self .client.post( set_url, json = update_data, name = "Cache Update" ) as update_response: if update_response.status_code == 200 : print ( f "Cache updated: { cache_key } " ) # Verify the update self ._verify_cache_value(cache_key, updated_value) else : response.failure( f "Cache update failed: { update_response.status_code } " ) def _verify_cache_value ( self , cache_key , expected_value ): """Helper method to verify cached value""" get_url = self .cache_endpoints[ "get" ].format( key = cache_key) with self .client.get(get_url, name = "Cache Verify" ) as response: if response.status_code == 200 : try : result = response.json() cached_value = result.get( "value" ) if cached_value == expected_value: print ( f "Cache verification successful: { cache_key } " ) else : print ( f "Cache value mismatch: { cache_key } " ) except json.JSONDecodeError: print ( f "Cache verify failed: invalid JSON" ) @task ( 1 ) def test_cache_ttl_expiration ( self ): """Test cache TTL expiration""" cache_key = f "ttl_test_ { random.randint( 10000 , 99999 ) } " short_ttl = 5 # 5 seconds test_value = { "data" : "ttl_test" , "timestamp" : int (time.time())} cache_data = { "value" : test_value, "ttl" : short_ttl} set_url = self .cache_endpoints[ "set" ].format( key = cache_key) with self .client.post(set_url, json = cache_data, name = "Cache Set TTL" ) as response: if response.status_code == 200 : print ( f "TTL test: { cache_key } set with { short_ttl } s TTL" ) # Immediate check (should hit) self ._test_cache_get(cache_key, expect_hit = True ) @task ( 1 ) def test_cache_bulk_operations ( self ): """Test bulk cache operations""" num_keys = random.randint( 3 , 6 ) bulk_keys = [ f "bulk_ { i } _ { random.randint( 1000 , 9999 ) } " for i in range (num_keys)] # Bulk set for i, key in enumerate (bulk_keys): test_value = { "bulk_index" : i, "data" : f "bulk_data_ { i } " } cache_data = { "value" : test_value, "ttl" : 300 } set_url = self .cache_endpoints[ "set" ].format( key = key) with self .client.post(set_url, json = cache_data, name = "Bulk Cache Set" ) as response: if response.status_code != 200 : response.failure( f "Bulk set failed for key: { key } " ) print ( f "Bulk set completed: { num_keys } keys" ) # Bulk get - retrieve some of the keys for key in random.sample(bulk_keys, min ( 3 , len (bulk_keys))): self ._test_cache_get(key, expect_hit = True ) @task ( 1 ) def test_cache_statistics ( self ): """Test cache statistics and monitoring""" stats_url = "/api/cache/stats" with self .client.get( stats_url, name = "Cache Statistics" ) as response: if response.status_code == 200 : try : stats = response.json() hit_rate = stats.get( "hit_rate" , 0 ) total_keys = stats.get( "total_keys" , 0 ) memory_usage = stats.get( "memory_usage_mb" , 0 ) print ( f "Cache stats: { hit_rate :.1f} % hit rate, { total_keys } keys, { memory_usage } MB" ) # Basic validation if hit_rate < 0 or hit_rate > 100 : response.failure( f "Invalid hit rate: { hit_rate } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) elif response.status_code == 404 : print ( "Cache statistics not available" ) else : response.failure( f "Cache statistics failed: { response.status_code } " )