from locust import HttpUser, task, between import time import json import re class SSEUser ( HttpUser ): wait_time = between( 1 , 3 ) def on_start ( self ): """Initialize SSE connection tracking""" self .connection_id = None self .last_event_id = None self .events_received = 0 @task ( 3 ) def test_sse_connection ( self ): """Test basic SSE connection and event reception""" headers = { 'Accept' : 'text/event-stream' , 'Cache-Control' : 'no-cache' , } # Add Last-Event-ID for reconnection if self .last_event_id: headers[ 'Last-Event-ID' ] = self .last_event_id with self .client.get( '/api/events/stream' , headers = headers, stream = True , catch_response = True ) as response: if response.status_code != 200 : response.failure( f "SSE connection failed: { response.status_code } " ) return # Process SSE stream for limited time start_time = time.time() timeout = 10 # 10 seconds max per connection try : for line in response.iter_lines( decode_unicode = True ): if time.time() - start_time > timeout: break if line: self ._process_sse_line(line) if self .events_received > 0 : response.success() else : response.failure( "No events received" ) except Exception as e: response.failure( f "SSE processing error: { str (e) } " ) def _process_sse_line ( self , line ): """Process individual SSE lines""" if line.startswith( 'data: ' ): # Extract event data data = line[ 6 :] # Remove 'data: ' prefix try : event_data = json.loads(data) self .events_received += 1 # Track event ID for reconnection if 'id' in event_data: self .last_event_id = event_data[ 'id' ] except json.JSONDecodeError: # Handle plain text events self .events_received += 1 elif line.startswith( 'id: ' ): # Track event ID self .last_event_id = line[ 4 :] elif line.startswith( 'event: ' ): # Handle named events event_type = line[ 7 :] @task ( 2 ) def test_sse_with_filters ( self ): """Test SSE with query parameters and filters""" params = { 'channel' : 'notifications' , 'user_id' : f 'user_ { self .client.base_url.split( "//" )[ 1 ].replace( "." , "_" ) } ' , 'types' : 'message,alert,update' } headers = { 'Accept' : 'text/event-stream' , 'Authorization' : 'Bearer test-token-123' } with self .client.get( '/api/events/filtered' , params = params, headers = headers, stream = True , catch_response = True ) as response: if response.status_code == 200 : # Process a few events event_count = 0 for line in response.iter_lines( decode_unicode = True ): if line and line.startswith( 'data: ' ): event_count += 1 if event_count >= 3 : # Process 3 events then disconnect break response.success() else : response.failure( f "Filtered SSE failed: { response.status_code } " ) @task ( 1 ) def test_sse_reconnection ( self ): """Test SSE reconnection after connection drop""" headers = { 'Accept' : 'text/event-stream' , 'Connection' : 'keep-alive' } # First connection with self .client.get( '/api/events/stream' , headers = headers, stream = True , catch_response = True ) as response: if response.status_code != 200 : response.failure( "Initial SSE connection failed" ) return # Simulate connection drop after receiving some events event_count = 0 for line in response.iter_lines( decode_unicode = True ): if line and line.startswith( 'data: ' ): event_count += 1 if event_count >= 2 : break # Simulate connection drop # Wait before reconnection time.sleep( 1 ) # Reconnection attempt reconnect_headers = headers.copy() if self .last_event_id: reconnect_headers[ 'Last-Event-ID' ] = self .last_event_id with self .client.get( '/api/events/stream' , headers = reconnect_headers, stream = True , catch_response = True ) as response: if response.status_code == 200 : response.success() else : response.failure( f "SSE reconnection failed: { response.status_code } " )