two-step game


Playing the QMIX Two-step game on Ray

We are trying to expand the code of the Two-step game (which is an example from the QMIX paper) using the Ray framework. The changes we want to apply should extract the best checkpoint from some trial of a tune.run(), restore it on a new QMixTrainer, and then use it on a new environment to compute the subsequent actions.

The code we tried to use is the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""The two-step game from QMIX: https://arxiv.org/pdf/1803.11485.pdf
 
Configurations you can try:
    - normal policy gradients (PG)
    - contrib/MADDPG
    - QMIX
 
See also: centralized_critic.py for centralized critic PPO on this game.
"""
 
import argparse
from gym.spaces import Tuple, MultiDiscrete, Dict, Discrete
import os
 
import ray
from ray import tune
from ray.rllib.agents.qmix import QMixTrainer
from ray.tune import register_env, grid_search
from ray.rllib.env.multi_agent_env import ENV_STATE
from ray.rllib.examples.env.two_step_game import TwoStepGame
from ray.rllib.utils.test_utils import check_learning_achieved
 
import numpy as np
 
parser = argparse.ArgumentParser()
parser.add_argument("--run", type=str, default="QMIX")
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument("--as-test", action="store_true")
parser.add_argument("--torch", action="store_true")
parser.add_argument("--stop-reward", type=float, default=7.0)
parser.add_argument("--stop-timesteps", type=int, default=50000)
 
if __name__ == "__main__":
    args = parser.parse_args()
 
    grouping = {
        "group_1": [0, 1],
    }
    obs_space = Tuple([
        Dict({
            "obs": MultiDiscrete([2, 2, 2, 3]),
            ENV_STATE: MultiDiscrete([2, 2, 2])
        }),
        Dict({
            "obs": MultiDiscrete([2, 2, 2, 3]),
            ENV_STATE: MultiDiscrete([2, 2, 2])
        }),
    ])
    act_space = Tuple([
        TwoStepGame.action_space,
        TwoStepGame.action_space,
    ])
    register_env(
        "grouped_twostep",
        lambda config: TwoStepGame(config).with_agent_groups(
            grouping, obs_space=obs_space, act_space=act_space))
 
    if args.run == "contrib/MADDPG":
        obs_space_dict = {
            "agent_1": Discrete(6),
            "agent_2": Discrete(6),
        }
        act_space_dict = {
            "agent_1": TwoStepGame.action_space,
            "agent_2": TwoStepGame.action_space,
        }
        config = {
            "learning_starts": 100,
            "env_config": {
                "actions_are_logits": True,
            },
            "multiagent": {
                "policies": {
                    "pol1": (None, Discrete(6), TwoStepGame.action_space, {
                        "agent_id": 0,
                    }),
                    "pol2": (None, Discrete(6), TwoStepGame.action_space, {
                        "agent_id": 1,
                    }),
                },
                "policy_mapping_fn": lambda x: "pol1" if x == 0 else "pol2",
            },
            "framework": "torch" if args.torch else "tf",
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        }
        group = False
    elif args.run == "QMIX":
        config = {
            "rollout_fragment_length": 4,
            "train_batch_size": 32,
            "exploration_config": {
                "epsilon_timesteps": 5000,
                "final_epsilon": 0.05,
            },
            "num_workers": 0,
            "mixer": grid_search([None, "qmix", "vdn"]),
            "env_config": {
                "separate_state_space": True,
                "one_hot_state_encoding": True
            },
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
            "framework": "torch" if args.torch else "tf",
        }
        group = True
    else:
        config = {
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
            "framework": "torch" if args.torch else "tf",
        }
        group = False
 
    ray.init(num_cpus=args.num_cpus or None)
 
    stop = {
        "episode_reward_mean": args.stop_reward,
        "timesteps_total": args.stop_timesteps,
    }
 
    config = dict(config, **{
        "env": "grouped_twostep" if group else TwoStepGame,
    })
 
    results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)
 
    if args.as_test:
        check_learning_achieved(results, args.stop_reward)
 
    best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
    print(f".. best checkpoint was: {best_checkpoint}")
 
    env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
    obs = env.reset()
 
    rllib_config = config.copy()
    rllib_config["mixer"] = "qmix"
    new_trainer = QMixTrainer(config=rllib_config)
    new_trainer.restore(best_checkpoint)
 
    a1 = new_trainer.compute_action(observation=obs['group_1'])
    a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))
 
    ray.shutdown()

To make it easier for you to see the changes from the original, this is the patch of the changes:

Index: main.py

<+>UTF-8
===================================================================
diff --git a/main.py b/main.py
--- a/main.py	(revision 80b3473ef3eede5f94e4805797556940bee91bc8)
+++ b/main.py	(date 1637485442837)
@@ -14,13 +14,16 @@
 
 import ray
 from ray import tune
+from ray.rllib.agents.qmix import QMixTrainer
 from ray.tune import register_env, grid_search
 from ray.rllib.env.multi_agent_env import ENV_STATE
 from ray.rllib.examples.env.two_step_game import TwoStepGame
 from ray.rllib.utils.test_utils import check_learning_achieved
 
+import numpy as np
+
 parser = argparse.ArgumentParser()
-parser.add_argument("--run", type=str, default="PG")
+parser.add_argument("--run", type=str, default="QMIX")
 parser.add_argument("--num-cpus", type=int, default=0)
 parser.add_argument("--as-test", action="store_true")
 parser.add_argument("--torch", action="store_true")
@@ -120,9 +123,23 @@
         "env": "grouped_twostep" if group else TwoStepGame,
     })
 
-    results = tune.run(args.run, stop=stop, config=config, verbose=1)
+    results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)
 
     if args.as_test:
         check_learning_achieved(results, args.stop_reward)
 
+    best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
+    print(f".. best checkpoint was: {best_checkpoint}")
+
+    env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
+    obs = env.reset()
+
+    rllib_config = config.copy()
+    rllib_config["mixer"] = "qmix"
+    new_trainer = QMixTrainer(config=rllib_config)
+    new_trainer.restore(best_checkpoint)
+
+    a1 = new_trainer.compute_action(observation=obs['group_1'])
+    a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))
+
     ray.shutdown()

When we execute, we get the following errors:

1
a1 = new_trainer.compute_action(observation=obs['group_1'])

Produces:

ValueError: ('Observation ({}) outside given space ({})!', [0, 3], Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))
1
a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))

Produces:

ValueError: ('Observation ({}) outside given space ({})!', array([0, 3, 1]), Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))

We are currently trying to figure out how we should change the observation to get accepted by the check_shape() function of the preprocessor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def check_shape(self, observation: Any) -> None:
"""Checks the shape of the given observation."""
if self._i % VALIDATION_INTERVAL == 0:
    if type(observation) is list and isinstance(
            self._obs_space, gym.spaces.Box):
        observation = np.array(observation)
    try:
        if not self._obs_space.contains(observation):
            raise ValueError(
                "Observation ({}) outside given space ({})!",
                observation, self._obs_space)
    except AttributeError:
        raise ValueError(
            "Observation for a Box/MultiBinary/MultiDiscrete space "
            "should be an np.array, not a Python list.", observation)
self._i += 1

When calling the check_shape() function, these are the values that are processed:

observation:
value = [0, 3]
type = <class 'list'>

self._obs_space:
value = Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])))
type = <class 'gym.spaces.tuple.Tuple'>

and this line fails:

1
if not self._obs_space.contains(observation)

Any positive feedback is welcome!