Load test Websocket with Locust

Load test Websocket with Locust

locust is good, isn't it? locust. All recent load tests have been done with locust. It's really easy to use python even for complicated scenarios.

However, recently, the number of always-on apps is increasing, and it seems that there are many situations where a simple req / res model cannot be applied well.

So, I tried to load using Websocket in the task of Locust so that I can check it from the Web screen.

websocketlocust.png

By default, the unit is ms, but if it is ms in socket communication, 0 is normally repeated, so the unit is μs. As you can see from the code, I just made a socket in Task, communicated, and notified locust of the result, so I have not touched locust itself at all.

Since locust has a simple design and the part for building a distributed environment and the part for requesting and aggregating the results are separated, it is good to be able to do this easily.

locustfile.py


# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function

import json
import uuid
import time
import gevent

from websocket import create_connection
import six

from locust import HttpLocust, TaskSet, task
from locust.events import request_success


class ChatTaskSet(TaskSet):
    def on_start(self):
        self.user_id = six.text_type(uuid.uuid4())
        ws = create_connection('ws://127.0.0.1:5000/chat')
        self.ws = ws

        def _receive():
            while True:
                res = ws.recv()
                data = json.loads(res)
                end_at = time.time()
                response_time = int((end_at - data['start_at']) * 1000000)
                request_success.fire(
                    request_type='WebSocket Recv',
                    name='test/ws/chat',
                    response_time=response_time,
                    response_length=len(res),
                )

        gevent.spawn(_receive)

    @task
    def sent(self):
        start_at = time.time()
        body = json.dumps({'message': 'hello, world', 'user_id': self.user_id, 'start_at': start_at})
        self.ws.send(body)
        request_success.fire(
            request_type='WebSocket Sent',
            name='test/ws/chat',
            response_time=int((time.time() - start_at) * 1000000),
            response_length=len(body),
        )


        
class ChatLocust(HttpLocust):
    task_set = ChatTaskSet
    min_wait = 0
    max_wait = 100

The point is to notify the result with events.request_success.fire and to launch a gevent thread for receiving. If you normally define receiving in @task, the task will stop during that time, and other tasks in the same TaskSet will also stop.

By the way, the sample Echo & PubSub server with the load is the following script.

server.py


# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function

from collections import defaultdict
import json

from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from flask import Flask, request
from werkzeug.exceptions import abort


app = Flask(__name__)

ctr = defaultdict(int)


@app.route('/echo')
def echo():
    ws = request.environ['wsgi.websocket']
    if not ws:
        abort(400)

    while True:
        message = ws.receive()
        if message is not None:
            r = json.loads(message)
            ctr[r['user_id']] += 1

        ws.send(message)


@app.route('/report')
def report():
    return '\n'.join(['{}:\t{}'.format(user_id, count) for user_id, count in ctr.items()])


socket_handlers = set()


@app.route('/chat')
def chat():
    ws = request.environ['wsgi.websocket']
    socket_handlers.add(ws)

    while True:
        message = ws.receive()
        for socket_handler in socket_handlers:
            try:
                socket_handler.send(message)
            except:
                socket_handlers.remove(socket_handler)


if __name__ == '__main__':
    http_server = WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    http_server.serve_forever()

The file is also uploaded in the following repository. https://gist.github.com/yamionp/9112dd6e54694d594306

Recommended Posts

Load test Websocket with Locust
Build load test tool Locust 1.1 on Docker
Stress Test with Locust written in Python
Prepare a distributed load test environment with the Python load test tool Locust
Primality test with Python
Strengthen with code test ⑦
Strengthen with code test ⑨
Strengthen with code test ③
Strengthen with code test ⑤
Primality test with python
Strengthen with code test ②
Various load test tools
WebSocket with Python + uWSGI
Strengthen with code test ①
LOAD DATA with PyMysql
Strengthen with code test ⑧
Strengthen with code test ⑨
Load nested json with pandas
Test embedded software with Google Test
Unit test flask with pytest
Test standard output with Pytest
Test Driven Development with Django Part 3
Save & load data with joblib, pickle
Load Django modules with an interpreter
Test Driven Development with Django Part 6
Load multiple JavaScript files with PyWebView
Test Driven Development with Django Part 2
Load gif images with Python + OpenCV
Test Driven Development with Django Part 1
Test Driven Development with Django Part 5
Controlling test reruns with Luigi + pytest