Skip to content

Commit

Permalink
Added support for LatentEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
manups4e committed Apr 5, 2024
1 parent 90a2e64 commit e1fe208
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 3 deletions.
11 changes: 11 additions & 0 deletions src/FxEvents.Client/EventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ public static void Send(string endpoint, params object[] args)
}
Events.Send(endpoint, args);
}

public static void SendLatent(string endpoint, int bytesPerSeconds, params object[] args)
{
if (!Initialized)
{
Logger.Error("Dispatcher not initialized, please initialize it and add the events strings");
return;
}
Events.SendLatent(endpoint, bytesPerSeconds, args);
}

public static async Task<T> Get<T>(string endpoint, params object[] args)
{
if (!Initialized)
Expand Down
5 changes: 5 additions & 0 deletions src/FxEvents.Client/EventSystem/ClientGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public async void Send(string endpoint, params object[] args)
await SendInternal(EventFlowType.Straight, new ServerId().Handle, endpoint, args);
}

public async void SendLatent(string endpoint, int bytePerSecond, params object[] args)
{
await SendInternalLatent(EventFlowType.Straight, new ServerId().Handle, endpoint, bytePerSecond, args);
}

public async Task<T> Get<T>(string endpoint, params object[] args)
{
return await GetInternal<T>(new ServerId().Handle, endpoint, args);
Expand Down
58 changes: 55 additions & 3 deletions src/FxEvents.Server/EventDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static void Send(IEnumerable<Player> players, string endpoint, params obj
}
Events.Send(players.Select(x => Convert.ToInt32(x.Handle)).ToList(), endpoint, args);
}

public static void Send(string endpoint, params object[] args)
{
if (!Initialized)
Expand All @@ -142,10 +142,10 @@ public static void Send(string endpoint, params object[] args)
return;
}

var playerList = Instance.GetPlayers;
PlayerList playerList = Instance.GetPlayers;
Events.Send(playerList.Select(x => Convert.ToInt32(x.Handle)).ToList(), endpoint, args);
}

public static void Send(IEnumerable<ISource> clients, string endpoint, params object[] args)
{
if (!Initialized)
Expand All @@ -155,6 +155,58 @@ public static void Send(IEnumerable<ISource> clients, string endpoint, params ob
}
Events.Send(clients.Select(x => x.Handle).ToList(), endpoint, args);
}

public static void SendLatent(Player player, string endpoint, int bytesPerSeconds, params object[] args)
{
if (!Initialized)
{
Logger.Error("Dispatcher not initialized, please initialize it and add the events strings");
return;
}
Events.SendLatent(Convert.ToInt32(player.Handle), endpoint, bytesPerSeconds, args);
}

public static void SendLatent(ISource client, string endpoint, int bytesPerSeconds, params object[] args)
{
if (!Initialized)
{
Logger.Error("Dispatcher not initialized, please initialize it and add the events strings");
return;
}
Events.SendLatent(client.Handle, endpoint, bytesPerSeconds, args);
}

public static void SendLatent(IEnumerable<Player> players, string endpoint, int bytesPerSeconds, params object[] args)
{
if (!Initialized)
{
Logger.Error("Dispatcher not initialized, please initialize it and add the events strings");
return;
}
Events.SendLatent(players.Select(x => Convert.ToInt32(x.Handle)).ToList(), endpoint, bytesPerSeconds, args);
}

public static void SendLatent(string endpoint, int bytesPerSeconds, params object[] args)
{
if (!Initialized)
{
Logger.Error("Dispatcher not initialized, please initialize it and add the events strings");
return;
}
PlayerList playerList = Instance.GetPlayers;
Events.SendLatent(playerList.Select(x => Convert.ToInt32(x.Handle)).ToList(), endpoint, bytesPerSeconds, args);
}

public static void SendLatent(IEnumerable<ISource> clients, string endpoint, int bytesPerSeconds, params object[] args)
{
if (!Initialized)
{
Logger.Error("Dispatcher not initialized, please initialize it and add the events strings");
return;
}
Events.SendLatent(clients.Select(x => x.Handle).ToList(), endpoint, bytesPerSeconds, args);
}

public static Task<T> Get<T>(Player player, string endpoint, params object[] args)
{
if (!Initialized)
Expand Down
21 changes: 21 additions & 0 deletions src/FxEvents.Server/EventSystem/ServerGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ public async void Send(int target, string endpoint, params object[] args)
await SendInternal(EventFlowType.Straight, target, endpoint, args);
}

public void SendLatent(Player player, string endpoint, int bytesxSecond, params object[] args) => SendLatent(Convert.ToInt32(player.Handle), endpoint, bytesxSecond, args);
public void SendLatent(ISource client, string endpoint, int bytesxSecond, params object[] args) => SendLatent(client.Handle, endpoint, bytesxSecond, args);
public void SendLatent(List<Player> players, string endpoint, int bytesxSecond, params object[] args) => SendLatent(players.Select(x => Convert.ToInt32(x.Handle)).ToList(), endpoint, bytesxSecond, args);
public void SendLatent(List<ISource> clients, string endpoint, int bytesxSecond, params object[] args) => SendLatent(clients.Select(x => x.Handle).ToList(), endpoint, bytesxSecond, args);

public async void SendLatent(List<int> targets, string endpoint, int bytesxSecond, params object[] args)
{
int i = 0;
while (i < targets.Count)
{
await BaseScript.Delay(0);
SendLatent(targets[i], endpoint, bytesxSecond, args);
i++;
}
}

public async void SendLatent(int target, string endpoint, int bytesxSecond, params object[] args)
{
await SendInternalLatent(EventFlowType.Straight, target, endpoint, bytesxSecond, args);
}

public Task<T> Get<T>(Player player, string endpoint, params object[] args) =>
Get<T>(Convert.ToInt32(player.Handle), endpoint, args);

Expand Down
43 changes: 43 additions & 0 deletions src/FxEvents.Shared/EventSubsystem/BaseGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace FxEvents.Shared.EventSubsystem
public delegate Task EventDelayMethod(int ms = 0);
public delegate Task EventMessagePreparation(string pipeline, int source, IMessage message);
public delegate void EventMessagePush(string pipeline, int source, byte[] buffer);
public delegate void EventMessagePushLatent(string pipeline, int source, int bytePerSecond, byte[] buffer);
public delegate ISource ConstructorCustomActivator<T>(int handle);
public abstract class BaseGateway
{
Expand All @@ -35,6 +36,7 @@ public abstract class BaseGateway
public EventDelayMethod? DelayDelegate { get; set; }
public EventMessagePreparation? PrepareDelegate { get; set; }
public EventMessagePush? PushDelegate { get; set; }
public EventMessagePushLatent? PushDelegateLatent { get; set; }

public async Task ProcessInboundAsync(int source, byte[] serialized)
{
Expand Down Expand Up @@ -260,6 +262,47 @@ protected async Task<EventMessage> SendInternal(EventFlowType flow, int source,
return message;
}

protected async Task<EventMessage> SendInternalLatent(EventFlowType flow, int source, string endpoint, int bytePerSecond, params object[] args)
{
StopwatchUtil stopwatch = StopwatchUtil.StartNew();
List<EventParameter> parameters = [];

for (int idx = 0; idx < args.Length; idx++)
{
object argument = args[idx];
Type type = argument.GetType();
//Debug.WriteLine($"outbound latent {endpoint} - index: {idx}, type:{type.FullName}, value:{argument.ToJson()}");

using SerializationContext context = new(endpoint, $"(Send) Parameter Index '{idx}'", Serialization);

context.Serialize(type, argument);
parameters.Add(new EventParameter(context.GetData()));
}

EventMessage message = new(endpoint, flow, parameters);

if (PrepareDelegate != null)
{
stopwatch.Stop();

await PrepareDelegate(InboundPipeline, source, message);
stopwatch.Start();
}

byte[] data = message.EncryptObject(EventDispatcher.EncryptionKey);

PushDelegateLatent(InboundPipeline, source, bytePerSecond, data);
if (EventDispatcher.Debug)
{
#if CLIENT
Logger.Debug($"[{endpoint} {flow}] Sent latent {data.Length} byte(s) to {(source == -1 ? "Server" : API.GetPlayerName(source))} in {stopwatch.Elapsed.TotalMilliseconds}ms");
#elif SERVER
Logger.Debug($"[{endpoint} {flow}] Sent latent {data.Length} byte(s) to {(source == -1 ? "Server" : API.GetPlayerName("" + source))} in {stopwatch.Elapsed.TotalMilliseconds}ms");
#endif
}
return message;
}

protected async Task<T> GetInternal<T>(int source, string endpoint, params object[] args)
{
StopwatchUtil stopwatch = StopwatchUtil.StartNew();
Expand Down

0 comments on commit e1fe208

Please sign in to comment.