Reactive Extensions practice

Try the following block diagram calculation with Reactive Extensions (the calculation itself has no meaning).

fig.png

Preparation

This time I will try it with Python. First, install the Python version of Reactive Extensions.

Terminal


pip install Rx

Representing the block diagram in JSON

Describe the block diagram placement module and wiring in JSON.

test.json


{
    "modules": [
        {"name":"add1", "module_type":"add", "inputs":["sin1", "const1"]},
        {"name":"add2", "module_type":"add", "inputs":["mul1", "div1"]},
        {"name":"mul1", "module_type":"mul", "inputs":["sub1", "add1"]},
        {"name":"sub1", "module_type":"sub", "inputs":["sin1", "cos1"]},
        {"name":"div1", "module_type":"div", "inputs":["cos1", "const2"]},
        {"name":"sin1", "module_type":"sin", "inputs":["const1"]},
        {"name":"cos1", "module_type":"cos", "inputs":["add1"]},
        {"name":"out1", "module_type":"out", "inputs":["add2"]},
        {"name":"const1", "module_type":"const", "value":1.0},
        {"name":"const2", "module_type":"const", "value":2.0}
    ]
}

Implementation

Module creation, module wiring, constant value setting, and calculation result display are all performed.

test.py


# -*- coding: utf-8 -*-
from rx.subjects import Subject
import json, operator, math

if __name__ == '__main__':
    #JSON reading
    with open('test.json') as f:
        j = json.load(f)

    #Module creation
    modules = { m['name']:Subject() for m in j['modules'] }

    #Module wiring
    for m in filter(lambda m: m['module_type'] != 'const', j['modules']):
        module_type = m['module_type'];
        self_name = m['name']
        input_names = m['inputs']
        if   module_type == 'add':
            modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.add) \
                .subscribe(modules[ self_name ].on_next)
        elif module_type == 'sub':
            modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.sub) \
                .subscribe(modules[ self_name ].on_next)
        elif module_type == 'mul':
            modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.mul) \
                .subscribe(modules[ self_name ].on_next)
        elif module_type == 'div':
            modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.truediv) \
                .subscribe(modules[ self_name ].on_next)
        elif module_type == 'sin':
            modules[ input_names[0] ].select(math.sin).subscribe(modules[ self_name ].on_next)
        elif module_type == 'cos':
            modules[ input_names[0] ].select(math.cos).subscribe(modules[ self_name ].on_next)
        elif module_type == 'tan':
            modules[ input_names[0] ].select(math.tan).subscribe(modules[ self_name ].on_next)
        elif module_type == 'out':
            modules[ input_names[0] ].subscribe(print) #Display of calculation results

    #Set a constant value
    for m in filter(lambda m: m['module_type'] == 'const', j['modules']):
        self_name = m['name']
        value = m['value']
        modules[ self_name ].on_next(value)

result


1.9082290502110406

Source

ReactiveExtensions I will explain the class and extension method used this time.

Subject Subject is a class that has both Observer (observer) and Observable (observer). Use name as a key to make it Dictionary for later use in wiring.

Excerpt


#Module creation
modules = { m['name']:Subject() for m in j['modules'] }

Select Unary operations receive (subscribe) the calculated value of select and send it to the next module (on_next).

Excerpt


modules[ input_names[0] ].select(math.sin).subscribe(modules[ self_name ].on_next)

Zip Binary operation receives (subscribes) the calculated value after the two input values are aligned and sends it to the next module (on_next).

Excerpt


modules[ input_names[0] ].zip(modules[ input_names[1] ], operator.add) \
    .subscribe(modules[ self_name ].on_next)

Verification

I will check it just in case.

test_check.py


# -*- coding: utf-8 -*-
import math
if __name__ == '__main__':
    sin1 = math.sin(1.0)
    add1 = sin1+1.0
    cos1 = math.cos(add1)
    sub1 = sin1-cos1
    mul1 = sub1*add1
    div1 = cos1/2.0
    add2 = mul1+div1
    out = add2
    print(out)

result


1.9082290502110406

I did it

Recommended Posts

Reactive Extensions practice
[Python] Reactive Extensions learned with RxPY (3.0.1) [Rx]
numpy practice 1
Linux practice
Practice Pytorch