Example tests and snippets

Testing websockets with SocketIO

LoadForge supports SocketIO load testing for websockets. We have a guide using standard libraries available on our default websockets test guide.

The SocketIO implementation makes it much easier to test, as you can see below.

Example Code

import time
import json
from locust import task
from locust_plugins.users import SocketIOUser


class MySocketIOUser(SocketIOUser):
    @task
    def my_task(self):
        self.my_value = None


        self.connect("wss://YOUR_WS_URL_HERE/socket.io/?EIO=3&transport=websocket")


        # example of subscribe
        self.send('42["subscribe",{"url":"/game/237382","sendInitialUpdate": true}]')


        # wait until I get a push message to on_message
        while not self.my_value:
            time.sleep(0.1)


        # you can do http in the same taskset as well
        self.client.get("/")


        # wait for additional pushes, while occasionally sending heartbeats, like a real client would
        self.sleep_with_heartbeat(10)


    def on_message(self, message):
        self.my_value = json.loads(message)["my_value"]


    if __name__ == "__main__":
        host = "http://example.com"

Locust Test Example

LoadForge is powered by locust, meaning open source locust users can copy this script as well. If you are a locust user, consider importing your script to LoadForge to supercharge your testing!

Previous
Websockets (Standard)