Skip to content

Commit 8c3a69e

Browse files
author
Jonathan Harper
committed
Add timeout for communicator exchange
When we initially connect to the environment using RPCCommunicator, the connection is polled so we don't hang forever on `.recv()` when the environment wasn't launched or failed. However we don't currently have any similar check for the exchanges mid-training-run. This change applies the same timeout from initialization to each exchange, and extends the default `timeout_wait` to 60 seconds to generally improve the chances we won't have a mismatch between environment launch time and the trainer timeout. Tested on: single-env and multi-env cases. Killed 1 environment process manually and saw that the model was saved appropriately and all processes closed.
1 parent dc8997b commit 8c3a69e

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

ml-agents-envs/mlagents/envs/environment.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151
seed: int = 0,
5252
docker_training: bool = False,
5353
no_graphics: bool = False,
54-
timeout_wait: int = 30,
54+
timeout_wait: int = 60,
5555
args: Optional[List[str]] = None,
5656
):
5757
"""

ml-agents-envs/mlagents/envs/rpc_communicator.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,22 @@ def check_port(self, port):
8282
finally:
8383
s.close()
8484

85-
def initialize(self, inputs: UnityInputProto) -> UnityOutputProto:
85+
def poll_for_timeout(self):
86+
"""
87+
Polls the GRPC parent connection for data, to be used before calling recv. This prevents
88+
us from hanging indefinitely in the case where the environment process has died or was not
89+
launched.
90+
"""
8691
if not self.unity_to_external.parent_conn.poll(self.timeout_wait):
8792
raise UnityTimeOutException(
8893
"The Unity environment took too long to respond. Make sure that :\n"
8994
"\t The environment does not need user interaction to launch\n"
9095
"\t The Agents are linked to the appropriate Brains\n"
9196
"\t The environment and the Python interface have compatible versions."
9297
)
98+
99+
def initialize(self, inputs: UnityInputProto) -> UnityOutputProto:
100+
self.poll_for_timeout()
93101
aca_param = self.unity_to_external.parent_conn.recv().unity_output
94102
message = UnityMessageProto()
95103
message.header.status = 200
@@ -103,6 +111,7 @@ def exchange(self, inputs: UnityInputProto) -> Optional[UnityOutputProto]:
103111
message.header.status = 200
104112
message.unity_input.CopyFrom(inputs)
105113
self.unity_to_external.parent_conn.send(message)
114+
self.poll_for_timeout()
106115
output = self.unity_to_external.parent_conn.recv()
107116
if output.header.status != 200:
108117
return None

ml-agents-envs/mlagents/envs/subprocess_env_manager.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import cloudpickle
44

55
from mlagents.envs.environment import UnityEnvironment
6-
from mlagents.envs.exception import UnityCommunicationException
6+
from mlagents.envs.exception import UnityCommunicationException, UnityTimeOutException
77
from multiprocessing import Process, Pipe, Queue
88
from multiprocessing.connection import Connection
99
from queue import Empty as EmptyQueueException
@@ -116,7 +116,7 @@ def _send_response(cmd_name, payload):
116116
_send_response("reset", all_brain_info)
117117
elif cmd.name == "close":
118118
break
119-
except (KeyboardInterrupt, UnityCommunicationException):
119+
except (KeyboardInterrupt, UnityCommunicationException, UnityTimeOutException):
120120
logger.info(f"UnityEnvironment worker {worker_id}: environment stopping.")
121121
step_queue.put(EnvironmentResponse("env_close", worker_id, None))
122122
finally:

0 commit comments

Comments
 (0)