from locust import task, HttpUser
import random
import json
class DatabaseAPITestUser(HttpUser):
def on_start(self):
# Database API endpoints
self.db_endpoints = {
"users": "/api/users",
"products": "/api/products",
"orders": "/api/orders",
"query": "/api/query"
}
# Test data for database operations
self.test_users = [
{"name": "John Doe", "email": "john@example.com", "age": 30},
{"name": "Jane Smith", "email": "jane@example.com", "age": 25},
{"name": "Bob Johnson", "email": "bob@example.com", "age": 35}
]
self.test_products = [
{"name": "Laptop", "price": 999.99, "category": "Electronics"},
{"name": "Book", "price": 29.99, "category": "Books"},
{"name": "Chair", "price": 149.99, "category": "Furniture"}
]
# Store created IDs for later operations
self.created_users = []
self.created_products = []
@task(4)
def test_create_user(self):
"""Test creating database records"""
user_data = random.choice(self.test_users).copy()
user_data["email"] = f"user{random.randint(1000, 9999)}@example.com"
with self.client.post(
self.db_endpoints["users"],
json=user_data,
name="Create User"
) as response:
if response.status_code in [200, 201]:
try:
result = response.json()
user_id = result.get("id") or result.get("user_id")
if user_id:
self.created_users.append(user_id)
print(f"Created user: {user_id}")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Create user failed: {response.status_code}")
@task(5)
def test_read_user(self):
"""Test reading database records"""
if self.created_users:
user_id = random.choice(self.created_users)
else:
user_id = random.randint(1, 100) # Assume some users exist
with self.client.get(
f"{self.db_endpoints['users']}/{user_id}",
name="Read User"
) as response:
if response.status_code == 200:
try:
user = response.json()
if user.get("id") or user.get("user_id"):
print(f"Read user: {user_id}")
else:
response.failure("User data missing ID")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
elif response.status_code == 404:
print(f"User not found: {user_id}")
else:
response.failure(f"Read user failed: {response.status_code}")
@task(3)
def test_update_user(self):
"""Test updating database records"""
if not self.created_users:
return
user_id = random.choice(self.created_users)
update_data = {
"name": f"Updated User {random.randint(100, 999)}",
"age": random.randint(18, 65)
}
with self.client.put(
f"{self.db_endpoints['users']}/{user_id}",
json=update_data,
name="Update User"
) as response:
if response.status_code == 200:
try:
result = response.json()
print(f"Updated user: {user_id}")
except json.JSONDecodeError:
print(f"Updated user: {user_id} (no JSON response)")
elif response.status_code == 404:
print(f"User not found for update: {user_id}")
self.created_users.remove(user_id)
else:
response.failure(f"Update user failed: {response.status_code}")
@task(2)
def test_delete_user(self):
"""Test deleting database records"""
if not self.created_users:
return
user_id = self.created_users.pop()
with self.client.delete(
f"{self.db_endpoints['users']}/{user_id}",
name="Delete User"
) as response:
if response.status_code in [200, 204]:
print(f"Deleted user: {user_id}")
elif response.status_code == 404:
print(f"User already deleted: {user_id}")
else:
response.failure(f"Delete user failed: {response.status_code}")
@task(3)
def test_list_users(self):
"""Test listing database records with pagination"""
params = {
"page": random.randint(1, 5),
"limit": random.choice([10, 20, 50]),
"sort": random.choice(["name", "email", "created_at"])
}
with self.client.get(
self.db_endpoints["users"],
params=params,
name="List Users"
) as response:
if response.status_code == 200:
try:
result = response.json()
users = result.get("users", result.get("data", []))
total = result.get("total", len(users))
page = result.get("page", params["page"])
print(f"Listed users: {len(users)} items, page {page}, total {total}")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"List users failed: {response.status_code}")
@task(2)
def test_database_query(self):
"""Test custom database queries"""
queries = [
{"query": "SELECT COUNT(*) FROM users WHERE age > ?", "params": [25]},
{"query": "SELECT * FROM products WHERE price < ? LIMIT 10", "params": [100.0]},
{"query": "SELECT category, COUNT(*) FROM products GROUP BY category", "params": []},
{"query": "SELECT * FROM users ORDER BY created_at DESC LIMIT 5", "params": []}
]
query_data = random.choice(queries)
with self.client.post(
self.db_endpoints["query"],
json=query_data,
name="Database Query"
) as response:
if response.status_code == 200:
try:
result = response.json()
rows = result.get("rows", result.get("data", []))
execution_time = result.get("execution_time_ms", 0)
print(f"Query executed: {len(rows)} rows, {execution_time}ms")
# Validate query performance
if execution_time > 1000: # 1 second threshold
print(f"Slow query detected: {execution_time}ms")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database query failed: {response.status_code}")
@task(2)
def test_database_search(self):
"""Test database search functionality"""
search_terms = ["john", "electronics", "chair", "book", "user"]
search_params = {
"q": random.choice(search_terms),
"table": random.choice(["users", "products"]),
"limit": 20
}
with self.client.get(
f"{self.db_endpoints['query']}/search",
params=search_params,
name="Database Search"
) as response:
if response.status_code == 200:
try:
result = response.json()
results = result.get("results", [])
total_matches = result.get("total_matches", len(results))
print(f"Search results: {len(results)} items, {total_matches} total matches")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database search failed: {response.status_code}")
@task(1)
def test_database_transaction(self):
"""Test database transaction handling"""
transaction_data = {
"operations": [
{
"type": "create",
"table": "users",
"data": {"name": "Transaction User", "email": f"trans{random.randint(1000, 9999)}@example.com"}
},
{
"type": "create",
"table": "products",
"data": {"name": "Transaction Product", "price": 99.99}
}
]
}
with self.client.post(
f"{self.db_endpoints['query']}/transaction",
json=transaction_data,
name="Database Transaction"
) as response:
if response.status_code == 200:
try:
result = response.json()
success = result.get("success", False)
operations_completed = result.get("operations_completed", 0)
if success:
print(f"Transaction successful: {operations_completed} operations")
else:
print(f"Transaction failed: {operations_completed} operations completed")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database transaction failed: {response.status_code}")
@task(1)
def test_database_performance(self):
"""Test database performance monitoring"""
with self.client.get(
f"{self.db_endpoints['query']}/stats",
name="Database Performance"
) as response:
if response.status_code == 200:
try:
stats = response.json()
connection_count = stats.get("active_connections", 0)
avg_query_time = stats.get("avg_query_time_ms", 0)
slow_queries = stats.get("slow_queries_count", 0)
print(f"DB Performance: {connection_count} connections, {avg_query_time}ms avg query time, {slow_queries} slow queries")
# Performance validation
if avg_query_time > 500:
print(f"Warning: High average query time: {avg_query_time}ms")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database performance check failed: {response.status_code}")