Vous vous souvenez de ** Apsetone Deb **? Je ne pense pas que mon travail serait même né sans cette existence. [Modèle de référence OSI](http://dic.nicovideo.jp/a/%E3%82%A2%E3%83%97%E3%82%BB%E3%83%88%E3%83%8D%E3 Comment se souvenir% 83% 87% E3% 83% 96).
Parmi les modèles de référence OSI, il s'agit de la mise en œuvre d'un serveur de socket qui effectue une communication de socket dans la couche de transport en utilisant pleinement le traitement parallèle. Cet article est une réécriture Python de ce que j'ai appris sur les tâches C # et async / wait d'un grand ingénieur ce printemps. "Maîtriser TCP / IP" qui était recouvert de poussière dans un coin de la pièce a été installé dans une main.
J'ai lu l'article Multithread / Process Summary (Ruby Edition) et j'ai écrit le code avec l'intention de le réviser. Il est bien organisé, donc si vous ne comprenez pas le sens des mots qui apparaissent, vous devriez le lire. (J'ai écrit en me lisant.)
Les protocoles typiques qui exécutent la fonction de couche de transport dans TCP / IP sont "TCP" et "UDP". Lors de la communication via TCP ou UDP, l'API du système d'exploitation appelée socket est largement utilisée. Cette fois, nous implémenterons le serveur et le client socket. Dans le processus, nous utiliserons la version gevent et la version async / await des threads légers.
C'est une extension du serveur d'écho. Il intègre les spécifications de la fonction de détection de déconnexion et la fonction d'écho des minutes précédentes et actuelles.
■ Fonctionnement du produit fini gif
Après avoir ouvert la communication, il envoie et reçoit vers et depuis le serveur socket un total de 10 fois par seconde et se déconnecte. 100 clients qui effectuent une communication de socket sont démarrés en même temps et traitent 100 boucles.
HelloWorld {n}
au serveur. Commencez par n = 0.Image d'exécution du client
Connexion serveur
Envoyer:Hello World!:0
Recevoir:Hello World!:0
Envoyer:Hello World!:1
Recevoir:Hello World!:0 Hello World!:1
....
Envoyer:Hello World!:8
Recevoir:Hello World!:7 Hello World!:8
Envoyer:Hello World!:9
Recevoir:Hello World!:8 Hello World!:9
Se déconnecter du serveur
Selon les spécifications, le client active 100 communications de socket en même temps, communique 10 fois au total toutes les 1 seconde, puis se déconnecte. Si vous écrivez le serveur de socket en code série, le débit sera extrêmement faible car la connexion suivante ne peut pas être démarrée tant que la première connexion n'est pas terminée et déconnectée.
py35_tcp_non_parallel_server.py
# -*- coding: utf-8 -*-
import socket
#Créer un socket INET STREAM
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Connectez la prise au port bien connu de l'hôte public
server.bind(('127.0.0.1', 1234))
#Devenir un socket serveur
server.listen(10000)
while True:
client_socket, address = server.accept()
while True:
#Recevoir
fp = client_socket.makefile()
line = fp.readline()
print(line, type(line))
if not line:
#Terminer la boucle si déconnecté
break
#réponse
client_socket.send(line.encode('utf-8', 'ignore'))
print('send:{}'.format(line))
■ Résultat d'exécution Il est très lent car il traite chaque communication dans l'ordre.
Dans gevent, wait_read
et wait_write
existent, donc le processus de temporisation de connexion peut être implémenté. Pratique. La version gevent et la version async / await sont écrites pour être compatibles.
Le fait est que gevent.monkey.patch_socket ()
change sock.accept ()
en traitement non bloquant, et gevent.spawn
jette le traitement du serveur dans un thread léger. point
gevent_tcp_server_by_monkey_patch.py
# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()
def task_stream(client_socket):
_prev_message = ""
while True:
#Recevoir
fp = client_socket.makefile()
line = fp.readline()
if not line:
#Terminer la boucle si déconnecté
break
#réponse
s = _prev_message + line
client_socket.send(s.encode('utf-8', 'ignore'))
print('send:{}'.format(s))
_prev_message = line
def gen_server(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
sock.listen(10000)
name = 1
while True:
conn, addr = sock.accept()
gevent.spawn(worker, str(name), conn, addr)
name += 1
def worker(name, sock, addr):
print('new connection!')
task = gevent.spawn(task_stream(sock))
group.add(task)
def main():
host, port = '127.0.0.1', 1234
server = gevent.spawn(gen_server, host, port)
group.add(server)
group.join()
server.kill()
group = Group()
main()
Le fait est qu'il attend sans blocage jusqu'à ce que la communication arrive avec wait_read
et le moment auquel le traitement du délai d'attente est ajouté.
gevent_tcp_server_by_stream_server.py
# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write
class TooLong(Exception):
pass
def handle(socket, address):
print('new connection!')
fp = socket.makefile()
try:
_prev_message = ""
while True:
wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
line = fp.readline()
if line:
#Générer une chaîne de réponse comprenant le dernier reçu
s = _prev_message + line
#Envoyer
socket.send(s.encode('utf-8', 'ignore'))
fp.flush()
print('send:{}'.format(_prev_message + line))
_prev_message = line
except TooLong:
print('timeout')
#Déconnecter lorsque le délai d'expiration se produit
socket.shutdown(py_socket.SHUT_RDWR)
socket.close()
pool = Pool(10000) # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()
gevent_tcp_client.py
# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time
HOST = '127.0.0.1'
PORT = 1234
def client():
conn = socket.create_connection((HOST, PORT))
for x in range(10):
message = 'Hello World!:{}\n'.format(x)
#Envoyer
conn.send(message.encode('utf-8', 'ignore'))
recv_message = conn.recv(3000)
print('recv:' + recv_message.decode('utf-8'))
#Attendez 1 seconde
gevent.sleep(1)
conn.close()
def main():
for x in range(100):
jobs = [gevent.spawn(client) for x in range(100)]
gevent.joinall(jobs)
main()
ʻAsync def task_echo (reader, writer) a un lecteur et un écrivain comme arguments, ce qui est désagréable, mais dans le document Python3 [il a été écrit dans l'exemple traitant de la communication socket](http: // docs) .python.jp / 3 / library / asyncio-stream.html # tcp-echo-server-using-streams), je l'ai donc implémenté tel quel. C'est probablement parce qu'il est enveloppé dans
StreamReader, mais quelque chose est désagréable. S'il y a une classe qui peut effectuer une communication socket non bloquante, elle peut être réécrite à partir de
StreamReader`, donc il y a une possibilité qu'elle soit modifiée dans le futur.
py35_tcp_server.py
# -*- coding: utf-8 -*-
import asyncio
HOST = '127.0.0.1'
PORT = 1234
async def task_echo(reader, writer):
print('new connection!')
_prev_message = ''
while True:
line = await reader.readline()
if line == b'':
#Détection de déconnexion Lorsqu'il est déconnecté, lecteur pour une raison quelconque.readline()Est B''Quittez la boucle pour revenir
break
if line:
writer.write(line)
await writer.drain()
print('send:{}'.format(_prev_message + line.decode()))
_prev_message = line.decode()
#Fermer la communication du socket lorsque la déconnexion est détectée
print("Close the client socket")
await writer.drain()
writer.close()
def main():
loop = asyncio.get_event_loop()
coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
main()
py35_tcp_client.py
# -*- coding: utf-8 -*-
import asyncio
HOST = '127.0.0.1'
PORT = 1234
async def tcp_echo_client():
reader, writer = await asyncio.open_connection(HOST, PORT)
for x in range(10):
message = 'Hello World!:{}\n'.format(str(x))
#Envoyer
writer.write(message.encode())
#Recevoir
data = (await reader.read(100)).decode()
print('recv:{}'.format(data))
#Attendez 1 seconde
await asyncio.sleep(1)
writer.close()
return "1"
def main():
loop = asyncio.get_event_loop()
for x in range(100):
tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
loop.run_until_complete(tasks)
loop.close()
main()
Je n'ai pas pu le mettre en place à cause d'un manque de capacité
En tant que serveur socket, le groupe de codes ci-dessus a un problème fatal. Il ne peut gérer qu'un seul processeur. Python qui ne peut gérer qu'un seul processeur en raison de la limitation de GIL (Global Interpreter Lock) est un peu décevant. (On le dit depuis plus de 5 ans et cela ne s'est pas encore amélioré, c'est donc probablement la limite.)
Si vous parlez subjectivement, ce que vous vouliez vraiment, c'est écrire un traitement parallèle à l'aide de threads légers, et même si l'utilisateur n'en est pas conscient, il sera traité en parallèle avec un multiprocesseur et un C # ultra rapide, et vous pouvez créer des threads légers autant que vous le souhaitez avec le message passant entre les threads. C'était un thread léger comme Erlang qui était libre de communiquer et l'utilisateur ne reconnaissait pas du tout le nombre de processeurs.
Le support multiprocesseur de Python ne peut être utilisé que pour éviter les problèmes de GIL ou démarré plusieurs fois avec un démon.
Cependant, lorsque l'échelle du serveur atteint 10 ou 100, le même phénomène se produit dans d'autres langues. Je ne connais pas de langage pratique qui fonctionne entre différents serveurs comme une seule machine logique. Python doit être conçu de manière à s'exécuter en un seul processus, et les démons doivent exécuter autant que le nombre de processeurs. Inévitablement, le backend sera conçu pour utiliser les services de file d'attente de messages ou le stockage, il devrait donc facilement évoluer vers plusieurs serveurs.
Bien sûr, comme effet secondaire, la conception devient compliquée. En tant qu'implémentation du service de chat, comment est-il correct de concevoir l'envoi de notifications push au moment de la publication aux utilisateurs actifs dans la même salle de discussion qui sont connectés à plusieurs serveurs de manière distribuée?
Si vous pouvez gérer des threads légers, vous pourrez écrire des serveurs efficaces qui communiquent avec le monde extérieur. En rendant le traitement en attente de communication asynchrone, il devient possible d'allouer la CPU à d'autres travaux en parallèle sans réellement attendre le temps d'attente.
Les threads légers qui ne pouvaient être gérés que par la bibliothèque externe gevent (Greenlet) peuvent désormais être gérés par Python pur avec l'implémentation d'asyncio dans Python 3.4. De plus, Python 3.5 async / await a été implémenté pour gérer les threads légers avec une description plus simple.
Lorsque j'écrivais la version async / await, je n'arrêtais pas de réfléchir à la façon de transformer un IO bloquant en un IO non bloquant. Après avoir écrit la version async / await, je pense que le concept de conception qui convertit implicitement IO en traitement non bloquant en touchant gevent est merveilleux.
Extrait du tutoriel gevent
La véritable puissance de gevent réside dans la planification collaborative des fonctions liées au réseau et aux E / S.
Lorsque vous l'utilisez pour exécuter. gevent s'occupe autant que possible de tous les détails et réseaux
Force la bibliothèque à transférer implicitement le contexte du greenlet.
Il existe de nombreuses fonctions et classes utiles dans gevent qui peuvent atteindre l'endroit qui démange, et je pense que gevent sera utilisé pour le moment.
En pratique, veillez à utiliser le protocole TCP ou UDP, car les sockets bruts ne peuvent pas être livrés en morceaux ou les déconnexions peuvent être bien détectées car l'ordre d'arrivée n'est pas garanti lors de l'envoi d'énormes quantités de données.
Après avoir consulté un ingénieur de haut niveau de l'entreprise sur les threads légers, si vous écrivez un serveur d'écho qui répond au précédent et cette fois dans node.js
, * si vous l'écrivez normalement, ce sera très difficile dans l'enfer des rappels *. Cependant, on m'a dit que l'écrire une fois serait une bonne expérience, alors j'aimerais l'essayer un jour.
La classe Timeout, qui peut limiter le temps d'exécution du bloc de code gevent, semble être utile, je voudrais donc la réimplémenter en Python pur.
tutoriel gevent-Timeouts
import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
[Maîtriser TCP / IP](http://www.amazon.co.jp/%E3%83%9E%E3%82%B9%E3%82%BF%E3%83%AA%E3%83%B3% E3% 82% B0 TCP-IP-% E5% 85% A5% E9% 96% 80% E7% B7% A8-% E7% AC% AC5% E7% 89% 88-% E7% AB% B9% E4% B8 % 8B / dp / 4274068765) tutoriel gevent gevent: Asynchronous I/O made easy 18.5. Asyncio - E / S asynchrones, boucles d'événements, collouts et tâches Tutorial: Writing a TCP server in Python