Current state of the websocket server for the Hermes client

This commit is contained in:
Tom
2024-06-24 22:21:59 +00:00
commit 95bc073a73
32 changed files with 1676 additions and 0 deletions

View File

@ -0,0 +1,57 @@
using HermesSocketLibrary.db;
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class ChatterHandler : ISocketHandler
{
public int OpCode { get; } = 6;
private readonly Database _database;
private readonly HashSet<long> _chatters;
private readonly ChatterMessage[] _array;
private readonly ILogger _logger;
private readonly object _lock;
private int _index;
public ChatterHandler(Database database, ILogger logger)
{
_database = database;
_logger = logger;
_chatters = new HashSet<long>(1001);
_array = new ChatterMessage[1000];
_index = -1;
_lock = new object();
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not ChatterMessage data)
return;
lock (_lock)
{
if (_chatters.Contains(data.Id))
return;
_chatters.Add(data.Id);
if (_index == _array.Length - 1)
_index = -1;
_array[++_index] = data;
}
try
{
string sql = "INSERT INTO \"Chatter\" (id, name) VALUES (@idd, @name)";
await _database.Execute(sql, new Dictionary<string, object>() { { "idd", data.Id }, { "name", data.Name } });
}
catch (Exception e)
{
_logger.Error(e, "Failed to add chatter.");
}
}
}
}

View File

@ -0,0 +1,77 @@
using HermesSocketLibrary.db;
using HermesSocketLibrary.Socket.Data;
using Npgsql;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class EmoteDetailsHandler : ISocketHandler
{
public int OpCode { get; } = 7;
private readonly Database _database;
private readonly HashSet<string> _emotes;
private readonly ILogger _logger;
private readonly object _lock;
public EmoteDetailsHandler(Database database, ILogger logger)
{
_database = database;
_logger = logger;
_emotes = new HashSet<string>(501);
_lock = new object();
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not EmoteDetailsMessage data)
return;
if (data.Emotes == null)
return;
if (!data.Emotes.Any())
return;
lock (_lock)
{
foreach (var entry in data.Emotes)
{
if (_emotes.Contains(entry.Key))
{
_emotes.Remove(entry.Key);
continue;
}
_emotes.Add(entry.Key);
}
}
int rows = 0;
string sql = "INSERT INTO \"Emote\" (id, name) VALUES (@idd, @name)";
using (var connection = await _database.DataSource.OpenConnectionAsync())
{
using (var command = new NpgsqlCommand(sql, connection))
{
foreach (var entry in data.Emotes)
{
command.Parameters.Clear();
command.Parameters.AddWithValue("idd", entry.Key);
command.Parameters.AddWithValue("name", entry.Value);
await command.PrepareAsync();
try
{
rows += await command.ExecuteNonQueryAsync();
}
catch (Exception e)
{
_logger.Error(e, "Failed to add emote detail: " + entry.Key + " -> " + entry.Value);
}
}
}
}
}
}
}

View File

@ -0,0 +1,75 @@
using HermesSocketLibrary.db;
using HermesSocketLibrary.Socket.Data;
using Npgsql;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class EmoteUsageHandler : ISocketHandler
{
public int OpCode { get; } = 8;
private readonly Database _database;
private readonly HashSet<string> _history;
private readonly EmoteUsageMessage[] _array;
private readonly ILogger _logger;
private int _index;
public EmoteUsageHandler(Database database, ILogger logger)
{
_database = database;
_logger = logger;
_history = new HashSet<string>(101);
_array = new EmoteUsageMessage[100];
_index = -1;
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not EmoteUsageMessage data)
return;
lock (_logger)
{
if (_history.Contains(data.MessageId))
{
return;
}
_history.Add(data.MessageId);
if (_index >= _array.Length - 1)
_index = -1;
_index = (_index + 1) % _array.Length;
if (_array[_index] != null)
_history.Remove(data.MessageId);
_array[_index] = data;
}
int rows = 0;
string sql = "INSERT INTO \"EmoteUsageHistory\" (timestamp, \"broadcasterId\", \"emoteId\", \"chatterId\") VALUES (@time, @broadcaster, @emote, @chatter)";
using (var connection = await _database.DataSource.OpenConnectionAsync())
{
using (var command = new NpgsqlCommand(sql, connection))
{
foreach (var entry in data.Emotes)
{
command.Parameters.Clear();
command.Parameters.AddWithValue("time", data.DateTime);
command.Parameters.AddWithValue("broadcaster", data.BroadcasterId);
command.Parameters.AddWithValue("emote", entry);
command.Parameters.AddWithValue("chatter", data.ChatterId);
await command.PrepareAsync();
rows += await command.ExecuteNonQueryAsync();
}
}
}
_logger.Information($"Tracked {rows} emote(s) to history.");
}
}
}

View File

@ -0,0 +1,29 @@
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class ErrorHandler : ISocketHandler
{
public int OpCode { get; } = 0;
private ILogger _logger;
public ErrorHandler(ILogger logger)
{
_logger = logger;
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not ErrorMessage data)
return;
if (data.Exception == null)
_logger.Error(data.Message);
else
_logger.Error(data.Exception, data.Message);
}
}
}

View File

@ -0,0 +1,33 @@
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class HeartbeatHandler : ISocketHandler
{
public int OpCode { get; } = 0;
private ILogger _logger;
public HeartbeatHandler(ILogger logger)
{
_logger = logger;
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not HeartbeatMessage data)
return;
sender.LastHeartbeatReceived = DateTime.UtcNow;
_logger.Verbose($"Received heartbeat from socket [ip: {sender.IPAddress}].");
if (data.Respond)
await sender.Send(0, new HeartbeatMessage()
{
DateTime = DateTime.UtcNow,
Respond = false
});
}
}
}

View File

@ -0,0 +1,83 @@
using HermesSocketLibrary.db;
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class HermesLoginHandler : ISocketHandler
{
public int OpCode { get; } = 1;
private readonly Database _database;
private readonly HermesSocketManager _sockets;
private readonly ILogger _logger;
private readonly object _lock;
public HermesLoginHandler(Database database, HermesSocketManager sockets, ILogger logger)
{
_database = database;
_sockets = sockets;
_logger = logger;
_lock = new object();
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not HermesLoginMessage data || data == null || data.ApiKey == null)
return;
if (sender.Id != null)
return;
string sql = "select \"userId\" from \"ApiKey\" where id = @key";
var result = await _database.ExecuteScalar(sql, new Dictionary<string, object>() { { "key", data.ApiKey } });
string? userId = result?.ToString();
if (userId == null)
return;
var recipients = _sockets.GetSockets(userId).ToList();
lock (_lock)
{
if (sender.Id != null)
return;
sender.Id = userId;
}
string sql2 = "select \"name\" from \"User\" where id = @user";
var result2 = await _database.ExecuteScalar(sql2, new Dictionary<string, object>() { { "user", userId } });
string? name = result2?.ToString();
if (string.IsNullOrEmpty(name))
return;
sender.Name = name;
await sender.Send(2, new LoginAckMessage()
{
UserId = userId
});
var ack = new LoginAckMessage()
{
AnotherClient = true,
UserId = userId
};
foreach (var socket in recipients)
{
try
{
await socket.Send(2, ack);
}
catch (Exception)
{
}
}
_logger.Information($"Hermes client logged in [name: {name}][id: {userId}][ip: {sender.IPAddress}]");
}
}
}

View File

@ -0,0 +1,10 @@
using System.Net.WebSockets;
namespace HermesSocketServer.Socket.Handlers
{
public interface ISocketHandler
{
int OpCode { get; }
Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets);
}
}

View File

@ -0,0 +1,71 @@
using HermesSocketLibrary.Requests;
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class RequestHandler : ISocketHandler
{
public int OpCode { get; } = 3;
private readonly RequestManager _requests;
private readonly HermesSocketManager _sockets;
private readonly ILogger _logger;
public RequestHandler(RequestManager requests, HermesSocketManager sockets, ILogger logger)
{
_requests = requests;
_sockets = sockets;
_logger = logger;
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (sender.Id == null)
return;
if (message is not RequestMessage data)
return;
RequestResult? result = null;
_logger.Debug("Executing request handler: " + data.Type);
try
{
result = await _requests.Grant(sender.Id, data);
}
catch (Exception e)
{
_logger.Error(e, $"Failed to grant a request of type '{data.Type}'.");
}
if (result == null || !result.Success)
return;
var ack = new RequestAckMessage()
{
Request = data,
Data = result.Result,
Nounce = data.Nounce
};
if (!result.NotifyClientsOnAccount)
{
await sender.Send(4, ack);
return;
}
var recipients = _sockets.GetSockets(sender.Id);
foreach (var socket in recipients)
{
try
{
_logger.Verbose($"Sending {data.Type} to socket [ip: {socket.IPAddress}].");
await socket.Send(4, ack);
}
catch (Exception)
{
_logger.Warning($"Failed to send {data.Type} to socket [ip: {socket.IPAddress}].");
}
}
}
}
}

View File

@ -0,0 +1,34 @@
using System.Text.Json;
using CommonSocketLibrary.Abstract;
using HermesSocketServer.Socket.Handlers;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket
{
public class SocketHandlerManager : HandlerManager<WebSocketUser, ISocketHandler>
{
private readonly HermesSocketManager _sockets;
private readonly IServiceProvider _serviceProvider;
public SocketHandlerManager(HermesSocketManager sockets, IServiceProvider serviceProvider, ILogger logger)
: base(logger)
{
_sockets = sockets;
_serviceProvider = serviceProvider;
Add(0, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-heartbeat"));
Add(1, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-hermeslogin"));
Add(3, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-request"));
Add(5, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-error"));
Add(6, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-chatter"));
Add(7, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-emotedetails"));
Add(8, _serviceProvider.GetRequiredKeyedService<ISocketHandler>("hermes-emoteusage"));
}
protected override async Task Execute<T>(WebSocketUser sender, ISocketHandler handler, T value)
{
await handler.Execute(sender, value, _sockets);
}
}
}

96
Socket/SocketManager.cs Normal file
View File

@ -0,0 +1,96 @@
using System.Net.WebSockets;
using System.Timers;
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket
{
public class HermesSocketManager
{
private IList<WebSocketUser> _sockets;
private System.Timers.Timer _timer;
private ILogger _logger;
public HermesSocketManager(ILogger logger)
{
_sockets = new List<WebSocketUser>();
_timer = new System.Timers.Timer(TimeSpan.FromSeconds(1));
_timer.AutoReset = true;
_timer.Elapsed += async (sender, e) => await HandleHeartbeats(e);
_timer.Enabled = true;
_logger = logger;
}
public void Add(WebSocketUser socket)
{
_sockets.Add(socket);
}
public IList<WebSocketUser> GetAllSockets()
{
return _sockets.AsReadOnly();
}
public IEnumerable<WebSocketUser> GetSockets(string userId)
{
foreach (var socket in _sockets)
{
if (socket.Id == userId)
yield return socket;
}
}
public bool Remove(WebSocketUser socket)
{
return _sockets.Remove(socket);
}
private async Task HandleHeartbeats(ElapsedEventArgs e)
{
try
{
var signalTime = e.SignalTime.ToUniversalTime();
for (var i = 0; i < _sockets.Count; i++)
{
var socket = _sockets[i];
if (!socket.Connected)
{
_sockets.RemoveAt(i--);
}
else if (signalTime - socket.LastHeartbeatReceived > TimeSpan.FromSeconds(30))
{
if (socket.LastHeartbeatReceived > socket.LastHearbeatSent)
{
try
{
socket.LastHearbeatSent = DateTime.UtcNow;
await socket.Send(0, new HeartbeatMessage() { DateTime = socket.LastHearbeatSent });
}
catch (Exception)
{
_logger.Warning($"Failed to send the heartbeat to socket [ip: {socket.IPAddress}].");
await socket.Close(WebSocketCloseStatus.NormalClosure, "Failed to send a heartbeat message.", CancellationToken.None);
}
finally
{
if (!socket.Connected)
_sockets.RemoveAt(i--);
}
}
else if (signalTime - socket.LastHeartbeatReceived > TimeSpan.FromSeconds(120))
{
_logger.Debug($"Closing socket [ip: {socket.IPAddress}] for not responding for 2 minutes.");
await socket.Close(WebSocketCloseStatus.NormalClosure, "No heartbeat received.", CancellationToken.None);
_sockets.RemoveAt(i--);
}
}
}
}
catch (Exception)
{
}
}
}
}

113
Socket/WebSocketUser.cs Normal file
View File

@ -0,0 +1,113 @@
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using HermesSocketLibrary.Socket.Data;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket
{
public class WebSocketUser
{
private readonly WebSocket _socket;
private readonly JsonSerializerOptions _options;
private readonly ILogger _logger;
private readonly IPAddress? _ipAddress;
private CancellationTokenSource _cts;
private bool _connected;
public WebSocketCloseStatus? CloseStatus { get => _socket.CloseStatus; }
public string? CloseStatusDescription { get => _socket.CloseStatusDescription; }
public WebSocketState State { get => _socket.State; }
public IPAddress? IPAddress { get => _ipAddress; }
public bool Connected { get => _connected; }
public string? Id { get; set; }
public string? Name { get; set; }
public DateTime LastHeartbeatReceived { get; set; }
public DateTime LastHearbeatSent { get; set; }
public CancellationToken Token { get => _cts.Token; }
public WebSocketUser(WebSocket socket, IPAddress? ipAddress, JsonSerializerOptions options, ILogger logger)
{
_socket = socket;
_ipAddress = ipAddress;
_options = options;
_connected = true;
_logger = logger;
_cts = new CancellationTokenSource();
LastHeartbeatReceived = DateTime.UtcNow;
}
public async Task Close(WebSocketCloseStatus status, string? message, CancellationToken token)
{
try
{
await _socket.CloseAsync(status, message ?? CloseStatusDescription, token);
}
catch (WebSocketException wse) when (wse.Message.StartsWith("The WebSocket is in an invalid state "))
{
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
_logger.Error(e, "Failed to close socket.");
}
finally
{
_connected = false;
await _cts.CancelAsync();
_cts = new CancellationTokenSource();
}
}
public async Task Send<Data>(int opcode, Data data)
{
var message = GenerateMessage(opcode, data);
var content = JsonSerializer.Serialize(message, _options);
var bytes = Encoding.UTF8.GetBytes(content);
var array = new ArraySegment<byte>(bytes);
var total = bytes.Length;
var current = 0;
while (current < total)
{
var size = Encoding.UTF8.GetBytes(content.Substring(current), array);
await _socket.SendAsync(array, WebSocketMessageType.Text, current + size >= total, Token);
current += size;
}
_logger.Verbose($"TX #{opcode}: {content}");
}
public async Task<WebSocketReceiveResult?> Receive(ArraySegment<byte> bytes)
{
try
{
return await _socket.ReceiveAsync(bytes, Token);
}
catch (WebSocketException wse) when (wse.Message.StartsWith("The remote party "))
{
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to receive a web socket message.");
}
return null;
}
private SocketMessage GenerateMessage<Data>(int opcode, Data data)
{
return new SocketMessage()
{
OpCode = opcode,
Data = data
};
}
}
}