locust is good, isn't it? locust. All recent load tests have been done with locust. It's really easy to use python even for complicated scenarios.
However, recently, the number of always-on apps is increasing, and it seems that there are many situations where a simple req / res model cannot be applied well.
So, I tried to load using Websocket in the task of Locust so that I can check it from the Web screen.
By default, the unit is ms, but if it is ms in socket communication, 0 is normally repeated, so the unit is μs. As you can see from the code, I just made a socket in Task, communicated, and notified locust of the result, so I have not touched locust itself at all.
Since locust has a simple design and the part for building a distributed environment and the part for requesting and aggregating the results are separated, it is good to be able to do this easily.
locustfile.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
import json
import uuid
import time
import gevent
from websocket import create_connection
import six
from locust import HttpLocust, TaskSet, task
from locust.events import request_success
class ChatTaskSet(TaskSet):
def on_start(self):
self.user_id = six.text_type(uuid.uuid4())
ws = create_connection('ws://127.0.0.1:5000/chat')
self.ws = ws
def _receive():
while True:
res = ws.recv()
data = json.loads(res)
end_at = time.time()
response_time = int((end_at - data['start_at']) * 1000000)
request_success.fire(
request_type='WebSocket Recv',
name='test/ws/chat',
response_time=response_time,
response_length=len(res),
)
gevent.spawn(_receive)
@task
def sent(self):
start_at = time.time()
body = json.dumps({'message': 'hello, world', 'user_id': self.user_id, 'start_at': start_at})
self.ws.send(body)
request_success.fire(
request_type='WebSocket Sent',
name='test/ws/chat',
response_time=int((time.time() - start_at) * 1000000),
response_length=len(body),
)
class ChatLocust(HttpLocust):
task_set = ChatTaskSet
min_wait = 0
max_wait = 100
The point is to notify the result with events.request_success.fire and to launch a gevent thread for receiving. If you normally define receiving in @task, the task will stop during that time, and other tasks in the same TaskSet will also stop.
By the way, the sample Echo & PubSub server with the load is the following script.
server.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
from collections import defaultdict
import json
from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from flask import Flask, request
from werkzeug.exceptions import abort
app = Flask(__name__)
ctr = defaultdict(int)
@app.route('/echo')
def echo():
ws = request.environ['wsgi.websocket']
if not ws:
abort(400)
while True:
message = ws.receive()
if message is not None:
r = json.loads(message)
ctr[r['user_id']] += 1
ws.send(message)
@app.route('/report')
def report():
return '\n'.join(['{}:\t{}'.format(user_id, count) for user_id, count in ctr.items()])
socket_handlers = set()
@app.route('/chat')
def chat():
ws = request.environ['wsgi.websocket']
socket_handlers.add(ws)
while True:
message = ws.receive()
for socket_handler in socket_handlers:
try:
socket_handler.send(message)
except:
socket_handlers.remove(socket_handler)
if __name__ == '__main__':
http_server = WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
http_server.serve_forever()
The file is also uploaded in the following repository. https://gist.github.com/yamionp/9112dd6e54694d594306
Recommended Posts