# locust.py from locust import task, between from locust_plugins.users import GrpcUser import streaming_pb2 class StreamingUser ( GrpcUser ): host = "grpc.server.com:50051" wait_time = between( 1 , 2 ) @task def client_stream ( self ): requests = (streaming_pb2.StreamRequest( value = i) for i in range ( 10 )) response = self .call(requests, "/streaming.StreamService/ClientStream" ) _ = response.value @task def server_stream ( self ): request = streaming_pb2.Empty() responses = self .call(request, "/streaming.StreamService/ServerStream" ) for msg in responses: _ = msg.value @task def bidi_stream ( self ): requests = (streaming_pb2.StreamRequest( value = i) for i in range ( 5 )) responses = self .call(requests, "/streaming.StreamService/BidiStream" ) for msg in responses: _ = msg.value