Ray is a framework that allows you to write distributed parallel processing in Python quickly and simply, and is designed to make it easy to parallelize existing code. By using Ray, you can write process-level parallel processing more easily than multiprocessing.
This article is based on the content of Ray Tutorial. The code has been confirmed to work with Python 3.8.2 and Ray 0.8.4.
You can install it from pip etc. in the terminal.
$ pip install ray
As a basic use, there are only three grammars to remember: ray.init`` ray.remote
ray.get
, and this article will also introduce ray.wait
ray.put
.
Consider parallelizing the execution of func
for the following code, where the function func
, which takes 3 seconds to execute, is called twice and the entire execution takes 6 seconds.
import time
def func(x):
time.sleep(3)
return x
begin_time = time.time() #Record start time
res1, res2 = func(1), func(2) #Call func twice
print(res1, res2) #output: 1 2
end_time = time.time() #Record the end time
print(end_time - begin_time) #About 6 seconds
When using Ray, it is necessary to ** always first ** specify the number of resources to be used in ray.init
and start the Ray process.
import ray
# ray.init()If you do not specify explicitly like, the number of resources will be determined automatically.
ray.init(num_cpus=4)
#Wait a little for Ray to start for the sake of more accurate time measurement
time.sleep(1)
If you want to execute a function in parallel, you need to make it a ** remote function ** that Ray can handle.
That said, it's easy to do, just add @ ray.remote
and a decorator to the function.
The remote function can be called as (function name) .remote (argument)
and sent to Ray's parallel workers for execution.
.remote (argument)
returns ** Object ID ** without waiting for it to finish.
@ray.remote
def func(x):
time.sleep(3)
return x
begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) #Output example: ObjectID(45b9....)
If you want to get the result, you can pass the Object ID returned from the remote function to ray.get
.
ray.get
blocks until all results corresponding to the Object ID are available.
print(ray.get(res1), ray.get(res2)) #output: 1 2
# ray.get can also receive a list
print(ray.get([res1, res2])) #output: [1, 2]
end_time = time.time()
print(end_time - begin_time) #About 3 seconds
When the above code is executed as one script, it takes only about 3 seconds, and you can see that the execution of func
is parallelized.
That's all the basics.
Ray can handle even if there is a dependency between the remote functions by passing the Object ID as it is.
The passed Object ID is restored to a normal Python object and executed when it is actually executed.
In the example below, func1
and func2
are applied in sequence to each of the four elements in vec
. Processing one element takes 2 seconds.
@ray.remote
def func1(x):
time.sleep(1)
return x * 2
@ray.remote
def func2(x):
time.sleep(1)
return x + 1
vec = [1, 2, 3, 4]
results = []
for x in vec:
res = func1.remote(x) #objectID is included in res
res = func2.remote(res) #Pass the ObjectID as is to the next remote function
results.append(res)
#results is a list of ObjectIDs
print(ray.get(results)) #output: [3, 5, 7, 9]
Ray parses the dependencies, runs func1
with no dependencies first, and then runs func2
in parallel for the processed elements of func1
.
This process, which takes 8 seconds sequentially, is executed in about 2 seconds by parallelization.
Ray also supports nested calls, and rewriting func2
as follows works fine.
The only condition for nested calls is that the function you want to call is predefined.
@ray.remote
def func2(x):
x = func1.remote(x) #ObjectID returned
time.sleep(1)
return ray.get(x) + 1 #Because it cannot be added directly to the Object ID, ray.Get and then calculate
print(ray.get([func2.remote(x) for x in vec])) #output: [3, 5, 7, 9]
The measured value in my environment is a little slower than 2 seconds, but it can be executed in parallel faster than 8 seconds.
Actor
The remote function returns as it is after being executed and cannot have a state.
In Ray, processing that has a state is realized by modifying the class with @ ray.remote
.
Classes qualified with @ ray.remote
are called ** Actor **.
For example, consider the following counter, which takes 1 second per increment.
When creating an instance of Actor, add .remote ()
as in the case of function call.
@ray.remote
class Counter:
def __init__(self, init_val, sleep=True):
#Init counter_Initialize with val
self.count = init_val
self.sleep = sleep
def increment(self):
if self.sleep:
time.sleep(1)
self.count += 1
return self.count
#Create counters with initial values of 0 and 100
counter1, counter2 = Counter.remote(0), Counter.remote(100)
Let's record the value at each stage in results while incrementing each counter three times.
results = []
for _ in range(3):
results.append(counter1.increment.remote())
results.append(counter2.increment.remote())
print(ray.get(results)) #output: [1, 101, 2, 102, 3, 103]
It has been incremented a total of 6 times, but since it is parallelized for each counter, the value can be obtained in only 3 seconds.
Also, if you want to call the methods of the same instance of Actor in parallel, you can define a remote function that takes an instance of Actor as an argument.
For example, let's execute a function called ʻincrementer that calls ʻincrement
every second with a shift of 0.5 seconds as follows.
Note that here we have a Counter
that makes ʻincrement` itself end in an instant.
@ray.remote
def incrementer(counter, id, times):
#Increment times every second
for _ in range(times):
cnt = counter.increment.remote()
print(f'id= {id} : count = {ray.get(cnt)}')
time.sleep(1)
counter = Counter.remote(0, sleep=False) #A counter where one increment ends in an instant
incrementer.remote(counter, id=1, times=5)
time.sleep(0.5) #Start 0.Shift by 5 seconds
inc = incrementer.remote(counter, id=2, times=5)
ray.wait([inc]) #Will be explained next,Function to wait for the end
When you run it, you can see that ʻincrementer updates the value of
counter` alternately every 0.5 seconds as follows.
(0.0 seconds later) id = 1 : count = 1
(0.5 seconds later) id = 2 : count = 2
(1.0 seconds later) id = 1 : count = 3
(1.5 seconds later) id = 2 : count = 4
(2.0 seconds later) ......
ray.wait
If you pass a list of Object IDs running in parallel to ray.get
, you will not be able to get the values until all of them have finished running.
If you use ray.wait
, it will wait until the specified number of functions executed in parallel are completed, and will return the ID that ended at that point and the ID that did not.
@ray.remote
def sleep(x):
#A function that rests for x seconds and returns x
time.sleep(x)
return x
ids = [sleep.remote(3), sleep.remote(5), sleep.remote(2)]
finished_ids, running_ids = ray.wait(ids, num_returns=2, timeout=None)
print(ray.get(finished_ids)) #output(After 3 seconds): [3,2]
print(ray.get(running_ids)) #output(After 5 seconds): [5]
ray.put
In fact, each object passed to the remote
function is implicitly serialized and copied into Ray's shared memory.
Therefore, if you pass a huge object to the argument of remote
multiple times, it will take extra time to copy and the area on the shared memory will be wasted.
In such cases, you can avoid this waste by explicitly copying only once in advance using ray.put
.
ray.put
returns an Object ID like remote
and passes it to the remote function.
Once the object is copied, it is shared so that any worker running in parallel can see it.
@ray.remote
def func4(obj, idx):
time.sleep(1)
return idx
# big_Let object be a large object
big_object = None
big_obj_id = ray.put(big_object)
# func.remote()Is called four times,I'm passing the Object ID, so it's big again_No object copy occurs
results = [func4.remote(big_obj_id, i) for i in range(4)]
print(ray.get(results)) #output: [0, 1, 2, 3]
Note that Ray's ray.get
deserialization seems to be much faster than pickle.load
.
The Official Document has more detailed usage. In particular, Examples contains specific examples such as parameter server and reinforcement learning in a distributed environment, which is helpful. Let's. There is also a high-level framework based on Ray, RLlib for reinforcement learning and [] for hyperparameter tuning. Tune and so on. Let's get a comfortable parallel processing life with Ray.
Recommended Posts