Skip to content

Communicator factory #4965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions com.unity.ml-agents/Runtime/Academy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,7 @@ void InitializeEnvironment()
var port = ReadPortFromArgs();
if (port > 0)
{
Communicator = new RpcCommunicator(
new CommunicatorInitParameters
{
port = port
}
);
Communicator = CommunicatorFactory.Create();
}

if (Communicator != null)
Expand All @@ -438,6 +433,7 @@ void InitializeEnvironment()
bool initSuccessful = false;
var communicatorInitParams = new CommunicatorInitParameters
{
port = port,
unityCommunicationVersion = k_ApiVersion,
unityPackageVersion = k_PackageVersion,
name = "AcademySingleton",
Expand Down
35 changes: 35 additions & 0 deletions com.unity.ml-agents/Runtime/Communicator/CommunicatorFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace Unity.MLAgents
{
/// <summary>
/// Factory class for an ICommunicator instance. This is used to the <see cref="Academy"/> at startup.
/// By default, on desktop platforms, an ICommunicator will be created and attempt to connect
/// to a trainer. This behavior can be prevented by setting <see cref="CommunicatorFactory.Enabled"/> to false
/// *before* the <see cref="Academy"/> is initialized.
/// </summary>
public static class CommunicatorFactory
{
static bool s_Enabled = true;

/// <summary>
/// Whether or not an ICommunicator instance will be created when the <see cref="Academy"/> is initialized.
/// Changing this has no effect after the <see cref="Academy"/> has already been initialized.
/// </summary>
public static bool Enabled
{
get => s_Enabled;
set => s_Enabled = value;
}

internal static ICommunicator Create()
{
#if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
if (s_Enabled)
{
return new RpcCommunicator();
}
#endif
// Non-desktop or disabled
return null;
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 14 additions & 35 deletions com.unity.ml-agents/Runtime/Communicator/RpcCommunicator.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
#if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
using Grpc.Core;
#endif
#if UNITY_EDITOR
using UnityEditor;
#endif
Expand Down Expand Up @@ -44,23 +43,17 @@ internal class RpcCommunicator : ICommunicator
Dictionary<string, ActionSpec> m_UnsentBrainKeys = new Dictionary<string, ActionSpec>();


#if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
/// The Unity to External client.
UnityToExternalProto.UnityToExternalProtoClient m_Client;
#endif
/// The communicator parameters sent at construction
CommunicatorInitParameters m_CommunicatorInitParameters;

/// <summary>
/// Initializes a new instance of the RPCCommunicator class.
/// </summary>
/// <param name="communicatorInitParameters">Communicator parameters.</param>
public RpcCommunicator(CommunicatorInitParameters communicatorInitParameters)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't pass these and don't store them anymore. They were only used for the port; that's now passed to Initialize()

public RpcCommunicator()
{
m_CommunicatorInitParameters = communicatorInitParameters;
}

#region Initialization
#region Initialization

internal static bool CheckCommunicationVersionsAreCompatible(
string unityCommunicationVersion,
Expand Down Expand Up @@ -110,6 +103,7 @@ public bool Initialize(CommunicatorInitParameters initParameters, out UnityRLIni
try
{
initializationInput = Initialize(
initParameters.port,
new UnityOutputProto
{
RlInitializationOutput = academyParameters
Expand Down Expand Up @@ -211,13 +205,10 @@ void UpdateEnvironmentWithInput(UnityRLInputProto rlInput)
SendCommandEvent(rlInput.Command);
}

UnityInputProto Initialize(UnityOutputProto unityOutput, out UnityInputProto unityInput)
UnityInputProto Initialize(int port, UnityOutputProto unityOutput, out UnityInputProto unityInput)
{
#if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
m_IsOpen = true;
var channel = new Channel(
"localhost:" + m_CommunicatorInitParameters.port,
ChannelCredentials.Insecure);
var channel = new Channel($"localhost:{port}", ChannelCredentials.Insecure);

m_Client = new UnityToExternalProto.UnityToExternalProtoClient(channel);
var result = m_Client.Exchange(WrapMessage(unityOutput, 200));
Expand All @@ -232,21 +223,17 @@ UnityInputProto Initialize(UnityOutputProto unityOutput, out UnityInputProto uni
QuitCommandReceived?.Invoke();
}
return result.UnityInput;
#else
throw new UnityAgentsException("You cannot perform training on this platform.");
#endif
}

#endregion
#endregion

#region Destruction
#region Destruction

/// <summary>
/// Close the communicator gracefully on both sides of the communication.
/// </summary>
public void Dispose()
{
#if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
if (!m_IsOpen)
{
return;
Expand All @@ -261,15 +248,11 @@ public void Dispose()
{
// ignored
}
#else
throw new UnityAgentsException(
"You cannot perform training on this platform.");
#endif
}

#endregion
#endregion

#region Sending Events
#region Sending Events

void SendCommandEvent(CommandProto command)
{
Expand All @@ -296,9 +279,9 @@ void SendCommandEvent(CommandProto command)
}
}

#endregion
#endregion

#region Sending and retreiving data
#region Sending and retreiving data

public void DecideBatch()
{
Expand Down Expand Up @@ -447,7 +430,6 @@ public ActionBuffers GetActions(string behaviorName, int agentId)
/// <param name="unityOutput">The UnityOutput to be sent.</param>
UnityInputProto Exchange(UnityOutputProto unityOutput)
{
#if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX
if (!m_IsOpen)
{
return null;
Expand Down Expand Up @@ -500,10 +482,6 @@ UnityInputProto Exchange(UnityOutputProto unityOutput)
QuitCommandReceived?.Invoke();
return null;
}
#else
throw new UnityAgentsException(
"You cannot perform training on this platform.");
#endif
}

/// <summary>
Expand Down Expand Up @@ -573,7 +551,7 @@ void UpdateSentActionSpec(UnityRLInitializationOutputProto output)
}
}

#endregion
#endregion

#if UNITY_EDITOR
/// <summary>
Expand All @@ -592,3 +570,4 @@ void HandleOnPlayModeChanged(PlayModeStateChange state)
#endif
}
}
#endif // UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX