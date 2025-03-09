Learn how to load test HLS video streaming with LoadForge to ensure your streaming infrastructure can handle real-world traffic.

​ Understanding HLS (HTTP Live Streaming)

HTTP Live Streaming (HLS) is an adaptive bitrate streaming protocol developed by Apple that enables high-quality streaming experiences across varying network conditions. HLS works by:

Breaking video into segments: Content is divided into small (typically 2-10 second) chunks Creating multiple quality levels: Each segment is encoded at different quality levels/bitrates Using manifest files: .m3u8 playlist files that describe available segments and quality levels Enabling adaptive playback: Clients automatically switch between quality levels based on network conditions

Key Advantage: HLS is widely supported across devices and platforms, making it one of the most popular streaming protocols for video delivery.

​ Why Load Test HLS Streams?

Load testing HLS streams is critical for:

Validating CDN performance under high concurrent viewer counts

under high concurrent viewer counts Ensuring origin servers can handle segment request spikes

can handle segment request spikes Verifying failover mechanisms work correctly during traffic surges

work correctly during traffic surges Measuring latency across different geographic regions

across different geographic regions Identifying bottlenecks in your streaming infrastructure before they impact users

​ How LoadForge Tests HLS Streams

LoadForge simulates real-world HLS streaming behavior by:

Requesting the master playlist ( .m3u8 file) Parsing the manifest to identify available quality levels Requesting media playlists for specific quality levels Downloading segments in the correct sequence Simulating adaptive bitrate behavior by switching between quality levels

This approach accurately replicates how real video players interact with your streaming infrastructure.

​ Basic HLS Test Example

Here’s a simple LoadForge script that tests an HLS stream:

from locust import HttpUser, task, between import m3u8 import random import time class HLSStreamingUser ( HttpUser ): wait_time = between( 1 , 3 ) def on_start ( self ): # Initialize variables to store playlist information self .master_playlist = None self .media_playlist = None self .segments = [] self .current_segment_index = 0 @task def stream_video ( self ): # Step 1: Request the master playlist if we haven't already if not self .master_playlist: response = self .client.get( "/path/to/master.m3u8" , name = "Master Playlist" ) if response.status_code == 200 : self .master_playlist = m3u8.loads(response.text) # Select a quality level (typically we'd choose based on bandwidth) # For testing, we'll just pick one randomly if self .master_playlist.playlists: playlist = random.choice( self .master_playlist.playlists) media_response = self .client.get(playlist.uri, name = "Media Playlist" ) # Parse the media playlist to get segments if media_response.status_code == 200 : self .media_playlist = m3u8.loads(media_response.text) self .segments = self .media_playlist.segments else : # If we can't get the master playlist, stop the task return # Step 2: If we have segments to request, get the next one if self .segments and self .current_segment_index < len ( self .segments): segment = self .segments[ self .current_segment_index] self .client.get(segment.uri, name = "Segment" ) self .current_segment_index += 1 # Simulate the time it takes to play the segment # This makes the test more realistic time.sleep(segment.duration) else : # We need to get a new media playlist to get more segments if self .master_playlist and self .master_playlist.playlists: playlist = random.choice( self .master_playlist.playlists) response = self .client.get(playlist.uri, name = "Media Playlist Refresh" ) if response.status_code == 200 : self .media_playlist = m3u8.loads(response.text) self .segments = self .media_playlist.segments self .current_segment_index = 0

​ Advanced HLS Testing Techniques

​ Simulating Realistic Viewer Behavior

To create more realistic tests, you can enhance your LoadForge script to simulate how real viewers interact with video content:

from locust import HttpUser, task, between, constant import m3u8 import random import time class RealisticViewerUser ( HttpUser ): wait_time = constant( 1 ) # Consistent polling for live streams def on_start ( self ): self .master_playlist = None self .media_playlists = {} self .current_bitrate = None self .segments_played = 0 self .watch_duration = random.randint( 30 , 300 ) # Random watch time between 30s and 5min self .buffer_health = 0 # Simulated buffer in seconds # Start the streaming session self .request_master_playlist() def request_master_playlist ( self ): response = self .client.get( "/path/to/master.m3u8" , name = "Master Playlist" ) if response.status_code == 200 : self .master_playlist = m3u8.loads(response.text) # Select initial bitrate (typically lowest for fast startup) available_bitrates = [(p.stream_info.bandwidth, p.uri) for p in self .master_playlist.playlists] available_bitrates.sort() # Sort by bandwidth # Start with a low bitrate self .current_bitrate, playlist_uri = available_bitrates[ 0 ] self .request_media_playlist(playlist_uri) def request_media_playlist ( self , playlist_uri ): response = self .client.get(playlist_uri, name = "Media Playlist" ) if response.status_code == 200 : media_playlist = m3u8.loads(response.text) self .media_playlists[playlist_uri] = media_playlist # Start downloading segments self .download_next_segment(playlist_uri) def download_next_segment ( self , playlist_uri ): media_playlist = self .media_playlists[playlist_uri] # Get next segment that hasn't been downloaded if self .segments_played < len (media_playlist.segments): segment = media_playlist.segments[ self .segments_played] start_time = time.time() response = self .client.get(segment.uri, name = "Segment" ) download_time = time.time() - start_time # Calculate simulated network conditions segment_size = len (response.content) bandwidth = segment_size / download_time if download_time > 0 else float ( 'inf' ) # Update buffer health self .buffer_health += segment.duration - download_time self .buffer_health = max ( 0 , self .buffer_health) # Can't have negative buffer # Potentially switch bitrates based on buffer health self .adapt_bitrate(bandwidth) self .segments_played += 1 # Check if we should continue watching total_watched = sum (s.duration for s in media_playlist.segments[: self .segments_played]) if total_watched >= self .watch_duration: # Viewer has watched their fill, session ends return # Schedule next segment download based on buffer health if self .buffer_health < 5 : # If buffer is low, download next segment immediately self .download_next_segment(playlist_uri) def adapt_bitrate ( self , current_bandwidth ): # Only adapt if we have a master playlist with multiple bitrates if not self .master_playlist or len ( self .master_playlist.playlists) <= 1 : return # Make sure all playlists have stream_info with bandwidth attribute available_bitrates = [] for playlist in self .master_playlist.playlists: # Some m3u8 files might not have stream_info or bandwidth properly set if hasattr (playlist, 'stream_info' ) and hasattr (playlist.stream_info, 'bandwidth' ): available_bitrates.append((playlist.stream_info.bandwidth, playlist.uri)) else : # If bandwidth info is missing, we'll use a default value based on position # This is just a fallback for testing purposes index = self .master_playlist.playlists.index(playlist) estimated_bandwidth = 500000 * (index + 1 ) # Rough estimate available_bitrates.append((estimated_bandwidth, playlist.uri)) if not available_bitrates: return available_bitrates.sort() # Sort by bandwidth # Simple ABR logic: target using 70% of available bandwidth target_bandwidth = current_bandwidth * 0.7 # Find the highest bitrate that's below our target new_bitrate, new_uri = available_bitrates[ 0 ] # Default to lowest for bitrate, uri in available_bitrates: if bitrate <= target_bandwidth: new_bitrate, new_uri = bitrate, uri else : break # If buffer is very low, drop to lowest bitrate if self .buffer_health < 2 : new_bitrate, new_uri = available_bitrates[ 0 ] # If we're changing bitrates, request the new media playlist if new_bitrate != self .current_bitrate: self .current_bitrate = new_bitrate self .request_media_playlist(new_uri) @task def simulate_playback ( self ): # This task simulates the ongoing playback process # It will be called regularly based on wait_time # If buffer is getting low, request next segment if self .buffer_health < 10 and self .master_playlist: # Find current playlist URI current_uri = None for playlist in self .master_playlist.playlists: if playlist.stream_info.bandwidth == self .current_bitrate: current_uri = playlist.uri break if current_uri and current_uri in self .media_playlists: # Simulate 1 second of playback self .buffer_health -= 1 # If we need more segments, get them if self .buffer_health < 5 : self .download_next_segment(current_uri)

​ Testing Live Streams vs. VOD Content

HLS testing differs slightly between live streams and video-on-demand (VOD) content:

​ Live Stream Testing

For live streams, your LoadForge test should:

Continuously poll the media playlist for updates

the media playlist for updates Focus on recent segments (typically the last 3-5 segments)

(typically the last 3-5 segments) Handle playlist updates as new segments become available

class LiveStreamUser ( HttpUser ): wait_time = constant( 2 ) # Poll every 2 seconds for playlist updates def on_start ( self ): self .master_uri = "/path/to/live/master.m3u8" self .media_uri = None self .last_segment_index = - 1 @task def watch_live_stream ( self ): # Always refresh the master playlist for live streams response = self .client.get( self .master_uri, name = "Live Master Playlist" ) if response.status_code == 200 : master = m3u8.loads(response.text) # Select a quality level if master.playlists: # For simplicity, we'll use the middle quality playlist = master.playlists[ len (master.playlists) // 2 ] self .media_uri = playlist.uri # Get the media playlist media_response = self .client.get( self .media_uri, name = "Live Media Playlist" ) if media_response.status_code == 200 : media = m3u8.loads(media_response.text) # For live streams, we only care about new segments if len (media.segments) > 0 : newest_index = len (media.segments) - 1 # Download any segments we haven't seen yet for i in range ( self .last_segment_index + 1 , newest_index + 1 ): if i >= 0 and i < len (media.segments): segment = media.segments[i] self .client.get(segment.uri, name = "Live Segment" ) # Update our position self .last_segment_index = newest_index

​ VOD Testing

For VOD content, your test can:

Simulate a complete viewing session from start to finish

from start to finish Focus on initial buffering and mid-stream quality switches

and mid-stream quality switches Test seeking behavior by requesting segments out of order

class VODStreamUser ( HttpUser ): wait_time = between( 1 , 3 ) def on_start ( self ): # Simulate a viewer starting a VOD asset self .master_uri = "/path/to/vod/master.m3u8" self .current_position = 0 # Start at the beginning self .watch_percentage = random.uniform( 0.1 , 1.0 ) # Watch 10% to 100% of the video # Request the master playlist self .request_master_playlist() def request_master_playlist ( self ): response = self .client.get( self .master_uri, name = "VOD Master Playlist" ) if response.status_code == 200 : master = m3u8.loads(response.text) # Select a quality level (randomly for this example) if master.playlists: playlist = random.choice(master.playlists) self .request_media_playlist(playlist.uri) def request_media_playlist ( self , media_uri ): response = self .client.get(media_uri, name = "VOD Media Playlist" ) if response.status_code == 200 : media = m3u8.loads(response.text) # Determine how much of the video to watch total_segments = len (media.segments) segments_to_watch = int (total_segments * self .watch_percentage) # Start watching segments sequentially self .watch_segments(media, segments_to_watch) def watch_segments ( self , media , segments_to_watch ): # Watch segments sequentially for i in range ( min (segments_to_watch, len (media.segments))): segment = media.segments[i] self .client.get(segment.uri, name = "VOD Segment" ) # Simulate the playback time time.sleep(segment.duration * 0.5 ) # Faster than real-time for testing purposes # Simulate a seek operation (jump ahead) in 30% of cases if random.random() < 0.3 and segments_to_watch < len (media.segments) - 5 : # Jump ahead by 5-10 segments jump_distance = random.randint( 5 , 10 ) new_position = min (segments_to_watch + jump_distance, len (media.segments) - 1 ) # Request the segment at the new position segment = media.segments[new_position] self .client.get(segment.uri, name = "VOD Seek Segment" ) # Continue watching from new position for i in range (new_position + 1 , len (media.segments)): segment = media.segments[i] self .client.get(segment.uri, name = "VOD Segment After Seek" ) time.sleep(segment.duration * 0.5 )

​ Testing Different CDN Providers

To evaluate how different CDNs handle your HLS streams, you can modify your LoadForge test to target multiple CDN endpoints:

from locust import HttpUser, task, between import m3u8 import random class MultiCDNUser ( HttpUser ): wait_time = between( 1 , 2 ) def on_start ( self ): # Define multiple CDN base URLs to test self .cdn_urls = [ "https://cdn1.example.com" , "https://cdn2.example.com" , "https://cdn3.example.com" ] # Select a CDN for this user session self .selected_cdn = random.choice( self .cdn_urls) # Set the stream path (same for all CDNs) self .stream_path = "/path/to/stream/master.m3u8" @task def stream_from_cdn ( self ): # Request the master playlist from the selected CDN master_url = f " { self .selected_cdn }{ self .stream_path } " response = self .client.get(master_url, name = "Master Playlist" ) if response.status_code == 200 : master = m3u8.loads(response.text) # Select a quality level if master.playlists: playlist = random.choice(master.playlists) # Construct the full URL for the media playlist # Note: playlist.uri might be relative or absolute if playlist.uri.startswith( 'http' ): media_url = playlist.uri else : # Handle relative URLs media_url = f " { self .selected_cdn }{ playlist.uri if playlist.uri.startswith( '/' ) else '/' + playlist.uri } " # Request the media playlist media_response = self .client.get(media_url, name = "Media Playlist" ) if media_response.status_code == 200 : media = m3u8.loads(media_response.text) # Request a few segments for i in range ( min ( 3 , len (media.segments))): segment = media.segments[i] # Construct the full URL for the segment if segment.uri.startswith( 'http' ): segment_url = segment.uri else : # Handle relative URLs segment_url = f " { self .selected_cdn }{ segment.uri if segment.uri.startswith( '/' ) else '/' + segment.uri } " # Download the segment self .client.get(segment_url, name = "Segment" )

​ Understanding HLS Manifest Structure

To effectively test HLS streams, it’s important to understand the structure of HLS manifest files:

​ Master Playlist (Variant Playlist)

The master playlist contains references to multiple media playlists with different quality levels:

#EXTM3U #EXT-X-VERSION:3 # Variant streams (different quality levels) #EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360 low/index.m3u8 #EXT-X-STREAM-INF:BANDWIDTH=1400000,RESOLUTION=842x480 medium/index.m3u8 #EXT-X-STREAM-INF:BANDWIDTH=2800000,RESOLUTION=1280x720 high/index.m3u8

​ Media Playlist

Each media playlist contains references to the actual video segments:

#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:10 #EXT-X-MEDIA-SEQUENCE:0 #EXTINF:9.009, segment0.ts #EXTINF:9.009, segment1.ts #EXTINF:9.009, segment2.ts #EXTINF:9.009, segment3.ts #EXT-X-ENDLIST

For live streams, the media playlist is continuously updated, and the #EXT-X-ENDLIST tag is omitted until the stream ends.

​ Common HLS Testing Scenarios

​ 1. Peak Concurrent Viewer Test

Simulate a sudden spike in viewers, such as during a live event:

from locust import HttpUser, task, between, LoadTestShape import m3u8 class ViewerUser ( HttpUser ): wait_time = between( 1 , 3 ) @task def watch_stream ( self ): # Request master playlist self .client.get( "/path/to/master.m3u8" , name = "Master Playlist" ) # Request a media playlist self .client.get( "/path/to/medium/index.m3u8" , name = "Media Playlist" ) # Request some segments for i in range ( 5 ): self .client.get( f "/path/to/medium/segment { i } .ts" , name = "Segment" ) # Define a spike test shape class SpikeTestShape ( LoadTestShape ): # Define stages with cumulative time - each duration is the point in time (in seconds) when that stage ends stages = [ { "duration" : 60 , "users" : 100 , "spawn_rate" : 10 }, # Warm-up: 0-60s { "duration" : 180 , "users" : 5000 , "spawn_rate" : 500 }, # Spike: 60-180s { "duration" : 360 , "users" : 1000 , "spawn_rate" : 100 }, # Sustained load: 180-360s { "duration" : 420 , "users" : 100 , "spawn_rate" : 10 }, # Scale down: 360-420s ] def tick ( self ): run_time = self .get_run_time() # Find the appropriate stage based on the current run time for i, stage in enumerate ( self .stages): if run_time < stage[ "duration" ]: # If this is the first stage or we're in a new stage if i == 0 or run_time >= self .stages[i - 1 ][ "duration" ]: print ( f "Entering stage { i + 1 } : { stage[ 'users' ] } users at { stage[ 'spawn_rate' ] } spawn rate" ) return stage[ "users" ], stage[ "spawn_rate" ] # Test is complete, stop the test return None

​ 2. Geographic Distribution Test

Test how your CDN performs across different regions by running LoadForge tests from multiple data centers:

from locust import HttpUser, task, between, events import m3u8 import time import statistics # Store latency metrics by region region_latencies = {} class GeoDistributedUser ( HttpUser ): wait_time = between( 1 , 2 ) def on_start ( self ): # Determine which region this test is running from # In LoadForge, you can use environment variables or tags self .region = self .environment.parsed_options.tag or "unknown" if self .region not in region_latencies: region_latencies[ self .region] = [] @task def test_cdn_latency ( self ): # Request the master playlist and measure latency start_time = time.time() response = self .client.get( "/path/to/master.m3u8" , name = f "Master Playlist ( { self .region } )" ) latency = time.time() - start_time # Store the latency for this region region_latencies[ self .region].append(latency) # Request a segment to test throughput if response.status_code == 200 : master = m3u8.loads(response.text) if master.playlists: playlist = master.playlists[ 0 ] # Use lowest quality for consistent comparison # Get the media playlist media_response = self .client.get(playlist.uri, name = f "Media Playlist ( { self .region } )" ) if media_response.status_code == 200 : media = m3u8.loads(media_response.text) if media.segments: # Request the first segment and measure throughput segment = media.segments[ 0 ] start_time = time.time() segment_response = self .client.get(segment.uri, name = f "Segment ( { self .region } )" ) download_time = time.time() - start_time # Calculate throughput in Mbps if download_time > 0 : size_in_bits = len (segment_response.content) * 8 throughput_mbps = (size_in_bits / download_time) / 1_000_000 # Log the throughput as a custom metric # Note: In newer Locust versions, we should use environment.events.request_success.fire # But for compatibility with different Locust versions, we'll use a simpler approach print ( f "Region: { self .region } | Throughput: { throughput_mbps :.2f} Mbps" ) # Optional: Store this in a custom stat if using Locust's stats functionality if hasattr ( self , 'environment' ) and hasattr ( self .environment, 'stats' ): self .environment.stats.log_request( request_type = "METRIC" , name = f "Throughput ( { self .region } )" , response_time = throughput_mbps, response_length = 0 ) @events.test_stop.add_listener def on_test_stop ( environment , ** kwargs ): # Print summary of latencies by region print ( "

Regional Latency Summary:" ) for region, latencies in region_latencies.items(): if latencies: avg_latency = statistics.mean(latencies) p95_latency = statistics.quantiles(latencies, n = 20 )[ 19 ] # 95th percentile print ( f "Region: { region } | Avg: { avg_latency :.3f} s | P95: { p95_latency :.3f} s | Samples: { len (latencies) } " )

​ Analyzing HLS Test Results

When analyzing LoadForge HLS test results, focus on these key metrics:

Time to First Segment: How quickly can viewers start playback? Segment Download Time: Are segments downloading fast enough to prevent buffering? Error Rates by Segment Type: Are certain segments (e.g., audio, video, subtitles) failing more than others? Geographic Performance Variations: Does your CDN perform consistently across regions? Adaptation Frequency: How often do quality levels change during playback?

LoadForge’s reporting dashboard provides detailed insights into these metrics, helping you identify and resolve streaming bottlenecks.

​ Best Practices for HLS Load Testing

Start with realistic user counts and gradually scale up to identify breaking points Test from multiple geographic locations to evaluate CDN performance globally Include a mix of device profiles (mobile, desktop, smart TV) in your tests Simulate different network conditions by varying the bitrate selection logic Test both steady-state and spike scenarios to ensure your infrastructure can handle both Monitor origin server metrics during tests to identify backend bottlenecks Test with actual content dimensions and bitrates that match your production streams

​ Common HLS Testing Challenges

​ Challenge: High CPU Usage During Tests

Parsing HLS manifests and managing segment downloads can be CPU-intensive. To optimize performance:

# Use FastHttpUser instead of HttpUser for better performance from locust.contrib.fasthttp import FastHttpUser from locust import task, between import m3u8 class OptimizedHLSUser ( FastHttpUser ): wait_time = between( 1 , 2 ) @task def stream_hls ( self ): # Implementation as before, but with FastHttpUser pass

​ Challenge: Handling Authentication

Many streaming services require authentication tokens in the HLS URLs:

class AuthenticatedStreamUser ( HttpUser ): wait_time = between( 1 , 3 ) def on_start ( self ): # Authenticate and get token auth_response = self .client.post( "/api/auth" , json = { "username" : "testuser" , "password" : "password" }) if auth_response.status_code == 200 : self .token = auth_response.json()[ "token" ] else : self .token = None @task def stream_protected_content ( self ): if not self .token: return # Request master playlist with auth token master_url = f "/path/to/master.m3u8?token= { self .token } " response = self .client.get(master_url, name = "Authenticated Master Playlist" ) # Continue as in previous examples

Effective HLS load testing with LoadForge helps ensure your streaming infrastructure can deliver high-quality video to viewers at scale. By simulating realistic viewer behavior and analyzing key performance metrics, you can identify and resolve bottlenecks before they impact your audience.

For more information on load testing specific applications or APIs, check out our other guides or contact our support team for assistance with your streaming load testing needs.