Write your distributed program on Ray
Ray is a distributed framework like Spark. We can use Ray through python or java programming.
Prepare the environment
# if you are using your own machine
docker run -it -p 8265:8265 tigerchang/raylab:latest
# if you are using node-1.cse.cuhk.edu.hk
docker run -it -p 8265:8265 tigerchang/raylab:with-proxymkdir ~/ray-lab && cd ~/ray-labvim main.pyimport platform
import sys
import time
import ray
from ray.actor import ActorHandle
# We define that 0.5 core of CPU to serve 1 UserActor
@ray.remote(num_cpus=0.5)
class UserActor:
def __init__(self, username:str):
self.name = username
self.chatroomActorHandle:ActorHandle = None
self.hostname = platform.node()
self.selfActorHandle = ray.get_actor(username)
print(f"Created UserActor for user {self.name}")
def join(self, chatroomActorHandle:ActorHandle):
print(f"{self.name} join chatroom {chatroomActorHandle._ray_actor_id}")
self.chatroomActorHandle = chatroomActorHandle
ray.wait([chatroomActorHandle.join.remote(userActorHandle=self.selfActorHandle)])
def receive_chat(self, msg:str):
print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, receive message: '{msg}'")
def send_chat(self, msg:str):
self.selfActorHandle = ray.get_actor(self.name)
print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, sending message: '{msg}'")
self.chatroomActorHandle.send_chat.remote(senderActorHandle=self.selfActorHandle, msg=msg)
@ray.remote
class ChatRoomActor:
def __init__(self):
self.memberUserActorHandles = []
def send_chat(self, senderActorHandle:ActorHandle, msg:str):
print(f"Receive chat from {senderActorHandle}, msg: {msg} forwarding to all group members")
# Forward the message except the sender
for userActorRef in self.memberUserActorHandles:
if userActorRef._ray_actor_id == senderActorHandle._ray_actor_id:
continue
userActorRef.receive_chat.remote(msg=msg)
def join(self, userActorHandle:ActorHandle):
self.memberUserActorHandles.append(userActorHandle)
# A helper function to print the hardware resources of Ray cluster
def print_cluster_resource():
print('''This cluster consists of
{} nodes in total
{} CPU resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))
# Create 1 chatroom with 3 users
def create_chatroom_and_users(useridOffset:int, num_user=3):
ret = {}
chatroomActorHandle = ChatRoomActor.remote()
for idx in range(num_user):
username = f"user{idx+useridOffset}"
userActorHandle = UserActor.options(name=username).remote(username=username)
ray.wait([userActorHandle.join.remote(chatroomActorHandle=chatroomActorHandle)])
ret[username] = userActorHandle
return ret
def send_chat_from_user(username:str, message:str):
userActorHandle = ray.get_actor(username)
userActorHandle.send_chat(message)
if __name__ == "__main__":
# initialize ray with single machine
ray.init(dashboard_host="0.0.0.0")
# initialize ray with ray cluster
# ray.init(dashboard_host="0.0.0.0", address="auto")
chatroom_counter = 0
user_actor_handle_dict = {} # key: username, value: UserActorHandle
# We now create a chatroom with 3 user
chatroom_counter += 1
user_actor_handle_dict.update(create_chatroom_and_users(useridOffset=len(user_actor_handle_dict), num_user=3))
time.sleep(1)
try:
while True:
print("""Action 1: Send a chat 'Hello world' from specific user
Action 2: Create a new chatroom with 3 users
Your choice (1 or 2): """)
in_action = sys.stdin.readline().split()[0]
if in_action == "1":
print(f"Please choose a user from 0 to {len(user_actor_handle_dict)-1}: ")
in_user_idx = sys.stdin.readline().split()[0]
# check if user exist
try:
user_idx = int(in_user_idx)
except:
print(f"'{in_user_idx}' is not a valid user index")
continue
userActorHandle = user_actor_handle_dict[f"user{user_idx}"]
userActorHandle.send_chat.remote("Hello world")
time.sleep(1)
if in_action == "2":
num_user = 3
chatroom_counter += 1
user_actor_handle_dict.update(create_chatroom_and_users(
useridOffset=len(user_actor_handle_dict), num_user=num_user))
print(f"Finish creating new chatroom with {num_user} users")
time.sleep(1)
except KeyboardInterrupt:
print('interrupted! shutdown ray')Last updated