Do you remember ** Apsetone Deb **? I don't think my job would have even been born without this existence. [OSI Reference Model](http://dic.nicovideo.jp/a/%E3%82%A2%E3%83%97%E3%82%BB%E3%83%88%E3%83%8D%E3 How to remember% 83% 87% E3% 83% 96).
Among the OSI reference models, this is a story of implementing a socket server that performs socket communication in the transport layer by making full use of parallel processing. This article is a Python rewrite of what I had when a great engineer taught me about C # Tasks and async / await this spring. "Mastering TCP / IP" that was covered with dust in one corner of the room was mounted in one hand.
--Parallel non-blocking processing can be written by yourself. --Gevent and async / await will be able to implement processing using lightweight threads respectively. --You will be able to understand the C10k problem a little.
I read the article Multithread / Process Summary (Ruby Edition) and wrote the code with the intention of reviewing it. It's well organized, so if you don't understand the meaning of the words that appear, you should read it. (I wrote while reading myself.)
Typical protocols that perform the transport layer function in TCP / IP are "TCP" and "UDP". When communicating using TCP or UDP, the OS API called socket is widely used. This time we will implement the socket server and client. In the process, we will use the gevent version and the async / await version of lightweight threads.
It is a specification that extends the echo server. The specifications of the disconnection detection function and the function to echo the previous and current minutes are included.
■ Operation of finished product gif
After opening the communication, it sends and receives to and from the socket server a total of 10 times every second and disconnects. 100 clients that perform socket communication are started at the same time and process 100 loops.
HelloWorld {n}
to the server. Start from n = 0.Client execution image
Server connection
Send:Hello World!:0
Receive:Hello World!:0
Send:Hello World!:1
Receive:Hello World!:0 Hello World!:1
....
Send:Hello World!:8
Receive:Hello World!:7 Hello World!:8
Send:Hello World!:9
Receive:Hello World!:8 Hello World!:9
Disconnect from server
According to the specifications, the client activates 100 socket communications at the same time, communicates 10 times in total every second, and then disconnects. If you write the socket server in serial code, the throughput will be extremely poor because the next connection cannot be started until the first connection is completed and disconnected.
py35_tcp_non_parallel_server.py
# -*- coding: utf-8 -*-
import socket
#Create an INET STREAM socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Connect the socket to the well-known port of the public host
server.bind(('127.0.0.1', 1234))
#Become a server socket
server.listen(10000)
while True:
client_socket, address = server.accept()
while True:
#Receive
fp = client_socket.makefile()
line = fp.readline()
print(line, type(line))
if not line:
#End loop if disconnected
break
#response
client_socket.send(line.encode('utf-8', 'ignore'))
print('send:{}'.format(line))
■ Execution result It is very slow because it processes each communication in order.
In gevent, wait_read
and wait_write
exist, so the connection Timeout process could be implemented. Convenient. The gevent version and the async / await version are written to be compatible.
The point is that gevent.monkey.patch_socket ()
changes sock.accept ()
to non-blocking processing, and gevent.spawn
throws server processing into lightweight threads. point
gevent_tcp_server_by_monkey_patch.py
# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()
def task_stream(client_socket):
_prev_message = ""
while True:
#Receive
fp = client_socket.makefile()
line = fp.readline()
if not line:
#End loop if disconnected
break
#response
s = _prev_message + line
client_socket.send(s.encode('utf-8', 'ignore'))
print('send:{}'.format(s))
_prev_message = line
def gen_server(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
sock.listen(10000)
name = 1
while True:
conn, addr = sock.accept()
gevent.spawn(worker, str(name), conn, addr)
name += 1
def worker(name, sock, addr):
print('new connection!')
task = gevent.spawn(task_stream(sock))
group.add(task)
def main():
host, port = '127.0.0.1', 1234
server = gevent.spawn(gen_server, host, port)
group.add(server)
group.join()
server.kill()
group = Group()
main()
The point is that it waits non-blocking until communication comes with wait_read
and the point that timeout processing is added.
gevent_tcp_server_by_stream_server.py
# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write
class TooLong(Exception):
pass
def handle(socket, address):
print('new connection!')
fp = socket.makefile()
try:
_prev_message = ""
while True:
wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
line = fp.readline()
if line:
#Generate a response string including the last received
s = _prev_message + line
#Send
socket.send(s.encode('utf-8', 'ignore'))
fp.flush()
print('send:{}'.format(_prev_message + line))
_prev_message = line
except TooLong:
print('timeout')
#Disconnect when Timeout occurs
socket.shutdown(py_socket.SHUT_RDWR)
socket.close()
pool = Pool(10000) # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()
gevent_tcp_client.py
# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time
HOST = '127.0.0.1'
PORT = 1234
def client():
conn = socket.create_connection((HOST, PORT))
for x in range(10):
message = 'Hello World!:{}\n'.format(x)
#Send
conn.send(message.encode('utf-8', 'ignore'))
recv_message = conn.recv(3000)
print('recv:' + recv_message.decode('utf-8'))
#Wait 1 second
gevent.sleep(1)
conn.close()
def main():
for x in range(100):
jobs = [gevent.spawn(client) for x in range(100)]
gevent.joinall(jobs)
main()
ʻAsync def task_echo (reader, writer)has reader and writer as arguments, which is unpleasant, but the Python3 document says [the sample that handles socket communication says so](http: // docs) .python.jp/3/library/asyncio-stream.html#tcp-echo-server-using-streams), so I implemented it as it is. It may be because it is wrapped with
StreamReader, but something is unpleasant .. If there is a class that can perform non-blocking socket communication, it can be rewritten from
StreamReader`, so there is a possibility that it will be modified in the future.
py35_tcp_server.py
# -*- coding: utf-8 -*-
import asyncio
HOST = '127.0.0.1'
PORT = 1234
async def task_echo(reader, writer):
print('new connection!')
_prev_message = ''
while True:
line = await reader.readline()
if line == b'':
#Disconnection detection When disconnected, reader for some reason.readline()Is b''Exit the loop to return
break
if line:
writer.write(line)
await writer.drain()
print('send:{}'.format(_prev_message + line.decode()))
_prev_message = line.decode()
#Close socket communication when disconnection is detected
print("Close the client socket")
await writer.drain()
writer.close()
def main():
loop = asyncio.get_event_loop()
coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
main()
py35_tcp_client.py
# -*- coding: utf-8 -*-
import asyncio
HOST = '127.0.0.1'
PORT = 1234
async def tcp_echo_client():
reader, writer = await asyncio.open_connection(HOST, PORT)
for x in range(10):
message = 'Hello World!:{}\n'.format(str(x))
#Send
writer.write(message.encode())
#Receive
data = (await reader.read(100)).decode()
print('recv:{}'.format(data))
#Wait 1 second
await asyncio.sleep(1)
writer.close()
return "1"
def main():
loop = asyncio.get_event_loop()
for x in range(100):
tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
loop.run_until_complete(tasks)
loop.close()
main()
I couldn't put it together due to lack of ability
As a socket server, the above code group has a fatal problem. It can only handle 1 CPU. Python, which can handle only one CPU due to the restrictions of GIL (Global Interpreter Lock), is a little disappointing. (It's been said for more than 5 years and it hasn't improved yet, so this is probably the limit.)
If you talk subjectively, what you really wanted is to write parallel processing using lightweight threads, and even if the user is not aware of it, it will be processed in parallel with a multiprocessor and super fast C #, and you can create lightweight threads as much as you want with message passing between threads. It was a lightweight thread like Erlang that was free to communicate and the user did not recognize the number of CPUs at all.
Python's multiprocessor support can only be forked to to avoid GIL problems or started multiple times with a daemon.
However, when the server scale reaches 10 or 100, the same phenomenon occurs in other languages. I don't know a practical language that runs between different servers as a logical single machine. Python should be designed to run in a single process and run as many daemons as there are CPUs. Inevitably the backend will be designed to utilize message queuing services or storage, so it should easily scale to multiple servers.
Of course, the side effect is that the design is complicated. When implementing a chat service, how is it correct to design to send push notifications at the time of posting to active users in the same chat room that are connected to multiple servers in a distributed manner?
If you can handle lightweight threads, you will be able to write efficient servers that communicate with the outside world. By making the processing waiting for communication asynchronous, it becomes possible to allocate CPU to other work in parallel without actually waiting for the waiting time.
Lightweight threads that could only be handled by the external library gevent (Greenlet) can now be handled by pure Python with the implementation of asyncio in Python 3.4. In addition, Python 3.5's async / await has been implemented to handle lightweight threads with a simpler description.
When I was writing the async / await version, I kept thinking about how to turn a blocking IO into a non-blocking IO. After writing the async / await version, I think the design concept that implicitly converts IO to non-blocking processing by touching gevent is wonderful.
Excerpt from gevent tutorial
The real power of gevent is network and IO bound, co-scheduled functions.
When using it to execute. gevent takes care of all the details and networks as much as possible
Causes the library to implicitly transfer the greenlet context.
There are many useful functions and classes in gevent that can reach the itch, and I think that gevent will be used for the time being.
In practice, be sure to use TCP or UDP protocol because raw sockets do not arrive in pieces when sending huge data and the order of arrival is not guaranteed, and disconnection detection cannot be performed well.
After consulting with a top-class engineer in the company about lightweight threads, if you write an echo server that responds to the previous and this time in node.js
, * if you write it normally, it will be very difficult in callback hell *. However, I was advised that writing it once would be a good experience, so I would like to try it someday.
The Timeout class, which can constrain the execution time of the gevent code block, seems to be useful, so I'd like to reimplement it in pure Python.
gevent tutorial-Timeouts
import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
[Mastering TCP / IP](http://www.amazon.co.jp/%E3%83%9E%E3%82%B9%E3%82%BF%E3%83%AA%E3%83%B3% E3% 82% B0 TCP-IP-% E5% 85% A5% E9% 96% 80% E7% B7% A8-% E7% AC% AC5% E7% 89% 88-% E7% AB% B9% E4% B8 % 8B / dp / 4274068765) gevent tutorial gevent: Asynchronous I/O made easy 18.5. Asyncio – Asynchronous I / O, Event Loops, Coroutines and Tasks Tutorial: Writing a TCP server in Python