from locust import HttpUser, task, between import time import json import re class SSEUser ( HttpUser ): wait_time = between( 1 , 3 ) def on_start ( self ): """Initialize SSE testing""" self .events_received = 0 self .connection_errors = 0 self .last_event_id = None @task ( 3 ) def test_sse_stream ( self ): """Test SSE stream with manual parsing""" headers = { 'Accept' : 'text/event-stream' , 'Cache-Control' : 'no-cache' , } # Add Last-Event-ID header if we have one if self .last_event_id: headers[ 'Last-Event-ID' ] = self .last_event_id try : with self .client.get( '/api/events/stream' , headers = headers, stream = True , name = "SSE Stream Connection" ) as response: if response.status_code == 200 : self ._process_sse_stream(response) else : print ( f "SSE connection failed: { response.status_code } " ) self .connection_errors += 1 except Exception as e: print ( f "SSE stream error: { str (e) } " ) self .connection_errors += 1 def _process_sse_stream ( self , response ): """Process SSE stream manually""" buffer = "" events_processed = 0 max_events = 5 # Limit events per connection for load testing try : for chunk in response.iter_content( chunk_size = 1024 , decode_unicode = True ): if chunk: buffer += chunk # Process complete events while '



' in buffer: event_data, buffer = buffer.split( '



' , 1 ) event = self ._parse_sse_event(event_data) if event: self ._handle_sse_event(event) events_processed += 1 # Limit events for load testing if events_processed >= max_events: return except Exception as e: print ( f "Error processing SSE stream: { str (e) } " ) def _parse_sse_event ( self , event_data ): """Parse SSE event from raw data""" event = { 'event' : None , 'data' : '' , 'id' : None , 'retry' : None } lines = event_data.strip().split( '

' ) for line in lines: line = line.strip() if not line or line.startswith( ':' ): continue # Skip empty lines and comments if ':' in line: field, value = line.split( ':' , 1 ) field = field.strip() value = value.strip() if field == 'data' : if event[ 'data' ]: event[ 'data' ] += '

' + value else : event[ 'data' ] = value elif field in [ 'event' , 'id' , 'retry' ]: event[field] = value return event if event[ 'data' ] or event[ 'event' ] else None def _handle_sse_event ( self , event ): """Handle parsed SSE event""" self .events_received += 1 # Store last event ID for reconnection if event[ 'id' ]: self .last_event_id = event[ 'id' ] # Process different event types event_type = event[ 'event' ] or 'message' if event_type == 'heartbeat' : self ._handle_heartbeat_event(event) elif event_type == 'data' : self ._handle_data_event(event) elif event_type == 'notification' : self ._handle_notification_event(event) else : self ._handle_generic_event(event) def _handle_heartbeat_event ( self , event ): """Handle heartbeat events""" print ( f "Heartbeat received: { event[ 'data' ] } " ) def _handle_data_event ( self , event ): """Handle data events""" try : data = json.loads(event[ 'data' ]) print ( f "Data event: { data.get( 'type' , 'unknown' ) } - { len (event[ 'data' ]) } bytes" ) except json.JSONDecodeError: print ( f "Non-JSON data event: { len (event[ 'data' ]) } bytes" ) def _handle_notification_event ( self , event ): """Handle notification events""" try : notification = json.loads(event[ 'data' ]) print ( f "Notification: { notification.get( 'message' , 'No message' ) } " ) except json.JSONDecodeError: print ( f "Invalid notification format" ) def _handle_generic_event ( self , event ): """Handle generic events""" print ( f "Event ' { event[ 'event' ] } ': { event[ 'data' ][: 100 ] } ..." ) @task ( 1 ) def test_sse_with_parameters ( self ): """Test SSE with query parameters""" params = { 'channel' : 'updates' , 'filter' : 'important' , 'format' : 'json' } headers = { 'Accept' : 'text/event-stream' , 'Cache-Control' : 'no-cache' , } try : with self .client.get( '/api/events/filtered' , params = params, headers = headers, stream = True , name = "SSE Filtered Stream" ) as response: if response.status_code == 200 : self ._process_sse_stream(response) else : print ( f "Filtered SSE failed: { response.status_code } " ) except Exception as e: print ( f "Filtered SSE error: { str (e) } " ) @task ( 1 ) def test_sse_reconnection ( self ): """Test SSE reconnection behavior""" headers = { 'Accept' : 'text/event-stream' , 'Cache-Control' : 'no-cache' , } # Simulate reconnection with Last-Event-ID if self .last_event_id: headers[ 'Last-Event-ID' ] = self .last_event_id response = self .client.get( '/api/events/stream' , headers = headers, name = "SSE Reconnection Test" ) if response.status_code == 200 : print ( f "SSE reconnection successful with ID: { self .last_event_id } " ) else : print ( f "SSE reconnection failed: { response.status_code } " ) def on_stop ( self ): """Report SSE testing statistics""" print ( f "

=== SSE Testing Complete ===" ) print ( f "Events received: { self .events_received } " ) print ( f "Connection errors: { self .connection_errors } " ) print ( f "Last event ID: { self .last_event_id } " )