Example tests and snippets
Testing websockets with SocketIO
LoadForge supports SocketIO load testing for websockets. We have a guide using standard libraries available on our default websockets test guide.
The SocketIO implementation makes it much easier to test, as you can see below.
Example Code
import time
import json
from locust import task
from locust_plugins.users import SocketIOUser
class MySocketIOUser(SocketIOUser):
@task
def my_task(self):
self.my_value = None
self.connect("wss://YOUR_WS_URL_HERE/socket.io/?EIO=3&transport=websocket")
# example of subscribe
self.send('42["subscribe",{"url":"/game/237382","sendInitialUpdate": true}]')
# wait until I get a push message to on_message
while not self.my_value:
time.sleep(0.1)
# you can do http in the same taskset as well
self.client.get("/")
# wait for additional pushes, while occasionally sending heartbeats, like a real client would
self.sleep_with_heartbeat(10)
def on_message(self, message):
self.my_value = json.loads(message)["my_value"]
if __name__ == "__main__":
host = "http://example.com"
Locust Test Example
LoadForge is powered by locust, meaning open source locust users can copy this script as well. If you are a locust user, consider importing your script to LoadForge to supercharge your testing!