Ray is a distributed framework like Spark. We can use Ray through python or java programming.
Prepare the environment
We have prepared a docker image with Ray installed. If you don't have docker engine installed on your own machine, please refer to https://docs.docker.com/get-docker.
Launch a docker container with our image tigerchang/raylab:latest
# if you are using your own machinedockerrun-it-p8265:8265tigerchang/raylab:latest# if you are using node-1.cse.cuhk.edu.hkdockerrun-it-p8265:8265tigerchang/raylab:with-proxy
Note: If you don't have docker installed in your own machine (though we recommend that), you can also use the server node-1.cse.cuhk.edu.hk provided by us, which has docker installed.
Note that the command may fail because port 8265 may be in use by other students. In this case, try another port between 1024 ~ 65535
First, we begin with a Ray program, to be executed on a local machine.
Create a new folder 'ray-lab' under your home directory and change the directory to 'ray-lab'
mkdir~/ray-lab&&cd~/ray-lab
Create a new file 'main.py' with the following content.
vim main.py
import platformimport sysimport timeimport rayfrom ray.actor import ActorHandle# We define that 0.5 core of CPU to serve 1 UserActor@ray.remote(num_cpus=0.5)classUserActor:def__init__(self,username:str): self.name = username self.chatroomActorHandle:ActorHandle =None self.hostname = platform.node() self.selfActorHandle = ray.get_actor(username)print(f"Created UserActor for user {self.name}")defjoin(self,chatroomActorHandle:ActorHandle):print(f"{self.name} join chatroom {chatroomActorHandle._ray_actor_id}") self.chatroomActorHandle = chatroomActorHandle ray.wait([chatroomActorHandle.join.remote(userActorHandle=self.selfActorHandle)])defreceive_chat(self,msg:str):print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, receive message: '{msg}'")defsend_chat(self,msg:str): self.selfActorHandle = ray.get_actor(self.name)print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, sending message: '{msg}'") self.chatroomActorHandle.send_chat.remote(senderActorHandle=self.selfActorHandle, msg=msg)@ray.remoteclassChatRoomActor:def__init__(self): self.memberUserActorHandles = []defsend_chat(self,senderActorHandle:ActorHandle,msg:str):print(f"Receive chat from {senderActorHandle}, msg: {msg} forwarding to all group members")# Forward the message except the senderfor userActorRef in self.memberUserActorHandles:if userActorRef._ray_actor_id == senderActorHandle._ray_actor_id:continue userActorRef.receive_chat.remote(msg=msg)defjoin(self,userActorHandle:ActorHandle): self.memberUserActorHandles.append(userActorHandle)# A helper function to print the hardware resources of Ray clusterdefprint_cluster_resource():print('''This cluster consists of{} nodes in total{} CPU resources in total '''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))# Create 1 chatroom with 3 usersdefcreate_chatroom_and_users(useridOffset:int,num_user=3): ret ={} chatroomActorHandle = ChatRoomActor.remote()for idx inrange(num_user): username = f"user{idx+useridOffset}" userActorHandle = UserActor.options(name=username).remote(username=username) ray.wait([userActorHandle.join.remote(chatroomActorHandle=chatroomActorHandle)]) ret[username]= userActorHandlereturn retdefsend_chat_from_user(username:str,message:str): userActorHandle = ray.get_actor(username) userActorHandle.send_chat(message)if__name__=="__main__":# initialize ray with single machine ray.init(dashboard_host="0.0.0.0")# initialize ray with ray cluster# ray.init(dashboard_host="0.0.0.0", address="auto") chatroom_counter =0 user_actor_handle_dict ={}# key: username, value: UserActorHandle# We now create a chatroom with 3 user chatroom_counter +=1 user_actor_handle_dict.update(create_chatroom_and_users(useridOffset=len(user_actor_handle_dict), num_user=3)) time.sleep(1)try:whileTrue:print("""Action 1: Send a chat 'Hello world' from specific user Action 2: Create a new chatroom with 3 usersYour choice (1 or 2): """) in_action = sys.stdin.readline().split()[0]if in_action =="1":print(f"Please choose a user from 0 to {len(user_actor_handle_dict)-1}: ") in_user_idx = sys.stdin.readline().split()[0]# check if user existtry: user_idx =int(in_user_idx)except:print(f"'{in_user_idx}' is not a valid user index")continue userActorHandle = user_actor_handle_dict[f"user{user_idx}"] userActorHandle.send_chat.remote("Hello world") time.sleep(1)if in_action =="2": num_user =3 chatroom_counter +=1 user_actor_handle_dict.update(create_chatroom_and_users( useridOffset=len(user_actor_handle_dict), num_user=num_user))print(f"Finish creating new chatroom with {num_user} users") time.sleep(1)exceptKeyboardInterrupt:print('interrupted! shutdown ray')
The code allows us to create a new chatroom on your request. An important line is Line #9 (which assigns 0.5CPU for each new user created). Later on, if there are more chatrooms (3 users are created per chatroom), Ray can automatically provision more EC2 instances accordingly in order to smoothly scale out.