From 7e7f8f2b0f96ad42d95884ee819ec4a075817c0d Mon Sep 17 00:00:00 2001 From: Kitty Draper Date: Fri, 17 Apr 2026 11:32:34 -0500 Subject: [PATCH] Refactor sending and receiving of RPCs to not use entities, thereby preserving send/receive order without having to encode an order value and reorder messages on the receive side. --- .../Runtime/Core/NetworkManager.cs | 3 +- .../Unified/UnifiedNetcodeTransport.cs | 125 +++++++++--------- 2 files changed, 65 insertions(+), 63 deletions(-) diff --git a/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs b/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs index b3bba1c4b4..9715756023 100644 --- a/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs +++ b/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs @@ -472,9 +472,10 @@ public void NetworkUpdate(NetworkUpdateStage updateStage) // This should be invoked just prior to the MessageManager processes its outbound queue. SceneManager.CheckForAndSendNetworkObjectSceneChanged(); - +#if !UNIFIED_NETCODE // Process outbound messages MessageManager.ProcessSendQueues(); +#endif // Metrics update needs to be driven by NetworkConnectionManager's update to assure metrics are dispatched after the send queue is processed. MetricsManager.UpdateMetrics(); diff --git a/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs b/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs index f8d97072fb..eadfd1db26 100644 --- a/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs +++ b/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs @@ -43,37 +43,41 @@ public static NativeArray ToNativeArray(in FixedBytes1280 data) } } + internal struct TransportRpcData : IBufferElementData + { + public FixedBytes1280 Buffer; + } + [BurstCompile] internal struct TransportRpc : IOutOfBandRpcCommand, IRpcCommandSerializer { - public FixedBytes1280 Buffer; - public ulong Order; + public TransportRpcData Value; public unsafe void Serialize(ref DataStreamWriter writer, in RpcSerializerState state, in TransportRpc data) { - writer.WriteULong(data.Order); - writer.WriteInt(data.Buffer.Length); - var span = new Span(FixedBytes1280.GetUnsafePtr(data.Buffer), data.Buffer.Length); + writer.WriteInt(data.Value.Buffer.Length); + var span = new Span(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), data.Value.Buffer.Length); writer.WriteBytes(span); } public unsafe void Deserialize(ref DataStreamReader reader, in RpcDeserializerState state, ref TransportRpc data) { - data.Order = reader.ReadULong(); var length = reader.ReadInt(); - data.Buffer = new FixedBytes1280 + data.Value.Buffer = new FixedBytes1280 { Length = length }; - var span = new Span(FixedBytes1280.GetUnsafePtr(data.Buffer), length); + var span = new Span(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), length); reader.ReadBytes(span); } [BurstCompile(DisableDirectCall = true)] private static void InvokeExecute(ref RpcExecutor.Parameters parameters) { - RpcExecutor.ExecuteCreateRequestComponent(ref parameters); + var element = new TransportRpc(); + element.Deserialize(ref parameters.Reader, parameters.DeserializerState, ref element); + parameters.CommandBuffer.AppendToBuffer(parameters.JobIndex, parameters.Connection, element.Value); } private static readonly PortableFunctionPointer k_InvokeExecuteFunctionPointer = new PortableFunctionPointer(InvokeExecute); @@ -115,9 +119,19 @@ public void OnUpdate(ref SystemState state) } } + [WorldSystemFilter(WorldSystemFilterFlags.ServerSimulation | WorldSystemFilterFlags.ClientSimulation | WorldSystemFilterFlags.ThinClientSimulation)] + [UpdateInGroup(typeof(SimulationSystemGroup), OrderLast = true)] + [UpdateBefore(typeof(RpcSystem))] internal partial class UnifiedNetcodeUpdateSystem : SystemBase { + public void OnCreate(ref SystemState state) + { + state.RequireForUpdate(); + state.RequireForUpdate(); + } + public UnifiedNetcodeTransport Transport; + public NetworkManager NetworkManager; public List DisconnectQueue = new List(); @@ -125,23 +139,39 @@ public void Disconnect(Connection connection) { DisconnectQueue.Add(connection); } + + public void SendRpc(TransportRpc rpc) + { + var rpcQueue = SystemAPI.GetSingleton().GetRpcQueue(); + var ghostInstance = GetComponentLookup(); + foreach (var rpcDataStreamBuffer in SystemAPI.Query>()) + { + rpcQueue.Schedule(rpcDataStreamBuffer, ghostInstance, rpc); + } + } protected override void OnUpdate() { + NetworkManager.MessageManager.ProcessSendQueues(); + using var commandBuffer = new EntityCommandBuffer(Allocator.Temp); - foreach (var (request, rpc, entity) in SystemAPI.Query, RefRO>().WithEntityAccess()) + foreach(var (networkId, _, entity) in SystemAPI.Query, RefRO>().WithEntityAccess()) { - var connectionId = SystemAPI.GetComponent(request.ValueRO.SourceConnection).Value; - - var buffer = rpc.ValueRO.Buffer; - try + var connectionId = networkId.ValueRO.Value; + DynamicBuffer rpcs = EntityManager.GetBuffer(entity); + foreach (var rpc in rpcs) { - Transport.DispatchMessage(connectionId, buffer, rpc.ValueRO.Order); - } - finally - { - commandBuffer.DestroyEntity(entity); + var buffer = rpc.Buffer; + try + { + Transport.DispatchMessage(connectionId, buffer); + } + catch(Exception e) + { + Debug.LogException(e); + } } + rpcs.Clear(); } foreach (var connection in DisconnectQueue) @@ -171,34 +201,15 @@ private class ConnectionInfo public BatchedSendQueue SendQueue; public BatchedReceiveQueue ReceiveQueue; public Connection Connection; - public ulong LastSent; - public ulong LastReceived; public Dictionary DeferredMessages; } private Dictionary m_Connections; - internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong order) + internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer) { var connectionInfo = m_Connections[connectionId]; - if (order <= connectionInfo.LastReceived) - { - Debug.LogWarning("Received duplicate message, ignoring."); - return; - } - - if (order != connectionInfo.LastReceived + 1) - { - if (connectionInfo.DeferredMessages == null) - { - connectionInfo.DeferredMessages = new Dictionary(); - } - - connectionInfo.DeferredMessages[order] = buffer; - return; - } - using var arr = FixedBytes1280.ToNativeArray(buffer); var reader = new DataStreamReader(arr); if (connectionInfo.ReceiveQueue == null) @@ -209,20 +220,7 @@ internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong { connectionInfo.ReceiveQueue.PushReader(reader); } - - connectionInfo.LastReceived = order; - if (connectionInfo.DeferredMessages != null) - { - var next = order + 1; - while (connectionInfo.DeferredMessages.Remove(next, out var nextBuffer)) - { - reader = new DataStreamReader(FixedBytes1280.ToNativeArray(nextBuffer)); - connectionInfo.ReceiveQueue.PushReader(reader); - connectionInfo.LastReceived = next; - ++next; - } - } - + var message = connectionInfo.ReceiveQueue.PopMessage(); while (message.Count != 0) { @@ -243,18 +241,15 @@ public override unsafe void Send(ulong clientId, ArraySegment payload, Net while (!connectionInfo.SendQueue.IsEmpty) { - var rpc = new TransportRpc - { - Buffer = new FixedBytes1280(), - }; + var rpc = new TransportRpc(); - var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Buffer), k_MaxPacketSize); + var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Value.Buffer), k_MaxPacketSize); var amount = connectionInfo.SendQueue.FillWriterWithBytes(ref writer, k_MaxPacketSize); - rpc.Buffer.Length = amount; - rpc.Order = ++connectionInfo.LastSent; - - connectionInfo.Connection.SendOutOfBandMessage(rpc); + rpc.Value.Buffer.Length = amount; + + var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged(); + updateSystem.SendRpc(rpc); connectionInfo.SendQueue.Consume(amount); } @@ -278,6 +273,8 @@ private void OnClientConnectedToServer(Connection connection, NetCodeConnectionE }; m_ServerClientId = connection.NetworkId.Value; InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup); + var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged(); + updateSystem.EntityManager.AddBuffer(connection.ConnectionEntity); } private void OnServerNewClientConnection(Connection connection, NetCodeConnectionEvent connectionEvent) @@ -289,6 +286,8 @@ private void OnServerNewClientConnection(Connection connection, NetCodeConnectio Connection = connection }; ; InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup); + var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged(); + updateSystem.EntityManager.AddBuffer(connection.ConnectionEntity); } private const string k_InvalidRpcMessage = "An invalid RPC was received"; @@ -376,6 +375,7 @@ public override bool StartClient() NetCode.Netcode.Client.OnDisconnect = OnClientDisconnectFromServer; var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged(); updateSystem.Transport = this; + updateSystem.NetworkManager = m_NetworkManager; return true; } @@ -390,6 +390,7 @@ public override bool StartServer() NetCode.Netcode.Server.OnDisconnect = OnServerClientDisconnected; var updateSystem = NetCode.Netcode.GetWorld(true).GetExistingSystemManaged(); updateSystem.Transport = this; + updateSystem.NetworkManager = m_NetworkManager; return true; }