from locust import HttpUser, task, between import json import time import uuid import random class MessageQueueUser ( HttpUser ): wait_time = between( 1 , 3 ) def on_start ( self ): """Initialize queue testing""" self .queue_name = f "test-queue- { int (time.time()) } " self .message_count = 0 self .published_messages = [] # Create test queue self ._create_queue() def _create_queue ( self ): """Create a test queue""" queue_config = { "name" : self .queue_name, "durable" : True , "auto_delete" : False , "max_length" : 1000 } response = self .client.post( '/api/queues' , json = queue_config, headers = { 'Content-Type' : 'application/json' }, name = "create_queue" ) if response.status_code not in [ 200 , 201 , 409 ]: # 409 = already exists print ( f "Failed to create queue: { response.status_code } " ) @task ( 4 ) def test_publish_message ( self ): """Test publishing messages to queue""" message_id = str (uuid.uuid4()) message = { "id" : message_id, "type" : random.choice([ "order" , "notification" , "email" , "sms" ]), "payload" : { "user_id" : f "user_ { random.randint( 1000 , 9999 ) } " , "action" : random.choice([ "create" , "update" , "delete" ]), "timestamp" : int (time.time()), "data" : { "key" : f "value_ { random.randint( 1 , 100 ) } " } }, "priority" : random.randint( 1 , 10 ), "retry_count" : 0 , "max_retries" : 3 } headers = { 'Content-Type' : 'application/json' , 'X-Message-ID' : message_id, 'X-Queue-Name' : self .queue_name } response = self .client.post( f '/api/queues/ { self .queue_name } /messages' , json = message, headers = headers, name = "publish_message" ) if response.status_code in [ 200 , 201 , 202 ]: self .message_count += 1 self .published_messages.append(message_id) @task ( 3 ) def test_consume_message ( self ): """Test consuming messages from queue""" params = { 'count' : random.randint( 1 , 5 ), 'timeout' : 10 , 'ack_mode' : 'auto' } response = self .client.get( f '/api/queues/ { self .queue_name } /messages' , params = params, name = "consume_message" ) if response.status_code == 200 : try : messages = response.json() if isinstance (messages, list ): for message in messages: self ._process_consumed_message(message) except json.JSONDecodeError: pass def _process_consumed_message ( self , message ): """Process consumed message and acknowledge""" message_id = message.get( 'id' ) if not message_id: return # Simulate message processing processing_time = random.uniform( 0.1 , 0.5 ) time.sleep(processing_time) # Acknowledge message ack_response = self .client.post( f '/api/queues/ { self .queue_name } /ack' , json = { 'message_id' : message_id}, name = "acknowledge_message" ) @task ( 2 ) def test_batch_publish ( self ): """Test batch message publishing""" batch_size = random.randint( 5 , 20 ) messages = [] for i in range (batch_size): message = { "id" : str (uuid.uuid4()), "type" : "batch_message" , "payload" : { "batch_index" : i, "batch_size" : batch_size, "timestamp" : int (time.time()) }, "priority" : 5 } messages.append(message) batch_payload = { "queue" : self .queue_name, "messages" : messages, "batch_id" : str (uuid.uuid4()) } response = self .client.post( '/api/queues/batch-publish' , json = batch_payload, headers = { 'Content-Type' : 'application/json' }, name = "batch_publish" ) @task ( 1 ) def test_queue_management ( self ): """Test queue management operations""" # Get queue stats response = self .client.get( f '/api/queues/ { self .queue_name } /stats' , name = "queue_stats" ) if response.status_code == 200 : try : stats = response.json() message_count = stats.get( 'message_count' , 0 ) consumer_count = stats.get( 'consumer_count' , 0 ) except json.JSONDecodeError: pass # Test queue purge (occasionally) if random.random() < 0.1 : # 10% chance purge_response = self .client.delete( f '/api/queues/ { self .queue_name } /messages' , name = "purge_queue" ) @task ( 1 ) def test_dead_letter_queue ( self ): """Test dead letter queue functionality""" # Publish message that will fail processing failed_message = { "id" : str (uuid.uuid4()), "type" : "failing_message" , "payload" : { "should_fail" : True , "error_type" : "processing_error" }, "retry_count" : 0 , "max_retries" : 2 } response = self .client.post( f '/api/queues/ { self .queue_name } /messages' , json = failed_message, headers = { 'Content-Type' : 'application/json' }, name = "publish_failing_message" ) # Check dead letter queue after some time time.sleep( 2 ) dlq_response = self .client.get( f '/api/queues/ { self .queue_name } -dlq/messages' , params = { 'count' : 1 }, name = "check_dead_letter_queue" ) def on_stop ( self ): """Cleanup test queue""" try : self .client.delete( f '/api/queues/ { self .queue_name } ' , name = "delete_queue" ) except : pass # Ignore cleanup errors