Write your distributed program on Ray
Ray is a distributed framework like Spark. We can use Ray through python or java programming.
Prepare the environment
We have prepared a docker image with Ray installed. If you don't have docker engine installed on your own machine, please refer to https://docs.docker.com/get-docker.
Launch a docker container with our image tigerchang/raylab:latest
# if you are using your own machine
docker run -it -p 8265:8265 tigerchang/raylab:latest
# if you are using node-1.cse.cuhk.edu.hk
docker run -it -p 8265:8265 tigerchang/raylab:with-proxy
Note: If you don't have docker installed in your own machine (though we recommend that), you can also use the server node-1.cse.cuhk.edu.hk provided by us, which has docker installed.
Note that the command may fail because port 8265 may be in use by other students. In this case, try another port between 1024 ~ 65535
First, we begin with a Ray program, to be executed on a local machine.
Create a new folder 'ray-lab' under your home directory and change the directory to 'ray-lab'
mkdir ~/ray-lab && cd ~/ray-lab
Create a new file 'main.py' with the following content.
vim main.py
import platform
import sys
import time
import ray
from ray.actor import ActorHandle
# We define that 0.5 core of CPU to serve 1 UserActor
@ray.remote(num_cpus=0.5)
class UserActor:
def __init__(self, username:str):
self.name = username
self.chatroomActorHandle:ActorHandle = None
self.hostname = platform.node()
self.selfActorHandle = ray.get_actor(username)
print(f"Created UserActor for user {self.name}")
def join(self, chatroomActorHandle:ActorHandle):
print(f"{self.name} join chatroom {chatroomActorHandle._ray_actor_id}")
self.chatroomActorHandle = chatroomActorHandle
ray.wait([chatroomActorHandle.join.remote(userActorHandle=self.selfActorHandle)])
def receive_chat(self, msg:str):
print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, receive message: '{msg}'")
def send_chat(self, msg:str):
self.selfActorHandle = ray.get_actor(self.name)
print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, sending message: '{msg}'")
self.chatroomActorHandle.send_chat.remote(senderActorHandle=self.selfActorHandle, msg=msg)
@ray.remote
class ChatRoomActor:
def __init__(self):
self.memberUserActorHandles = []
def send_chat(self, senderActorHandle:ActorHandle, msg:str):
print(f"Receive chat from {senderActorHandle}, msg: {msg} forwarding to all group members")
# Forward the message except the sender
for userActorRef in self.memberUserActorHandles:
if userActorRef._ray_actor_id == senderActorHandle._ray_actor_id:
continue
userActorRef.receive_chat.remote(msg=msg)
def join(self, userActorHandle:ActorHandle):
self.memberUserActorHandles.append(userActorHandle)
# A helper function to print the hardware resources of Ray cluster
def print_cluster_resource():
print('''This cluster consists of
{} nodes in total
{} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))
# Create 1 chatroom with 3 users
def create_chatroom_and_users(useridOffset:int, num_user=3):
ret = {}
chatroomActorHandle = ChatRoomActor.remote()
for idx in range(num_user):
username = f"user{idx+useridOffset}"
userActorHandle = UserActor.options(name=username).remote(username=username)
ray.wait([userActorHandle.join.remote(chatroomActorHandle=chatroomActorHandle)])
ret[username] = userActorHandle
return ret
def send_chat_from_user(username:str, message:str):
userActorHandle = ray.get_actor(username)
userActorHandle.send_chat(message)
if __name__ == "__main__":
# initialize ray with single machine
ray.init(dashboard_host="0.0.0.0")
# initialize ray with ray cluster
# ray.init(dashboard_host="0.0.0.0", address="auto")
chatroom_counter = 0
user_actor_handle_dict = {} # key: username, value: UserActorHandle
# We now create a chatroom with 3 user
chatroom_counter += 1
user_actor_handle_dict.update(create_chatroom_and_users(useridOffset=len(user_actor_handle_dict), num_user=3))
time.sleep(1)
try:
while True:
print("""Action 1: Send a chat 'Hello world' from specific user
Action 2: Create a new chatroom with 3 users
Your choice (1 or 2): """)
in_action = sys.stdin.readline().split()[0]
if in_action == "1":
print(f"Please choose a user from 0 to {len(user_actor_handle_dict)-1}: ")
in_user_idx = sys.stdin.readline().split()[0]
# check if user exist
try:
user_idx = int(in_user_idx)
except:
print(f"'{in_user_idx}' is not a valid user index")
continue
userActorHandle = user_actor_handle_dict[f"user{user_idx}"]
userActorHandle.send_chat.remote("Hello world")
time.sleep(1)
if in_action == "2":
num_user = 3
chatroom_counter += 1
user_actor_handle_dict.update(create_chatroom_and_users(
useridOffset=len(user_actor_handle_dict), num_user=num_user))
print(f"Finish creating new chatroom with {num_user} users")
time.sleep(1)
except KeyboardInterrupt:
print('interrupted! shutdown ray')
The code allows us to create a new chatroom on your request. An important line is Line #9 (which assigns 0.5CPU for each new user created). Later on, if there are more chatrooms (3 users are created per chatroom), Ray can automatically provision more EC2 instances accordingly in order to smoothly scale out.
Last updated
Was this helpful?