from locust import task, HttpUser import random import json class JWTTestUser ( HttpUser ): def on_start ( self ): # User credentials for testing self .username = "test@example.com" self .password = "testpassword" self .access_token = None # API endpoints self .login_endpoint = "/auth/login" self .protected_endpoints = [ "/api/user/profile" , "/api/user/data" , "/api/dashboard" , "/api/settings" ] # Login to get JWT token self .login() def login ( self ): """Login to get JWT access token""" login_data = { "username" : self .username, "password" : self .password } with self .client.post( self .login_endpoint, json = login_data, name = "Auth: Login" ) as response: if response.status_code == 200 : try : token_data = response.json() self .access_token = token_data.get( 'access_token' ) or token_data.get( 'token' ) print ( f "Successfully logged in: { self .username } " ) except json.JSONDecodeError: print ( "Login successful but invalid JSON response" ) else : print ( f "Login failed: { response.status_code } " ) response.failure( f "Login failed: { response.status_code } " ) def get_auth_headers ( self ): """Get authentication headers with JWT token""" if self .access_token: return { 'Authorization' : f 'Bearer { self .access_token } ' , 'Content-Type' : 'application/json' } return { 'Content-Type' : 'application/json' } @task ( 4 ) def test_protected_api_access ( self ): """Test accessing protected API with JWT token""" if not self .access_token: self .login() endpoint = random.choice( self .protected_endpoints) headers = self .get_auth_headers() with self .client.get( endpoint, headers = headers, name = "Protected API Access" ) as response: if response.status_code == 200 : print ( f "Protected API access success: { endpoint } " ) elif response.status_code == 401 : print ( f "JWT authentication failed: { endpoint } " ) response.failure( "JWT token rejected" ) # Token might be expired, try to login again self .access_token = None else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 3 ) def test_user_profile_update ( self ): """Test updating user profile with JWT authentication""" if not self .access_token: self .login() headers = self .get_auth_headers() update_data = { "name" : "Test User Updated" , "email" : self .username, "preferences" : { "theme" : "dark" , "notifications" : True } } with self .client.put( "/api/user/profile" , json = update_data, headers = headers, name = "Update Profile" ) as response: if response.status_code in [ 200 , 204 ]: print ( "Profile update successful" ) elif response.status_code == 401 : response.failure( "JWT authentication failed on update" ) self .access_token = None else : response.failure( f "Profile update failed: { response.status_code } " ) @task ( 2 ) def test_invalid_token_access ( self ): """Test API access with invalid JWT token""" endpoint = random.choice( self .protected_endpoints) # Use invalid token invalid_headers = { 'Authorization' : 'Bearer invalid-jwt-token-12345' , 'Content-Type' : 'application/json' } with self .client.get( endpoint, headers = invalid_headers, name = "Invalid Token Access" ) as response: if response.status_code == 401 : print ( f "Invalid token correctly rejected: { endpoint } " ) elif response.status_code == 200 : response.failure( "Invalid JWT token was accepted" ) else : print ( f "Invalid token returned { response.status_code } " ) @task ( 2 ) def test_missing_token_access ( self ): """Test API access without JWT token""" endpoint = random.choice( self .protected_endpoints) with self .client.get( endpoint, name = "Missing Token Access" ) as response: if response.status_code == 401 : print ( f "Missing token correctly rejected: { endpoint } " ) elif response.status_code == 200 : response.failure( "Request without JWT token was accepted" ) else : print ( f "Missing token returned { response.status_code } " ) @task ( 1 ) def test_login_with_invalid_credentials ( self ): """Test login with invalid credentials""" invalid_login_data = { "username" : "invalid@example.com" , "password" : "wrongpassword" } with self .client.post( self .login_endpoint, json = invalid_login_data, name = "Invalid Login" ) as response: if response.status_code == 401 : print ( "Invalid credentials correctly rejected" ) elif response.status_code == 200 : response.failure( "Invalid credentials were accepted" ) else : print ( f "Invalid login returned { response.status_code } " )