Write your distributed program on Ray

Ray is a distributed framework like Spark. We can use Ray through python or java programming.

Prepare the environment

We have prepared a docker image with Ray installed. If you don't have docker engine installed on your own machine, please refer to https://docs.docker.com/get-docker.

Launch a docker container with our image tigerchang/raylab:latest

# if you are using your own machine
docker run -it -p 8265:8265 tigerchang/raylab:latest

# if you are using node-1.cse.cuhk.edu.hk
docker run -it -p 8265:8265 tigerchang/raylab:with-proxy

Note: If you don't have docker installed in your own machine (though we recommend that), you can also use the server node-1.cse.cuhk.edu.hk provided by us, which has docker installed.

Note that the command may fail because port 8265 may be in use by other students. In this case, try another port between 1024 ~ 65535

First, we begin with a Ray program, to be executed on a local machine.

Create a new folder 'ray-lab' under your home directory and change the directory to 'ray-lab'

mkdir ~/ray-lab && cd ~/ray-lab

Create a new file 'main.py' with the following content.

vim main.py
import platform
import sys
import time

import ray
from ray.actor import ActorHandle

# We define that 0.5 core of CPU to serve 1 UserActor
@ray.remote(num_cpus=0.5)
class UserActor:
    def __init__(self, username:str):
        self.name = username
        self.chatroomActorHandle:ActorHandle = None
        self.hostname = platform.node()
        self.selfActorHandle = ray.get_actor(username)
        print(f"Created UserActor for user {self.name}")

    def join(self, chatroomActorHandle:ActorHandle):
        print(f"{self.name} join chatroom {chatroomActorHandle._ray_actor_id}")

        self.chatroomActorHandle = chatroomActorHandle
        ray.wait([chatroomActorHandle.join.remote(userActorHandle=self.selfActorHandle)])

    def receive_chat(self, msg:str):
        print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, receive message: '{msg}'")

    def send_chat(self, msg:str):
        self.selfActorHandle = ray.get_actor(self.name)

        print(f"I am {self.name}. Now in Ray worker node `{self.hostname}`, sending message: '{msg}'")
        self.chatroomActorHandle.send_chat.remote(senderActorHandle=self.selfActorHandle, msg=msg)

@ray.remote
class ChatRoomActor:

    def __init__(self):
        self.memberUserActorHandles = []

    def send_chat(self, senderActorHandle:ActorHandle, msg:str):
        print(f"Receive chat from {senderActorHandle}, msg: {msg} forwarding to all group members")
        # Forward the message except the sender
        for userActorRef in self.memberUserActorHandles:
            if userActorRef._ray_actor_id == senderActorHandle._ray_actor_id:
                continue
            userActorRef.receive_chat.remote(msg=msg)

    def join(self, userActorHandle:ActorHandle):
        self.memberUserActorHandles.append(userActorHandle)


# A helper function to print the hardware resources of Ray cluster
def print_cluster_resource():
    print('''This cluster consists of
            {} nodes in total
            {} CPU resources in total
        '''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))


# Create 1 chatroom with 3 users
def create_chatroom_and_users(useridOffset:int, num_user=3):
    ret = {}
    chatroomActorHandle = ChatRoomActor.remote()

    for idx in range(num_user):
        username = f"user{idx+useridOffset}"
        userActorHandle = UserActor.options(name=username).remote(username=username)
        ray.wait([userActorHandle.join.remote(chatroomActorHandle=chatroomActorHandle)])
        ret[username] = userActorHandle

    return ret

def send_chat_from_user(username:str, message:str):
    userActorHandle = ray.get_actor(username)
    userActorHandle.send_chat(message)


if __name__ == "__main__":
    # initialize ray with single machine
    ray.init(dashboard_host="0.0.0.0")
    # initialize ray with ray cluster
    # ray.init(dashboard_host="0.0.0.0", address="auto")

    chatroom_counter = 0
    user_actor_handle_dict = {} # key: username, value: UserActorHandle

    # We now create a chatroom with 3 user
    chatroom_counter += 1
    user_actor_handle_dict.update(create_chatroom_and_users(useridOffset=len(user_actor_handle_dict), num_user=3))

    time.sleep(1)

    try:
        while True:
            print("""Action 1: Send a chat 'Hello world' from specific user 
Action 2: Create a new chatroom with 3 users
Your choice (1 or 2): """)
            in_action = sys.stdin.readline().split()[0]

            if in_action == "1":
                print(f"Please choose a user from 0 to {len(user_actor_handle_dict)-1}: ")
                in_user_idx = sys.stdin.readline().split()[0]

                # check if user exist
                try:
                    user_idx = int(in_user_idx)
                except:
                    print(f"'{in_user_idx}' is not a valid user index")
                    continue

                userActorHandle = user_actor_handle_dict[f"user{user_idx}"]
                userActorHandle.send_chat.remote("Hello world")

                time.sleep(1)
            if in_action == "2":
                num_user = 3
                chatroom_counter += 1
                user_actor_handle_dict.update(create_chatroom_and_users(
                    useridOffset=len(user_actor_handle_dict), num_user=num_user))
                print(f"Finish creating new chatroom with {num_user} users")
                time.sleep(1)


    except KeyboardInterrupt:
        print('interrupted! shutdown ray')

The code allows us to create a new chatroom on your request. An important line is Line #9 (which assigns 0.5CPU for each new user created). Later on, if there are more chatrooms (3 users are created per chatroom), Ray can automatically provision more EC2 instances accordingly in order to smoothly scale out.

Last updated