Skip to main content
This guide demonstrates how to test message queue APIs with LoadForge, covering popular queue systems like RabbitMQ, AWS SQS, and Redis queues.

Use Cases

  • Testing message queue throughput and reliability
  • Validating queue API endpoints for publishing and consuming
  • Load testing async processing systems
  • Testing queue management operations (create, delete, purge)
  • Validating message ordering and delivery guarantees

Basic Queue API Testing

from locust import HttpUser, task, between
import json
import time
import uuid
import random

class MessageQueueUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize queue testing"""
        self.queue_name = f"test-queue-{int(time.time())}"
        self.message_count = 0
        self.published_messages = []
        
        # Create test queue
        self._create_queue()
        
    def _create_queue(self):
        """Create a test queue"""
        queue_config = {
            "name": self.queue_name,
            "durable": True,
            "auto_delete": False,
            "max_length": 1000
        }
        
        response = self.client.post('/api/queues',
                                  json=queue_config,
                                  headers={'Content-Type': 'application/json'},
                                  name="create_queue")
        
        if response.status_code not in [200, 201, 409]:  # 409 = already exists
            print(f"Failed to create queue: {response.status_code}")

    @task(4)
    def test_publish_message(self):
        """Test publishing messages to queue"""
        message_id = str(uuid.uuid4())
        message = {
            "id": message_id,
            "type": random.choice(["order", "notification", "email", "sms"]),
            "payload": {
                "user_id": f"user_{random.randint(1000, 9999)}",
                "action": random.choice(["create", "update", "delete"]),
                "timestamp": int(time.time()),
                "data": {"key": f"value_{random.randint(1, 100)}"}
            },
            "priority": random.randint(1, 10),
            "retry_count": 0,
            "max_retries": 3
        }
        
        headers = {
            'Content-Type': 'application/json',
            'X-Message-ID': message_id,
            'X-Queue-Name': self.queue_name
        }
        
        response = self.client.post(f'/api/queues/{self.queue_name}/messages',
                                  json=message,
                                  headers=headers,
                                  name="publish_message")
        
        if response.status_code in [200, 201, 202]:
            self.message_count += 1
            self.published_messages.append(message_id)

    @task(3)
    def test_consume_message(self):
        """Test consuming messages from queue"""
        params = {
            'count': random.randint(1, 5),
            'timeout': 10,
            'ack_mode': 'auto'
        }
        
        response = self.client.get(f'/api/queues/{self.queue_name}/messages',
                                 params=params,
                                 name="consume_message")
        
        if response.status_code == 200:
            try:
                messages = response.json()
                if isinstance(messages, list):
                    for message in messages:
                        self._process_consumed_message(message)
            except json.JSONDecodeError:
                pass

    def _process_consumed_message(self, message):
        """Process consumed message and acknowledge"""
        message_id = message.get('id')
        if not message_id:
            return
            
        # Simulate message processing
        processing_time = random.uniform(0.1, 0.5)
        time.sleep(processing_time)
        
        # Acknowledge message
        ack_response = self.client.post(f'/api/queues/{self.queue_name}/ack',
                                      json={'message_id': message_id},
                                      name="acknowledge_message")

    @task(2)
    def test_batch_publish(self):
        """Test batch message publishing"""
        batch_size = random.randint(5, 20)
        messages = []
        
        for i in range(batch_size):
            message = {
                "id": str(uuid.uuid4()),
                "type": "batch_message",
                "payload": {
                    "batch_index": i,
                    "batch_size": batch_size,
                    "timestamp": int(time.time())
                },
                "priority": 5
            }
            messages.append(message)
        
        batch_payload = {
            "queue": self.queue_name,
            "messages": messages,
            "batch_id": str(uuid.uuid4())
        }
        
        response = self.client.post('/api/queues/batch-publish',
                                  json=batch_payload,
                                  headers={'Content-Type': 'application/json'},
                                  name="batch_publish")

    @task(1)
    def test_queue_management(self):
        """Test queue management operations"""
        # Get queue stats
        response = self.client.get(f'/api/queues/{self.queue_name}/stats',
                                 name="queue_stats")
        
        if response.status_code == 200:
            try:
                stats = response.json()
                message_count = stats.get('message_count', 0)
                consumer_count = stats.get('consumer_count', 0)
            except json.JSONDecodeError:
                pass
        
        # Test queue purge (occasionally)
        if random.random() < 0.1:  # 10% chance
            purge_response = self.client.delete(f'/api/queues/{self.queue_name}/messages',
                                              name="purge_queue")

    @task(1)
    def test_dead_letter_queue(self):
        """Test dead letter queue functionality"""
        # Publish message that will fail processing
        failed_message = {
            "id": str(uuid.uuid4()),
            "type": "failing_message",
            "payload": {
                "should_fail": True,
                "error_type": "processing_error"
            },
            "retry_count": 0,
            "max_retries": 2
        }
        
        response = self.client.post(f'/api/queues/{self.queue_name}/messages',
                                  json=failed_message,
                                  headers={'Content-Type': 'application/json'},
                                  name="publish_failing_message")
        
        # Check dead letter queue after some time
        time.sleep(2)
        
        dlq_response = self.client.get(f'/api/queues/{self.queue_name}-dlq/messages',
                                     params={'count': 1},
                                     name="check_dead_letter_queue")

    def on_stop(self):
        """Cleanup test queue"""
        try:
            self.client.delete(f'/api/queues/{self.queue_name}',
                             name="delete_queue")
        except:
            pass  # Ignore cleanup errors

AWS SQS Testing

from locust import HttpUser, task, between
import json
import time
import uuid

class SQSUser(HttpUser):
    wait_time = between(1, 2)
    
    def on_start(self):
        """Initialize SQS testing"""
        self.queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue"
        self.aws_region = "us-east-1"
        
    @task(3)
    def test_sqs_send_message(self):
        """Test SQS SendMessage API"""
        message_body = {
            "event": "user_action",
            "user_id": f"user_{random.randint(1000, 9999)}",
            "action": random.choice(["login", "logout", "purchase"]),
            "timestamp": int(time.time())
        }
        
        payload = {
            "Action": "SendMessage",
            "QueueUrl": self.queue_url,
            "MessageBody": json.dumps(message_body),
            "MessageAttributes": {
                "EventType": {
                    "StringValue": "user_action",
                    "DataType": "String"
                },
                "Priority": {
                    "StringValue": str(random.randint(1, 10)),
                    "DataType": "Number"
                }
            }
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.SendMessage'
        }
        
        response = self.client.post('/sqs',
                                  json=payload,
                                  headers=headers,
                                  name="sqs_send_message")

    @task(2)
    def test_sqs_receive_message(self):
        """Test SQS ReceiveMessage API"""
        payload = {
            "Action": "ReceiveMessage",
            "QueueUrl": self.queue_url,
            "MaxNumberOfMessages": random.randint(1, 10),
            "WaitTimeSeconds": 5,
            "MessageAttributeNames": ["All"]
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.ReceiveMessage'
        }
        
        response = self.client.post('/sqs',
                                  json=payload,
                                  headers=headers,
                                  name="sqs_receive_message")
        
        if response.status_code == 200:
            try:
                result = response.json()
                messages = result.get('Messages', [])
                for message in messages:
                    self._delete_sqs_message(message.get('ReceiptHandle'))
            except json.JSONDecodeError:
                pass

    def _delete_sqs_message(self, receipt_handle):
        """Delete processed SQS message"""
        if not receipt_handle:
            return
            
        payload = {
            "Action": "DeleteMessage",
            "QueueUrl": self.queue_url,
            "ReceiptHandle": receipt_handle
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.DeleteMessage'
        }
        
        self.client.post('/sqs',
                       json=payload,
                       headers=headers,
                       name="sqs_delete_message")

    @task(1)
    def test_sqs_batch_operations(self):
        """Test SQS batch send and receive"""
        # Batch send
        entries = []
        for i in range(random.randint(2, 10)):
            entry = {
                "Id": str(i),
                "MessageBody": json.dumps({
                    "batch_index": i,
                    "timestamp": int(time.time())
                })
            }
            entries.append(entry)
        
        batch_payload = {
            "Action": "SendMessageBatch",
            "QueueUrl": self.queue_url,
            "Entries": entries
        }
        
        headers = {
            'Content-Type': 'application/x-amz-json-1.0',
            'X-Amz-Target': 'AWSSimpleQueueService.SendMessageBatch'
        }
        
        response = self.client.post('/sqs',
                                  json=batch_payload,
                                  headers=headers,
                                  name="sqs_batch_send")

Redis Queue Testing

from locust import HttpUser, task, between
import json
import time

class RedisQueueUser(HttpUser):
    wait_time = between(0.5, 2)
    
    def on_start(self):
        """Initialize Redis queue testing"""
        self.queue_key = f"queue:test:{int(time.time())}"
        
    @task(4)
    def test_redis_lpush(self):
        """Test Redis LPUSH (left push) operation"""
        message = {
            "id": str(uuid.uuid4()),
            "data": f"message_{int(time.time())}",
            "priority": random.randint(1, 5)
        }
        
        payload = {
            "command": "LPUSH",
            "key": self.queue_key,
            "value": json.dumps(message)
        }
        
        response = self.client.post('/redis/queue',
                                  json=payload,
                                  name="redis_lpush")

    @task(3)
    def test_redis_brpop(self):
        """Test Redis BRPOP (blocking right pop) operation"""
        payload = {
            "command": "BRPOP",
            "keys": [self.queue_key],
            "timeout": 5
        }
        
        response = self.client.post('/redis/queue',
                                  json=payload,
                                  name="redis_brpop")

    @task(1)
    def test_redis_llen(self):
        """Test Redis LLEN (list length) operation"""
        payload = {
            "command": "LLEN",
            "key": self.queue_key
        }
        
        response = self.client.post('/redis/queue',
                                  json=payload,
                                  name="redis_llen")

Key Testing Points

  1. Message Publishing: Test message creation and queue insertion
  2. Message Consumption: Test message retrieval and processing
  3. Batch Operations: Test bulk message handling
  4. Queue Management: Test queue creation, deletion, and stats
  5. Error Handling: Test dead letter queues and retry logic
  6. Performance: Monitor throughput and latency

This guide provides comprehensive message queue testing patterns for reliable async processing systems with proper error handling and performance monitoring. 
I