TL;DR Quand j'ai essayé d'utiliser RxPY parce que je voulais écrire Rx en Python, c'était différent de ce que je pensais qu'il serait confortable à utiliser. Apparemment, dans la nouvelle spécification, le chaînage observable est écrit à l'aide d'un mécanisme appelé pipe. https://rxpy.readthedocs.io/en/latest/migration.html
Conventionnel ↓
observable_object.map(lambda x:x*2) \
.filter(lambda x:x>3) \
.subscribe(print) \
Actuellement ↓
from rx import operators as ops
observable_object.pipe(
ops.map(lambda x:x*2),
ops.filter(lambda x:x>3)
).subscribe(print)
Je voulais juste transmettre ceci.
Cependant, il n'y a pas trop d'articles sur RxPY en japonais, donc je vais résumer RxPY afin de diffuser Rx aux pythonistes débutants.
C'est une API pour écrire intelligemment un traitement asynchrone en gérant des données dans un flux observable qui peut effectuer un traitement de type Linq. http://reactivex.io/ Il peut être utilisé dans la plupart des langues principales. http://reactivex.io/languages.html Pour Python, RxPY est pris en charge. https://rxpy.readthedocs.io/en/latest/index.html
Il y a déjà beaucoup d'explications sur le concept, donc je vais laisser la page officielle et d'autres articles de Qiita, mais je ne le ferai pas ici. Au lieu de cela, je vais vous montrer le code primitif de RxPY.
Reactive Extensions traite les données comme des flux. Inversement, les données avec lesquelles vous souhaitez que les extensions réactives fonctionnent doivent être converties en flux.
import rx
# 0,1,2,3,Générez 4 flux.
rx.range(0,5)
# 'aaa','bbb','ccc'Pour générer un flux de.
rx.of('aaa','bbb','ccc')
#Convertissez une liste en flux.
l = [0,1,2,3,4]
rx.from_(l)
Nous utiliserons chacune des données en circulation dans l'ordre. Reactive Extensions s'abonne lors de l'utilisation de données de flux. Il peut être plus rapide de consulter le code.
import rx
# 0,1,2,3,4 flux
stream = rx.range(0,5)
#la fonction d'impression est 0,1,2,3,Recevez 4 dans l'ordre.
stream.subscribe(print)
###production###
# 0
# 1
# 2
# 3
# 4
#Bien sûr, vous pouvez également gérer vos propres expressions définies et expressions lambda.
stream.subscribe(lambda x:print('value = '+str(x)))
###production###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4
#Plus strictement, vous pouvez écrire le traitement lorsqu'une erreur se produit et le traitement final.
stream.subscribe(
on_next = lambda x:print('on_next : '+str(x)) #Une fonction qui reçoit des données de flux.
,on_error = lambda x:print('on_error : '+str(x)) #Que faire lorsqu'une erreur se produit.
,on_completed = lambda :print('on_completed !') #Exécuté lorsque toutes les données du flux ont circulé.
)
###production###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !
Avec les extensions réactives ordinaires, le chaînage des méthodes est effectué à partir du flux, RxPY utilise des tubes et des opérateurs pour traiter les données de flux.
import rx
from rx import operators as ops
# 0,1,2,3,4 flux
stream = rx.range(0,5)
# map
stream.pipe(
ops.map(lambda x:x*2) #Doublez les données.
).subscribe(print)
###production###
# 0
# 2
# 4
# 6
# 8
# filter
stream.pipe(
ops.filter(lambda x:x>2) #2 Filtrez les données suivantes.
).subscribe(print)
###production###
# 3
# 4
# zip
stream.pipe(
ops.zip(rx.range(0,10,2)) #Associez les données dans chacun des deux flux.
).subscribe(print)
###production###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)
# buffer_with_count
stream.pipe(
ops.buffer_with_count(2) #Combinez les données en deux.
).subscribe(print)
###production###
# [0, 1]
# [2, 3]
# [4]
# to_list
stream.pipe(
ops.to_list() #Listez les données.
).subscribe(print)
###production###
# [0, 1, 2, 3, 4]
#Les opérateurs peuvent enchaîner.
stream.pipe(
ops.map(lambda x:x*2) #Doublez les données.
,ops.filter(lambda x:x>2) #2 Filtrez les données suivantes.
,ops.map(lambda x:str(x)) #Convertissez les données en caractères.
).subscribe(lambda x:print('value = '+x))
###production###
# value = 4
# value = 6
# value = 8
#Allumé lorsqu'une erreur se produit pendant le traitement_Une erreur est exécutée et aucune autre donnée n'est traitée.
stream.pipe(
ops.map(lambda x:1/(x-2)) #Quand 2 est joué, une erreur se produit dans la division à zéro.
).subscribe(
on_next = print
,on_error = lambda x: print(x)
)
###production###
# -0.5
# -1.0
# division by zero
Il y a pas mal d'opérateurs. Trouvez et utilisez celui qui vous convient. https://rxpy.readthedocs.io/en/latest/reference_operators.html
Jusqu'à présent, nous convertissions les données existantes en flux. Ici, nous expliquerons comment envoyer des données au flux à tout moment.
import rx
from rx.subject import Subject
#Utilisez Subject pour créer un flux spécial qui permet aux données de circuler à tout moment.
stream = Subject()
# on_Les données peuvent être diffusées avec next.
#Mais ce flux n'est pas abonné, donc rien ne se passe.
stream.on_next(1)
#Une fois abonné, vous le recevrez à chaque flux de données.
d = stream.subscribe(print)
stream.on_next(1)
###production###
# 1
stream.on_next(2)
###production###
# 2
#Jeter lors de la désinscription.
d.dispose()
stream.on_next(2)
#Il est également possible de s'abonner à plusieurs. Envie de diffuser.
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###production###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1
#Si vous ne disposez pas d'abonnés inutiles, ils continueront à s'abonner pour toujours.
d1.dispose()
d2.dispose()
d3.dispose()
#Il est également possible de traiter le flux et de s'abonner
stream.pipe(
ops.filter(lambda x:x%2==0) #Filtrer par multiples de 2
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
ops.filter(lambda x:x%3==0) #Filtrer par multiples de 3
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###production###
# 2 is a multiple of 2
stream.on_next(3)
###production###
# 3 is a multiple of 3
stream.on_next(6)
###production###
# 6 is a multiple of 2
# 6 is a multiple of 3
#L'élimination du sujet libère des ressources.
#Tout ce à quoi vous vous abonnez est également éliminé.
#Si vous supprimez, vous ne pourrez pas diffuser de données.
stream.dispose()
Il existe également plusieurs types de sujets. Veuillez utiliser celui qui correspond à votre objectif. https://rxpy.readthedocs.io/en/latest/reference_subject.html
Contrôlez quand et comment il est abonné.
import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler
def f(s):
time.sleep(1*random.random())
print(s)
stream = Subject()
#Configurez le planificateur pour qu'il s'exécute dans le thread actuel.
#S'abonner est exécuté un par un dans le même thread.
stream_with_scheduler = stream.pipe(
ops.observe_on(CurrentThreadScheduler()) #Paramètres du planificateur
)
stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))
stream.on_next(1)
#Le CurrentThreadScheduler est le même que le planificateur par défaut, de sorte que le comportement reste le même.
###production###
# 1
# 2
# 3
stream.dispose()
stream = Subject()
#Configurer un planificateur pour s'exécuter sur un nouveau thread
stream_with_scheduler = stream.pipe(
ops.observe_on(NewThreadScheduler()) #Paramètres du planificateur
)
stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))
stream.on_next(1)
#Il fonctionne sur un nouveau thread, donc ils s'exécutent tous en même temps.
###production###
# 2
# 3
# 1
stream.dispose()
Il existe également plusieurs planificateurs. Utilisez celui qui vous convient. https://rxpy.readthedocs.io/en/latest/reference_scheduler.html
J'expliquerai comment faire un traitement asynchrone bien en utilisant les connaissances jusqu'à présent.
Si vous avez des processus chronophages tels que des requêtes HTTP ou des opérations lourdes, il peut être préférable de les exécuter en parallèle plutôt que séquentiellement. Le problème devient alors la dépendance de traitement. Ici, nous présenterons une méthode de réunion comme solution unique.
import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random
stream = Subject()
#Processus chronophage
def f1():
time.sleep(5*random.random())
print('f1 done.')
stream.on_next(1)
def f2():
time.sleep(5*random.random())
print('f2 done.')
stream.on_next(1)
def f3():
time.sleep(5*random.random())
print('f3 done.')
stream.on_next(1)
def f4():
time.sleep(5*random.random())
print('f4 done.')
stream.on_next(1)
def f5():
time.sleep(5*random.random())
print('f5 done.')
stream.on_next(1)
stream.pipe(
ops.buffer_with_count(5) #Stocké jusqu'à 5 éléments de flux de données dans le flux.
).subscribe(lambda x:print('All done.')) #Il est exécuté après 5 éléments de flux de données dans le flux. Autrement dit, f1~Il est exécuté une fois que tout f5 est terminé.
#Puisqu'il s'agit d'un processus qui prend du temps, tous sont exécutés en même temps.
for f in [f1,f2,f3,f4,f5]:
threading.Thread(target=f).start()
###production###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.
Voici le cas supposé.
keyboard.is_pressed
, qui renvoie l'état du clavier.C'est un peu arbitraire, mais pardonnez-moi car c'est pour faire un exemple simple ...
Tout d'abord, diffusez l'état du clavier. Plus précisément, la sortie inépuisable suivante True ou False est diffusée.
while True:
print(keyboard.is_pressed('enter'))
###production###
# True
# True
# True
# True
# False
# False
# ...
↓ Changé pour diffuser
import rx
from rx import operators as ops
from rx.subject import Subject
enter_state_stream = Subject()
while True:
# enter_state_Diffuser l'état de la touche Entrée pour diffuser
enter_state_stream.on_next(keyboard.is_pressed('enter'))
Comme je ne suis pas abonné, rien ne se passe tel quel. Nous allons implémenter on_press comme point de départ.
import rx
from rx import operators as ops
from rx.subject import Subject
enter_state_stream = Subject()
# on_press
enter_state_stream.pipe(
ops.buffer_with_count(2,1) #Obtenez deux données.
,ops.filter(lambda x: x==[False,True]) #Le moment où vous appuyez dessus est faux,De vrais flux de données.
).subscribe(lambda x: print('on_press'))
while True:
enter_state_stream.on_next(keyboard.is_pressed('enter'))
###production(Allumé chaque fois que vous appuyez sur la touche Entrée_appuyez sur s'affiche)###
# on_press
# on_press
# on_press
# on_press
Comme on_release peut être implémenté de la même manière, ignorez-le une fois. Ensuite, implémentons on_double_press.
import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
ops.buffer_with_count(2,1) #Obtenez les deux premières données.
,ops.filter(lambda x: x==[False,True]) #Le moment où vous appuyez dessus est faux,De vrais flux de données.
)
# on_double_press
on_press_stream.pipe(
ops.timestamp() # on_Ajouter un horodatage à appuyer
,ops.buffer_with_count(2,1) # on_Regardez la presse deux par deux
,ops.map(lambda x:x[1][1]-x[0][1]) #Deux sur_Convertir en intervalle de temps de presse
,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_l'intervalle de temps de presse est de 0.Filtrer en moins de 2 secondes
).subscribe(lambda x: print('on_double_press'))
while True:
enter_state_stream.on_next(keyboard.is_pressed('enter'))
###production(S'affiche chaque fois que la touche Entrée est enfoncée en continu)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press
Vous avez maintenant implémenté on_double_press. Enfin, mettons-le ensemble dans une belle classe tout en faisant un traitement asynchrone.
import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading
class Enter:
enter_state_stream = None
on_press_stream = None
on_release_stream = None
on_double_press = None
def __init__(self):
self.enter_state_stream = Subject()
self.on_press_stream = self.enter_state_stream.pipe(
ops.buffer_with_count(2,1)
,ops.filter(lambda x: x==[False,True])
)
self.on_release_stream = self.enter_state_stream.pipe(
ops.buffer_with_count(2,1)
,ops.filter(lambda x: x==[True,False])
)
self.on_double_press = self.on_press_stream.pipe(
ops.timestamp()
,ops.buffer_with_count(2,1)
,ops.map(lambda x:x[1][1]-x[0][1])
,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2))
)
def f():
while True:
self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
threading.Thread(target=f).start()
def main():
enter = Enter()
#Vous pouvez écrire un traitement d'événement comme celui-ci!
enter.on_double_press.subscribe(lambda x:print('on_double_press'))
Si vous le souhaitez, veuillez écrire quelque chose. Je vous serais reconnaissant de bien vouloir commenter. Veuillez me faire savoir si quelque chose ne va pas.
Recommended Posts