from locust import task, HttpUser import random import json class FormValidationUser ( HttpUser ): def on_start ( self ): # Common form endpoints to test self .forms = { "/contact" : { "required_fields" : [ "name" , "email" , "message" ], "optional_fields" : [ "phone" , "company" ], "validation_rules" : { "email" : "email_format" , "phone" : "phone_format" } }, "/signup" : { "required_fields" : [ "username" , "email" , "password" ], "optional_fields" : [ "first_name" , "last_name" ], "validation_rules" : { "email" : "email_format" , "password" : "min_length_8" , "username" : "alphanumeric" } }, "/profile" : { "required_fields" : [ "first_name" , "last_name" ], "optional_fields" : [ "bio" , "website" , "location" ], "validation_rules" : { "website" : "url_format" , "bio" : "max_length_500" } } } # Test data for validation self .test_data = { "valid" : { "name" : "John Doe" , "first_name" : "John" , "last_name" : "Doe" , "email" : "john@example.com" , "username" : "johndoe123" , "password" : "SecurePass123!" , "phone" : "+1-555-123-4567" , "message" : "This is a test message" , "website" : "https://example.com" , "bio" : "Software developer with 5 years experience" }, "invalid" : { "email" : [ "invalid-email" , "@example.com" , "test@" , "plaintext" ], "password" : [ "123" , "short" , "" ], "phone" : [ "123" , "invalid-phone" , "" ], "website" : [ "not-a-url" , "ftp://invalid" , "" ], "username" : [ "" , "user@name" , "user name" , "a" ] } } @task ( 4 ) def test_valid_form_submission ( self ): """Test form submission with valid data""" form_path = random.choice( list ( self .forms.keys())) form_config = self .forms[form_path] # Build valid form data form_data = {} # Add required fields for field in form_config[ "required_fields" ]: if field in self .test_data[ "valid" ]: form_data[field] = self .test_data[ "valid" ][field] # Add some optional fields for field in form_config[ "optional_fields" ]: if field in self .test_data[ "valid" ] and random.choice([ True , False ]): form_data[field] = self .test_data[ "valid" ][field] with self .client.post( form_path, data = form_data, name = f "Valid Form - { form_path } " ) as response: if response.status_code in [ 200 , 201 , 302 ]: print ( f "Valid form submission { form_path } : SUCCESS ( { response.status_code } )" ) # Check for success indicators if response.status_code == 200 : success_indicators = [ "success" , "thank you" , "submitted" , "received" ] response_text = response.text.lower() success_found = any (indicator in response_text for indicator in success_indicators) if success_found: print ( f "Success message detected in { form_path } " ) else : print ( f "No clear success message in { form_path } " ) elif response.status_code == 400 : print ( f "Valid form submission { form_path } : Validation error (unexpected)" ) response.failure( "Valid data rejected" ) else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 3 ) def test_missing_required_fields ( self ): """Test form submission with missing required fields""" form_path = random.choice( list ( self .forms.keys())) form_config = self .forms[form_path] if not form_config[ "required_fields" ]: return # Skip if no required fields # Build form data missing one required field form_data = {} missing_field = random.choice(form_config[ "required_fields" ]) for field in form_config[ "required_fields" ]: if field != missing_field and field in self .test_data[ "valid" ]: form_data[field] = self .test_data[ "valid" ][field] with self .client.post( form_path, data = form_data, name = f "Missing Field - { form_path } " ) as response: if response.status_code == 400 : print ( f "Missing field test { form_path } : Correctly rejected ( { missing_field } )" ) # Check for error message about missing field if response.text: error_indicators = [ "required" , "missing" , "field" , missing_field] response_text = response.text.lower() error_found = any (indicator in response_text for indicator in error_indicators) if error_found: print ( f "Error message detected for missing { missing_field } " ) else : print ( f "No clear error message for missing { missing_field } " ) elif response.status_code in [ 200 , 201 , 302 ]: print ( f "Missing field test { form_path } : Unexpectedly accepted" ) response.failure( f "Form accepted without required field: { missing_field } " ) else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 3 ) def test_invalid_field_formats ( self ): """Test form submission with invalid field formats""" form_path = random.choice( list ( self .forms.keys())) form_config = self .forms[form_path] # Find a field with validation rules validation_fields = list (form_config.get( "validation_rules" , {}).keys()) if not validation_fields: return # Skip if no validation rules test_field = random.choice(validation_fields) # Build form data with invalid value for test field form_data = {} # Add required fields (valid values) for field in form_config[ "required_fields" ]: if field == test_field: # Use invalid value for test field if test_field in self .test_data[ "invalid" ]: form_data[field] = random.choice( self .test_data[ "invalid" ][test_field]) else : form_data[field] = "invalid_value" elif field in self .test_data[ "valid" ]: form_data[field] = self .test_data[ "valid" ][field] with self .client.post( form_path, data = form_data, name = f "Invalid Format - { form_path } " ) as response: if response.status_code == 400 : print ( f "Invalid format test { form_path } : Correctly rejected ( { test_field } )" ) # Check for specific validation error validation_keywords = [ "invalid" , "format" , "valid" , test_field] response_text = response.text.lower() error_found = any (keyword in response_text for keyword in validation_keywords) if error_found: print ( f "Validation error message detected for { test_field } " ) else : print ( f "No clear validation error for { test_field } " ) elif response.status_code in [ 200 , 201 , 302 ]: print ( f "Invalid format test { form_path } : Unexpectedly accepted" ) response.failure( f "Invalid { test_field } format accepted" ) else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 2 ) def test_empty_form_submission ( self ): """Test form submission with completely empty data""" form_path = random.choice( list ( self .forms.keys())) with self .client.post( form_path, data = {}, name = f "Empty Form - { form_path } " ) as response: if response.status_code == 400 : print ( f "Empty form test { form_path } : Correctly rejected" ) # Check for multiple field errors error_indicators = [ "required" , "missing" , "field" , "empty" ] response_text = response.text.lower() error_count = sum ( 1 for indicator in error_indicators if indicator in response_text) if error_count >= 2 : print ( f "Multiple validation errors detected (good)" ) else : print ( f "Limited error feedback for empty form" ) elif response.status_code in [ 200 , 201 , 302 ]: print ( f "Empty form test { form_path } : Unexpectedly accepted" ) response.failure( "Empty form accepted" ) else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 2 ) def test_field_length_limits ( self ): """Test form fields with length limits""" form_path = random.choice( list ( self .forms.keys())) form_config = self .forms[form_path] # Test with very long values long_values = { "name" : "A" * 1000 , "email" : "a" * 500 + "@example.com" , "message" : "This is a very long message. " * 100 , "bio" : "Long bio content. " * 200 , "username" : "a" * 500 } # Build form data with one long field form_data = {} test_field = None for field in form_config[ "required_fields" ]: if field in long_values: form_data[field] = long_values[field] test_field = field break elif field in self .test_data[ "valid" ]: form_data[field] = self .test_data[ "valid" ][field] if not test_field: return # Skip if no testable field with self .client.post( form_path, data = form_data, name = f "Length Limit - { form_path } " ) as response: if response.status_code == 400 : print ( f "Length limit test { form_path } : Correctly rejected ( { test_field } )" ) # Check for length-related error length_keywords = [ "long" , "length" , "limit" , "maximum" , "characters" ] response_text = response.text.lower() length_error = any (keyword in response_text for keyword in length_keywords) if length_error: print ( f "Length validation error detected for { test_field } " ) else : print ( f "Generic error for length violation on { test_field } " ) elif response.status_code in [ 200 , 201 , 302 ]: print ( f "Length limit test { form_path } : Accepted (no length limit)" ) else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 1 ) def test_special_characters ( self ): """Test form handling of special characters""" form_path = random.choice( list ( self .forms.keys())) form_config = self .forms[form_path] # Special characters to test special_chars = { "name" : "John O'Connor-Smith" , "message" : "Test with <script>alert('xss')</script> and special chars: àáâãäå" , "bio" : "Bio with émojis 🚀 and spëcial chars" , "company" : "Smith & Jones Co., Ltd." } # Build form data with special characters form_data = {} for field in form_config[ "required_fields" ]: if field in special_chars: form_data[field] = special_chars[field] elif field in self .test_data[ "valid" ]: form_data[field] = self .test_data[ "valid" ][field] with self .client.post( form_path, data = form_data, name = f "Special Chars - { form_path } " ) as response: if response.status_code in [ 200 , 201 , 302 , 400 ]: print ( f "Special chars test { form_path } : { response.status_code } " ) # Check that dangerous content is handled if "<script>" in response.text: response.failure( "Potential XSS vulnerability - script tag in response" ) print ( "WARNING: Script tag found in response" ) else : print ( "Special characters handled safely" ) else : response.failure( f "Unexpected response: { response.status_code } " ) @task ( 1 ) def test_form_csrf_protection ( self ): """Test CSRF protection on forms""" form_path = random.choice( list ( self .forms.keys())) # First, get the form to check for CSRF token with self .client.get(form_path, name = f "Get Form - { form_path } " ) as get_response: if get_response.status_code == 200 : # Look for CSRF token in response csrf_indicators = [ "csrf" , "token" , "_token" , "authenticity_token" ] response_text = get_response.text.lower() has_csrf = any (indicator in response_text for indicator in csrf_indicators) if has_csrf: print ( f "CSRF protection detected on { form_path } " ) # Try to submit without CSRF token form_data = {} form_config = self .forms[form_path] for field in form_config[ "required_fields" ]: if field in self .test_data[ "valid" ]: form_data[field] = self .test_data[ "valid" ][field] with self .client.post( form_path, data = form_data, name = f "CSRF Test - { form_path } " ) as post_response: if post_response.status_code == 403 : print ( f "CSRF protection working - request blocked" ) elif post_response.status_code == 400 : print ( f "CSRF protection may be working - validation error" ) else : print ( f "CSRF test result: { post_response.status_code } " ) else : print ( f "No obvious CSRF protection on { form_path } " )