Implémentation du serveur de socket avec détection de déconnexion par gevent ou async / await

Vous vous souvenez de ** Apsetone Deb **? Je ne pense pas que mon travail serait même né sans cette existence. [Modèle de référence OSI](http://dic.nicovideo.jp/a/%E3%82%A2%E3%83%97%E3%82%BB%E3%83%88%E3%83%8D%E3 Comment se souvenir% 83% 87% E3% 83% 96).

Parmi les modèles de référence OSI, il s'agit de la mise en œuvre d'un serveur de socket qui effectue une communication de socket dans la couche de transport en utilisant pleinement le traitement parallèle. Cet article est une réécriture Python de ce que j'ai appris sur les tâches C # et async / wait d'un grand ingénieur ce printemps. "Maîtriser TCP / IP" qui était recouvert de poussière dans un coin de la pièce a été installé dans une main.

Objectif

J'ai lu l'article Multithread / Process Summary (Ruby Edition) et j'ai écrit le code avec l'intention de le réviser. Il est bien organisé, donc si vous ne comprenez pas le sens des mots qui apparaissent, vous devriez le lire. (J'ai écrit en me lisant.)

Présentation et spécifications du serveur Socket

Les protocoles typiques qui exécutent la fonction de couche de transport dans TCP / IP sont "TCP" et "UDP". Lors de la communication via TCP ou UDP, l'API du système d'exploitation appelée socket est largement utilisée. Cette fois, nous implémenterons le serveur et le client socket. Dans le processus, nous utiliserons la version gevent et la version async / await des threads légers.

Spécifications du serveur de socket et déroulement des opérations

C'est une extension du serveur d'écho. Il intègre les spécifications de la fonction de détection de déconnexion et la fonction d'écho des minutes précédentes et actuelles.

  1. Acceptez la communication de socket du client
  2. Lorsqu'un message est reçu du client, faites écho à la réception précédente et à la réception actuelle.
  3. Si la communication est déconnectée, fermez la communication de socket

■ Fonctionnement du produit fini gif loop.gif

Spécifications du client et déroulement des opérations

Après avoir ouvert la communication, il envoie et reçoit vers et depuis le serveur socket un total de 10 fois par seconde et se déconnecte. 100 clients qui effectuent une communication de socket sont démarrés en même temps et traitent 100 boucles.

  1. Connectez-vous au serveur socket
  2. Envoyez HelloWorld {n} au serveur. Commencez par n = 0.
  3. Recevoir les transmissions précédentes et actuelles du serveur
  4. 1 seconde matsu ← Cette fois
  5. n ++ et revenez au processus de 2.
  6. Déconnectez-vous lorsque n> = 10

Image d'exécution du client


Connexion serveur
Envoyer:Hello World!:0
Recevoir:Hello World!:0
Envoyer:Hello World!:1
Recevoir:Hello World!:0 Hello World!:1
....
Envoyer:Hello World!:8
Recevoir:Hello World!:7 Hello World!:8
Envoyer:Hello World!:9
Recevoir:Hello World!:8 Hello World!:9
Se déconnecter du serveur

Cas d'échec: Si vous écrivez normalement, vous aurez un serveur qui ne fonctionnera pas en parallèle.

Selon les spécifications, le client active 100 communications de socket en même temps, communique 10 fois au total toutes les 1 seconde, puis se déconnecte. Si vous écrivez le serveur de socket en code série, le débit sera extrêmement faible car la connexion suivante ne peut pas être démarrée tant que la première connexion n'est pas terminée et déconnectée.

py35_tcp_non_parallel_server.py


# -*- coding: utf-8 -*-
import socket

#Créer un socket INET STREAM
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Connectez la prise au port bien connu de l'hôte public
server.bind(('127.0.0.1', 1234))
#Devenir un socket serveur
server.listen(10000)

while True:
    client_socket, address = server.accept()
    while True:
        #Recevoir
        fp = client_socket.makefile()
        line = fp.readline()
        print(line, type(line))
        if not line:
            #Terminer la boucle si déconnecté
            break

        #réponse
        client_socket.send(line.encode('utf-8', 'ignore'))
        print('send:{}'.format(line))

■ Résultat d'exécution Il est très lent car il traite chaque communication dans l'ordre. wrong_loop.gif

Serveur de socket de version Python3.5 Gevent

Dans gevent, wait_read et wait_write existent, donc le processus de temporisation de connexion peut être implémenté. Pratique. La version gevent et la version async / await sont écrites pour être compatibles.

version monkey_patch de gevent

Le fait est que gevent.monkey.patch_socket () change sock.accept () en traitement non bloquant, et gevent.spawn jette le traitement du serveur dans un thread léger. point

gevent_tcp_server_by_monkey_patch.py


# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()


def task_stream(client_socket):
    _prev_message = ""
    while True:
        #Recevoir
        fp = client_socket.makefile()
        line = fp.readline()
        if not line:
            #Terminer la boucle si déconnecté
            break

        #réponse
        s = _prev_message + line
        client_socket.send(s.encode('utf-8', 'ignore'))
        print('send:{}'.format(s))
        _prev_message = line


def gen_server(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((host, port))
    sock.listen(10000)
    name = 1
    while True:
        conn, addr = sock.accept()
        gevent.spawn(worker, str(name), conn, addr)
        name += 1


def worker(name, sock, addr):
    print('new connection!')
    task = gevent.spawn(task_stream(sock))
    group.add(task)


def main():
    host, port = '127.0.0.1', 1234
    server = gevent.spawn(gen_server, host, port)
    group.add(server)
    group.join()
    server.kill()

group = Group()
main()

Version Stream Server de gevent

Le fait est qu'il attend sans blocage jusqu'à ce que la communication arrive avec wait_read et le moment auquel le traitement du délai d'attente est ajouté.

gevent_tcp_server_by_stream_server.py


# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write


class TooLong(Exception):
    pass


def handle(socket, address):
    print('new connection!')

    fp = socket.makefile()
    try:
        _prev_message = ""
        while True:
            wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
            line = fp.readline()
            if line:
                #Générer une chaîne de réponse comprenant le dernier reçu
                s = _prev_message + line
                #Envoyer
                socket.send(s.encode('utf-8', 'ignore'))
                fp.flush()
                print('send:{}'.format(_prev_message + line))
                _prev_message = line
    except TooLong:
        print('timeout')

    #Déconnecter lorsque le délai d'expiration se produit
    socket.shutdown(py_socket.SHUT_RDWR)
    socket.close()


pool = Pool(10000)  # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()

gevent_tcp_client.py


# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time

HOST = '127.0.0.1'
PORT = 1234


def client():
    conn = socket.create_connection((HOST, PORT))
    for x in range(10):
        message = 'Hello World!:{}\n'.format(x)
        #Envoyer
        conn.send(message.encode('utf-8', 'ignore'))
        recv_message = conn.recv(3000)
        print('recv:' + recv_message.decode('utf-8'))
        #Attendez 1 seconde
        gevent.sleep(1)
    conn.close()


def main():
    for x in range(100):
        jobs = [gevent.spawn(client) for x in range(100)]
        gevent.joinall(jobs)


main()


Python3.5 version async / attente du serveur de socket

ʻAsync def task_echo (reader, writer) a un lecteur et un écrivain comme arguments, ce qui est désagréable, mais dans le document Python3 [il a été écrit dans l'exemple traitant de la communication socket](http: // docs) .python.jp / 3 / library / asyncio-stream.html # tcp-echo-server-using-streams), je l'ai donc implémenté tel quel. C'est probablement parce qu'il est enveloppé dans StreamReader, mais quelque chose est désagréable. S'il y a une classe qui peut effectuer une communication socket non bloquante, elle peut être réécrite à partir de StreamReader`, donc il y a une possibilité qu'elle soit modifiée dans le futur.

py35_tcp_server.py


# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def task_echo(reader, writer):
    print('new connection!')
    _prev_message = ''
    while True:
        line = await reader.readline()
        if line == b'':
            #Détection de déconnexion Lorsqu'il est déconnecté, lecteur pour une raison quelconque.readline()Est B''Quittez la boucle pour revenir
            break
        if line:
            writer.write(line)
            await writer.drain()
            print('send:{}'.format(_prev_message + line.decode()))
            _prev_message = line.decode()

    #Fermer la communication du socket lorsque la déconnexion est détectée
    print("Close the client socket")
    await writer.drain()
    writer.close()


def main():
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
    server = loop.run_until_complete(coro)

    # Serve requests until CTRL+c is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()


main()

py35_tcp_client.py


# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def tcp_echo_client():
    reader, writer = await asyncio.open_connection(HOST, PORT)
    for x in range(10):
        message = 'Hello World!:{}\n'.format(str(x))
        #Envoyer
        writer.write(message.encode())
        #Recevoir
        data = (await reader.read(100)).decode()
        print('recv:{}'.format(data))
        #Attendez 1 seconde
        await asyncio.sleep(1)
    writer.close()
    return "1"


def main():
    loop = asyncio.get_event_loop()
    for x in range(100):
        tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
        loop.run_until_complete(tasks)
    loop.close()

main()


Résumé non coordonné

Je n'ai pas pu le mettre en place à cause d'un manque de capacité

Problème fatal avec le code ci-dessus

En tant que serveur socket, le groupe de codes ci-dessus a un problème fatal. Il ne peut gérer qu'un seul processeur. Python qui ne peut gérer qu'un seul processeur en raison de la limitation de GIL (Global Interpreter Lock) est un peu décevant. (On le dit depuis plus de 5 ans et cela ne s'est pas encore amélioré, c'est donc probablement la limite.)

Si vous parlez subjectivement, ce que vous vouliez vraiment, c'est écrire un traitement parallèle à l'aide de threads légers, et même si l'utilisateur n'en est pas conscient, il sera traité en parallèle avec un multiprocesseur et un C # ultra rapide, et vous pouvez créer des threads légers autant que vous le souhaitez avec le message passant entre les threads. C'était un thread léger comme Erlang qui était libre de communiquer et l'utilisateur ne reconnaissait pas du tout le nombre de processeurs.

Le support multiprocesseur de Python ne peut être utilisé que pour éviter les problèmes de GIL ou démarré plusieurs fois avec un démon.

Cependant, lorsque l'échelle du serveur atteint 10 ou 100, le même phénomène se produit dans d'autres langues. Je ne connais pas de langage pratique qui fonctionne entre différents serveurs comme une seule machine logique. Python doit être conçu de manière à s'exécuter en un seul processus, et les démons doivent exécuter autant que le nombre de processeurs. Inévitablement, le backend sera conçu pour utiliser les services de file d'attente de messages ou le stockage, il devrait donc facilement évoluer vers plusieurs serveurs.

Bien sûr, comme effet secondaire, la conception devient compliquée. En tant qu'implémentation du service de chat, comment est-il correct de concevoir l'envoi de notifications push au moment de la publication aux utilisateurs actifs dans la même salle de discussion qui sont connectés à plusieurs serveurs de manière distribuée?

Avantages de pouvoir gérer des fils légers

Si vous pouvez gérer des threads légers, vous pourrez écrire des serveurs efficaces qui communiquent avec le monde extérieur. En rendant le traitement en attente de communication asynchrone, il devient possible d'allouer la CPU à d'autres travaux en parallèle sans réellement attendre le temps d'attente.

Différence entre gevent et async / await

Les threads légers qui ne pouvaient être gérés que par la bibliothèque externe gevent (Greenlet) peuvent désormais être gérés par Python pur avec l'implémentation d'asyncio dans Python 3.4. De plus, Python 3.5 async / await a été implémenté pour gérer les threads légers avec une description plus simple.

Lorsque j'écrivais la version async / await, je n'arrêtais pas de réfléchir à la façon de transformer un IO bloquant en un IO non bloquant. Après avoir écrit la version async / await, je pense que le concept de conception qui convertit implicitement IO en traitement non bloquant en touchant gevent est merveilleux.

Extrait du tutoriel gevent


La véritable puissance de gevent réside dans la planification collaborative des fonctions liées au réseau et aux E / S.
Lorsque vous l'utilisez pour exécuter. gevent s'occupe autant que possible de tous les détails et réseaux
Force la bibliothèque à transférer implicitement le contexte du greenlet.

Il existe de nombreuses fonctions et classes utiles dans gevent qui peuvent atteindre l'endroit qui démange, et je pense que gevent sera utilisé pour le moment.

Utilisez TCP ou UDP dans la pratique

En pratique, veillez à utiliser le protocole TCP ou UDP, car les sockets bruts ne peuvent pas être livrés en morceaux ou les déconnexions peuvent être bien détectées car l'ordre d'arrivée n'est pas garanti lors de l'envoi d'énormes quantités de données.

Devoir 1 - Écrivons dans node.js

Après avoir consulté un ingénieur de haut niveau de l'entreprise sur les threads légers, si vous écrivez un serveur d'écho qui répond au précédent et cette fois dans node.js, * si vous l'écrivez normalement, ce sera très difficile dans l'enfer des rappels *. Cependant, on m'a dit que l'écrire une fois serait une bonne expérience, alors j'aimerais l'essayer un jour.

Homework 2-Réimplémentation de la classe Timeout pour Gevent

La classe Timeout, qui peut limiter le temps d'exécution du bloc de code gevent, semble être utile, je voudrais donc la réimplémenter en Python pur.

tutoriel gevent-Timeouts


import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

référence

[Maîtriser TCP / IP](http://www.amazon.co.jp/%E3%83%9E%E3%82%B9%E3%82%BF%E3%83%AA%E3%83%B3% E3% 82% B0 TCP-IP-% E5% 85% A5% E9% 96% 80% E7% B7% A8-% E7% AC% AC5% E7% 89% 88-% E7% AB% B9% E4% B8 % 8B / dp / 4274068765) tutoriel gevent gevent: Asynchronous I/O made easy 18.5. Asyncio - E / S asynchrones, boucles d'événements, collouts et tâches Tutorial: Writing a TCP server in Python

Recommended Posts

Implémentation du serveur de socket avec détection de déconnexion par gevent ou async / await
[Python] Requête asynchrone avec async / await
Async / await avec Kivy et tkinter
J'ai essayé de communiquer avec un serveur distant par communication Socket avec Python.