Understanding HLS (HTTP Live Streaming)

HTTP Live Streaming (HLS) is an adaptive bitrate streaming protocol developed by Apple that enables high-quality streaming experiences across varying network conditions. HLS works by:

  1. Breaking video into segments: Content is divided into small (typically 2-10 second) chunks
  2. Creating multiple quality levels: Each segment is encoded at different quality levels/bitrates
  3. Using manifest files: .m3u8 playlist files that describe available segments and quality levels
  4. Enabling adaptive playback: Clients automatically switch between quality levels based on network conditions

Key Advantage: HLS is widely supported across devices and platforms, making it one of the most popular streaming protocols for video delivery.

Why Load Test HLS Streams?

Load testing HLS streams is critical for:

  • Validating CDN performance under high concurrent viewer counts
  • Ensuring origin servers can handle segment request spikes
  • Verifying failover mechanisms work correctly during traffic surges
  • Measuring latency across different geographic regions
  • Identifying bottlenecks in your streaming infrastructure before they impact users

How LoadForge Tests HLS Streams

LoadForge simulates real-world HLS streaming behavior by:

  1. Requesting the master playlist (.m3u8 file)
  2. Parsing the manifest to identify available quality levels
  3. Requesting media playlists for specific quality levels
  4. Downloading segments in the correct sequence
  5. Simulating adaptive bitrate behavior by switching between quality levels

This approach accurately replicates how real video players interact with your streaming infrastructure.

Basic HLS Test Example

Here’s a simple LoadForge script that tests an HLS stream:

from locust import HttpUser, task, between
import m3u8
import random
import time

class HLSStreamingUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        # Initialize variables to store playlist information
        self.master_playlist = None
        self.media_playlist = None
        self.segments = []
        self.current_segment_index = 0
    
    @task
    def stream_video(self):
        # Step 1: Request the master playlist if we haven't already
        if not self.master_playlist:
            response = self.client.get("/path/to/master.m3u8", name="Master Playlist")
            if response.status_code == 200:
                self.master_playlist = m3u8.loads(response.text)
                
                # Select a quality level (typically we'd choose based on bandwidth)
                # For testing, we'll just pick one randomly
                if self.master_playlist.playlists:
                    playlist = random.choice(self.master_playlist.playlists)
                    media_response = self.client.get(playlist.uri, name="Media Playlist")
                    
                    # Parse the media playlist to get segments
                    if media_response.status_code == 200:
                        self.media_playlist = m3u8.loads(media_response.text)
                        self.segments = self.media_playlist.segments
            else:
                # If we can't get the master playlist, stop the task
                return
        
        # Step 2: If we have segments to request, get the next one
        if self.segments and self.current_segment_index < len(self.segments):
            segment = self.segments[self.current_segment_index]
            self.client.get(segment.uri, name="Segment")
            self.current_segment_index += 1
            
            # Simulate the time it takes to play the segment
            # This makes the test more realistic
            time.sleep(segment.duration)
        else:
            # We need to get a new media playlist to get more segments
            if self.master_playlist and self.master_playlist.playlists:
                playlist = random.choice(self.master_playlist.playlists)
                response = self.client.get(playlist.uri, name="Media Playlist Refresh")
                
                if response.status_code == 200:
                    self.media_playlist = m3u8.loads(response.text)
                    self.segments = self.media_playlist.segments
                    self.current_segment_index = 0

Advanced HLS Testing Techniques

Simulating Realistic Viewer Behavior

To create more realistic tests, you can enhance your LoadForge script to simulate how real viewers interact with video content:

from locust import HttpUser, task, between, constant
import m3u8
import random
import time

class RealisticViewerUser(HttpUser):
    wait_time = constant(1)  # Consistent polling for live streams
    
    def on_start(self):
        self.master_playlist = None
        self.media_playlists = {}
        self.current_bitrate = None
        self.segments_played = 0
        self.watch_duration = random.randint(30, 300)  # Random watch time between 30s and 5min
        self.buffer_health = 0  # Simulated buffer in seconds
        
        # Start the streaming session
        self.request_master_playlist()
    
    def request_master_playlist(self):
        response = self.client.get("/path/to/master.m3u8", name="Master Playlist")
        if response.status_code == 200:
            self.master_playlist = m3u8.loads(response.text)
            
            # Select initial bitrate (typically lowest for fast startup)
            available_bitrates = [(p.stream_info.bandwidth, p.uri) for p in self.master_playlist.playlists]
            available_bitrates.sort()  # Sort by bandwidth
            
            # Start with a low bitrate
            self.current_bitrate, playlist_uri = available_bitrates[0]
            self.request_media_playlist(playlist_uri)
    
    def request_media_playlist(self, playlist_uri):
        response = self.client.get(playlist_uri, name="Media Playlist")
        if response.status_code == 200:
            media_playlist = m3u8.loads(response.text)
            self.media_playlists[playlist_uri] = media_playlist
            
            # Start downloading segments
            self.download_next_segment(playlist_uri)
    
    def download_next_segment(self, playlist_uri):
        media_playlist = self.media_playlists[playlist_uri]
        
        # Get next segment that hasn't been downloaded
        if self.segments_played < len(media_playlist.segments):
            segment = media_playlist.segments[self.segments_played]
            start_time = time.time()
            response = self.client.get(segment.uri, name="Segment")
            download_time = time.time() - start_time
            
            # Calculate simulated network conditions
            segment_size = len(response.content)
            bandwidth = segment_size / download_time if download_time > 0 else float('inf')
            
            # Update buffer health
            self.buffer_health += segment.duration - download_time
            self.buffer_health = max(0, self.buffer_health)  # Can't have negative buffer
            
            # Potentially switch bitrates based on buffer health
            self.adapt_bitrate(bandwidth)
            
            self.segments_played += 1
            
            # Check if we should continue watching
            total_watched = sum(s.duration for s in media_playlist.segments[:self.segments_played])
            if total_watched >= self.watch_duration:
                # Viewer has watched their fill, session ends
                return
                
            # Schedule next segment download based on buffer health
            if self.buffer_health < 5:  # If buffer is low, download next segment immediately
                self.download_next_segment(playlist_uri)
    
    def adapt_bitrate(self, current_bandwidth):
        # Only adapt if we have a master playlist with multiple bitrates
        if not self.master_playlist or len(self.master_playlist.playlists) <= 1:
            return
            
        # Make sure all playlists have stream_info with bandwidth attribute
        available_bitrates = []
        for playlist in self.master_playlist.playlists:
            # Some m3u8 files might not have stream_info or bandwidth properly set
            if hasattr(playlist, 'stream_info') and hasattr(playlist.stream_info, 'bandwidth'):
                available_bitrates.append((playlist.stream_info.bandwidth, playlist.uri))
            else:
                # If bandwidth info is missing, we'll use a default value based on position
                # This is just a fallback for testing purposes
                index = self.master_playlist.playlists.index(playlist)
                estimated_bandwidth = 500000 * (index + 1)  # Rough estimate
                available_bitrates.append((estimated_bandwidth, playlist.uri))
                
        if not available_bitrates:
            return
            
        available_bitrates.sort()  # Sort by bandwidth
        
        # Simple ABR logic: target using 70% of available bandwidth
        target_bandwidth = current_bandwidth * 0.7
        
        # Find the highest bitrate that's below our target
        new_bitrate, new_uri = available_bitrates[0]  # Default to lowest
        for bitrate, uri in available_bitrates:
            if bitrate <= target_bandwidth:
                new_bitrate, new_uri = bitrate, uri
            else:
                break
                
        # If buffer is very low, drop to lowest bitrate
        if self.buffer_health < 2:
            new_bitrate, new_uri = available_bitrates[0]
            
        # If we're changing bitrates, request the new media playlist
        if new_bitrate != self.current_bitrate:
            self.current_bitrate = new_bitrate
            self.request_media_playlist(new_uri)
    
    @task
    def simulate_playback(self):
        # This task simulates the ongoing playback process
        # It will be called regularly based on wait_time
        
        # If buffer is getting low, request next segment
        if self.buffer_health < 10 and self.master_playlist:
            # Find current playlist URI
            current_uri = None
            for playlist in self.master_playlist.playlists:
                if playlist.stream_info.bandwidth == self.current_bitrate:
                    current_uri = playlist.uri
                    break
                    
            if current_uri and current_uri in self.media_playlists:
                # Simulate 1 second of playback
                self.buffer_health -= 1
                
                # If we need more segments, get them
                if self.buffer_health < 5:
                    self.download_next_segment(current_uri)

Testing Live Streams vs. VOD Content

HLS testing differs slightly between live streams and video-on-demand (VOD) content:

Live Stream Testing

For live streams, your LoadForge test should:

  • Continuously poll the media playlist for updates
  • Focus on recent segments (typically the last 3-5 segments)
  • Handle playlist updates as new segments become available
class LiveStreamUser(HttpUser):
    wait_time = constant(2)  # Poll every 2 seconds for playlist updates
    
    def on_start(self):
        self.master_uri = "/path/to/live/master.m3u8"
        self.media_uri = None
        self.last_segment_index = -1
        
    @task
    def watch_live_stream(self):
        # Always refresh the master playlist for live streams
        response = self.client.get(self.master_uri, name="Live Master Playlist")
        
        if response.status_code == 200:
            master = m3u8.loads(response.text)
            
            # Select a quality level
            if master.playlists:
                # For simplicity, we'll use the middle quality
                playlist = master.playlists[len(master.playlists) // 2]
                self.media_uri = playlist.uri
                
                # Get the media playlist
                media_response = self.client.get(self.media_uri, name="Live Media Playlist")
                
                if media_response.status_code == 200:
                    media = m3u8.loads(media_response.text)
                    
                    # For live streams, we only care about new segments
                    if len(media.segments) > 0:
                        newest_index = len(media.segments) - 1
                        
                        # Download any segments we haven't seen yet
                        for i in range(self.last_segment_index + 1, newest_index + 1):
                            if i >= 0 and i < len(media.segments):
                                segment = media.segments[i]
                                self.client.get(segment.uri, name="Live Segment")
                        
                        # Update our position
                        self.last_segment_index = newest_index

VOD Testing

For VOD content, your test can:

  • Simulate a complete viewing session from start to finish
  • Focus on initial buffering and mid-stream quality switches
  • Test seeking behavior by requesting segments out of order
class VODStreamUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        # Simulate a viewer starting a VOD asset
        self.master_uri = "/path/to/vod/master.m3u8"
        self.current_position = 0  # Start at the beginning
        self.watch_percentage = random.uniform(0.1, 1.0)  # Watch 10% to 100% of the video
        
        # Request the master playlist
        self.request_master_playlist()
    
    def request_master_playlist(self):
        response = self.client.get(self.master_uri, name="VOD Master Playlist")
        if response.status_code == 200:
            master = m3u8.loads(response.text)
            
            # Select a quality level (randomly for this example)
            if master.playlists:
                playlist = random.choice(master.playlists)
                self.request_media_playlist(playlist.uri)
    
    def request_media_playlist(self, media_uri):
        response = self.client.get(media_uri, name="VOD Media Playlist")
        if response.status_code == 200:
            media = m3u8.loads(response.text)
            
            # Determine how much of the video to watch
            total_segments = len(media.segments)
            segments_to_watch = int(total_segments * self.watch_percentage)
            
            # Start watching segments sequentially
            self.watch_segments(media, segments_to_watch)
    
    def watch_segments(self, media, segments_to_watch):
        # Watch segments sequentially
        for i in range(min(segments_to_watch, len(media.segments))):
            segment = media.segments[i]
            self.client.get(segment.uri, name="VOD Segment")
            
            # Simulate the playback time
            time.sleep(segment.duration * 0.5)  # Faster than real-time for testing purposes
            
        # Simulate a seek operation (jump ahead) in 30% of cases
        if random.random() < 0.3 and segments_to_watch < len(media.segments) - 5:
            # Jump ahead by 5-10 segments
            jump_distance = random.randint(5, 10)
            new_position = min(segments_to_watch + jump_distance, len(media.segments) - 1)
            
            # Request the segment at the new position
            segment = media.segments[new_position]
            self.client.get(segment.uri, name="VOD Seek Segment")
            
            # Continue watching from new position
            for i in range(new_position + 1, len(media.segments)):
                segment = media.segments[i]
                self.client.get(segment.uri, name="VOD Segment After Seek")
                time.sleep(segment.duration * 0.5)

Testing Different CDN Providers

To evaluate how different CDNs handle your HLS streams, you can modify your LoadForge test to target multiple CDN endpoints:

from locust import HttpUser, task, between
import m3u8
import random

class MultiCDNUser(HttpUser):
    wait_time = between(1, 2)
    
    def on_start(self):
        # Define multiple CDN base URLs to test
        self.cdn_urls = [
            "https://cdn1.example.com",
            "https://cdn2.example.com",
            "https://cdn3.example.com"
        ]
        
        # Select a CDN for this user session
        self.selected_cdn = random.choice(self.cdn_urls)
        
        # Set the stream path (same for all CDNs)
        self.stream_path = "/path/to/stream/master.m3u8"
        
    @task
    def stream_from_cdn(self):
        # Request the master playlist from the selected CDN
        master_url = f"{self.selected_cdn}{self.stream_path}"
        response = self.client.get(master_url, name="Master Playlist")
        
        if response.status_code == 200:
            master = m3u8.loads(response.text)
            
            # Select a quality level
            if master.playlists:
                playlist = random.choice(master.playlists)
                
                # Construct the full URL for the media playlist
                # Note: playlist.uri might be relative or absolute
                if playlist.uri.startswith('http'):
                    media_url = playlist.uri
                else:
                    # Handle relative URLs
                    media_url = f"{self.selected_cdn}{playlist.uri if playlist.uri.startswith('/') else '/' + playlist.uri}"
                
                # Request the media playlist
                media_response = self.client.get(media_url, name="Media Playlist")
                
                if media_response.status_code == 200:
                    media = m3u8.loads(media_response.text)
                    
                    # Request a few segments
                    for i in range(min(3, len(media.segments))):
                        segment = media.segments[i]
                        
                        # Construct the full URL for the segment
                        if segment.uri.startswith('http'):
                            segment_url = segment.uri
                        else:
                            # Handle relative URLs
                            segment_url = f"{self.selected_cdn}{segment.uri if segment.uri.startswith('/') else '/' + segment.uri}"
                        
                        # Download the segment
                        self.client.get(segment_url, name="Segment")

Understanding HLS Manifest Structure

To effectively test HLS streams, it’s important to understand the structure of HLS manifest files:

Master Playlist (Variant Playlist)

The master playlist contains references to multiple media playlists with different quality levels:

#EXTM3U
#EXT-X-VERSION:3

# Variant streams (different quality levels)
#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360
low/index.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=1400000,RESOLUTION=842x480
medium/index.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=2800000,RESOLUTION=1280x720
high/index.m3u8

Media Playlist

Each media playlist contains references to the actual video segments:

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:10
#EXT-X-MEDIA-SEQUENCE:0

#EXTINF:9.009,
segment0.ts
#EXTINF:9.009,
segment1.ts
#EXTINF:9.009,
segment2.ts
#EXTINF:9.009,
segment3.ts

#EXT-X-ENDLIST

For live streams, the media playlist is continuously updated, and the #EXT-X-ENDLIST tag is omitted until the stream ends.

Common HLS Testing Scenarios

1. Peak Concurrent Viewer Test

Simulate a sudden spike in viewers, such as during a live event:

from locust import HttpUser, task, between, LoadTestShape
import m3u8

class ViewerUser(HttpUser):
    wait_time = between(1, 3)
    
    @task
    def watch_stream(self):
        # Request master playlist
        self.client.get("/path/to/master.m3u8", name="Master Playlist")
        
        # Request a media playlist
        self.client.get("/path/to/medium/index.m3u8", name="Media Playlist")
        
        # Request some segments
        for i in range(5):
            self.client.get(f"/path/to/medium/segment{i}.ts", name="Segment")

# Define a spike test shape
class SpikeTestShape(LoadTestShape):
    # Define stages with cumulative time - each duration is the point in time (in seconds) when that stage ends
    stages = [
        {"duration": 60, "users": 100, "spawn_rate": 10},   # Warm-up: 0-60s
        {"duration": 180, "users": 5000, "spawn_rate": 500},  # Spike: 60-180s
        {"duration": 360, "users": 1000, "spawn_rate": 100},  # Sustained load: 180-360s
        {"duration": 420, "users": 100, "spawn_rate": 10},   # Scale down: 360-420s
    ]
    
    def tick(self):
        run_time = self.get_run_time()
        
        # Find the appropriate stage based on the current run time
        for i, stage in enumerate(self.stages):
            if run_time < stage["duration"]:
                # If this is the first stage or we're in a new stage
                if i == 0 or run_time >= self.stages[i-1]["duration"]:
                    print(f"Entering stage {i+1}: {stage['users']} users at {stage['spawn_rate']} spawn rate")
                return stage["users"], stage["spawn_rate"]
                
        # Test is complete, stop the test
        return None

2. Geographic Distribution Test

Test how your CDN performs across different regions by running LoadForge tests from multiple data centers:

from locust import HttpUser, task, between, events
import m3u8
import time
import statistics

# Store latency metrics by region
region_latencies = {}

class GeoDistributedUser(HttpUser):
    wait_time = between(1, 2)
    
    def on_start(self):
        # Determine which region this test is running from
        # In LoadForge, you can use environment variables or tags
        self.region = self.environment.parsed_options.tag or "unknown"
        
        if self.region not in region_latencies:
            region_latencies[self.region] = []
    
    @task
    def test_cdn_latency(self):
        # Request the master playlist and measure latency
        start_time = time.time()
        response = self.client.get("/path/to/master.m3u8", name=f"Master Playlist ({self.region})")
        latency = time.time() - start_time
        
        # Store the latency for this region
        region_latencies[self.region].append(latency)
        
        # Request a segment to test throughput
        if response.status_code == 200:
            master = m3u8.loads(response.text)
            if master.playlists:
                playlist = master.playlists[0]  # Use lowest quality for consistent comparison
                
                # Get the media playlist
                media_response = self.client.get(playlist.uri, name=f"Media Playlist ({self.region})")
                
                if media_response.status_code == 200:
                    media = m3u8.loads(media_response.text)
                    if media.segments:
                        # Request the first segment and measure throughput
                        segment = media.segments[0]
                        start_time = time.time()
                        segment_response = self.client.get(segment.uri, name=f"Segment ({self.region})")
                        download_time = time.time() - start_time
                        
                        # Calculate throughput in Mbps
                        if download_time > 0:
                            size_in_bits = len(segment_response.content) * 8
                            throughput_mbps = (size_in_bits / download_time) / 1_000_000
                            # Log the throughput as a custom metric
                            # Note: In newer Locust versions, we should use environment.events.request_success.fire
                            # But for compatibility with different Locust versions, we'll use a simpler approach
                            print(f"Region: {self.region} | Throughput: {throughput_mbps:.2f} Mbps")
                            
                            # Optional: Store this in a custom stat if using Locust's stats functionality
                            if hasattr(self, 'environment') and hasattr(self.environment, 'stats'):
                                self.environment.stats.log_request(
                                    request_type="METRIC",
                                    name=f"Throughput ({self.region})",
                                    response_time=throughput_mbps,
                                    response_length=0
                                )

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    # Print summary of latencies by region
    print("\nRegional Latency Summary:")
    for region, latencies in region_latencies.items():
        if latencies:
            avg_latency = statistics.mean(latencies)
            p95_latency = statistics.quantiles(latencies, n=20)[19]  # 95th percentile
            print(f"Region: {region} | Avg: {avg_latency:.3f}s | P95: {p95_latency:.3f}s | Samples: {len(latencies)}")

Analyzing HLS Test Results

When analyzing LoadForge HLS test results, focus on these key metrics:

  1. Time to First Segment: How quickly can viewers start playback?
  2. Segment Download Time: Are segments downloading fast enough to prevent buffering?
  3. Error Rates by Segment Type: Are certain segments (e.g., audio, video, subtitles) failing more than others?
  4. Geographic Performance Variations: Does your CDN perform consistently across regions?
  5. Adaptation Frequency: How often do quality levels change during playback?

LoadForge’s reporting dashboard provides detailed insights into these metrics, helping you identify and resolve streaming bottlenecks.

Best Practices for HLS Load Testing

  1. Start with realistic user counts and gradually scale up to identify breaking points
  2. Test from multiple geographic locations to evaluate CDN performance globally
  3. Include a mix of device profiles (mobile, desktop, smart TV) in your tests
  4. Simulate different network conditions by varying the bitrate selection logic
  5. Test both steady-state and spike scenarios to ensure your infrastructure can handle both
  6. Monitor origin server metrics during tests to identify backend bottlenecks
  7. Test with actual content dimensions and bitrates that match your production streams

Common HLS Testing Challenges

Challenge: High CPU Usage During Tests

Parsing HLS manifests and managing segment downloads can be CPU-intensive. To optimize performance:

# Use FastHttpUser instead of HttpUser for better performance
from locust.contrib.fasthttp import FastHttpUser
from locust import task, between
import m3u8

class OptimizedHLSUser(FastHttpUser):
    wait_time = between(1, 2)
    
    @task
    def stream_hls(self):
        # Implementation as before, but with FastHttpUser
        pass

Challenge: Handling Authentication

Many streaming services require authentication tokens in the HLS URLs:

class AuthenticatedStreamUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        # Authenticate and get token
        auth_response = self.client.post("/api/auth", json={
            "username": "testuser",
            "password": "password"
        })
        
        if auth_response.status_code == 200:
            self.token = auth_response.json()["token"]
        else:
            self.token = None
    
    @task
    def stream_protected_content(self):
        if not self.token:
            return
            
        # Request master playlist with auth token
        master_url = f"/path/to/master.m3u8?token={self.token}"
        response = self.client.get(master_url, name="Authenticated Master Playlist")
        
        # Continue as in previous examples

Conclusion

Effective HLS load testing with LoadForge helps ensure your streaming infrastructure can deliver high-quality video to viewers at scale. By simulating realistic viewer behavior and analyzing key performance metrics, you can identify and resolve bottlenecks before they impact your audience.

For more information on load testing specific applications or APIs, check out our other guides or contact our support team for assistance with your streaming load testing needs.