This guide shows how to test APIs that use JWT (JSON Web Token) authentication. Perfect for testing login flows and authenticated API access.

Use Cases

  • Test JWT-based login authentication
  • Test protected API endpoints with JWT tokens
  • Validate token-based access control
  • Test invalid token handling

Simple Implementation

from locust import task, HttpUser
import random
import json

class JWTTestUser(HttpUser):
    def on_start(self):
        # User credentials for testing
        self.username = "test@example.com"
        self.password = "testpassword"
        self.access_token = None
        
        # API endpoints
        self.login_endpoint = "/auth/login"
        self.protected_endpoints = [
            "/api/user/profile",
            "/api/user/data",
            "/api/dashboard",
            "/api/settings"
        ]
        
        # Login to get JWT token
        self.login()

    def login(self):
        """Login to get JWT access token"""
        login_data = {
            "username": self.username,
            "password": self.password
        }
        
        with self.client.post(
            self.login_endpoint,
            json=login_data,
            name="Auth: Login"
        ) as response:
            if response.status_code == 200:
                try:
                    token_data = response.json()
                    self.access_token = token_data.get('access_token') or token_data.get('token')
                    print(f"Successfully logged in: {self.username}")
                except json.JSONDecodeError:
                    print("Login successful but invalid JSON response")
            else:
                print(f"Login failed: {response.status_code}")
                response.failure(f"Login failed: {response.status_code}")

    def get_auth_headers(self):
        """Get authentication headers with JWT token"""
        if self.access_token:
            return {
                'Authorization': f'Bearer {self.access_token}',
                'Content-Type': 'application/json'
            }
        return {'Content-Type': 'application/json'}

    @task(4)
    def test_protected_api_access(self):
        """Test accessing protected API with JWT token"""
        if not self.access_token:
            self.login()
            
        endpoint = random.choice(self.protected_endpoints)
        headers = self.get_auth_headers()
        
        with self.client.get(
            endpoint,
            headers=headers,
            name="Protected API Access"
        ) as response:
            if response.status_code == 200:
                print(f"Protected API access success: {endpoint}")
            elif response.status_code == 401:
                print(f"JWT authentication failed: {endpoint}")
                response.failure("JWT token rejected")
                # Token might be expired, try to login again
                self.access_token = None
            else:
                response.failure(f"Unexpected response: {response.status_code}")

    @task(3)
    def test_user_profile_update(self):
        """Test updating user profile with JWT authentication"""
        if not self.access_token:
            self.login()
            
        headers = self.get_auth_headers()
        update_data = {
            "name": "Test User Updated",
            "email": self.username,
            "preferences": {"theme": "dark", "notifications": True}
        }
        
        with self.client.put(
            "/api/user/profile",
            json=update_data,
            headers=headers,
            name="Update Profile"
        ) as response:
            if response.status_code in [200, 204]:
                print("Profile update successful")
            elif response.status_code == 401:
                response.failure("JWT authentication failed on update")
                self.access_token = None
            else:
                response.failure(f"Profile update failed: {response.status_code}")

    @task(2)
    def test_invalid_token_access(self):
        """Test API access with invalid JWT token"""
        endpoint = random.choice(self.protected_endpoints)
        
        # Use invalid token
        invalid_headers = {
            'Authorization': 'Bearer invalid-jwt-token-12345',
            'Content-Type': 'application/json'
        }
        
        with self.client.get(
            endpoint,
            headers=invalid_headers,
            name="Invalid Token Access"
        ) as response:
            if response.status_code == 401:
                print(f"Invalid token correctly rejected: {endpoint}")
            elif response.status_code == 200:
                response.failure("Invalid JWT token was accepted")
            else:
                print(f"Invalid token returned {response.status_code}")

    @task(2)
    def test_missing_token_access(self):
        """Test API access without JWT token"""
        endpoint = random.choice(self.protected_endpoints)
        
        with self.client.get(
            endpoint,
            name="Missing Token Access"
        ) as response:
            if response.status_code == 401:
                print(f"Missing token correctly rejected: {endpoint}")
            elif response.status_code == 200:
                response.failure("Request without JWT token was accepted")
            else:
                print(f"Missing token returned {response.status_code}")

    @task(1)
    def test_login_with_invalid_credentials(self):
        """Test login with invalid credentials"""
        invalid_login_data = {
            "username": "invalid@example.com",
            "password": "wrongpassword"
        }
        
        with self.client.post(
            self.login_endpoint,
            json=invalid_login_data,
            name="Invalid Login"
        ) as response:
            if response.status_code == 401:
                print("Invalid credentials correctly rejected")
            elif response.status_code == 200:
                response.failure("Invalid credentials were accepted")
            else:
                print(f"Invalid login returned {response.status_code}")

Setup Instructions

  1. Replace self.username and self.password with valid test credentials
  2. Update self.login_endpoint with your actual login endpoint
  3. Update self.protected_endpoints with your protected API endpoints
  4. Adjust token field name if your API returns different field (token vs access_token)

What This Tests

  • Login Flow: Tests JWT token acquisition through login
  • Protected APIs: Tests access to JWT-protected endpoints
  • Authentication: Validates Bearer token authentication
  • Error Handling: Tests invalid/missing token scenarios

Best Practices

  • Use dedicated test accounts with appropriate permissions
  • Handle token expiration by re-authenticating
  • Test both valid and invalid authentication scenarios
  • Monitor API response times with different token states

Common Issues

  • Token Expiration: JWT tokens expire, implement re-authentication
  • Token Format: Ensure Bearer token format matches API expectations
  • Field Names: APIs may return ‘token’, ‘access_token’, or ‘jwt’
  • HTTPS Only: JWT tokens should only be sent over secure connections
I