from locust import task, HttpUser import random import json class DatabaseAPITestUser ( HttpUser ): def on_start ( self ): # Database API endpoints self .db_endpoints = { "users" : "/api/users" , "products" : "/api/products" , "orders" : "/api/orders" , "query" : "/api/query" } # Test data for database operations self .test_users = [ { "name" : "John Doe" , "email" : "john@example.com" , "age" : 30 }, { "name" : "Jane Smith" , "email" : "jane@example.com" , "age" : 25 }, { "name" : "Bob Johnson" , "email" : "bob@example.com" , "age" : 35 } ] self .test_products = [ { "name" : "Laptop" , "price" : 999.99 , "category" : "Electronics" }, { "name" : "Book" , "price" : 29.99 , "category" : "Books" }, { "name" : "Chair" , "price" : 149.99 , "category" : "Furniture" } ] # Store created IDs for later operations self .created_users = [] self .created_products = [] @task ( 4 ) def test_create_user ( self ): """Test creating database records""" user_data = random.choice( self .test_users).copy() user_data[ "email" ] = f "user { random.randint( 1000 , 9999 ) } @example.com" with self .client.post( self .db_endpoints[ "users" ], json = user_data, name = "Create User" ) as response: if response.status_code in [ 200 , 201 ]: try : result = response.json() user_id = result.get( "id" ) or result.get( "user_id" ) if user_id: self .created_users.append(user_id) print ( f "Created user: { user_id } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Create user failed: { response.status_code } " ) @task ( 5 ) def test_read_user ( self ): """Test reading database records""" if self .created_users: user_id = random.choice( self .created_users) else : user_id = random.randint( 1 , 100 ) # Assume some users exist with self .client.get( f " { self .db_endpoints[ 'users' ] } / { user_id } " , name = "Read User" ) as response: if response.status_code == 200 : try : user = response.json() if user.get( "id" ) or user.get( "user_id" ): print ( f "Read user: { user_id } " ) else : response.failure( "User data missing ID" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) elif response.status_code == 404 : print ( f "User not found: { user_id } " ) else : response.failure( f "Read user failed: { response.status_code } " ) @task ( 3 ) def test_update_user ( self ): """Test updating database records""" if not self .created_users: return user_id = random.choice( self .created_users) update_data = { "name" : f "Updated User { random.randint( 100 , 999 ) } " , "age" : random.randint( 18 , 65 ) } with self .client.put( f " { self .db_endpoints[ 'users' ] } / { user_id } " , json = update_data, name = "Update User" ) as response: if response.status_code == 200 : try : result = response.json() print ( f "Updated user: { user_id } " ) except json.JSONDecodeError: print ( f "Updated user: { user_id } (no JSON response)" ) elif response.status_code == 404 : print ( f "User not found for update: { user_id } " ) self .created_users.remove(user_id) else : response.failure( f "Update user failed: { response.status_code } " ) @task ( 2 ) def test_delete_user ( self ): """Test deleting database records""" if not self .created_users: return user_id = self .created_users.pop() with self .client.delete( f " { self .db_endpoints[ 'users' ] } / { user_id } " , name = "Delete User" ) as response: if response.status_code in [ 200 , 204 ]: print ( f "Deleted user: { user_id } " ) elif response.status_code == 404 : print ( f "User already deleted: { user_id } " ) else : response.failure( f "Delete user failed: { response.status_code } " ) @task ( 3 ) def test_list_users ( self ): """Test listing database records with pagination""" params = { "page" : random.randint( 1 , 5 ), "limit" : random.choice([ 10 , 20 , 50 ]), "sort" : random.choice([ "name" , "email" , "created_at" ]) } with self .client.get( self .db_endpoints[ "users" ], params = params, name = "List Users" ) as response: if response.status_code == 200 : try : result = response.json() users = result.get( "users" , result.get( "data" , [])) total = result.get( "total" , len (users)) page = result.get( "page" , params[ "page" ]) print ( f "Listed users: { len (users) } items, page { page } , total { total } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "List users failed: { response.status_code } " ) @task ( 2 ) def test_database_query ( self ): """Test custom database queries""" queries = [ { "query" : "SELECT COUNT(*) FROM users WHERE age > ?" , "params" : [ 25 ]}, { "query" : "SELECT * FROM products WHERE price < ? LIMIT 10" , "params" : [ 100.0 ]}, { "query" : "SELECT category, COUNT(*) FROM products GROUP BY category" , "params" : []}, { "query" : "SELECT * FROM users ORDER BY created_at DESC LIMIT 5" , "params" : []} ] query_data = random.choice(queries) with self .client.post( self .db_endpoints[ "query" ], json = query_data, name = "Database Query" ) as response: if response.status_code == 200 : try : result = response.json() rows = result.get( "rows" , result.get( "data" , [])) execution_time = result.get( "execution_time_ms" , 0 ) print ( f "Query executed: { len (rows) } rows, { execution_time } ms" ) # Validate query performance if execution_time > 1000 : # 1 second threshold print ( f "Slow query detected: { execution_time } ms" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Database query failed: { response.status_code } " ) @task ( 2 ) def test_database_search ( self ): """Test database search functionality""" search_terms = [ "john" , "electronics" , "chair" , "book" , "user" ] search_params = { "q" : random.choice(search_terms), "table" : random.choice([ "users" , "products" ]), "limit" : 20 } with self .client.get( f " { self .db_endpoints[ 'query' ] } /search" , params = search_params, name = "Database Search" ) as response: if response.status_code == 200 : try : result = response.json() results = result.get( "results" , []) total_matches = result.get( "total_matches" , len (results)) print ( f "Search results: { len (results) } items, { total_matches } total matches" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Database search failed: { response.status_code } " ) @task ( 1 ) def test_database_transaction ( self ): """Test database transaction handling""" transaction_data = { "operations" : [ { "type" : "create" , "table" : "users" , "data" : { "name" : "Transaction User" , "email" : f "trans { random.randint( 1000 , 9999 ) } @example.com" } }, { "type" : "create" , "table" : "products" , "data" : { "name" : "Transaction Product" , "price" : 99.99 } } ] } with self .client.post( f " { self .db_endpoints[ 'query' ] } /transaction" , json = transaction_data, name = "Database Transaction" ) as response: if response.status_code == 200 : try : result = response.json() success = result.get( "success" , False ) operations_completed = result.get( "operations_completed" , 0 ) if success: print ( f "Transaction successful: { operations_completed } operations" ) else : print ( f "Transaction failed: { operations_completed } operations completed" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Database transaction failed: { response.status_code } " ) @task ( 1 ) def test_database_performance ( self ): """Test database performance monitoring""" with self .client.get( f " { self .db_endpoints[ 'query' ] } /stats" , name = "Database Performance" ) as response: if response.status_code == 200 : try : stats = response.json() connection_count = stats.get( "active_connections" , 0 ) avg_query_time = stats.get( "avg_query_time_ms" , 0 ) slow_queries = stats.get( "slow_queries_count" , 0 ) print ( f "DB Performance: { connection_count } connections, { avg_query_time } ms avg query time, { slow_queries } slow queries" ) # Performance validation if avg_query_time > 500 : print ( f "Warning: High average query time: { avg_query_time } ms" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Database performance check failed: { response.status_code } " )