from locust import task, HttpUser import random import json class MultiTenantTestUser ( HttpUser ): def on_start ( self ): # Sample tenant configurations self .tenants = [ { "id" : "tenant001" , "name" : "Acme Corp" , "plan" : "enterprise" , "subdomain" : "acme" }, { "id" : "tenant002" , "name" : "Beta LLC" , "plan" : "professional" , "subdomain" : "beta" }, { "id" : "tenant003" , "name" : "Gamma Inc" , "plan" : "starter" , "subdomain" : "gamma" } ] # Select a tenant for this test session self .current_tenant = random.choice( self .tenants) self .tenant_id = self .current_tenant[ "id" ] # API endpoints self .tenant_endpoints = { "data" : "/api/tenants/ {tenant_id} /data" , "users" : "/api/tenants/ {tenant_id} /users" , "config" : "/api/tenants/ {tenant_id} /config" , "resources" : "/api/tenants/ {tenant_id} /resources" } # Authentication tokens (would be obtained via login in real scenario) self .tenant_tokens = { "tenant001" : "token_acme_123" , "tenant002" : "token_beta_456" , "tenant003" : "token_gamma_789" } def get_tenant_headers ( self , tenant_id = None ): """Get headers with tenant-specific authentication""" if not tenant_id: tenant_id = self .tenant_id return { "Authorization" : f "Bearer { self .tenant_tokens.get(tenant_id, 'invalid_token' ) } " , "X-Tenant-ID" : tenant_id, "Content-Type" : "application/json" } @task ( 4 ) def test_tenant_data_access ( self ): """Test accessing tenant-specific data""" endpoint = self .tenant_endpoints[ "data" ].format( tenant_id = self .tenant_id) headers = self .get_tenant_headers() with self .client.get( endpoint, headers = headers, name = "Tenant Data Access" ) as response: if response.status_code == 200 : try : data = response.json() items = data.get( "data" , data.get( "items" , [])) tenant_context = data.get( "tenant_id" ) print ( f "Tenant { self .tenant_id } data: { len (items) } items" ) # Validate tenant context if tenant_context and tenant_context != self .tenant_id: response.failure( f "Data leakage: Expected { self .tenant_id } , got { tenant_context } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) elif response.status_code == 403 : response.failure( "Access denied to tenant data" ) else : response.failure( f "Tenant data access failed: { response.status_code } " ) @task ( 3 ) def test_tenant_user_management ( self ): """Test tenant-specific user operations""" endpoint = self .tenant_endpoints[ "users" ].format( tenant_id = self .tenant_id) headers = self .get_tenant_headers() # Create a test user for this tenant user_data = { "username" : f "testuser_ { random.randint( 1000 , 9999 ) } " , "email" : f "test { random.randint( 100 , 999 ) } @ { self .current_tenant[ 'subdomain' ] } .example.com" , "role" : random.choice([ "admin" , "user" , "viewer" ]), "tenant_id" : self .tenant_id } with self .client.post( endpoint, json = user_data, headers = headers, name = "Create Tenant User" ) as response: if response.status_code in [ 200 , 201 ]: try : result = response.json() user_id = result.get( "user_id" ) or result.get( "id" ) created_tenant = result.get( "tenant_id" ) print ( f "Created user { user_id } for tenant { self .tenant_id } " ) # Validate tenant assignment if created_tenant != self .tenant_id: response.failure( f "User created in wrong tenant: { created_tenant } " ) # Test retrieving the user if user_id: self ._test_get_tenant_user(user_id) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Create tenant user failed: { response.status_code } " ) def _test_get_tenant_user ( self , user_id ): """Helper method to test retrieving tenant user""" endpoint = f " { self .tenant_endpoints[ 'users' ].format( tenant_id = self .tenant_id) } / { user_id } " headers = self .get_tenant_headers() with self .client.get( endpoint, headers = headers, name = "Get Tenant User" ) as response: if response.status_code == 200 : try : user = response.json() user_tenant = user.get( "tenant_id" ) if user_tenant != self .tenant_id: response.failure( f "User tenant mismatch: { user_tenant } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) @task ( 2 ) def test_tenant_configuration ( self ): """Test tenant-specific configuration management""" endpoint = self .tenant_endpoints[ "config" ].format( tenant_id = self .tenant_id) headers = self .get_tenant_headers() # Update tenant configuration config_data = { "features" : { "advanced_analytics" : self .current_tenant[ "plan" ] == "enterprise" , "api_rate_limit" : 1000 if self .current_tenant[ "plan" ] == "enterprise" else 100 , "storage_limit_gb" : 100 if self .current_tenant[ "plan" ] == "enterprise" else 10 }, "branding" : { "theme_color" : f "# { random.randint( 100000 , 999999 ) :06x} " , "logo_url" : f "https:// { self .current_tenant[ 'subdomain' ] } .example.com/logo.png" } } with self .client.put( endpoint, json = config_data, headers = headers, name = "Update Tenant Config" ) as response: if response.status_code == 200 : try : result = response.json() updated_tenant = result.get( "tenant_id" ) print ( f "Updated config for tenant { self .tenant_id } " ) if updated_tenant != self .tenant_id: response.failure( f "Config updated for wrong tenant: { updated_tenant } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Update tenant config failed: { response.status_code } " ) @task ( 2 ) def test_cross_tenant_access_denied ( self ): """Test that cross-tenant access is properly denied""" # Try to access a different tenant's data other_tenants = [t for t in self .tenants if t[ "id" ] != self .tenant_id] if not other_tenants: return other_tenant = random.choice(other_tenants) other_tenant_id = other_tenant[ "id" ] # Use current tenant's token to try accessing other tenant's data endpoint = self .tenant_endpoints[ "data" ].format( tenant_id = other_tenant_id) headers = self .get_tenant_headers() # Uses current tenant's token with self .client.get( endpoint, headers = headers, name = "Cross-Tenant Access Test" ) as response: if response.status_code == 403 : print ( f "Cross-tenant access correctly denied: { self .tenant_id } -> { other_tenant_id } " ) elif response.status_code == 404 : print ( f "Cross-tenant resource not found (acceptable): { other_tenant_id } " ) elif response.status_code == 200 : response.failure( f "Security breach: Cross-tenant access allowed { self .tenant_id } -> { other_tenant_id } " ) else : print ( f "Cross-tenant access returned: { response.status_code } " ) @task ( 1 ) def test_tenant_resource_limits ( self ): """Test tenant-specific resource limits""" endpoint = self .tenant_endpoints[ "resources" ].format( tenant_id = self .tenant_id) headers = self .get_tenant_headers() with self .client.get( endpoint, headers = headers, name = "Tenant Resource Limits" ) as response: if response.status_code == 200 : try : resources = response.json() used_storage = resources.get( "storage_used_gb" , 0 ) storage_limit = resources.get( "storage_limit_gb" , 0 ) api_calls_used = resources.get( "api_calls_used" , 0 ) api_calls_limit = resources.get( "api_calls_limit" , 0 ) print ( f "Tenant { self .tenant_id } resources: Storage { used_storage } / { storage_limit } GB, API { api_calls_used } / { api_calls_limit } " ) # Validate plan-based limits plan = self .current_tenant[ "plan" ] expected_storage = 100 if plan == "enterprise" else 10 if storage_limit != expected_storage: print ( f "Warning: Storage limit { storage_limit } doesn't match plan { plan } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Get tenant resources failed: { response.status_code } " ) @task ( 1 ) def test_tenant_subdomain_access ( self ): """Test tenant subdomain-based access""" subdomain = self .current_tenant[ "subdomain" ] # Test subdomain-specific endpoint subdomain_endpoint = f "/api/subdomain/ { subdomain } /info" headers = self .get_tenant_headers() with self .client.get( subdomain_endpoint, headers = headers, name = "Subdomain Access" ) as response: if response.status_code == 200 : try : info = response.json() returned_tenant = info.get( "tenant_id" ) returned_subdomain = info.get( "subdomain" ) print ( f "Subdomain access: { subdomain } -> { returned_tenant } " ) if returned_tenant != self .tenant_id: response.failure( f "Subdomain mapped to wrong tenant: { returned_tenant } " ) if returned_subdomain != subdomain: response.failure( f "Subdomain mismatch: { returned_subdomain } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Subdomain access failed: { response.status_code } " )