[Introduction to Python3 Day 22] Chapter 11 Concurrency and Networking (11.1 to 11.3)

11.1 Concurrency

When you're waiting for something on your computer, there are two reasons:

--I / O bound: Waiting for I / O processing --CPU bound: CPU wait

Term for concurrency.

--Synchronous: Process in sequence like a funeral procession. --Non-dynamic: The task is processed independently.

11.1.1 Queue

--Cue elements are added to one end and removed from the other end.

11.1.2 Process

dishes.py



import multiprocessing as mp

def washer(dishes, output):
    for dish in dishes:
        print("Washing",dish, "dish")
        output.put(dish)

#get()Removes the item from the queue and returns it.
#task_done is get()When called after, it tells the queue that the process is complete.
#join()Blocks until all items in the queue are retrieved.
def dryer(input):
    while True:
        dish=input.get()
        print("Drying",dish,"dish")
        input.task_done()

dish_queue=mp.JoinableQueue()
dryer_proc=mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon=True
dryer_proc.start()

dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join()

Execution result



python dishes.py
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish
Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert dish

11.1.3 Thread

--Threads run in a process and have access to everything in the process. --The difference between multiprocessing and threading is that threading does not have a terminate () function.

threads.py



import threading

def do_this(what):
    whoami(what)

#threading.current_thread()Creates an object corresponding to the thread of function call processing.
def whoami(what):
    print("Thread %s says: %s" % (threading.current_thread(),what))

#When you create a thread object, the thread starts()Call the method to start the activity.
#target is a callable object invoked by the run method.
#args is an argument tuple when calling target.
if __name__=="__main__":
    whoami("I am the main program")
    for n in range(4):
        p=threading.Thread(target=do_this, args=("I am function %s" % n,))
        p.start()

Execution result



python threads.py
Thread <_MainThread(MainThread, started 4530769344)> says: I am the main program
Thread <Thread(Thread-1, started 123145448275968)> says: I am function 0
Thread <Thread(Thread-2, started 123145453531136)> says: I am function 1
Thread <Thread(Thread-3, started 123145448275968)> says: I am function 2
Thread <Thread(Thread-4, started 123145453531136)> says: I am function 3

thread_dishes.py



import threading, dish_queue
import time

def washer(dishes, dish_queue):
    for dish in dishes:
        print("Washing",dish)
        time.sleep(5)
        dish_queue.put(dish) #Queue item.

def dryer(dish_queue):
    while True:
        dish=dish_queue.get() #Removes an item from the queue and returns it.
        print("Drying", dish)
        time.sleep(10)
        dish_queue.task_done() #get()After task_done()Is called to indicate that the processing for the retrieved task is completed.

dish_queue =queue.Queue() #FIFO queue constructor.
for n in range(2):
    dryer_thread=threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()

dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join() #Block until all items in the queue are retrieved and processed. Unblock when the task is complete.

11.1.4 green threads and gevent

--gevent rewrites many of Python's standard objects such as sockets so that they use the gevent mechanism without blocking.

gevent_test.py


import gevent
from gevent import monkey
monkey.patch_socket()

hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

Execution result


python gevent_test.py
66.6.44.4
104.27.172.75
104.18.63.71

gevent_monkey.py


import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

Execution result


python gevent_monkey.py
66.6.44.4
104.27.173.75
104.18.62.71

11.1.5 twisted

--Asynchronous event-driven networking framework. When a function is linked to an event such as data reception or disconnection, the linked function is called when those events occur. (Callback)

knock_sever.py



from twisted.internet import protocol, reactor

class Knock(protocol.Protocol):
    def dataReceived(self, data):
        print(Client, data)
        if data.startswith("Knock, knock"):
            response = "Who is there?"
        else:
            response = data + " who?"
        print(Server, response)
        self.transport.write(response)

class KnockFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Knock()

reactor.listenTCP(8000, KnockFactory())
reactor.run()

knock_client.py


from twisted.internet import reactor, protocol

class KnockClient(protocol.Protocol):
    def connectionMade(self):
        self.transport.write("Knock, knock")

    def dataReceived(self, data):
        if data.startswith("Who is there"):
            response = "Disappearing client"
            self.transport.write(response)
        else:
            self.transport.loseConnection()
            reactor.stop()

class KnockFactory(protocol.ClientFactory):
    protocol = KnockClient

def main():
    f = KnockFactory()
    reactor.connectTCP('localhost', 8000, f)
    reactor.run()

if __name__ == '__main__':
    main()

11.1.7 Redis

--The Redis list allows you to quickly create cues.

redis_washer.py



#"dishes"Is generated in the Redis server.
import redis
conn=redis.Redis()
print("Washer is starting")
dishes=["salad","bread","entree","dessert"]
for dish in dishes:
    msg=dish.encode("utf-8")
    conn.rpush("dishes",msg)#rpush adds a new element to the end.
    print("Washed",dish)
conn.rpush("dishes","quit")
print("Washer is done")

redis_dryer.py


#"dishes"Wait for the messages that are marked with, and display a message indicating that each has been dried.
import redis
conn=redis.Redis()
print("Dryer is starting")
while True:
    msg=conn.blpop("dishes")
    if not msg:
        break
    val=msg[1].decode("utf-8")
    if val=="quit":
        break
    print("Dried",val)
print("Dishes are dried")

Execution result


$ python redis_dryer.py &
[1] 43950

$ Dryer is starting

$ python redis_washer.py
Washer is starting
Washed salad
Dried salad
Washed bread
Dried bread
Washed entree
Dried entree
Washed dessert
Dried dessert
Washer is done
Dishes are dried
[1]+  Done                    python redis_dryer.py


redis_dryer2.py



#Create multiple dryer processes.
#Added a timeout feature to the dryer process instead of looking for a sentinel.
def dryer():
    import redis
    import os
    import time
    conn=redis.Redis()
    pid=os.getpid()
    timeout=20
    print("Dryer process % is starting" %pid)
    while True:
        msg=conn.blpop("dishes",timeout)#Extract the first element of the list(LPOP)Return with key.
        if not msg:
            break
        val=msg[1].decode("utf-8")#BLPOP is an array of two elements, the first element being the key and the second element being the popped value.
        #Therefore, msg[0]Not msg[1]Will be.
        if val=="quit":
            break
        print("&%s: dried %s" % (pid,val))
        time.sleep(0.1)
    print("Dryer process %s is done" %pid)

import multiprocessing
DRYERS=3
for num in range(DRYERS):
    p = multiprocessing.Process(target=dryer)
    p.start()

Execution result



python redis_dryer2.py &
[1] 44162

$ Dryer process  44179s starting
Dryer process  44178s starting
Dryer process  44180s starting
Dryer process 44180 is done
Dryer process 44178 is done
Dryer process 44179 is done

[1]+  Done                    python redis_dryer2.py


11.2 Network

11.2.1 Pattern

--The general pattern is request / response (client / server) --Publish / subscribe is where the publisher sends the data and the subscriber receives a copy. Subscribers can also specify that they want to receive only specific type data (topics). --If there are no subscribers to the topic, the data will be ignored.

11.2.2 Publish / subscribe model

--Broadcast, not queue.

11.2.2.1 Redis

redis_pub.py


import redis
import random

conn = redis.Redis()
cats = ['siamese', 'persian', 'maine coon', 'norweigian forest']
hats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']
for msg in range(10):
    cat = random.choice(cats)
    hat = random.choice(hats)
    print('Publish: %s wears a %s' % (cat, hat))
    conn.publish(cat, hat)

redis_sub.py


import redis
conn=redis.Redis()

topics=["maine coon", "persian"]
sub=conn.pubsub()
sub.subscribe(topics)
for msg in sub.listen():
    if msg["type"]=="message":
        cat=msg["channel"]
        hat=msg["data"]
        print("Subscribe: %s wears a %s" % (cat, hat))

Execution result



$ python redis_pub.py
Publish: maine coon wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a stovepipe
Publish: siamese wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: persian wears a bowler

$ python redis_sub.py
Subscribe: b'persian' wears a b'fedora'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'maine coon' wears a b'bowler'


11.2.2.2 ZeroMQ

--ZeroMQ does not have a central server, so individual publishers write to all subscribers.

zmq_pub.py


import zmq
import random
import time
host="*"
port=6789
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind('tcp://%s:%s' % (host, port))
cats=["siamese", "persian", "maine coon", "norwegian forest"]
hats=["stovepipe", "bowler", "tam-o-shanter", "fedora"]
time.sleep(1)
#UTF on topic and value strings-Note that we are using 8.
for msg in range(10):
    cat=random.choice(cats)
    cat_bytes=cat.encode("utf-8")
    hat=random.choice(hats)
    hat_bytes=hat.encode("utf-8")
    print("Publish: %s wears a %s" % (cat, hat))
    pub.send_multipart([cat_bytes, hat_bytes])

zmq_sub.py


import zmq
host="127.0.0.1"
port=6789
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect('tcp://%s:%s' % (host, port))
topics=["maine coon", "persian"]
for topic in topics:
    sub.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
while True:
    cat_bytes, hat_bytes=sub.recv_multipart()
    cat=cat_bytes.decode("utf-8")
    hat=hat_bytes.decode("utf-8")
    print("Subscribe: %s wears a %s" % (cat, hat))

Execution result



$ python zmq_pub.py
Publish: maine coon wears a fedora
Publish: maine coon wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: norwegian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: maine coon wears a bowler

$ python zmq_sub.py
Subscribe: maine coon wears a fedora
Subscribe: maine coon wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: maine coon wears a bowler

11.2.3 TCP/IP

――The Internet is based on rules that determine how to open connections, exchange data, disconnect connections, handle timeouts, and so on. These are called ** protocols ** and are divided into ** layers **.

-UDP: Used for exchanging short data. --TCP: Used for longer-lived connections than UDP.

11.2.4 Socket

udp_server.py


from datetime import datetime
import socket

server_address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
#The first method creates a socket and the second method binds to the socket.(Listen for any data that arrives at that IP address and port.)
#AF_INET means to make the Internet.
#SOCK_DGRAM uses UDP in the sense that it sends and receives datagrams.
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

#recvfrom is waiting for the datagram to arrive.
data, client=server.recvfrom(max_size)

print("At", datetime.now(), client , "said", data)
server.sendto(b"Are you talking to me?", client)
server.close()

udp_client.py


import socket
from datetime import datetime

server_address=("localhost",6789)
max_size=4096

print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b"Hey!", server_address)
data, server=client.recvfrom(max_size)
print("At", datetime.now(), server, "said", data)
client.close()

Execution result



$ python udp_server.py
Starting the server at 2020-02-01 09:51:33.707462
Waiting for a client to call.
At 2020-02-01 09:52:24.053328 ('127.0.0.1', 54667) said b'Hey!'

$ python udp_client.py
Starting the client at 2020-02-01 09:52:48.897087
At 2020-02-01 09:52:48.898221 ('127.0.0.1', 6789) said b'Are you talking to me?'

tcp_client.py


from datetime import datetime
import socket

address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
#SOCK to use TCP, which is a streaming protocol_Use STREAM.
client=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Connect to set up the stream()Add
client.connect(address)
#UDP server responds to client.sendto()Note that I was calling.
client.sendall(b"Hey!")
data=client.recv(max_size)
print("At", datetime.now(), "someone replied", data)
client.close()

tcp_server.py


from datetime import datetime
import socket

address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
server=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(5)

client, addr=server.accept()
data=client.recv(max_size)


print("At", datetime.now(), client , "said", data)
client.sendall(b"Are you talking to me?")
server.close()
server.close()

Execution result



$ python tcp_server.py
Starting the server at 2020-02-01 10:16:53.333266
Waiting for a client to call.
At 2020-02-01 10:16:57.520042 <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 6789), raddr=('127.0.0.1', 49223)> said b'Hey!'


$ python tcp_client.py
Starting the server at 2020-02-01 10:15:25.298030
At 2020-02-01 10:15:25.301961 someone replied b''


11.2.5 ZeroMQ

--ZeroMQ is a library, but it is sometimes called an enhanced socket. --Whole message exchange --Retry connection --Buffering to protect data when the timing does not match during transmission and reception.

zmq_server.py


import zmq

host="127.0.0.1"
port=6789
#Context()Creates a ZeroMQ object that manages the state
context=zmq.Context()
#The server issues a response REP.
server=context.socket(zmq.REP)
#I want the server to listen for a specific IP address and pod.
server.bind("tcp://%s:%s" %(host, port))
while True:
#wait for next request from recv()
    request_bytes=server.recv()
    request_str=request_bytes.decode("utf-8")
    print("That voice in my head says: %s" %request_str)
    reply_str="Stop saying:%s" %request_str
    reply_bytes=bytes(reply_str, "utf-8")
    server.send(reply_bytes)

zmq_client.py


import zmq

host="127.0.0.1"
port=6789

context=zmq.Context()
#The client issues a request REQ to the server.
client=context.socket(zmq.REQ)
#bind()Not connect()use.
client.connect("tcp://%s:%s" %(host, port))
for num in range(1,6):
#wait for next request from recv()
    request_str="message #%s" % num
    request_bytes=request_str.encode("utf-8")
    client.send(request_bytes)
    reply_bytes=client.recv()
    reply_str=reply_bytes.decode("utf-8")
    print("Sent %s, received %s" % (request_str, reply_str))

Execution result



$ python zmq_client.py
That voice in my head says: message #1
Sent message #1, received Stop saying:message #1
That voice in my head says: message #2
Sent message #2, received Stop saying:message #2
That voice in my head says: message #3
Sent message #3, received Stop saying:message #3
That voice in my head says: message #4
Sent message #4, received Stop saying:message #4
That voice in my head says: message #5
Sent message #5, received Stop saying:message #5


python zmq_server.py &
[2] 47417
$ Traceback (most recent call last):
  File "zmq_server.py", line 8, in <module>
    server.bind("tcp://%s:%s" %(host, port))
  File "zmq/backend/cython/socket.pyx", line 550, in zmq.backend.cython.socket.Socket.bind
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Address already in use

11.2.7 Internet Services

11.2.7.1 DNS


>>> import socket
>>> socket.gethostbyname("www.crappytaxidermy.com")
`66.6.44.4`
>>> socket.gethostbyname_ex("www.crappytaxidermy.com")
(`crappytaxidermy.com`, [`www.crappytaxidermy.com`], [`66.6.44.4`])

>>> socket.getaddrinfo("www.crappytaxidermy.com",80)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_DGRAM: 2>, 17, ``, (`66.6.44.4`, 80)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]
>>> socket.getaddrinfo("www.crappytaxidermy.com",80,socket.AF_INET,
... socket.SOCK_STREAM)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]

>>> import socket
>>> socket.getservbyname("http")
80
>>> socket.getservbyport(80)
`http`


11.2.9 Remote processing

11.2.9.1 RPC

xmlrpc_server.py



from xmlrpc.server import SimpleXMLRPCServer

def double(num):
    return num*2

server=SimpleXMLRPCServer(("localhost",6666))
server.register_function(double,"double")
server.serve_forever()

xmlrpc_client.py


import xmlrpc.client

proxy=xmlrpc.client.ServerProxy("http://localhost:6666/")
num=7
result=proxy.double(num)
print("Double %s is %s" % (num, result))

Execution result



$ python xmlrpc_client.py
Double 7 is 14

$ python xmlrpc_server.py
127.0.0.1 - - [01/Feb/2020 14:54:50] "POST / HTTP/1.1" 200 -

msgpack_server.py


from msgpackrpc import Server, Address

class Services():
    def double(self, num):
        return num*2

server =Server(Services())
server.listen(Address("localhost",5555))
server.start()


msgpack_client.py


from msgpackrpc import Client,Address

client=Client(Address("localhost",5555))
num=8
result=client.call("double",num)
print("Double %s is %s" % (num, result))


Execution result


$ python msppack_client.py
Double 8 is 16

11.3 Review assignment

11-1 Let's implement the present tense service using a plain socket. When the client sends the string time to the server, it shall return the current date and time in ISO characters.

udp_time_server.py


import socket
from datetime import datetime

server_address=("localhost",6111)
max_size=4096

print("Starting the client at", datetime.now())
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

while True:
    data, client_addr=server.recvfrom(max_size)
    if data == b"time":
        now=str(datetime.utcnow())
        data=now.encode("utf-8")
        server.sendto(data, client_addr)
        print("Server sent", data)
server.close()


udp_time_client.py


import socket
from datetime import datetime
from time import sleep

address=("localhost",6111)
max_size=4096

print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
    sleep(5)
    client.sendto(b"time", address)
    data, server_addr=client.recvfrom(max_size)
    print("Client read", data)
client.close()

Execution result


$ python udp_time_server.py
Starting the client at 2020-02-01 17:11:51.527771
Server sent b'2020-02-01 08:11:59.365796'
Server sent b'2020-02-01 08:12:04.370495'
Server sent b'2020-02-01 08:12:09.371627'

$ python udp_time_client.py
Starting the client at 2020-02-01 17:10:03.510044
Client read b'2020-02-01 08:10:08.514726'
Client read b'2020-02-01 08:10:13.521450'
Client read b'2020-02-01 08:10:18.527667'
Client read b'2020-02-01 08:10:23.529492'
Client read b'2020-02-01 08:10:28.531994'
Client read b'2020-02-01 08:10:33.535134'
Client read b'2020-02-01 08:10:38.541067'


11-2 Let's do the same with ZeroMQ's REQ and REP sockets.

zmq_time_server.py



import zmq
from datetime import datetime

host="127.0.0.1"
port=1111

context=zmq.Context()
server=context.socket(zmq.REP)
server.bind("tcp://%s:%s" % (host, port))
print("Server started at", datetime.utcnow())

while True:
    message=server.recv()
    if message == b"time":
        now=datetime.utcnow()
        reply=str(now)
        server.send(bytes(reply,"utf-8"))
        print("Server sent", reply)

zmq_time_client.py


import zmq
from datetime import datetime
from time import sleep

host="127.0.0.1"
port=1111

context=zmq.Context()
server=context.socket(zmq.REQ)
server.bind("tcp://%s:%s" % (host, port))
print("Client started at", datetime.utcnow())

while True:
    sleep(5)
    requst=b"time"
    client.send(request)
    reply=client.recv()
    print("Client sent", reply)

Execution result



$ python zmq_time_server.py
Server started at 2020-02-01 08:27:16.448842

11-3 Let's do the same with XMLRPC.

xmlrpc_time_server.py


from xmlrpc.server import SimpleXMLRPCServer
from datetime import datetime

def current_time():
    now = str(datetime.now())
    print('Server sent %s', now)
    return now

server = SimpleXMLRPCServer(("localhost", 6789))
server.register_function(current_time, "current_time")
server.serve_forever()


cmlrpc_time_client.py


import xmlrpc.client
from datetime import datetime
from time import sleep

proxy = xmlrpc.client.ServerProxy("http://localhost:6789/")
while True:
    sleep(3)
    result = proxy.current_time()
    print("Current time is %s" % result)

Execution result



$ python xmlrpc_time_server.py
Server sent %s 2020-02-01 17:44:06.341654
127.0.0.1 - - [01/Feb/2020 17:44:06] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:09.346517
127.0.0.1 - - [01/Feb/2020 17:44:09] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:12.352605
127.0.0.1 - - [01/Feb/2020 17:44:12] "POST / HTTP/1.1" 200 -

$ python cmlrpc_time_client.py
Current time is 2020-02-01 17:44:06.341654
Current time is 2020-02-01 17:44:09.346517
Current time is 2020-02-01 17:44:12.352605

11-4

redis_choc_supply.py


import redis
import random
from time import sleep

conn=redis.Redis()
varieties=["T","C","C","N"]
conveyor="Chocolates"

while True:
    seconds=random.random()
    sleep(seconds)
    piece=random.choice(varieties)
    conn.rpush(conveyor, piece)

redis_lucy.py


import redis
from datetime import datetime
from time import sleep

conn=redis.Redis()
timeout=10
conveyor="Chocolates"
while True:
    sleep(0.5)
    msg=conn.blpop(conveyor, timeout)
    remaining=conn.llen(conveyor)
    if msg:
        piece=msg[1]
        print("Lucy got a", piece, "at", datetime.utcnow(),
        ", only", remaining, "left")

Execution result


$ python redis_lucy.py
Lucy got a b'T' at 2020-02-01 09:05:54.780153 , only 116 left
Lucy got a b'N' at 2020-02-01 09:05:55.282109 , only 117 left
Lucy got a b'T' at 2020-02-01 09:05:55.783487 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:56.284971 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:56.787798 , only 118 left
Lucy got a b'T' at 2020-02-01 09:05:57.289434 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:57.794357 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:58.295897 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:58.800536 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.303087 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.805465 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.308003 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.810408 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.312918 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.818497 , only 119 left
Lucy got a b'N' at 2020-02-01 09:06:02.324028 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:02.826697 , only 119 left
Lucy got a b'T' at 2020-02-01 09:06:03.329229 , only 120 left
Lucy got a b'T' at 2020-02-01 09:06:03.835205 , only 120 left


11-5 Use ZeroMQ to publish the words in verse 7.3 one by one. Also, write a ZeroMQ subscriber that displays words that start with a vowel, and another subscriber that displays five-letter words.

poem_pub.py


import string
import zmq
from time import sleep

host="127.0.0.1"
port=9999
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind("tcp://%s:%s" % (host, port))
sleep(1)

with open("mammonth.txt","rt") as poem:
    words=poem.read()
for word in words.split():
    word=word.strip(string.punctuation)
    data=word.encode("utf-8")
    if word.startswith(("a", "e", "i", "u","o","A","E","I","U","O")):
        print("vowels",data)
        pub.send_multipart([b"vowels", data])
    if len(word) ==5:
        print("five",data)
        pub.send_multipart([b"five", data])

poem_sub.py


import string
import zmq

host="127.0.0.1"
port=9999
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect("tcp://%s:%s" % (host,port))
sub.setsockopt(zmq.SUBSCRIBE, b"vowels")
sub.setsockopt(zmq.SUBSCRIBE, b"five")

while True:
    topic, word=sub.recv_multipart()
    print(topic,word)

Execution result


#First few lines
$ python poem_pub.py
five b'queen'
vowels b'of'
five b'Lying'
vowels b'at'
vowels b'ease'
vowels b'evening'
five b'flies'


$ python poem_sub.py
b'five' b'queen'
b'vowels' b'of'
b'five' b'Lying'
b'vowels' b'at'
b'vowels' b'ease'
b'vowels' b'evening'
b'five' b'flies'
b'five' b'seize'

Impressions

It was a rather rough review. It was volumey, but recently I wonder if I will use the cloud such as AWS or Open Stack without setting up a server myself. Since the understanding of parallel processing and multiprocessing is ambiguous, I wonder if I have to remember it while using it.

References

"Introduction to Python3 by Bill Lubanovic (published by O'Reilly Japan)"

"Queue --- synchronous queue class" https://docs.python.org/ja/3/library/queue.html#queue.Queue.task_done

"Threading --- thread-based parallel processing" https://docs.python.org/ja/3/library/threading.html

"Multiprocessing --- Process-based parallel processing¶" https://docs.python.org/ja/3/library/multiprocessing.html#pipes-and-queues

「Redis」 http://redis.shibu.jp/commandreference/lists.html

Recommended Posts

[Introduction to Python3 Day 22] Chapter 11 Concurrency and Networking (11.1 to 11.3)
[Introduction to Python3 Day 12] Chapter 6 Objects and Classes (6.3-6.15)
[Introduction to Python3 Day 11] Chapter 6 Objects and Classes (6.1-6.2)
[Introduction to Python3 Day 1] Programming and Python
[Introduction to Python3 Day 13] Chapter 7 Strings (7.1-7.1.1.1)
[Introduction to Python3 Day 14] Chapter 7 Strings (7.1.1.1 to 7.1.1.4)
[Introduction to Python3 Day 15] Chapter 7 Strings (7.1.2-7.1.2.2)
[Introduction to Python3 Day 21] Chapter 10 System (10.1 to 10.5)
[Introduction to Python3, Day 17] Chapter 8 Data Destinations (8.1-8.2.5)
[Introduction to Python3, Day 17] Chapter 8 Data Destinations (8.3-8.3.6.1)
[Introduction to Python3 Day 19] Chapter 8 Data Destinations (8.4-8.5)
[Introduction to Python3 Day 18] Chapter 8 Data Destinations (8.3.6.2 to 8.3.6.3)
[Introduction to Python3 Day 23] Chapter 12 Become a Paisonista (12.1 to 12.6)
[Introduction to Python3 Day 20] Chapter 9 Unraveling the Web (9.1-9.4)
[Introduction to Python3 Day 8] Chapter 4 Py Skin: Code Structure (4.1-4.13)
[Introduction to Python3 Day 3] Chapter 2 Py components: Numbers, strings, variables (2.2-2.3.6)
[Introduction to Python3 Day 2] Chapter 2 Py Components: Numbers, Strings, Variables (2.1)
[Introduction to Python3 Day 4] Chapter 2 Py Components: Numbers, Strings, Variables (2.3.7-2.4)
Introduction to Effectiveness Verification Chapter 1 in Python
[Introduction to Python3 Day 7] Chapter 3 Py Tools: Lists, Tuples, Dictionaries, Sets (3.3-3.8)
[Introduction to Python3 Day 5] Chapter 3 Py Tools: Lists, Tuples, Dictionaries, Sets (3.1-3.2.6)
[Introduction to Python3 Day 10] Chapter 5 Py's Cosmetic Box: Modules, Packages, Programs (5.4-5.7)
[Introduction to Python3 Day 6] Chapter 3 Py tool lists, tuples, dictionaries, sets (3.2.7-3.2.19)
Introduction to effectiveness verification Chapter 3 written in Python
Introduction to Python language
Introduction to OpenCV (python)-(2)
[Introduction to Udemy Python3 + Application] 64. Namespace and Scope
Introduction to Effectiveness Verification Chapter 2 Written in Python
[Introduction to Udemy Python3 + Application] 35. Comparison operators and logical operators
[Chapter 5] Introduction to Python with 100 knocks of language processing
[Chapter 3] Introduction to Python with 100 knocks of language processing
[Chapter 2] Introduction to Python with 100 knocks of language processing
[Introduction to Udemy Python3 + Application] 68. Import statement and AS
[Technical book] Introduction to data analysis using Python -1 Chapter Introduction-
[Chapter 4] Introduction to Python with 100 knocks of language processing
Introduction to Python Django (2) Win
Introduction to serial communication [Python]
[Introduction to Python] <list> [edit: 2020/02/22]
Introduction to Python (Python version APG4b)
An introduction to Python Programming
Introduction to Python For, While
[Introduction to cx_Oracle] (Part 6) DB and Python data type mapping
[Introduction to Udemy Python3 + Application] 42. for statement, break statement, and continue statement
[Introduction to Udemy Python3 + Application] 39. while statement, continue statement and break statement
[Introduction to Udemy Python 3 + Application] 36. How to use In and Not
[Introduction to Data Scientists] Basics of Python ♬ Functions and classes
[Introduction to Udemy Python3 + Application] 50. Positional arguments, keyword arguments, and default arguments
Introduction to Effectiveness Verification Chapters 4 and 5 are written in Python
[Introduction to Python] Combine Nikkei 225 and NY Dow csv data
[Introduction to Udemy Python 3 + Application] 58. Lambda
Python 3.6 on Windows ... and to Xamarin.
Introduction to Python Numerical Library NumPy
Practice! !! Introduction to Python (Type Hints)
[Introduction to Python] <numpy ndarray> [edit: 2020/02/22]
[Introduction to Udemy Python 3 + Application] 57. Decorator
Introduction to Python Hands On Part 1
[Introduction to Python] How to parse JSON
[Introduction to Udemy Python 3 + Application] 56. Closure
Introduction to Protobuf-c (C language ⇔ Python)
[Introduction to Udemy Python3 + Application] 59. Generator
Hadoop introduction and MapReduce with Python