We are trying to expand the code of the Two-step game (which is an example from the QMIX paper) using the Ray framework
. The changes we want to apply should extract the best checkpoint from some trial of a tune.run()
, restore it on a new QMixTrainer
, and then use it on a new environment to compute the subsequent actions.
The code we tried to use is the following:
"""The two-step game from QMIX: https://arxiv.org/pdf/1803.11485.pdf
Configurations you can try:
- normal policy gradients (PG)
- contrib/MADDPG
- QMIX
See also: centralized_critic.py for centralized critic PPO on this game.
"""
import argparse
from gym.spaces import Tuple, MultiDiscrete, Dict, Discrete
import os
import ray
from ray import tune
from ray.rllib.agents.qmix import QMixTrainer
from ray.tune import register_env, grid_search
from ray.rllib.env.multi_agent_env import ENV_STATE
from ray.rllib.examples.env.two_step_game import TwoStepGame
from ray.rllib.utils.test_utils import check_learning_achieved
import numpy as np
parser = argparse.ArgumentParser()
parser.add_argument("--run", type=str, default="QMIX")
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument("--as-test", action="store_true")
parser.add_argument("--torch", action="store_true")
parser.add_argument("--stop-reward", type=float, default=7.0)
parser.add_argument("--stop-timesteps", type=int, default=50000)
if __name__ == "__main__":
args = parser.parse_args()
grouping = {
"group_1": [0, 1],
}
obs_space = Tuple([
Dict({
"obs": MultiDiscrete([2, 2, 2, 3]),
ENV_STATE: MultiDiscrete([2, 2, 2])
}),
Dict({
"obs": MultiDiscrete([2, 2, 2, 3]),
ENV_STATE: MultiDiscrete([2, 2, 2])
}),
])
act_space = Tuple([
TwoStepGame.action_space,
TwoStepGame.action_space,
])
register_env(
"grouped_twostep",
lambda config: TwoStepGame(config).with_agent_groups(
grouping, obs_space=obs_space, act_space=act_space))
if args.run == "contrib/MADDPG":
obs_space_dict = {
"agent_1": Discrete(6),
"agent_2": Discrete(6),
}
act_space_dict = {
"agent_1": TwoStepGame.action_space,
"agent_2": TwoStepGame.action_space,
}
config = {
"learning_starts": 100,
"env_config": {
"actions_are_logits": True,
},
"multiagent": {
"policies": {
"pol1": (None, Discrete(6), TwoStepGame.action_space, {
"agent_id": 0,
}),
"pol2": (None, Discrete(6), TwoStepGame.action_space, {
"agent_id": 1,
}),
},
"policy_mapping_fn": lambda x: "pol1" if x == 0 else "pol2",
},
"framework": "torch" if args.torch else "tf",
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
}
group = False
elif args.run == "QMIX":
config = {
"rollout_fragment_length": 4,
"train_batch_size": 32,
"exploration_config": {
"epsilon_timesteps": 5000,
"final_epsilon": 0.05,
},
"num_workers": 0,
"mixer": grid_search([None, "qmix", "vdn"]),
"env_config": {
"separate_state_space": True,
"one_hot_state_encoding": True
},
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
"framework": "torch" if args.torch else "tf",
}
group = True
else:
config = {
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
"framework": "torch" if args.torch else "tf",
}
group = False
ray.init(num_cpus=args.num_cpus or None)
stop = {
"episode_reward_mean": args.stop_reward,
"timesteps_total": args.stop_timesteps,
}
config = dict(config, **{
"env": "grouped_twostep" if group else TwoStepGame,
})
results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True)
if args.as_test:
check_learning_achieved(results, args.stop_reward)
best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max")
print(f".. best checkpoint was: {best_checkpoint}")
env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space)
obs = env.reset()
rllib_config = config.copy()
rllib_config["mixer"] = "qmix"
new_trainer = QMixTrainer(config=rllib_config)
new_trainer.restore(best_checkpoint)
a1 = new_trainer.compute_action(observation=obs['group_1'])
a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))
ray.shutdown()
To make it easier for you to see the changes from the original, this is the patch of the changes:
Index: main.py <+>UTF-8 =================================================================== diff --git a/main.py b/main.py --- a/main.py (revision 80b3473ef3eede5f94e4805797556940bee91bc8) +++ b/main.py (date 1637485442837) @@ -14,13 +14,16 @@ import ray from ray import tune +from ray.rllib.agents.qmix import QMixTrainer from ray.tune import register_env, grid_search from ray.rllib.env.multi_agent_env import ENV_STATE from ray.rllib.examples.env.two_step_game import TwoStepGame from ray.rllib.utils.test_utils import check_learning_achieved +import numpy as np + parser = argparse.ArgumentParser() -parser.add_argument("--run", type=str, default="PG") +parser.add_argument("--run", type=str, default="QMIX") parser.add_argument("--num-cpus", type=int, default=0) parser.add_argument("--as-test", action="store_true") parser.add_argument("--torch", action="store_true") @@ -120,9 +123,23 @@ "env": "grouped_twostep" if group else TwoStepGame, }) - results = tune.run(args.run, stop=stop, config=config, verbose=1) + results = tune.run(args.run, stop=stop, config=config, verbose=1, checkpoint_freq=1, checkpoint_at_end=True) if args.as_test: check_learning_achieved(results, args.stop_reward) + best_checkpoint = results.get_best_checkpoint(results.trials[0], mode="max") + print(f".. best checkpoint was: {best_checkpoint}") + + env = TwoStepGame(config).with_agent_groups(grouping, obs_space=obs_space, act_space=act_space) + obs = env.reset() + + rllib_config = config.copy() + rllib_config["mixer"] = "qmix" + new_trainer = QMixTrainer(config=rllib_config) + new_trainer.restore(best_checkpoint) + + a1 = new_trainer.compute_action(observation=obs['group_1']) + a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]])) + ray.shutdown()
When we execute, we get the following errors:
a1 = new_trainer.compute_action(observation=obs['group_1'])
Produces:
ValueError: ('Observation ({}) outside given space ({})!', [0, 3], Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))
a2 = new_trainer.compute_action(observation=np.concatenate([obs['group_1'], [1]]))
Produces:
ValueError: ('Observation ({}) outside given space ({})!', array([0, 3, 1]), Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))))
We are currently trying to figure out how we should change the observation to get accepted by the check_shape()
function of the preprocessor.
def check_shape(self, observation: Any) -> None:
"""Checks the shape of the given observation."""
if self._i % VALIDATION_INTERVAL == 0:
if type(observation) is list and isinstance(
self._obs_space, gym.spaces.Box):
observation = np.array(observation)
try:
if not self._obs_space.contains(observation):
raise ValueError(
"Observation ({}) outside given space ({})!",
observation, self._obs_space)
except AttributeError:
raise ValueError(
"Observation for a Box/MultiBinary/MultiDiscrete space "
"should be an np.array, not a Python list.", observation)
self._i += 1
When calling the check_shape()
function, these are the values that are processed:
observation: value = [0, 3] type = <class 'list'> self._obs_space: value = Tuple(Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2])), Dict(obs:MultiDiscrete([2 2 2 3]), state:MultiDiscrete([2 2 2]))) type = <class 'gym.spaces.tuple.Tuple'>
and this line fails:
if not self._obs_space.contains(observation)
Any positive feedback is welcome!
This post is also available in: Αγγλικα