Understanding HLS (HTTP Live Streaming)
HTTP Live Streaming (HLS) is an adaptive bitrate streaming protocol developed by Apple that enables high-quality streaming experiences across varying network conditions. HLS works by:
- Breaking video into segments: Content is divided into small (typically 2-10 second) chunks
- Creating multiple quality levels: Each segment is encoded at different quality levels/bitrates
- Using manifest files:
.m3u8
playlist files that describe available segments and quality levels
- Enabling adaptive playback: Clients automatically switch between quality levels based on network conditions
Key Advantage: HLS is widely supported across devices and platforms, making it one of the most popular streaming protocols for video delivery.
Why Load Test HLS Streams?
Load testing HLS streams is critical for:
- Validating CDN performance under high concurrent viewer counts
- Ensuring origin servers can handle segment request spikes
- Verifying failover mechanisms work correctly during traffic surges
- Measuring latency across different geographic regions
- Identifying bottlenecks in your streaming infrastructure before they impact users
How LoadForge Tests HLS Streams
LoadForge simulates real-world HLS streaming behavior by:
- Requesting the master playlist (
.m3u8
file)
- Parsing the manifest to identify available quality levels
- Requesting media playlists for specific quality levels
- Downloading segments in the correct sequence
- Simulating adaptive bitrate behavior by switching between quality levels
This approach accurately replicates how real video players interact with your streaming infrastructure.
Basic HLS Test Example
Here’s a simple LoadForge script that tests an HLS stream:
from locust import HttpUser, task, between
import m3u8
import random
import time
class HLSStreamingUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
# Initialize variables to store playlist information
self.master_playlist = None
self.media_playlist = None
self.segments = []
self.current_segment_index = 0
@task
def stream_video(self):
# Step 1: Request the master playlist if we haven't already
if not self.master_playlist:
response = self.client.get("/path/to/master.m3u8", name="Master Playlist")
if response.status_code == 200:
self.master_playlist = m3u8.loads(response.text)
# Select a quality level (typically we'd choose based on bandwidth)
# For testing, we'll just pick one randomly
if self.master_playlist.playlists:
playlist = random.choice(self.master_playlist.playlists)
media_response = self.client.get(playlist.uri, name="Media Playlist")
# Parse the media playlist to get segments
if media_response.status_code == 200:
self.media_playlist = m3u8.loads(media_response.text)
self.segments = self.media_playlist.segments
else:
# If we can't get the master playlist, stop the task
return
# Step 2: If we have segments to request, get the next one
if self.segments and self.current_segment_index < len(self.segments):
segment = self.segments[self.current_segment_index]
self.client.get(segment.uri, name="Segment")
self.current_segment_index += 1
# Simulate the time it takes to play the segment
# This makes the test more realistic
time.sleep(segment.duration)
else:
# We need to get a new media playlist to get more segments
if self.master_playlist and self.master_playlist.playlists:
playlist = random.choice(self.master_playlist.playlists)
response = self.client.get(playlist.uri, name="Media Playlist Refresh")
if response.status_code == 200:
self.media_playlist = m3u8.loads(response.text)
self.segments = self.media_playlist.segments
self.current_segment_index = 0
Advanced HLS Testing Techniques
Simulating Realistic Viewer Behavior
To create more realistic tests, you can enhance your LoadForge script to simulate how real viewers interact with video content:
from locust import HttpUser, task, between, constant
import m3u8
import random
import time
class RealisticViewerUser(HttpUser):
wait_time = constant(1) # Consistent polling for live streams
def on_start(self):
self.master_playlist = None
self.media_playlists = {}
self.current_bitrate = None
self.segments_played = 0
self.watch_duration = random.randint(30, 300) # Random watch time between 30s and 5min
self.buffer_health = 0 # Simulated buffer in seconds
# Start the streaming session
self.request_master_playlist()
def request_master_playlist(self):
response = self.client.get("/path/to/master.m3u8", name="Master Playlist")
if response.status_code == 200:
self.master_playlist = m3u8.loads(response.text)
# Select initial bitrate (typically lowest for fast startup)
available_bitrates = [(p.stream_info.bandwidth, p.uri) for p in self.master_playlist.playlists]
available_bitrates.sort() # Sort by bandwidth
# Start with a low bitrate
self.current_bitrate, playlist_uri = available_bitrates[0]
self.request_media_playlist(playlist_uri)
def request_media_playlist(self, playlist_uri):
response = self.client.get(playlist_uri, name="Media Playlist")
if response.status_code == 200:
media_playlist = m3u8.loads(response.text)
self.media_playlists[playlist_uri] = media_playlist
# Start downloading segments
self.download_next_segment(playlist_uri)
def download_next_segment(self, playlist_uri):
media_playlist = self.media_playlists[playlist_uri]
# Get next segment that hasn't been downloaded
if self.segments_played < len(media_playlist.segments):
segment = media_playlist.segments[self.segments_played]
start_time = time.time()
response = self.client.get(segment.uri, name="Segment")
download_time = time.time() - start_time
# Calculate simulated network conditions
segment_size = len(response.content)
bandwidth = segment_size / download_time if download_time > 0 else float('inf')
# Update buffer health
self.buffer_health += segment.duration - download_time
self.buffer_health = max(0, self.buffer_health) # Can't have negative buffer
# Potentially switch bitrates based on buffer health
self.adapt_bitrate(bandwidth)
self.segments_played += 1
# Check if we should continue watching
total_watched = sum(s.duration for s in media_playlist.segments[:self.segments_played])
if total_watched >= self.watch_duration:
# Viewer has watched their fill, session ends
return
# Schedule next segment download based on buffer health
if self.buffer_health < 5: # If buffer is low, download next segment immediately
self.download_next_segment(playlist_uri)
def adapt_bitrate(self, current_bandwidth):
# Only adapt if we have a master playlist with multiple bitrates
if not self.master_playlist or len(self.master_playlist.playlists) <= 1:
return
# Make sure all playlists have stream_info with bandwidth attribute
available_bitrates = []
for playlist in self.master_playlist.playlists:
# Some m3u8 files might not have stream_info or bandwidth properly set
if hasattr(playlist, 'stream_info') and hasattr(playlist.stream_info, 'bandwidth'):
available_bitrates.append((playlist.stream_info.bandwidth, playlist.uri))
else:
# If bandwidth info is missing, we'll use a default value based on position
# This is just a fallback for testing purposes
index = self.master_playlist.playlists.index(playlist)
estimated_bandwidth = 500000 * (index + 1) # Rough estimate
available_bitrates.append((estimated_bandwidth, playlist.uri))
if not available_bitrates:
return
available_bitrates.sort() # Sort by bandwidth
# Simple ABR logic: target using 70% of available bandwidth
target_bandwidth = current_bandwidth * 0.7
# Find the highest bitrate that's below our target
new_bitrate, new_uri = available_bitrates[0] # Default to lowest
for bitrate, uri in available_bitrates:
if bitrate <= target_bandwidth:
new_bitrate, new_uri = bitrate, uri
else:
break
# If buffer is very low, drop to lowest bitrate
if self.buffer_health < 2:
new_bitrate, new_uri = available_bitrates[0]
# If we're changing bitrates, request the new media playlist
if new_bitrate != self.current_bitrate:
self.current_bitrate = new_bitrate
self.request_media_playlist(new_uri)
@task
def simulate_playback(self):
# This task simulates the ongoing playback process
# It will be called regularly based on wait_time
# If buffer is getting low, request next segment
if self.buffer_health < 10 and self.master_playlist:
# Find current playlist URI
current_uri = None
for playlist in self.master_playlist.playlists:
if playlist.stream_info.bandwidth == self.current_bitrate:
current_uri = playlist.uri
break
if current_uri and current_uri in self.media_playlists:
# Simulate 1 second of playback
self.buffer_health -= 1
# If we need more segments, get them
if self.buffer_health < 5:
self.download_next_segment(current_uri)
Testing Live Streams vs. VOD Content
HLS testing differs slightly between live streams and video-on-demand (VOD) content:
Live Stream Testing
For live streams, your LoadForge test should:
- Continuously poll the media playlist for updates
- Focus on recent segments (typically the last 3-5 segments)
- Handle playlist updates as new segments become available
class LiveStreamUser(HttpUser):
wait_time = constant(2) # Poll every 2 seconds for playlist updates
def on_start(self):
self.master_uri = "/path/to/live/master.m3u8"
self.media_uri = None
self.last_segment_index = -1
@task
def watch_live_stream(self):
# Always refresh the master playlist for live streams
response = self.client.get(self.master_uri, name="Live Master Playlist")
if response.status_code == 200:
master = m3u8.loads(response.text)
# Select a quality level
if master.playlists:
# For simplicity, we'll use the middle quality
playlist = master.playlists[len(master.playlists) // 2]
self.media_uri = playlist.uri
# Get the media playlist
media_response = self.client.get(self.media_uri, name="Live Media Playlist")
if media_response.status_code == 200:
media = m3u8.loads(media_response.text)
# For live streams, we only care about new segments
if len(media.segments) > 0:
newest_index = len(media.segments) - 1
# Download any segments we haven't seen yet
for i in range(self.last_segment_index + 1, newest_index + 1):
if i >= 0 and i < len(media.segments):
segment = media.segments[i]
self.client.get(segment.uri, name="Live Segment")
# Update our position
self.last_segment_index = newest_index
VOD Testing
For VOD content, your test can:
- Simulate a complete viewing session from start to finish
- Focus on initial buffering and mid-stream quality switches
- Test seeking behavior by requesting segments out of order
class VODStreamUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
# Simulate a viewer starting a VOD asset
self.master_uri = "/path/to/vod/master.m3u8"
self.current_position = 0 # Start at the beginning
self.watch_percentage = random.uniform(0.1, 1.0) # Watch 10% to 100% of the video
# Request the master playlist
self.request_master_playlist()
def request_master_playlist(self):
response = self.client.get(self.master_uri, name="VOD Master Playlist")
if response.status_code == 200:
master = m3u8.loads(response.text)
# Select a quality level (randomly for this example)
if master.playlists:
playlist = random.choice(master.playlists)
self.request_media_playlist(playlist.uri)
def request_media_playlist(self, media_uri):
response = self.client.get(media_uri, name="VOD Media Playlist")
if response.status_code == 200:
media = m3u8.loads(response.text)
# Determine how much of the video to watch
total_segments = len(media.segments)
segments_to_watch = int(total_segments * self.watch_percentage)
# Start watching segments sequentially
self.watch_segments(media, segments_to_watch)
def watch_segments(self, media, segments_to_watch):
# Watch segments sequentially
for i in range(min(segments_to_watch, len(media.segments))):
segment = media.segments[i]
self.client.get(segment.uri, name="VOD Segment")
# Simulate the playback time
time.sleep(segment.duration * 0.5) # Faster than real-time for testing purposes
# Simulate a seek operation (jump ahead) in 30% of cases
if random.random() < 0.3 and segments_to_watch < len(media.segments) - 5:
# Jump ahead by 5-10 segments
jump_distance = random.randint(5, 10)
new_position = min(segments_to_watch + jump_distance, len(media.segments) - 1)
# Request the segment at the new position
segment = media.segments[new_position]
self.client.get(segment.uri, name="VOD Seek Segment")
# Continue watching from new position
for i in range(new_position + 1, len(media.segments)):
segment = media.segments[i]
self.client.get(segment.uri, name="VOD Segment After Seek")
time.sleep(segment.duration * 0.5)
Testing Different CDN Providers
To evaluate how different CDNs handle your HLS streams, you can modify your LoadForge test to target multiple CDN endpoints:
from locust import HttpUser, task, between
import m3u8
import random
class MultiCDNUser(HttpUser):
wait_time = between(1, 2)
def on_start(self):
# Define multiple CDN base URLs to test
self.cdn_urls = [
"https://cdn1.example.com",
"https://cdn2.example.com",
"https://cdn3.example.com"
]
# Select a CDN for this user session
self.selected_cdn = random.choice(self.cdn_urls)
# Set the stream path (same for all CDNs)
self.stream_path = "/path/to/stream/master.m3u8"
@task
def stream_from_cdn(self):
# Request the master playlist from the selected CDN
master_url = f"{self.selected_cdn}{self.stream_path}"
response = self.client.get(master_url, name="Master Playlist")
if response.status_code == 200:
master = m3u8.loads(response.text)
# Select a quality level
if master.playlists:
playlist = random.choice(master.playlists)
# Construct the full URL for the media playlist
# Note: playlist.uri might be relative or absolute
if playlist.uri.startswith('http'):
media_url = playlist.uri
else:
# Handle relative URLs
media_url = f"{self.selected_cdn}{playlist.uri if playlist.uri.startswith('/') else '/' + playlist.uri}"
# Request the media playlist
media_response = self.client.get(media_url, name="Media Playlist")
if media_response.status_code == 200:
media = m3u8.loads(media_response.text)
# Request a few segments
for i in range(min(3, len(media.segments))):
segment = media.segments[i]
# Construct the full URL for the segment
if segment.uri.startswith('http'):
segment_url = segment.uri
else:
# Handle relative URLs
segment_url = f"{self.selected_cdn}{segment.uri if segment.uri.startswith('/') else '/' + segment.uri}"
# Download the segment
self.client.get(segment_url, name="Segment")
Understanding HLS Manifest Structure
To effectively test HLS streams, it’s important to understand the structure of HLS manifest files:
Master Playlist (Variant Playlist)
The master playlist contains references to multiple media playlists with different quality levels:
#EXTM3U
#EXT-X-VERSION:3
# Variant streams (different quality levels)
#EXT-X-STREAM-INF:BANDWIDTH=800000,RESOLUTION=640x360
low/index.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=1400000,RESOLUTION=842x480
medium/index.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=2800000,RESOLUTION=1280x720
high/index.m3u8
Each media playlist contains references to the actual video segments:
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:10
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:9.009,
segment0.ts
#EXTINF:9.009,
segment1.ts
#EXTINF:9.009,
segment2.ts
#EXTINF:9.009,
segment3.ts
#EXT-X-ENDLIST
For live streams, the media playlist is continuously updated, and the #EXT-X-ENDLIST
tag is omitted until the stream ends.
Common HLS Testing Scenarios
1. Peak Concurrent Viewer Test
Simulate a sudden spike in viewers, such as during a live event:
from locust import HttpUser, task, between, LoadTestShape
import m3u8
class ViewerUser(HttpUser):
wait_time = between(1, 3)
@task
def watch_stream(self):
# Request master playlist
self.client.get("/path/to/master.m3u8", name="Master Playlist")
# Request a media playlist
self.client.get("/path/to/medium/index.m3u8", name="Media Playlist")
# Request some segments
for i in range(5):
self.client.get(f"/path/to/medium/segment{i}.ts", name="Segment")
# Define a spike test shape
class SpikeTestShape(LoadTestShape):
# Define stages with cumulative time - each duration is the point in time (in seconds) when that stage ends
stages = [
{"duration": 60, "users": 100, "spawn_rate": 10}, # Warm-up: 0-60s
{"duration": 180, "users": 5000, "spawn_rate": 500}, # Spike: 60-180s
{"duration": 360, "users": 1000, "spawn_rate": 100}, # Sustained load: 180-360s
{"duration": 420, "users": 100, "spawn_rate": 10}, # Scale down: 360-420s
]
def tick(self):
run_time = self.get_run_time()
# Find the appropriate stage based on the current run time
for i, stage in enumerate(self.stages):
if run_time < stage["duration"]:
# If this is the first stage or we're in a new stage
if i == 0 or run_time >= self.stages[i-1]["duration"]:
print(f"Entering stage {i+1}: {stage['users']} users at {stage['spawn_rate']} spawn rate")
return stage["users"], stage["spawn_rate"]
# Test is complete, stop the test
return None
2. Geographic Distribution Test
Test how your CDN performs across different regions by running LoadForge tests from multiple data centers:
from locust import HttpUser, task, between, events
import m3u8
import time
import statistics
# Store latency metrics by region
region_latencies = {}
class GeoDistributedUser(HttpUser):
wait_time = between(1, 2)
def on_start(self):
# Determine which region this test is running from
# In LoadForge, you can use environment variables or tags
self.region = self.environment.parsed_options.tag or "unknown"
if self.region not in region_latencies:
region_latencies[self.region] = []
@task
def test_cdn_latency(self):
# Request the master playlist and measure latency
start_time = time.time()
response = self.client.get("/path/to/master.m3u8", name=f"Master Playlist ({self.region})")
latency = time.time() - start_time
# Store the latency for this region
region_latencies[self.region].append(latency)
# Request a segment to test throughput
if response.status_code == 200:
master = m3u8.loads(response.text)
if master.playlists:
playlist = master.playlists[0] # Use lowest quality for consistent comparison
# Get the media playlist
media_response = self.client.get(playlist.uri, name=f"Media Playlist ({self.region})")
if media_response.status_code == 200:
media = m3u8.loads(media_response.text)
if media.segments:
# Request the first segment and measure throughput
segment = media.segments[0]
start_time = time.time()
segment_response = self.client.get(segment.uri, name=f"Segment ({self.region})")
download_time = time.time() - start_time
# Calculate throughput in Mbps
if download_time > 0:
size_in_bits = len(segment_response.content) * 8
throughput_mbps = (size_in_bits / download_time) / 1_000_000
# Log the throughput as a custom metric
# Note: In newer Locust versions, we should use environment.events.request_success.fire
# But for compatibility with different Locust versions, we'll use a simpler approach
print(f"Region: {self.region} | Throughput: {throughput_mbps:.2f} Mbps")
# Optional: Store this in a custom stat if using Locust's stats functionality
if hasattr(self, 'environment') and hasattr(self.environment, 'stats'):
self.environment.stats.log_request(
request_type="METRIC",
name=f"Throughput ({self.region})",
response_time=throughput_mbps,
response_length=0
)
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
# Print summary of latencies by region
print("\nRegional Latency Summary:")
for region, latencies in region_latencies.items():
if latencies:
avg_latency = statistics.mean(latencies)
p95_latency = statistics.quantiles(latencies, n=20)[19] # 95th percentile
print(f"Region: {region} | Avg: {avg_latency:.3f}s | P95: {p95_latency:.3f}s | Samples: {len(latencies)}")
Analyzing HLS Test Results
When analyzing LoadForge HLS test results, focus on these key metrics:
- Time to First Segment: How quickly can viewers start playback?
- Segment Download Time: Are segments downloading fast enough to prevent buffering?
- Error Rates by Segment Type: Are certain segments (e.g., audio, video, subtitles) failing more than others?
- Geographic Performance Variations: Does your CDN perform consistently across regions?
- Adaptation Frequency: How often do quality levels change during playback?
LoadForge’s reporting dashboard provides detailed insights into these metrics, helping you identify and resolve streaming bottlenecks.
Best Practices for HLS Load Testing
- Start with realistic user counts and gradually scale up to identify breaking points
- Test from multiple geographic locations to evaluate CDN performance globally
- Include a mix of device profiles (mobile, desktop, smart TV) in your tests
- Simulate different network conditions by varying the bitrate selection logic
- Test both steady-state and spike scenarios to ensure your infrastructure can handle both
- Monitor origin server metrics during tests to identify backend bottlenecks
- Test with actual content dimensions and bitrates that match your production streams
Common HLS Testing Challenges
Challenge: High CPU Usage During Tests
Parsing HLS manifests and managing segment downloads can be CPU-intensive. To optimize performance:
# Use FastHttpUser instead of HttpUser for better performance
from locust.contrib.fasthttp import FastHttpUser
from locust import task, between
import m3u8
class OptimizedHLSUser(FastHttpUser):
wait_time = between(1, 2)
@task
def stream_hls(self):
# Implementation as before, but with FastHttpUser
pass
Challenge: Handling Authentication
Many streaming services require authentication tokens in the HLS URLs:
class AuthenticatedStreamUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
# Authenticate and get token
auth_response = self.client.post("/api/auth", json={
"username": "testuser",
"password": "password"
})
if auth_response.status_code == 200:
self.token = auth_response.json()["token"]
else:
self.token = None
@task
def stream_protected_content(self):
if not self.token:
return
# Request master playlist with auth token
master_url = f"/path/to/master.m3u8?token={self.token}"
response = self.client.get(master_url, name="Authenticated Master Playlist")
# Continue as in previous examples
Conclusion
Effective HLS load testing with LoadForge helps ensure your streaming infrastructure can deliver high-quality video to viewers at scale. By simulating realistic viewer behavior and analyzing key performance metrics, you can identify and resolve bottlenecks before they impact your audience.
For more information on load testing specific applications or APIs, check out our other guides or contact our support team for assistance with your streaming load testing needs.