TL;DR When I tried using RxPY because I wanted to write Rx in Python, it was different from what I thought it would be comfortable to use. Apparently, in the new specification, observable chaining is written using a mechanism called pipe. https://rxpy.readthedocs.io/en/latest/migration.html
Conventional ↓
observable_object.map(lambda x:x*2) \
.filter(lambda x:x>3) \
.subscribe(print) \
Currently ↓
from rx import operators as ops
observable_object.pipe(
ops.map(lambda x:x*2),
ops.filter(lambda x:x>3)
).subscribe(print)
I just wanted to convey this.
However, there aren't many Japanese articles about RxPY, so I'll summarize RxPY in order to spread Rx to beginner Pythonists.
It is an API for smartly writing asynchronous processing by handling data in an Observable stream that can perform Linq-like processing. http://reactivex.io/ It can be used in most major languages. http://reactivex.io/languages.html For Python, RxPY supports it. https://rxpy.readthedocs.io/en/latest/index.html
There are already a lot of explanations about the concept, so I will leave it to the official page and other Qiita articles, but I will not do it here. Instead, I'll show you the primitive code for RxPY.
Reactive Extensions treats data as streams. Conversely, the data you want Reactive Extensions to work with needs to be converted to a stream.
import rx
# 0,1,2,3,Generate 4 streams.
rx.range(0,5)
# 'aaa','bbb','ccc'Generate a stream of.
rx.of('aaa','bbb','ccc')
#Convert the list to a stream.
l = [0,1,2,3,4]
rx.from_(l)
We will use each of the flowing data in order. Reactive Extensions subscribes when using stream data. It may be faster to look at the code.
import rx
# 0,1,2,3,4 streams
stream = rx.range(0,5)
#print function is 0,1,2,3,Receive 4 in order.
stream.subscribe(print)
###output###
# 0
# 1
# 2
# 3
# 4
#Of course, you can also handle your own defined expressions and lambda expressions.
stream.subscribe(lambda x:print('value = '+str(x)))
###output###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4
#More precisely, you can write the processing when an error occurs and the final processing.
stream.subscribe(
on_next = lambda x:print('on_next : '+str(x)) #A function that receives data from a stream.
,on_error = lambda x:print('on_error : '+str(x)) #What to do when an error occurs.
,on_completed = lambda :print('on_completed !') #Executed when all the data in the stream has flowed.
)
###output###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !
With ordinary Reactive Extensions, method chaining is done from stream, RxPY processes stream data using pipes and operators.
import rx
from rx import operators as ops
# 0,1,2,3,4 streams
stream = rx.range(0,5)
# map
stream.pipe(
ops.map(lambda x:x*2) #Double the data.
).subscribe(print)
###output###
# 0
# 2
# 4
# 6
# 8
# filter
stream.pipe(
ops.filter(lambda x:x>2) #2 Filter the following data.
).subscribe(print)
###output###
# 3
# 4
# zip
stream.pipe(
ops.zip(rx.range(0,10,2)) #Pair the data in each of the two streams.
).subscribe(print)
###output###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)
# buffer_with_count
stream.pipe(
ops.buffer_with_count(2) #Combine the data into two pieces.
).subscribe(print)
###output###
# [0, 1]
# [2, 3]
# [4]
# to_list
stream.pipe(
ops.to_list() #List the data.
).subscribe(print)
###output###
# [0, 1, 2, 3, 4]
#Operators can chain.
stream.pipe(
ops.map(lambda x:x*2) #Double the data.
,ops.filter(lambda x:x>2) #2 Filter the following data.
,ops.map(lambda x:str(x)) #Convert data to characters.
).subscribe(lambda x:print('value = '+x))
###output###
# value = 4
# value = 6
# value = 8
#On when an error occurs during processing_An error is executed and no further data is processed.
stream.pipe(
ops.map(lambda x:1/(x-2)) #An error occurs in division by zero when 2 is played.
).subscribe(
on_next = print
,on_error = lambda x: print(x)
)
###output###
# -0.5
# -1.0
# division by zero
There are quite a few operators. Find and use the one that suits your purpose. https://rxpy.readthedocs.io/en/latest/reference_operators.html
Until now, we were converting existing data into a stream. Here, we will explain how to stream data to the stream at any time.
import rx
from rx.subject import Subject
#Use Subject to create a special stream that allows data to flow at any time.
stream = Subject()
# on_Data can be streamed with next.
#But this stream isn't subscribed so nothing happens.
stream.on_next(1)
#Once subscribed, you will receive it every time data flows.
d = stream.subscribe(print)
stream.on_next(1)
###output###
# 1
stream.on_next(2)
###output###
# 2
#Dispose when unsubscribing.
d.dispose()
stream.on_next(2)
#It is also possible to subscribe to multiple. Feeling to broadcast.
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###output###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1
#If you don't dispose unnecessary subscribers, they will continue to subscribe forever.
d1.dispose()
d2.dispose()
d3.dispose()
#It is also possible to process the stream and subscribe
stream.pipe(
ops.filter(lambda x:x%2==0) #Filter in multiples of 2
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
ops.filter(lambda x:x%3==0) #Filter by multiples of 3
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###output###
# 2 is a multiple of 2
stream.on_next(3)
###output###
# 3 is a multiple of 3
stream.on_next(6)
###output###
# 6 is a multiple of 2
# 6 is a multiple of 3
#Disposing the subject frees resources.
#Everything you subscribe to is also dispose.
#If you dispose, you will not be able to stream the data.
stream.dispose()
There are also several types of subjects. Please use the one that suits your purpose. https://rxpy.readthedocs.io/en/latest/reference_subject.html
Control when and how it is subscribed.
import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler
def f(s):
time.sleep(1*random.random())
print(s)
stream = Subject()
#Set the scheduler to run in the current thread.
#Subscribe is executed one by one in the same thread.
stream_with_scheduler = stream.pipe(
ops.observe_on(CurrentThreadScheduler()) #Scheduler settings
)
stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))
stream.on_next(1)
#The CurrentThreadScheduler is the same as the default scheduler, so the behavior is the same.
###output###
# 1
# 2
# 3
stream.dispose()
stream = Subject()
#Set up a scheduler to run on a new thread
stream_with_scheduler = stream.pipe(
ops.observe_on(NewThreadScheduler()) #Scheduler settings
)
stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))
stream.on_next(1)
#It runs on a new thread, so they all run at the same time.
###output###
# 2
# 3
# 1
stream.dispose()
There are also several schedulers. Use the one that suits your purpose. https://rxpy.readthedocs.io/en/latest/reference_scheduler.html
I will explain how to do asynchronous processing nicely using the knowledge so far.
If you have time-consuming processes such as HTTP requests or heavy operations, it may be better to execute them in parallel rather than sequentially. The problem is the processing dependency. Here, we will introduce a meeting method as one solution.
import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random
stream = Subject()
#Time-consuming process
def f1():
time.sleep(5*random.random())
print('f1 done.')
stream.on_next(1)
def f2():
time.sleep(5*random.random())
print('f2 done.')
stream.on_next(1)
def f3():
time.sleep(5*random.random())
print('f3 done.')
stream.on_next(1)
def f4():
time.sleep(5*random.random())
print('f4 done.')
stream.on_next(1)
def f5():
time.sleep(5*random.random())
print('f5 done.')
stream.on_next(1)
stream.pipe(
ops.buffer_with_count(5) #It is stocked until 5 pieces of data flow in the stream.
).subscribe(lambda x:print('All done.')) #It is executed after 5 data flows in the stream. That is, f1~It will be executed after all f5 is finished.
#Since it is a time-consuming process, all are executed at the same time.
for f in [f1,f2,f3,f4,f5]:
threading.Thread(target=f).start()
###output###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.
Here is the assumed case.
keyboard.is_pressed
, which returns the keyboard status.It's a little arbitrary, but forgive me because it's to make a simple example ...
First, stream the keyboard state. Specifically, the following inexhaustible output True or False is streamed.
while True:
print(keyboard.is_pressed('enter'))
###output###
# True
# True
# True
# True
# False
# False
# ...
↓ Changed to stream
import rx
from rx import operators as ops
from rx.subject import Subject
enter_state_stream = Subject()
while True:
# enter_state_Stream the state of the Enter key to stream
enter_state_stream.on_next(keyboard.is_pressed('enter'))
Since I haven't subscribed, nothing happens as it is. Let's start by implementing on_press.
import rx
from rx import operators as ops
from rx.subject import Subject
enter_state_stream = Subject()
# on_press
enter_state_stream.pipe(
ops.buffer_with_count(2,1) #Get two data.
,ops.filter(lambda x: x==[False,True]) #The moment you press it is False,True data flows.
).subscribe(lambda x: print('on_press'))
while True:
enter_state_stream.on_next(keyboard.is_pressed('enter'))
###output(On every time you press the enter key_press is displayed)###
# on_press
# on_press
# on_press
# on_press
Since on_release can be implemented in the same way, skip it once. Next, let's implement on_double_press.
import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
ops.buffer_with_count(2,1) #Get the first two data.
,ops.filter(lambda x: x==[False,True]) #The moment you press it is False,True data flows.
)
# on_double_press
on_press_stream.pipe(
ops.timestamp() # on_Timestamp press
,ops.buffer_with_count(2,1) # on_Look at the press two by two
,ops.map(lambda x:x[1][1]-x[0][1]) #Two on_Convert to press time interval
,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_press time interval is 0.Filter in less than 2 seconds
).subscribe(lambda x: print('on_double_press'))
while True:
enter_state_stream.on_next(keyboard.is_pressed('enter'))
###output(Displayed each time the enter key is pressed continuously)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press
You have now implemented on_double_press. Finally, let's put it together in a nice class while making it asynchronous processing.
import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading
class Enter:
enter_state_stream = None
on_press_stream = None
on_release_stream = None
on_double_press = None
def __init__(self):
self.enter_state_stream = Subject()
self.on_press_stream = self.enter_state_stream.pipe(
ops.buffer_with_count(2,1)
,ops.filter(lambda x: x==[False,True])
)
self.on_release_stream = self.enter_state_stream.pipe(
ops.buffer_with_count(2,1)
,ops.filter(lambda x: x==[True,False])
)
self.on_double_press = self.on_press_stream.pipe(
ops.timestamp()
,ops.buffer_with_count(2,1)
,ops.map(lambda x:x[1][1]-x[0][1])
,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2))
)
def f():
while True:
self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
threading.Thread(target=f).start()
def main():
enter = Enter()
#You can write event processing like this!
enter.on_double_press.subscribe(lambda x:print('on_double_press'))
If you like, please write anything. I would be grateful if you could comment. Please let me know if something is wrong.
Recommended Posts