When you're waiting for something on your computer, there are two reasons:
--I / O bound: Waiting for I / O processing --CPU bound: CPU wait
Term for concurrency.
--Synchronous: Process in sequence like a funeral procession. --Non-dynamic: The task is processed independently.
--Cue elements are added to one end and removed from the other end.
dishes.py
import multiprocessing as mp
def washer(dishes, output):
for dish in dishes:
print("Washing",dish, "dish")
output.put(dish)
#get()Removes the item from the queue and returns it.
#task_done is get()When called after, it tells the queue that the process is complete.
#join()Blocks until all items in the queue are retrieved.
def dryer(input):
while True:
dish=input.get()
print("Drying",dish,"dish")
input.task_done()
dish_queue=mp.JoinableQueue()
dryer_proc=mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon=True
dryer_proc.start()
dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join()
Execution result
python dishes.py
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish
Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert dish
--Threads run in a process and have access to everything in the process. --The difference between multiprocessing and threading is that threading does not have a terminate () function.
threads.py
import threading
def do_this(what):
whoami(what)
#threading.current_thread()Creates an object corresponding to the thread of function call processing.
def whoami(what):
print("Thread %s says: %s" % (threading.current_thread(),what))
#When you create a thread object, the thread starts()Call the method to start the activity.
#target is a callable object invoked by the run method.
#args is an argument tuple when calling target.
if __name__=="__main__":
whoami("I am the main program")
for n in range(4):
p=threading.Thread(target=do_this, args=("I am function %s" % n,))
p.start()
Execution result
python threads.py
Thread <_MainThread(MainThread, started 4530769344)> says: I am the main program
Thread <Thread(Thread-1, started 123145448275968)> says: I am function 0
Thread <Thread(Thread-2, started 123145453531136)> says: I am function 1
Thread <Thread(Thread-3, started 123145448275968)> says: I am function 2
Thread <Thread(Thread-4, started 123145453531136)> says: I am function 3
thread_dishes.py
import threading, dish_queue
import time
def washer(dishes, dish_queue):
for dish in dishes:
print("Washing",dish)
time.sleep(5)
dish_queue.put(dish) #Queue item.
def dryer(dish_queue):
while True:
dish=dish_queue.get() #Removes an item from the queue and returns it.
print("Drying", dish)
time.sleep(10)
dish_queue.task_done() #get()After task_done()Is called to indicate that the processing for the retrieved task is completed.
dish_queue =queue.Queue() #FIFO queue constructor.
for n in range(2):
dryer_thread=threading.Thread(target=dryer, args=(dish_queue,))
dryer_thread.start()
dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join() #Block until all items in the queue are retrieved and processed. Unblock when the task is complete.
--gevent rewrites many of Python's standard objects such as sockets so that they use the gevent mechanism without blocking.
gevent_test.py
import gevent
from gevent import monkey
monkey.patch_socket()
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)
Execution result
python gevent_test.py
66.6.44.4
104.27.172.75
104.18.63.71
gevent_monkey.py
import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)
Execution result
python gevent_monkey.py
66.6.44.4
104.27.173.75
104.18.62.71
11.1.5 twisted
--Asynchronous event-driven networking framework. When a function is linked to an event such as data reception or disconnection, the linked function is called when those events occur. (Callback)
knock_sever.py
from twisted.internet import protocol, reactor
class Knock(protocol.Protocol):
def dataReceived(self, data):
print(Client, data)
if data.startswith("Knock, knock"):
response = "Who is there?"
else:
response = data + " who?"
print(Server, response)
self.transport.write(response)
class KnockFactory(protocol.Factory):
def buildProtocol(self, addr):
return Knock()
reactor.listenTCP(8000, KnockFactory())
reactor.run()
knock_client.py
from twisted.internet import reactor, protocol
class KnockClient(protocol.Protocol):
def connectionMade(self):
self.transport.write("Knock, knock")
def dataReceived(self, data):
if data.startswith("Who is there"):
response = "Disappearing client"
self.transport.write(response)
else:
self.transport.loseConnection()
reactor.stop()
class KnockFactory(protocol.ClientFactory):
protocol = KnockClient
def main():
f = KnockFactory()
reactor.connectTCP('localhost', 8000, f)
reactor.run()
if __name__ == '__main__':
main()
11.1.7 Redis
--The Redis list allows you to quickly create cues.
redis_washer.py
#"dishes"Is generated in the Redis server.
import redis
conn=redis.Redis()
print("Washer is starting")
dishes=["salad","bread","entree","dessert"]
for dish in dishes:
msg=dish.encode("utf-8")
conn.rpush("dishes",msg)#rpush adds a new element to the end.
print("Washed",dish)
conn.rpush("dishes","quit")
print("Washer is done")
redis_dryer.py
#"dishes"Wait for the messages that are marked with, and display a message indicating that each has been dried.
import redis
conn=redis.Redis()
print("Dryer is starting")
while True:
msg=conn.blpop("dishes")
if not msg:
break
val=msg[1].decode("utf-8")
if val=="quit":
break
print("Dried",val)
print("Dishes are dried")
Execution result
$ python redis_dryer.py &
[1] 43950
$ Dryer is starting
$ python redis_washer.py
Washer is starting
Washed salad
Dried salad
Washed bread
Dried bread
Washed entree
Dried entree
Washed dessert
Dried dessert
Washer is done
Dishes are dried
[1]+ Done python redis_dryer.py
redis_dryer2.py
#Create multiple dryer processes.
#Added a timeout feature to the dryer process instead of looking for a sentinel.
def dryer():
import redis
import os
import time
conn=redis.Redis()
pid=os.getpid()
timeout=20
print("Dryer process % is starting" %pid)
while True:
msg=conn.blpop("dishes",timeout)#Extract the first element of the list(LPOP)Return with key.
if not msg:
break
val=msg[1].decode("utf-8")#BLPOP is an array of two elements, the first element being the key and the second element being the popped value.
#Therefore, msg[0]Not msg[1]Will be.
if val=="quit":
break
print("&%s: dried %s" % (pid,val))
time.sleep(0.1)
print("Dryer process %s is done" %pid)
import multiprocessing
DRYERS=3
for num in range(DRYERS):
p = multiprocessing.Process(target=dryer)
p.start()
Execution result
python redis_dryer2.py &
[1] 44162
$ Dryer process 44179s starting
Dryer process 44178s starting
Dryer process 44180s starting
Dryer process 44180 is done
Dryer process 44178 is done
Dryer process 44179 is done
[1]+ Done python redis_dryer2.py
--The general pattern is request / response (client / server) --Publish / subscribe is where the publisher sends the data and the subscriber receives a copy. Subscribers can also specify that they want to receive only specific type data (topics). --If there are no subscribers to the topic, the data will be ignored.
--Broadcast, not queue.
11.2.2.1 Redis
redis_pub.py
import redis
import random
conn = redis.Redis()
cats = ['siamese', 'persian', 'maine coon', 'norweigian forest']
hats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']
for msg in range(10):
cat = random.choice(cats)
hat = random.choice(hats)
print('Publish: %s wears a %s' % (cat, hat))
conn.publish(cat, hat)
redis_sub.py
import redis
conn=redis.Redis()
topics=["maine coon", "persian"]
sub=conn.pubsub()
sub.subscribe(topics)
for msg in sub.listen():
if msg["type"]=="message":
cat=msg["channel"]
hat=msg["data"]
print("Subscribe: %s wears a %s" % (cat, hat))
Execution result
$ python redis_pub.py
Publish: maine coon wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a stovepipe
Publish: siamese wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: persian wears a bowler
$ python redis_sub.py
Subscribe: b'persian' wears a b'fedora'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'maine coon' wears a b'bowler'
11.2.2.2 ZeroMQ
--ZeroMQ does not have a central server, so individual publishers write to all subscribers.
zmq_pub.py
import zmq
import random
import time
host="*"
port=6789
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind('tcp://%s:%s' % (host, port))
cats=["siamese", "persian", "maine coon", "norwegian forest"]
hats=["stovepipe", "bowler", "tam-o-shanter", "fedora"]
time.sleep(1)
#UTF on topic and value strings-Note that we are using 8.
for msg in range(10):
cat=random.choice(cats)
cat_bytes=cat.encode("utf-8")
hat=random.choice(hats)
hat_bytes=hat.encode("utf-8")
print("Publish: %s wears a %s" % (cat, hat))
pub.send_multipart([cat_bytes, hat_bytes])
zmq_sub.py
import zmq
host="127.0.0.1"
port=6789
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect('tcp://%s:%s' % (host, port))
topics=["maine coon", "persian"]
for topic in topics:
sub.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
while True:
cat_bytes, hat_bytes=sub.recv_multipart()
cat=cat_bytes.decode("utf-8")
hat=hat_bytes.decode("utf-8")
print("Subscribe: %s wears a %s" % (cat, hat))
Execution result
$ python zmq_pub.py
Publish: maine coon wears a fedora
Publish: maine coon wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: norwegian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: maine coon wears a bowler
$ python zmq_sub.py
Subscribe: maine coon wears a fedora
Subscribe: maine coon wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: maine coon wears a bowler
11.2.3 TCP/IP
――The Internet is based on rules that determine how to open connections, exchange data, disconnect connections, handle timeouts, and so on. These are called ** protocols ** and are divided into ** layers **.
-UDP: Used for exchanging short data. --TCP: Used for longer-lived connections than UDP.
udp_server.py
from datetime import datetime
import socket
server_address=("localhost",6789)
max_size=4096
print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
#The first method creates a socket and the second method binds to the socket.(Listen for any data that arrives at that IP address and port.)
#AF_INET means to make the Internet.
#SOCK_DGRAM uses UDP in the sense that it sends and receives datagrams.
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)
#recvfrom is waiting for the datagram to arrive.
data, client=server.recvfrom(max_size)
print("At", datetime.now(), client , "said", data)
server.sendto(b"Are you talking to me?", client)
server.close()
udp_client.py
import socket
from datetime import datetime
server_address=("localhost",6789)
max_size=4096
print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b"Hey!", server_address)
data, server=client.recvfrom(max_size)
print("At", datetime.now(), server, "said", data)
client.close()
Execution result
$ python udp_server.py
Starting the server at 2020-02-01 09:51:33.707462
Waiting for a client to call.
At 2020-02-01 09:52:24.053328 ('127.0.0.1', 54667) said b'Hey!'
$ python udp_client.py
Starting the client at 2020-02-01 09:52:48.897087
At 2020-02-01 09:52:48.898221 ('127.0.0.1', 6789) said b'Are you talking to me?'
tcp_client.py
from datetime import datetime
import socket
address=("localhost",6789)
max_size=4096
print("Starting the server at", datetime.now())
#SOCK to use TCP, which is a streaming protocol_Use STREAM.
client=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Connect to set up the stream()Add
client.connect(address)
#UDP server responds to client.sendto()Note that I was calling.
client.sendall(b"Hey!")
data=client.recv(max_size)
print("At", datetime.now(), "someone replied", data)
client.close()
tcp_server.py
from datetime import datetime
import socket
address=("localhost",6789)
max_size=4096
print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
server=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(5)
client, addr=server.accept()
data=client.recv(max_size)
print("At", datetime.now(), client , "said", data)
client.sendall(b"Are you talking to me?")
server.close()
server.close()
Execution result
$ python tcp_server.py
Starting the server at 2020-02-01 10:16:53.333266
Waiting for a client to call.
At 2020-02-01 10:16:57.520042 <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 6789), raddr=('127.0.0.1', 49223)> said b'Hey!'
$ python tcp_client.py
Starting the server at 2020-02-01 10:15:25.298030
At 2020-02-01 10:15:25.301961 someone replied b''
11.2.5 ZeroMQ
--ZeroMQ is a library, but it is sometimes called an enhanced socket. --Whole message exchange --Retry connection --Buffering to protect data when the timing does not match during transmission and reception.
zmq_server.py
import zmq
host="127.0.0.1"
port=6789
#Context()Creates a ZeroMQ object that manages the state
context=zmq.Context()
#The server issues a response REP.
server=context.socket(zmq.REP)
#I want the server to listen for a specific IP address and pod.
server.bind("tcp://%s:%s" %(host, port))
while True:
#wait for next request from recv()
request_bytes=server.recv()
request_str=request_bytes.decode("utf-8")
print("That voice in my head says: %s" %request_str)
reply_str="Stop saying:%s" %request_str
reply_bytes=bytes(reply_str, "utf-8")
server.send(reply_bytes)
zmq_client.py
import zmq
host="127.0.0.1"
port=6789
context=zmq.Context()
#The client issues a request REQ to the server.
client=context.socket(zmq.REQ)
#bind()Not connect()use.
client.connect("tcp://%s:%s" %(host, port))
for num in range(1,6):
#wait for next request from recv()
request_str="message #%s" % num
request_bytes=request_str.encode("utf-8")
client.send(request_bytes)
reply_bytes=client.recv()
reply_str=reply_bytes.decode("utf-8")
print("Sent %s, received %s" % (request_str, reply_str))
Execution result
$ python zmq_client.py
That voice in my head says: message #1
Sent message #1, received Stop saying:message #1
That voice in my head says: message #2
Sent message #2, received Stop saying:message #2
That voice in my head says: message #3
Sent message #3, received Stop saying:message #3
That voice in my head says: message #4
Sent message #4, received Stop saying:message #4
That voice in my head says: message #5
Sent message #5, received Stop saying:message #5
python zmq_server.py &
[2] 47417
$ Traceback (most recent call last):
File "zmq_server.py", line 8, in <module>
server.bind("tcp://%s:%s" %(host, port))
File "zmq/backend/cython/socket.pyx", line 550, in zmq.backend.cython.socket.Socket.bind
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Address already in use
11.2.7.1 DNS
>>> import socket
>>> socket.gethostbyname("www.crappytaxidermy.com")
`66.6.44.4`
>>> socket.gethostbyname_ex("www.crappytaxidermy.com")
(`crappytaxidermy.com`, [`www.crappytaxidermy.com`], [`66.6.44.4`])
>>> socket.getaddrinfo("www.crappytaxidermy.com",80)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_DGRAM: 2>, 17, ``, (`66.6.44.4`, 80)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]
>>> socket.getaddrinfo("www.crappytaxidermy.com",80,socket.AF_INET,
... socket.SOCK_STREAM)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]
>>> import socket
>>> socket.getservbyname("http")
80
>>> socket.getservbyport(80)
`http`
11.2.9.1 RPC
xmlrpc_server.py
from xmlrpc.server import SimpleXMLRPCServer
def double(num):
return num*2
server=SimpleXMLRPCServer(("localhost",6666))
server.register_function(double,"double")
server.serve_forever()
xmlrpc_client.py
import xmlrpc.client
proxy=xmlrpc.client.ServerProxy("http://localhost:6666/")
num=7
result=proxy.double(num)
print("Double %s is %s" % (num, result))
Execution result
$ python xmlrpc_client.py
Double 7 is 14
$ python xmlrpc_server.py
127.0.0.1 - - [01/Feb/2020 14:54:50] "POST / HTTP/1.1" 200 -
msgpack_server.py
from msgpackrpc import Server, Address
class Services():
def double(self, num):
return num*2
server =Server(Services())
server.listen(Address("localhost",5555))
server.start()
msgpack_client.py
from msgpackrpc import Client,Address
client=Client(Address("localhost",5555))
num=8
result=client.call("double",num)
print("Double %s is %s" % (num, result))
Execution result
$ python msppack_client.py
Double 8 is 16
udp_time_server.py
import socket
from datetime import datetime
server_address=("localhost",6111)
max_size=4096
print("Starting the client at", datetime.now())
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)
while True:
data, client_addr=server.recvfrom(max_size)
if data == b"time":
now=str(datetime.utcnow())
data=now.encode("utf-8")
server.sendto(data, client_addr)
print("Server sent", data)
server.close()
udp_time_client.py
import socket
from datetime import datetime
from time import sleep
address=("localhost",6111)
max_size=4096
print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
sleep(5)
client.sendto(b"time", address)
data, server_addr=client.recvfrom(max_size)
print("Client read", data)
client.close()
Execution result
$ python udp_time_server.py
Starting the client at 2020-02-01 17:11:51.527771
Server sent b'2020-02-01 08:11:59.365796'
Server sent b'2020-02-01 08:12:04.370495'
Server sent b'2020-02-01 08:12:09.371627'
$ python udp_time_client.py
Starting the client at 2020-02-01 17:10:03.510044
Client read b'2020-02-01 08:10:08.514726'
Client read b'2020-02-01 08:10:13.521450'
Client read b'2020-02-01 08:10:18.527667'
Client read b'2020-02-01 08:10:23.529492'
Client read b'2020-02-01 08:10:28.531994'
Client read b'2020-02-01 08:10:33.535134'
Client read b'2020-02-01 08:10:38.541067'
zmq_time_server.py
import zmq
from datetime import datetime
host="127.0.0.1"
port=1111
context=zmq.Context()
server=context.socket(zmq.REP)
server.bind("tcp://%s:%s" % (host, port))
print("Server started at", datetime.utcnow())
while True:
message=server.recv()
if message == b"time":
now=datetime.utcnow()
reply=str(now)
server.send(bytes(reply,"utf-8"))
print("Server sent", reply)
zmq_time_client.py
import zmq
from datetime import datetime
from time import sleep
host="127.0.0.1"
port=1111
context=zmq.Context()
server=context.socket(zmq.REQ)
server.bind("tcp://%s:%s" % (host, port))
print("Client started at", datetime.utcnow())
while True:
sleep(5)
requst=b"time"
client.send(request)
reply=client.recv()
print("Client sent", reply)
Execution result
$ python zmq_time_server.py
Server started at 2020-02-01 08:27:16.448842
xmlrpc_time_server.py
from xmlrpc.server import SimpleXMLRPCServer
from datetime import datetime
def current_time():
now = str(datetime.now())
print('Server sent %s', now)
return now
server = SimpleXMLRPCServer(("localhost", 6789))
server.register_function(current_time, "current_time")
server.serve_forever()
cmlrpc_time_client.py
import xmlrpc.client
from datetime import datetime
from time import sleep
proxy = xmlrpc.client.ServerProxy("http://localhost:6789/")
while True:
sleep(3)
result = proxy.current_time()
print("Current time is %s" % result)
Execution result
$ python xmlrpc_time_server.py
Server sent %s 2020-02-01 17:44:06.341654
127.0.0.1 - - [01/Feb/2020 17:44:06] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:09.346517
127.0.0.1 - - [01/Feb/2020 17:44:09] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:12.352605
127.0.0.1 - - [01/Feb/2020 17:44:12] "POST / HTTP/1.1" 200 -
$ python cmlrpc_time_client.py
Current time is 2020-02-01 17:44:06.341654
Current time is 2020-02-01 17:44:09.346517
Current time is 2020-02-01 17:44:12.352605
11-4
redis_choc_supply.py
import redis
import random
from time import sleep
conn=redis.Redis()
varieties=["T","C","C","N"]
conveyor="Chocolates"
while True:
seconds=random.random()
sleep(seconds)
piece=random.choice(varieties)
conn.rpush(conveyor, piece)
redis_lucy.py
import redis
from datetime import datetime
from time import sleep
conn=redis.Redis()
timeout=10
conveyor="Chocolates"
while True:
sleep(0.5)
msg=conn.blpop(conveyor, timeout)
remaining=conn.llen(conveyor)
if msg:
piece=msg[1]
print("Lucy got a", piece, "at", datetime.utcnow(),
", only", remaining, "left")
Execution result
$ python redis_lucy.py
Lucy got a b'T' at 2020-02-01 09:05:54.780153 , only 116 left
Lucy got a b'N' at 2020-02-01 09:05:55.282109 , only 117 left
Lucy got a b'T' at 2020-02-01 09:05:55.783487 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:56.284971 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:56.787798 , only 118 left
Lucy got a b'T' at 2020-02-01 09:05:57.289434 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:57.794357 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:58.295897 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:58.800536 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.303087 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.805465 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.308003 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.810408 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.312918 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.818497 , only 119 left
Lucy got a b'N' at 2020-02-01 09:06:02.324028 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:02.826697 , only 119 left
Lucy got a b'T' at 2020-02-01 09:06:03.329229 , only 120 left
Lucy got a b'T' at 2020-02-01 09:06:03.835205 , only 120 left
poem_pub.py
import string
import zmq
from time import sleep
host="127.0.0.1"
port=9999
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind("tcp://%s:%s" % (host, port))
sleep(1)
with open("mammonth.txt","rt") as poem:
words=poem.read()
for word in words.split():
word=word.strip(string.punctuation)
data=word.encode("utf-8")
if word.startswith(("a", "e", "i", "u","o","A","E","I","U","O")):
print("vowels",data)
pub.send_multipart([b"vowels", data])
if len(word) ==5:
print("five",data)
pub.send_multipart([b"five", data])
poem_sub.py
import string
import zmq
host="127.0.0.1"
port=9999
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect("tcp://%s:%s" % (host,port))
sub.setsockopt(zmq.SUBSCRIBE, b"vowels")
sub.setsockopt(zmq.SUBSCRIBE, b"five")
while True:
topic, word=sub.recv_multipart()
print(topic,word)
Execution result
#First few lines
$ python poem_pub.py
five b'queen'
vowels b'of'
five b'Lying'
vowels b'at'
vowels b'ease'
vowels b'evening'
five b'flies'
$ python poem_sub.py
b'five' b'queen'
b'vowels' b'of'
b'five' b'Lying'
b'vowels' b'at'
b'vowels' b'ease'
b'vowels' b'evening'
b'five' b'flies'
b'five' b'seize'
It was a rather rough review. It was volumey, but recently I wonder if I will use the cloud such as AWS or Open Stack without setting up a server myself. Since the understanding of parallel processing and multiprocessing is ambiguous, I wonder if I have to remember it while using it.
"Introduction to Python3 by Bill Lubanovic (published by O'Reilly Japan)"
"Queue --- synchronous queue class" https://docs.python.org/ja/3/library/queue.html#queue.Queue.task_done
"Threading --- thread-based parallel processing" https://docs.python.org/ja/3/library/threading.html
"Multiprocessing --- Process-based parallel processing¶" https://docs.python.org/ja/3/library/multiprocessing.html#pipes-and-queues
「Redis」 http://redis.shibu.jp/commandreference/lists.html
Recommended Posts