tune.run()


Playing the QMIX Two-step game on Ray

We are trying to expand the code of the Two-step game (which is an example from the QMIX paper) using the Ray framework. The changes we want to apply should extract the best checkpoint from some trial of a tune.run(), restore it on a new QMixTrainer, and then use it on a new environment to compute the subsequent actions.

The code we tried to use is the following:

"""The two-step game from QMIX: https://arxiv.org/pdf/1803.11485.pdf

Configurations you can try:
    - normal policy gradients (PG)
    - contrib/MADDPG
    - QMIX

See also: centralized_critic.py for centralized critic PPO on this game.
"""

import argparse
from gym.spaces import Tuple, MultiDiscrete, Dict, Discrete
import os

import ray
from ray import tune
from ray.rllib.agents.qmix import QMixTrainer
from ray.tune import register_env, grid_search
from ray.rllib.env.multi_agent_env import ENV_STATE
from ray.rllib.examples.env.two_step_game import TwoStepGame
from ray.rllib.utils.test_utils import check_learning_achieved

import numpy as np

parser = argparse.ArgumentParser()
parser.add_argument("--run", type=str, default="QMIX")
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument("--as-test", action="store_true")
parser.add_argument("--torch", action="store_true")
parser.add_argument("--stop-reward", type=float, default=7.0)
parser.add_argument("--stop-timesteps", type=int, default=50000)

if __name__ == "__main__":
    args = parser.parse_args()

    grouping = {
        "group_1": [0, 1],
    }
    obs_space = Tuple([
        Dict({
            "obs": MultiDiscrete([2, 2, 2, 3]),
            ENV_STATE: MultiDiscrete([2, 2, 2])
        }),
        Dict({
            "obs": MultiDiscrete([2, 2, 2, 3]),
            ENV_STATE: MultiDiscrete([2, 2, 2])
        }),
    ])
    act_space = Tuple([
        TwoStepGame.action_space,
        TwoStepGame.action_space,
    ])
    register_env(
        "grouped_twostep",
        lambda config: TwoStepGame(config).with_agent_groups(
            grouping, obs_space=obs_space, act_space=act_space))

    if args.run == "contrib/MADDPG":
        obs_space_dict = {
            "agent_1": Discrete(6),
            "agent_2": Discrete(6),
        }
        act_space_dict = {
            "agent_1": TwoStepGame.action_space,
            "agent_2": TwoStepGame.action_space,
        }
        config = {
            "learning_starts": 100,
            "env_config": {
                "actions_are_logits": True,
            },
            "multiagent": {
                "policies": {
                    "pol1": (None, Discrete(6), TwoStepGame.action_space, {
                        "agent_id": 0,
                    }),
                    "pol2": (None, Discrete(6), TwoStepGame.action_space, {
                        "agent_id": 1,
                    }),
                },
                "policy_mapping_fn": lambda x: "pol1" if x == 0 else "pol2",
            },
            "framework": "torch" if args.torch else "tf",
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        }
        group = False
    elif args.run == "QMIX":
        config = {
            "rollout_fragment_length": 4,
            "train_batch_size": 32,
            "exploration_config": {
                "epsilon_timesteps": 5000,
                "final_epsilon": 0.05,
            },
            "num_workers": 0,
            "mixer": grid_search([None, "qmix", "vdn"]),
            "env_config": {
                "separate_state_space": True,
                "one_hot_state_encoding": True
            },
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
            "framework": "torch" if args.torch else "tf",
        }
        group = True
    else:
        config = {
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
            "framework": "torch" if args.torch else "tf",
        }
        group = False

    ray.init(num_cpus=args.num_cpus or None)

    stop = {
        "episode_reward_mean": args.stop_reward,
        "timesteps_total": args.stop_timesteps,
    }

    config = dict(config, **{
        "env": "grouped_twostep" if group else TwoStepGame,
    })

    results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)

    if args.as_test:
        check_learning_achieved(results, args.stop_reward)

    best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
    print(f".. best checkpoint was: {best_checkpoint}")

    env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
    obs = env.reset()

    rllib_config = config.copy()
    rllib_config["mixer"] = "qmix"
    new_trainer = QMixTrainer(config=rllib_config)
    new_trainer.restore(best_checkpoint)

    a1 = new_trainer.compute_action(observation=obs['group_1'])
    a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))

    ray.shutdown()

To make it easier for you to see the changes from the original, this is the patch of the changes:

Index: main.py

<+>UTF-8
===================================================================
diff --git a/main.py b/main.py
--- a/main.py	(revision 80b3473ef3eede5f94e4805797556940bee91bc8)
+++ b/main.py	(date 1637485442837)
@@ -14,13 +14,16 @@
 
 import ray
 from ray import tune
+from ray.rllib.agents.qmix import QMixTrainer
 from ray.tune import register_env, grid_search
 from ray.rllib.env.multi_agent_env import ENV_STATE
 from ray.rllib.examples.env.two_step_game import TwoStepGame
 from ray.rllib.utils.test_utils import check_learning_achieved
 
+import numpy as np
+
 parser = argparse.ArgumentParser()
-parser.add_argument("--run", type=str, default="PG")
+parser.add_argument("--run", type=str, default="QMIX")
 parser.add_argument("--num-cpus", type=int, default=0)
 parser.add_argument("--as-test", action="store_true")
 parser.add_argument("--torch", action="store_true")
@@ -120,9 +123,23 @@
         "env": "grouped_twostep" if group else TwoStepGame,
     })
 
-    results = tune.run(args.run, stop=stop, config=config, verbose=1)
+    results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)
 
     if args.as_test:
         check_learning_achieved(results, args.stop_reward)
 
+    best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
+    print(f".. best checkpoint was: {best_checkpoint}")
+
+    env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
+    obs = env.reset()
+
+    rllib_config = config.copy()
+    rllib_config["mixer"] = "qmix"
+    new_trainer = QMixTrainer(config=rllib_config)
+    new_trainer.restore(best_checkpoint)
+
+    a1 = new_trainer.compute_action(observation=obs['group_1'])
+    a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))
+
     ray.shutdown()

When we execute, we get the following errors:

a1 = new_trainer.compute_action(observation=obs['group_1'])

Produces:

ValueError: ('Observation ({}) outside given space ({})!', [0, 3], Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))
a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))

Produces:

ValueError: ('Observation ({}) outside given space ({})!', array([0, 3, 1]), Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))

We are currently trying to figure out how we should change the observation to get accepted by the check_shape() function of the preprocessor.

def check_shape(self, observation: Any) -> None:
"""Checks the shape of the given observation."""
if self._i % VALIDATION_INTERVAL == 0:
    if type(observation) is list and isinstance(
            self._obs_space, gym.spaces.Box):
        observation = np.array(observation)
    try:
        if not self._obs_space.contains(observation):
            raise ValueError(
                "Observation ({}) outside given space ({})!",
                observation, self._obs_space)
    except AttributeError:
        raise ValueError(
            "Observation for a Box/MultiBinary/MultiDiscrete space "
            "should be an np.array, not a Python list.", observation)
self._i += 1

When calling the check_shape() function, these are the values that are processed:

observation:
value = [0, 3]
type = <class 'list'>

self._obs_space:
value = Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])))
type = <class 'gym.spaces.tuple.Tuple'>

and this line fails:

if not self._obs_space.contains(observation)

Any positive feedback is welcome!