ray


Playing the QMIX Two-step game on Ray

We are trying to expand the code of the Two-step game (which is an example from the QMIX paper) using the Ray framework. The changes we want to apply should extract the best checkpoint from some trial of a tune.run(), restore it on a new QMixTrainer, and then use it on a new environment to compute the subsequent actions.

The code we tried to use is the following:

"""The two-step game from QMIX: https://arxiv.org/pdf/1803.11485.pdf

Configurations you can try:
    - normal policy gradients (PG)
    - contrib/MADDPG
    - QMIX

See also: centralized_critic.py for centralized critic PPO on this game.
"""

import argparse
from gym.spaces import Tuple, MultiDiscrete, Dict, Discrete
import os

import ray
from ray import tune
from ray.rllib.agents.qmix import QMixTrainer
from ray.tune import register_env, grid_search
from ray.rllib.env.multi_agent_env import ENV_STATE
from ray.rllib.examples.env.two_step_game import TwoStepGame
from ray.rllib.utils.test_utils import check_learning_achieved

import numpy as np

parser = argparse.ArgumentParser()
parser.add_argument("--run", type=str, default="QMIX")
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument("--as-test", action="store_true")
parser.add_argument("--torch", action="store_true")
parser.add_argument("--stop-reward", type=float, default=7.0)
parser.add_argument("--stop-timesteps", type=int, default=50000)

if __name__ == "__main__":
    args = parser.parse_args()

    grouping = {
        "group_1": [0, 1],
    }
    obs_space = Tuple([
        Dict({
            "obs": MultiDiscrete([2, 2, 2, 3]),
            ENV_STATE: MultiDiscrete([2, 2, 2])
        }),
        Dict({
            "obs": MultiDiscrete([2, 2, 2, 3]),
            ENV_STATE: MultiDiscrete([2, 2, 2])
        }),
    ])
    act_space = Tuple([
        TwoStepGame.action_space,
        TwoStepGame.action_space,
    ])
    register_env(
        "grouped_twostep",
        lambda config: TwoStepGame(config).with_agent_groups(
            grouping, obs_space=obs_space, act_space=act_space))

    if args.run == "contrib/MADDPG":
        obs_space_dict = {
            "agent_1": Discrete(6),
            "agent_2": Discrete(6),
        }
        act_space_dict = {
            "agent_1": TwoStepGame.action_space,
            "agent_2": TwoStepGame.action_space,
        }
        config = {
            "learning_starts": 100,
            "env_config": {
                "actions_are_logits": True,
            },
            "multiagent": {
                "policies": {
                    "pol1": (None, Discrete(6), TwoStepGame.action_space, {
                        "agent_id": 0,
                    }),
                    "pol2": (None, Discrete(6), TwoStepGame.action_space, {
                        "agent_id": 1,
                    }),
                },
                "policy_mapping_fn": lambda x: "pol1" if x == 0 else "pol2",
            },
            "framework": "torch" if args.torch else "tf",
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        }
        group = False
    elif args.run == "QMIX":
        config = {
            "rollout_fragment_length": 4,
            "train_batch_size": 32,
            "exploration_config": {
                "epsilon_timesteps": 5000,
                "final_epsilon": 0.05,
            },
            "num_workers": 0,
            "mixer": grid_search([None, "qmix", "vdn"]),
            "env_config": {
                "separate_state_space": True,
                "one_hot_state_encoding": True
            },
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
            "framework": "torch" if args.torch else "tf",
        }
        group = True
    else:
        config = {
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
            "framework": "torch" if args.torch else "tf",
        }
        group = False

    ray.init(num_cpus=args.num_cpus or None)

    stop = {
        "episode_reward_mean": args.stop_reward,
        "timesteps_total": args.stop_timesteps,
    }

    config = dict(config, **{
        "env": "grouped_twostep" if group else TwoStepGame,
    })

    results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)

    if args.as_test:
        check_learning_achieved(results, args.stop_reward)

    best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
    print(f".. best checkpoint was: {best_checkpoint}")

    env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
    obs = env.reset()

    rllib_config = config.copy()
    rllib_config["mixer"] = "qmix"
    new_trainer = QMixTrainer(config=rllib_config)
    new_trainer.restore(best_checkpoint)

    a1 = new_trainer.compute_action(observation=obs['group_1'])
    a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))

    ray.shutdown()

To make it easier for you to see the changes from the original, this is the patch of the changes:

Index: main.py

<+>UTF-8
===================================================================
diff --git a/main.py b/main.py
--- a/main.py	(revision 80b3473ef3eede5f94e4805797556940bee91bc8)
+++ b/main.py	(date 1637485442837)
@@ -14,13 +14,16 @@
 
 import ray
 from ray import tune
+from ray.rllib.agents.qmix import QMixTrainer
 from ray.tune import register_env, grid_search
 from ray.rllib.env.multi_agent_env import ENV_STATE
 from ray.rllib.examples.env.two_step_game import TwoStepGame
 from ray.rllib.utils.test_utils import check_learning_achieved
 
+import numpy as np
+
 parser = argparse.ArgumentParser()
-parser.add_argument("--run", type=str, default="PG")
+parser.add_argument("--run", type=str, default="QMIX")
 parser.add_argument("--num-cpus", type=int, default=0)
 parser.add_argument("--as-test", action="store_true")
 parser.add_argument("--torch", action="store_true")
@@ -120,9 +123,23 @@
         "env": "grouped_twostep" if group else TwoStepGame,
     })
 
-    results = tune.run(args.run, stop=stop, config=config, verbose=1)
+    results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)
 
     if args.as_test:
         check_learning_achieved(results, args.stop_reward)
 
+    best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
+    print(f".. best checkpoint was: {best_checkpoint}")
+
+    env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
+    obs = env.reset()
+
+    rllib_config = config.copy()
+    rllib_config["mixer"] = "qmix"
+    new_trainer = QMixTrainer(config=rllib_config)
+    new_trainer.restore(best_checkpoint)
+
+    a1 = new_trainer.compute_action(observation=obs['group_1'])
+    a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))
+
     ray.shutdown()

When we execute, we get the following errors:

a1 = new_trainer.compute_action(observation=obs['group_1'])

Produces:

ValueError: ('Observation ({}) outside given space ({})!', [0, 3], Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))
a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))

Produces:

ValueError: ('Observation ({}) outside given space ({})!', array([0, 3, 1]), Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))

We are currently trying to figure out how we should change the observation to get accepted by the check_shape() function of the preprocessor.

def check_shape(self, observation: Any) -> None:
"""Checks the shape of the given observation."""
if self._i % VALIDATION_INTERVAL == 0:
    if type(observation) is list and isinstance(
            self._obs_space, gym.spaces.Box):
        observation = np.array(observation)
    try:
        if not self._obs_space.contains(observation):
            raise ValueError(
                "Observation ({}) outside given space ({})!",
                observation, self._obs_space)
    except AttributeError:
        raise ValueError(
            "Observation for a Box/MultiBinary/MultiDiscrete space "
            "should be an np.array, not a Python list.", observation)
self._i += 1

When calling the check_shape() function, these are the values that are processed:

observation:
value = [0, 3]
type = <class 'list'>

self._obs_space:
value = Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])))
type = <class 'gym.spaces.tuple.Tuple'>

and this line fails:

if not self._obs_space.contains(observation)

Any positive feedback is welcome!


Rough notes on Ray

Just a bunch of snippets that we used to deploy a local Ray cluster. We couldn’t get the second node to connect to the cluster even though no error was given.

From https://www.anaconda.com/products/individual-b
 wget https://repo.anaconda.com/archive/Anaconda3-2021.05-Linux-x86_64.sh
 bash ./Anaconda3-2021.05-Linux-x86_64.sh
 source ~/anaconda3/bin/activate
 conda create --name ray.3.7 python=3.7.10;
 conda activate ray.3.7;
 conda install --name ray.3.7 pip;
 pip install ray==1.1.0
 pip install gym pandas torch ray ray[default] ray[rllib] ray[serve] ray[tune];

 ON WORKERS:
 From https://docs.docker.com/engine/install/ubuntu/
 sudo apt-get remove docker docker-engine docker.io containerd runc;
 sudo apt-get update;
 sudo apt-get install apt-transport-https ca-certificates curl gnupg lsb-release;
 curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
 echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
 sudo apt-get update
 sudo apt-get install docker-ce docker-ce-cli containerd.io
 sudo addgroup --system docker
 sudo adduser $USER docker
 newgrp docker
 sudo systemctl restart docker

 Uninstall Docker Engine
 Uninstall the Docker Engine, CLI, and Containerd packages:
 sudo apt-get purge docker-ce docker-ce-cli containerd.io
 Images, containers, volumes, or customized configuration files on your host are not automatically removed. To delete all images, containers, and volumes:
 sudo rm -rf /var/lib/docker
  sudo rm -rf /var/lib/containerd
 You must delete any edited configuration files manually.


 2021-06-09 03:56:34,453    WARNING services.py:1740 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 7963275264 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=10.24gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM.

 ON MASTER:
 ray up office.yaml
 ray up -vvvvvv office.yaml
 ray exec office.yaml 'ray status'

The yaml file that we used was the following

# A unique identifier for the head node and workers of this cluster.
cluster_name: default

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled. Assumes Docker is installed.
docker:
    image: "rayproject/ray-ml:1.1.0" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: True
    run_options: ["--shm-size=10.24gb"]  # Extra options to pass into "docker run"

provider:
    type: local
    head_ip: 192.168.1.14
    ## head_ip: YOUR_HEAD_NODE_HOSTNAME
    # You may need to supply a public ip for the head node if you need
    # to run `ray up` from outside of the Ray cluster's network
    # (e.g. the cluster is in an AWS VPC and you're starting ray from your laptop)
    # This is useful when debugging the local node provider with cloud VMs.
    # external_head_ip: YOUR_HEAD_PUBLIC_IP
    worker_ips: [192.168.1.70]
    ## worker_ips: [WORKER_NODE_1_HOSTNAME, WORKER_NODE_2_HOSTNAME, ... ]
    # Optional when running automatic cluster management on prem. If you use a coordinator server,
    # then you can launch multiple autoscaling clusters on the same set of machines, and the coordinator
    # will assign individual nodes to clusters as needed.
    #    coordinator_address: "<host>:<port>"
    # cache_stopped_nodes: False

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: tux
    #ssh_user: YOUR_USERNAME
    # Optional if an ssh private key is necessary to ssh to the cluster.
    ssh_private_key: ~/.ssh/id_rsa

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
# Typically, min_workers == max_workers == len(worker_ips).
min_workers: 20

# The maximum number of workers nodes to launch in addition to the head node.
# This takes precedence over min_workers.
# Typically, min_workers == max_workers == len(worker_ips).
max_workers: 20
# The default behavior for manually managed clusters is
# min_workers == max_workers == len(worker_ips),
# meaning that Ray is started on all available nodes of the cluster.
# For automatically managed clusters, max_workers is required and min_workers defaults to 0.

initial_workers: 20

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

idle_timeout_minutes: 5

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up each nodes.
setup_commands: []
    # Note: if you're developing Ray, you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - ray stop
    - ulimit -c unlimited && ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ray start --address=$RAY_HEAD_IP:6379

Do not use the snap version of docker for Ray on Ubuntu 20.04LTS

If you are trying to deploy a local Ray cluster on Ubuntu machines and you are getting the following error:

Shared connection to 192.168.1.74 closed.
     Running docker exec ray_container printenv HOME
       Full command is ssh -tt -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes -o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ControlMaster=auto -o ControlPath=/tmp/ray_ssh_71415f9f14/c21f969b5f/%C -o ControlPersist=10s -o ConnectTimeout=120s [email protected] bash --login -c -i 'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && (docker exec ray_container printenv HOME)'
 Shared connection to 192.168.1.74 closed.
     Running docker cp /tmp/ray_tmp_mount/default/~/ray_bootstrap_config.yaml ray_container:/home/ray/ray_bootstrap_config.yaml
       Full command is ssh -tt -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes -o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ControlMaster=auto -o ControlPath=/tmp/ray_ssh_71415f9f14/c21f969b5f/%C -o ControlPersist=10s -o ConnectTimeout=120s [email protected] bash --login -c -i 'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && (docker cp /tmp/ray_tmp_mount/default/~/ray_bootstrap_config.yaml ray_container:/home/ray/ray_bootstrap_config.yaml)'
 lstat /tmp/ray_tmp_mount/default/~: no such file or directory
 Shared connection to 192.168.1.74 closed.
 2021-06-09 11:41:09,299    INFO node_provider.py:93 -- ClusterState: Writing cluster state: ['192.168.1.70', '192.168.1.74']

You might need to consider removing the snap version of docker and follow the official instructions of docker.

# From https://docs.docker.com/engine/install/ubuntu/
sudo apt-get remove docker docker-engine docker.io containerd runc;
sudo apt-get update;
sudo apt-get install apt-transport-https ca-certificates curl gnupg lsb-release;

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io

sudo addgroup --system docker
sudo adduser $USER docker
newgrp docker
sudo systemctl restart docker