Compare commits

..

8 Commits

27 changed files with 519 additions and 295 deletions

View File

@@ -8,6 +8,7 @@ namespace HermesSocketServer.Models
public required User User { get; set; }
public required ChatterStore Chatters { get; set; }
public required ConnectionStore Connections { get; set; }
public required EmoteStore Emotes { get; set; }
public required GroupStore Groups { get; set; }
public required GroupPermissionStore GroupPermissions { get; set; }
public required PolicyStore Policies { get; set; }

View File

@@ -7,5 +7,6 @@ namespace HermesSocketServer.Models
public required string Email { get; set; }
public required string Role { get; set; }
public required string DefaultVoice { get; set; }
public required string StreamElementsOverlayKey { get; set; }
}
}

View File

@@ -7,7 +7,7 @@ namespace HermesSocketServer.Requests
public class CreateConnection : IRequest
{
public string Name => "create_connection";
public string[] RequiredKeys => ["name", "type", "clientId", "accessToken", "grantType", "scope", "expiration"];
public string[] RequiredKeys => ["name", "type", "client_id", "access_token", "grant_type", "scope", "expiration"];
private ILogger _logger;
public CreateConnection(ILogger logger)
@@ -19,12 +19,14 @@ namespace HermesSocketServer.Requests
{
string name = data["name"].ToString()!;
string type = data["type"].ToString()!;
string clientId = data["clientId"].ToString()!;
string accessToken = data["accessToken"].ToString()!;
string grantType = data["grantType"].ToString()!;
string clientId = data["client_id"].ToString()!;
string accessToken = data["access_token"].ToString()!;
string grantType = data["grant_type"].ToString()!;
string scope = data["scope"].ToString()!;
if (!DateTime.TryParse(data["expiration"].ToString()!, out var expiresAt))
if (data["expiration"] == null || !DateTime.TryParse(data["expiration"].ToString(), out var expiresAt))
return Task.FromResult(RequestResult.Failed("Expiration needs to be a date time string."));
var previous = channel.Connections.Get(name);
var connection = new Connection()
{
@@ -36,12 +38,13 @@ namespace HermesSocketServer.Requests
GrantType = grantType,
Scope = scope,
ExpiresAt = expiresAt,
Default = previous?.Default ?? false,
};
bool result = channel.Connections.Set(name, connection);
if (result)
{
_logger.Information($"Added connection to channel [name: {name}][type: {type}][scope: {scope}][expiration: {expiresAt}][channel: {channel.Id}]");
_logger.Information($"Added or updated connection to channel [name: {name}][type: {type}][scope: {scope}][expiration: {expiresAt}][channel: {channel.Id}]");
return Task.FromResult(RequestResult.Successful(connection));
}
return Task.FromResult(RequestResult.Failed("Something went wrong when updating the cache."));

View File

@@ -31,6 +31,11 @@ namespace HermesSocketServer.Requests
Allow = allow,
};
if (channel.GroupPermissions.Get().Values.Any(p => p.GroupId == groupId && p.Path == path))
{
return Task.FromResult(RequestResult.Failed("Permission exists already."));
}
bool result = channel.GroupPermissions.Set(id.ToString(), permission);
if (result)
{

View File

@@ -6,7 +6,7 @@ namespace HermesSocketServer.Requests
public class DeleteConnection : IRequest
{
public string Name => "delete_connection";
public string[] RequiredKeys => ["id"];
public string[] RequiredKeys => ["name"];
private ILogger _logger;
public DeleteConnection(ILogger logger)
@@ -16,16 +16,16 @@ namespace HermesSocketServer.Requests
public Task<RequestResult> Grant(Channel channel, IDictionary<string, object> data)
{
var connectionId = data["id"].ToString()!;
var connectionName = data["name"].ToString()!;
var result = channel.Connections.Remove(connectionId);
var result = channel.Connections.Remove(connectionName);
if (result)
{
_logger.Information($"Deleted a connection by id [connection id: {connectionId}]");
_logger.Information($"Deleted a connection by name [connection name: {connectionName}]");
return Task.FromResult(RequestResult.Successful(null));
}
_logger.Warning($"Connection Id does not exist [connection id: {connectionId}]");
_logger.Warning($"Connection Name does not exist [connection name: {connectionName}]");
return Task.FromResult(RequestResult.Failed("Something went wrong when updating the cache."));
}
}

View File

@@ -14,54 +14,53 @@ namespace HermesSocketServer.Requests
_logger = logger;
}
public async Task<RequestResult> Grant(Channel channel, IDictionary<string, object> data)
public Task<RequestResult> Grant(Channel channel, IDictionary<string, object> data)
{
var groupId = data["id"].ToString()!;
var groupIdString = data["id"].ToString()!;
var groupId = new Guid(groupIdString);
var result = channel.Groups.Remove(groupId);
var result = channel.Groups.Remove(groupIdString);
if (result)
{
var permissions = channel.GroupPermissions.Get().Values
.Where(p => p.GroupId.ToString() == groupId);
Task? chattersSave = null;
if (channel.Groups.Chatters.TryGetValue(groupId, out var chatters))
if (channel.Groups.Chatters.TryGetValue(groupId.ToString(), out var chatters))
{
var filteredChatters = chatters.Get().Values.Where(c => c.GroupId == groupId).ToArray();
if (filteredChatters.Any())
var filteredChatters = chatters.Get().Values;
foreach (var chatter in filteredChatters)
{
foreach (var chatter in filteredChatters)
{
var res = chatters.Remove(chatter.ChatterId.ToString(), fromCascade: true);
if (!res)
_logger.Warning($"Failed to delete group chatter by id [group chatter id: {chatter.ChatterId}]");
}
chattersSave = chatters.Save();
var res = chatters.Remove(chatter.ChatterId.ToString(), fromCascade: true);
if (!res)
_logger.Warning($"Failed to delete group chatter by id from cascade [group chatter id: {chatter.ChatterId}]");
}
}
var permissions = channel.GroupPermissions.Get().Values
.Where(p => p.GroupId == groupId);
foreach (var permission in permissions)
{
var res = channel.GroupPermissions.Remove(permission.Id, fromCascade: true);
if (!res)
_logger.Warning($"Failed to delete group permission by id [group chatter id: {permission.Id}]");
_logger.Warning($"Failed to delete group permission by id from cascade [group permission id: {permission.Id}]");
}
if (chattersSave != null)
await Task.WhenAll(chattersSave, channel.GroupPermissions.Save());
else
await channel.GroupPermissions.Save();
if (!channel.Groups.Chatters.Remove(groupId))
var policies = channel.Policies.Get().Values
.Where(c => c.GroupId == groupId).ToArray();
foreach (var policy in policies)
{
var res = channel.Policies.Remove(policy.Id.ToString(), fromCascade: true);
if (!res)
_logger.Warning($"Failed to delete group policy by id from cascade [group policy id: {policy.Id}]");
}
if (!channel.Groups.Chatters.Remove(groupIdString))
_logger.Warning($"Failed to delete group chatters from inner store [group id: {groupId}]");
_logger.Information($"Deleted a group by id [group id: {groupId}]");
return RequestResult.Successful(null);
return Task.FromResult(RequestResult.Successful(null));
}
_logger.Warning($"Group Id does not exist [group id: {groupId}]");
return RequestResult.Failed("Something went wrong when updating the cache.");
return Task.FromResult(RequestResult.Failed("Something went wrong when updating the cache."));
}
}
}

View File

@@ -18,11 +18,12 @@ namespace HermesSocketServer.Requests
{
var id = data["id"].ToString()!;
var permission = channel.GroupPermissions.Get(id);
var result = channel.GroupPermissions.Remove(id);
if (result)
{
_logger.Information($"Deleted a group permission by id [group permission id: {id}]");
return Task.FromResult(RequestResult.Successful(null));
return Task.FromResult(RequestResult.Successful(permission));
}
_logger.Warning($"Group Permission Id does not exist [group permission id: {id}]");

View File

@@ -21,6 +21,15 @@ namespace HermesSocketServer.Requests
if (result)
{
var redemptions = channel.Redemptions.Get().Values
.Where(r => r.ActionName == name);
foreach (var redemption in redemptions)
{
var res = channel.Redemptions.Remove(redemption.Id, fromCascade: true);
if (!res)
_logger.Warning($"Failed to delete redemption by id from cascade [redemption id: {redemption.Id}]");
}
_logger.Information($"Deleted a redeemable action by name [name: {name}]");
return Task.FromResult(RequestResult.Successful(null));
}

View File

@@ -1,5 +1,3 @@
using HermesSocketLibrary.db;
using HermesSocketLibrary.Requests.Messages;
using HermesSocketServer.Models;
using ILogger = Serilog.ILogger;
@@ -9,24 +7,16 @@ namespace HermesSocketServer.Requests
{
public string Name => "get_emotes";
public string[] RequiredKeys => [];
private Database _database;
private ILogger _logger;
public GetEmotes(Database database, ILogger logger)
public GetEmotes(ILogger logger)
{
_database = database;
_logger = logger;
}
public async Task<RequestResult> Grant(Channel channel, IDictionary<string, object> data)
{
IList<EmoteInfo> emotes = new List<EmoteInfo>();
string sql = $"SELECT id, name FROM \"Emote\"";
await _database.Execute(sql, (IDictionary<string, object>?) null, (r) => emotes.Add(new EmoteInfo()
{
Id = r.GetString(0),
Name = r.GetString(1)
}));
var emotes = channel.Emotes.Get().Values;
_logger.Information($"Fetched all emotes for channel [channel: {channel.Id}]");
return RequestResult.Successful(emotes, notifyClientsOnAccount: false);
}

View File

@@ -31,6 +31,11 @@ namespace HermesSocketServer.Requests
Allow = allow,
};
if (!channel.GroupPermissions.Get().Values.Any(p => p.GroupId == groupId && p.Path == path))
{
return Task.FromResult(RequestResult.Failed("Permission does not exist."));
}
bool result = channel.GroupPermissions.Modify(id.ToString(), permission);
if (result)
{

View File

@@ -2,7 +2,6 @@ using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using CommonSocketLibrary.Common;
using HermesSocketLibrary.Requests.Messages;
using HermesSocketLibrary.Socket.Data;
using HermesSocketServer.Socket;
using ILogger = Serilog.ILogger;
@@ -51,12 +50,12 @@ namespace HermesSocketLibrary
continue;
if (message.OpCode != 0)
_logger.Information($"rxm: {messageString} [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
_logger.Information($"receive: {messageString} [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
if (message.OpCode < 0 || message.OpCode > 8 || message.OpCode == 2 || message.OpCode == 4)
{
await socket.Send(5, new LoggingMessage("Received an invalid message: " + messageString, HermesLoggingLevel.Error));
return;
break;
}
bool loggedIn = !string.IsNullOrEmpty(socket.Id);
@@ -64,13 +63,13 @@ namespace HermesSocketLibrary
if (!loggedIn && !nonProtectedOps.Contains(message.OpCode))
{
_logger.Warning($"An attempt was made to use protected routes while not logged in [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
return;
break;
}
int[] protectedOps = { 0, 3, 5, 6, 7, 8 };
if (loggedIn && !protectedOps.Contains(message.OpCode))
{
_logger.Warning($"An attempt was made to use non-protected routes while logged in [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
return;
break;
}
if (message.Data == null)
@@ -80,27 +79,7 @@ namespace HermesSocketLibrary
}
string data = message.Data.ToString()!;
if (message.OpCode == 0)
message.Data = JsonSerializer.Deserialize<HeartbeatMessage>(data, _options);
else if (message.OpCode == 1)
message.Data = JsonSerializer.Deserialize<HermesLoginMessage>(data, _options);
else if (message.OpCode == 3)
message.Data = JsonSerializer.Deserialize<RequestMessage>(data, _options);
else if (message.OpCode == 5)
message.Data = JsonSerializer.Deserialize<LoggingMessage>(data, _options);
else if (message.OpCode == 6)
message.Data = JsonSerializer.Deserialize<ChatterMessage>(data, _options);
else if (message.OpCode == 7)
message.Data = JsonSerializer.Deserialize<EmoteDetailsMessage>(data, _options);
else if (message.OpCode == 8)
message.Data = JsonSerializer.Deserialize<EmoteUsageMessage>(data, _options);
//else if (message.OpCode == 9)
// message.Data = JsonSerializer.Deserialize<SlaveMessage>(data, _options);
else
{
await socket.Send(5, new LoggingMessage("Received a message with invalid data: " + messageString, HermesLoggingLevel.Error));
continue;
}
message.Data = DeserializeData(message.OpCode, data);
await _handlers.Execute(socket, message.OpCode, message.Data);
}
catch (WebSocketException wse)
@@ -120,7 +99,7 @@ namespace HermesSocketLibrary
}
catch (Exception e)
{
_logger.Information(e, $"Client failed to disconnect [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
_logger.Warning(e, $"Client failed to disconnect [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
}
finally
{
@@ -130,7 +109,6 @@ namespace HermesSocketLibrary
_logger.Information($"Client disconnected [ip: {socket.IPAddress}][id: {socket.Id}][name: {socket.Name}][token: {socket.ApiKey}][uid: {socket.SessionId}]");
// Update slave status of another client from the same user if available.
// TODO: Ensure one of the clients is always a non-slave.
if (socket.Id != null && !socket.Slave)
{
var client = _sockets.GetSockets(socket.Id).Where(s => !s.WebLogin).FirstOrDefault();
@@ -138,8 +116,28 @@ namespace HermesSocketLibrary
{
await client.Send(9, new SlaveMessage() { Slave = false });
client.Slave = false;
socket.Slave = true;
}
}
}
private object? DeserializeData(int opcode, string data)
{
if (opcode == 0)
return JsonSerializer.Deserialize<HeartbeatMessage>(data, _options);
else if (opcode == 1)
return JsonSerializer.Deserialize<HermesLoginMessage>(data, _options);
else if (opcode == 3)
return JsonSerializer.Deserialize<RequestMessage>(data, _options);
else if (opcode == 5)
return JsonSerializer.Deserialize<LoggingMessage>(data, _options);
else if (opcode == 6)
return JsonSerializer.Deserialize<ChatterMessage>(data, _options);
else if (opcode == 7)
return JsonSerializer.Deserialize<EmoteDetailsMessage>(data, _options);
else if (opcode == 8)
return JsonSerializer.Deserialize<EmoteUsageMessage>(data, _options);
return null;
}
}
}

View File

@@ -14,7 +14,7 @@ namespace HermesSocketServer.Services
private readonly ServerConfiguration _configuration;
private readonly Serilog.ILogger _logger;
private readonly IDictionary<string, Channel> _channels;
private readonly object _lock;
private readonly Mutex _mutex;
public ChannelManager(IStore<string, User> users, Database database, IStore<string, TTSVoice> voices, ServerConfiguration configuration, Serilog.ILogger logger)
{
@@ -24,7 +24,7 @@ namespace HermesSocketServer.Services
_configuration = configuration;
_logger = logger;
_channels = new ConcurrentDictionary<string, Channel>();
_lock = new object();
_mutex = new Mutex();
}
@@ -36,15 +36,17 @@ namespace HermesSocketServer.Services
return await Task.Run(() =>
{
lock (_lock)
try
{
_mutex.WaitOne();
if (_channels.TryGetValue(userId, out var channel))
return Task.FromResult<Channel?>(channel);
return channel;
var actionTable = _configuration.Database.Tables["Action"];
var chatterTable = _configuration.Database.Tables["Chatter"];
var connectionTable = _configuration.Database.Tables["Connection"];
//var chatterGroupTable = _configuration.Database.Tables["ChatterGroup"];
var connectionStateTable = _configuration.Database.Tables["ConnectionState"];
var emoteTable = _configuration.Database.Tables["Emote"];
var groupTable = _configuration.Database.Tables["Group"];
var groupPermissionTable = _configuration.Database.Tables["GroupPermission"];
var policyTable = _configuration.Database.Tables["Policy"];
@@ -54,6 +56,7 @@ namespace HermesSocketServer.Services
var chatters = new ChatterStore(userId, chatterTable, _database, _logger);
var connections = new ConnectionStore(userId, connectionTable, _database, _logger);
var emotes = new EmoteStore(userId, emoteTable, _database, _logger);
var groups = new GroupStore(userId, groupTable, _database, _configuration, _logger);
var groupPermissions = new GroupPermissionStore(userId, groupPermissionTable, groups, _database, _logger);
var policies = new PolicyStore(userId, policyTable, groups, _database, _logger);
@@ -68,6 +71,7 @@ namespace HermesSocketServer.Services
User = user,
Chatters = chatters,
Connections = connections,
Emotes = emotes,
Groups = groups,
GroupPermissions = groupPermissions,
Policies = policies,
@@ -80,6 +84,7 @@ namespace HermesSocketServer.Services
Task.WaitAll([
channel.Actions.Load(),
channel.Chatters.Load(),
channel.Emotes.Load(),
channel.Connections.Load(),
channel.Groups.Load(),
channel.Filters.Load(),
@@ -93,27 +98,49 @@ namespace HermesSocketServer.Services
]);
_channels.Add(userId, channel);
return Task.FromResult<Channel?>(channel);
return channel;
}
finally
{
_mutex.ReleaseMutex();
}
});
}
public Channel? Get(string channelId)
{
if (_channels.TryGetValue(channelId, out var channel))
return channel;
return null;
try
{
_mutex.WaitOne();
if (_channels.TryGetValue(channelId, out var channel))
return channel;
return null;
}
finally
{
_mutex.ReleaseMutex();
}
}
public async Task Save(string userId)
{
if (!_channels.TryGetValue(userId, out var channel))
return;
Channel? channel;
try
{
_mutex.WaitOne();
if (!_channels.TryGetValue(userId, out channel))
return;
}
finally
{
_mutex.ReleaseMutex();
}
_logger.Debug($"Saving channel data to database [channel id: {channel.Id}][channel name: {channel.User.Name}]");
await Task.WhenAll([
channel.Chatters.Save(),
channel.Connections.Save(),
channel.Emotes.Save(),
channel.Groups.Save(),
channel.GroupPermissions.Save(),
channel.Policies.Save(),
@@ -132,6 +159,7 @@ namespace HermesSocketServer.Services
var genericTablesTask = Task.WhenAll([
channel.Chatters.Save(),
channel.Connections.Save(),
channel.Emotes.Save(),
channel.Filters.Save(),
channel.VoiceStates.Save(),
]).ConfigureAwait(false);

View File

@@ -6,32 +6,40 @@ namespace HermesSocketServer.Socket.Handlers
{
public class ChatterHandler : ISocketHandler
{
private const int CHATTER_BUFFER_SIZE = 2000;
public int OperationCode { get; } = 6;
private readonly Database _database;
private readonly HashSet<long> _chatters;
private readonly ChatterMessage[] _array;
private readonly long[] _array;
private readonly ILogger _logger;
private readonly object _lock;
private readonly Mutex _lock;
private int _index;
public ChatterHandler(Database database, ILogger logger)
{
_database = database;
_logger = logger;
_chatters = new HashSet<long>(1001);
_array = new ChatterMessage[1000];
_chatters = new HashSet<long>(CHATTER_BUFFER_SIZE);
_array = new long[CHATTER_BUFFER_SIZE];
_index = -1;
_lock = new object();
_lock = new Mutex();
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not ChatterMessage data || sender.Id == null)
return;
lock (_lock)
if (sender.Slave && !sender.WebLogin)
{
_logger.Warning($"Received message from a slave client [message type: chatter details][sender id: {sender.Id}][sender session: {sender.SessionId}]");
return;
}
try
{
_lock.WaitOne();
if (_chatters.Contains(data.Id))
return;
@@ -40,7 +48,16 @@ namespace HermesSocketServer.Socket.Handlers
if (_index == _array.Length - 1)
_index = -1;
_array[++_index] = data;
var previous = _array[++_index];
if (previous != 0)
{
_chatters.Remove(previous);
}
_array[_index] = data.Id;
}
finally
{
_lock.ReleaseMutex();
}
try

View File

@@ -1,36 +1,48 @@
using HermesSocketLibrary.db;
using HermesSocketLibrary.Requests.Messages;
using HermesSocketLibrary.Socket.Data;
using Npgsql;
using HermesSocketServer.Services;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Socket.Handlers
{
public class EmoteDetailsHandler : ISocketHandler
{
public int OperationCode { get; } = 7;
private readonly Database _database;
private readonly HashSet<string> _emotes;
private readonly ILogger _logger;
private readonly object _lock;
private const int EMOTE_BUFFER_SIZE = 5000;
public EmoteDetailsHandler(Database database, ILogger logger)
public int OperationCode { get; } = 7;
private readonly ChannelManager _manager;
private readonly HashSet<string> _emotes;
private readonly string[] _array;
private readonly ILogger _logger;
private readonly Mutex _mutex;
private int _index;
public EmoteDetailsHandler(ChannelManager manager, ILogger logger)
{
_database = database;
_manager = manager;
_emotes = new HashSet<string>(EMOTE_BUFFER_SIZE);
_array = new string[EMOTE_BUFFER_SIZE];
_logger = logger;
_emotes = new HashSet<string>(501);
_lock = new object();
_mutex = new Mutex();
_index = -1;
}
public async Task Execute<T>(WebSocketUser sender, T message, HermesSocketManager sockets)
{
if (message is not EmoteDetailsMessage data || sender.Id == null)
return;
if (data.Emotes == null || !data.Emotes.Any())
return;
lock (_lock)
if (sender.Slave && !sender.WebLogin)
{
_logger.Warning($"Received message from a slave client [message type: emote details][sender id: {sender.Id}][sender session: {sender.SessionId}]");
return;
}
try
{
_mutex.WaitOne();
foreach (var entry in data.Emotes)
{
if (_emotes.Contains(entry.Key))
@@ -40,36 +52,32 @@ namespace HermesSocketServer.Socket.Handlers
}
_emotes.Add(entry.Key);
if (_index == _array.Length - 1)
_index = -1;
var previous = _array[++_index];
if (previous != null)
{
_emotes.Remove(previous);
}
_array[_index] = entry.Key;
}
}
finally
{
_mutex.ReleaseMutex();
}
if (!data.Emotes.Any())
return;
int rows = 0;
string sql = "INSERT INTO \"Emote\" (id, name) VALUES (@idd, @name)";
using (var connection = await _database.DataSource.OpenConnectionAsync())
{
using (var command = new NpgsqlCommand(sql, connection))
{
foreach (var entry in data.Emotes)
{
command.Parameters.Clear();
command.Parameters.AddWithValue("idd", entry.Key);
command.Parameters.AddWithValue("name", entry.Value);
await command.PrepareAsync();
try
{
rows += await command.ExecuteNonQueryAsync();
}
catch (Exception e)
{
_logger.Error(e, "Failed to add emote detail: " + entry.Key + " -> " + entry.Value);
}
}
}
}
var channel = _manager.Get(sender.Id);
if (channel == null)
return;
foreach (var entry in data.Emotes)
channel.Emotes.Set(entry.Key, new EmoteInfo() { Id = entry.Key, Name = entry.Value, UserId = channel.Id });
}
}
}

View File

@@ -13,7 +13,7 @@ namespace HermesSocketServer.Socket.Handlers
private readonly HashSet<string> _history;
private readonly EmoteUsageMessage[] _array;
private readonly ILogger _logger;
private readonly object _lock;
private readonly Mutex _mutex;
private int _index;
@@ -21,9 +21,9 @@ namespace HermesSocketServer.Socket.Handlers
{
_database = database;
_logger = logger;
_history = new HashSet<string>(101);
_array = new EmoteUsageMessage[100];
_lock = new object();
_history = new HashSet<string>(1001);
_array = new EmoteUsageMessage[1000];
_mutex = new Mutex();
_index = -1;
}
@@ -32,9 +32,15 @@ namespace HermesSocketServer.Socket.Handlers
{
if (message is not EmoteUsageMessage data || sender.Id == null)
return;
lock (_lock)
if (sender.Slave && !sender.WebLogin)
{
_logger.Warning($"Received message from a slave client [message type: emote usage][sender id: {sender.Id}][sender session: {sender.SessionId}]");
return;
}
try
{
_mutex.WaitOne();
if (_history.Contains(data.MessageId))
{
return;
@@ -50,7 +56,12 @@ namespace HermesSocketServer.Socket.Handlers
_array[_index] = data;
}
finally
{
_mutex.ReleaseMutex();
}
// TODO: multi-row inserts to increase database performance.
int rows = 0;
string sql = "INSERT INTO \"EmoteUsageHistory\" (timestamp, \"broadcasterId\", \"emoteId\", \"chatterId\") VALUES (@time, @broadcaster, @emote, @chatter)";
using (var connection = await _database.DataSource.OpenConnectionAsync())

View File

@@ -17,7 +17,7 @@ namespace HermesSocketServer.Socket.Handlers
private readonly Database _database;
private readonly HermesSocketManager _sockets;
private readonly ILogger _logger;
private readonly object _lock;
private readonly SemaphoreSlim _semaphore;
public HermesLoginHandler(ChannelManager manager, IStore<string, TTSVoice> voices, ServerConfiguration configuration, Database database, HermesSocketManager sockets, ILogger logger)
{
@@ -27,7 +27,7 @@ namespace HermesSocketServer.Socket.Handlers
_database = database;
_sockets = sockets;
_logger = logger;
_lock = new object();
_semaphore = new SemaphoreSlim(1);
}
@@ -42,119 +42,102 @@ namespace HermesSocketServer.Socket.Handlers
var result = await _database.ExecuteScalar(sql, new Dictionary<string, object>() { { "key", data.ApiKey } });
string? userId = result?.ToString();
if (userId == null)
if (string.IsNullOrWhiteSpace(userId))
return;
IEnumerable<WebSocketUser?> recipients = Enumerable.Empty<WebSocketUser?>();
lock (_lock)
try
{
await _semaphore.WaitAsync();
if (sender.Id != null)
throw new Exception("User already logged in.");
return;
sender.Id = userId;
if (string.IsNullOrWhiteSpace(sender.Id))
throw new Exception("Credentials do not match.");
recipients = _sockets.GetSockets(userId).ToList().Where(s => s.SessionId != sender.SessionId);
sender.Slave = data.WebLogin || recipients.Where(r => r?.WebLogin != true).Any();
}
sender.Slave = data.WebLogin || recipients.Any(r => r != null && !r.WebLogin);
sender.ApiKey = data.ApiKey;
sender.WebLogin = data.WebLogin;
sender.ApiKey = data.ApiKey;
sender.WebLogin = data.WebLogin;
var channel = _manager.Get(userId);
if (channel == null)
{
channel = await _manager.Add(userId);
// Fetch channel data.
var channel = _manager.Get(userId);
if (channel == null)
{
channel = await _manager.Add(userId);
if (channel == null)
throw new Exception("Channel does not exist.");
}
sender.Name = channel.User.Name;
sender.Admin = channel.User.Role == "ADMIN";
if (string.IsNullOrEmpty(sender.Name))
{
_logger.Error($"Could not find username for a certain user [user id: {userId}][api key: {data.ApiKey}]");
return;
}
sender.Name = channel.User.Name;
sender.Admin = channel.User.Role == "ADMIN";
if (string.IsNullOrEmpty(sender.Name))
{
_logger.Error($"Could not find username for a certain user [user id: {userId}][api key: {data.ApiKey}]");
return;
}
if (string.IsNullOrEmpty(channel.User.DefaultVoice))
_logger.Warning($"No default voice was set for an user [user id: {userId}][api key: {data.ApiKey}]");
sql = "select \"providerAccountId\" from \"Account\" where \"userId\" = @user and provider = @provider";
var result2 = await _database.ExecuteScalar(sql, new Dictionary<string, object>() { { "user", userId }, { "provider", "twitch" } });
var providerId = result2?.ToString();
if (providerId == null)
{
_logger.Warning($"Could not find the Provider Account Id [user id: {userId}][provider: twitch]");
return;
}
var ack = new LoginAckMessage()
{
UserId = userId,
ProviderAccountId = providerId,
SessionId = sender.SessionId,
UserName = channel.User.Name,
OwnerId = _configuration.Tts.OwnerId,
Admin = sender.Admin,
WebLogin = data.WebLogin,
WordFilters = channel.Filters.Get().Values,
DefaultTTSVoice = channel.User.DefaultVoice ?? _configuration.Tts.DefaultTtsVoice,
TTSVoicesAvailable = _voices.Get().ToDictionary(v => v.Key, v => v.Value.Name),
Slave = sender.Slave,
};
var userIdDict = new Dictionary<string, object>() { { "user", userId } };
ack.Connections = new List<Connection>();
string sql3 = "select \"name\", \"type\", \"clientId\", \"accessToken\", \"grantType\", \"scope\", \"expiresAt\", \"default\" from \"Connection\" where \"userId\" = @user";
await _database.Execute(sql3, userIdDict, sql =>
ack.Connections.Add(new Connection()
{
UserId = channel.Id,
Name = sql.GetString(0),
Type = sql.GetString(1),
ClientId = sql.GetString(2),
AccessToken = sql.GetString(3),
GrantType = sql.GetString(4),
Scope = sql.GetString(5),
ExpiresAt = sql.GetDateTime(6),
Default = sql.GetBoolean(7)
})
);
ack.EnabledTTSVoices = new List<string>();
string sql5 = $"SELECT v.name FROM \"TtsVoiceState\" s "
+ "INNER JOIN \"TtsVoice\" v ON s.\"ttsVoiceId\" = v.id "
+ "WHERE \"userId\" = @user AND state = true";
await _database.Execute(sql5, userIdDict, (r) => ack.EnabledTTSVoices.Add(r.GetString(0)));
await sender.Send(2, ack);
string version = $"{data.MajorVersion}.{data.MinorVersion}.{data.PatchVersion}";
_logger.Information($"Hermes client logged in {(sender.Admin ? "as administrator " : "")}[name: {sender.Name}][id: {userId}][ip: {sender.IPAddress}][version: {version}][web: {data.WebLogin}]");
ack = new LoginAckMessage()
{
AnotherClient = true,
UserId = userId,
OwnerId = _configuration.Tts.OwnerId,
WebLogin = data.WebLogin
};
var tasks = new List<Task>();
foreach (var socket in recipients)
{
try
{
tasks.Add(socket.Send(2, ack));
}
catch (Exception)
sql = "select \"providerAccountId\" from \"Account\" where \"userId\" = @user and provider = @provider";
var result2 = await _database.ExecuteScalar(sql, new Dictionary<string, object>() { { "user", userId }, { "provider", "twitch" } });
var providerId = result2?.ToString();
if (providerId == null)
{
_logger.Warning($"Could not find the Provider Account Id [user id: {userId}][provider: twitch]");
return;
}
var voices = _voices.Get();
var voiceStates = channel.VoiceStates.Get();
var voicesEnabled = voices.Values.Where(v => !voiceStates.TryGetValue(v.Id, out var voice) || voice.Enabled).Select(v => v.Name).ToList();
var ack = new LoginAckMessage()
{
UserId = userId,
ProviderAccountId = providerId,
SessionId = sender.SessionId,
UserName = channel.User.Name,
StreamElementsOverlayKey = channel.User.StreamElementsOverlayKey,
OwnerId = _configuration.Tts.OwnerId,
Admin = sender.Admin,
WebLogin = data.WebLogin,
WordFilters = channel.Filters.Get().Values,
DefaultTTSVoice = channel.User.DefaultVoice ?? _configuration.Tts.DefaultTtsVoice,
TTSVoicesAvailable = voices.ToDictionary(v => v.Key, v => v.Value.Name),
EnabledTTSVoices = voicesEnabled,
Connections = channel.Connections.Get().Values.ToList(),
Slave = sender.Slave,
};
await sender.Send(2, ack);
// Sending notification to other clients about another client logging in.
string version = $"{data.MajorVersion}.{data.MinorVersion}.{data.PatchVersion}";
_logger.Information($"Hermes client logged in {(sender.Admin ? "as administrator " : "")}[name: {sender.Name}][id: {userId}][ip: {sender.IPAddress}][version: {version}][web: {data.WebLogin}]");
ack = new LoginAckMessage()
{
AnotherClient = true,
UserId = userId,
OwnerId = _configuration.Tts.OwnerId,
WebLogin = data.WebLogin
};
var tasks = new List<Task>();
foreach (var socket in recipients)
{
try
{
tasks.Add(socket!.Send(2, ack));
}
catch (Exception)
{
}
}
await Task.WhenAll(tasks);
}
finally
{
_semaphore.Release();
}
await Task.WhenAll(tasks);
}
}
}

View File

@@ -23,6 +23,11 @@ namespace HermesSocketServer.Socket.Handlers
{
if (message is not RequestMessage data || sender.Id == null)
return;
if (sender.Slave && !sender.WebLogin && !data.Type.StartsWith("get"))
{
_logger.Warning($"Received a non-get request from a slave client [request type: {data.Type}][sender id: {sender.Id}][sender session: {sender.SessionId}]");
return;
}
RequestResult? result = null;
_logger.Debug("Executing request handler: " + data.Type);

View File

@@ -30,7 +30,8 @@ namespace HermesSocketServer.Store
await _database.Execute(sql, data, (reader) =>
{
var chatterId = reader.GetInt32(0).ToString();
lock (_lock)
_rwls.EnterWriteLock();
try
{
_store.Add(chatterId, new GroupChatter()
{
@@ -40,6 +41,10 @@ namespace HermesSocketServer.Store
ChatterLabel = reader.GetString(1),
});
}
finally
{
_rwls.ExitWriteLock();
}
});
_logger.Information($"Loaded {_store.Count} group chatters from database [group id: {_groupId}]");
}

View File

@@ -38,7 +38,7 @@ namespace HermesSocketServer.Store
Default = reader.GetBoolean(7),
});
});
_logger.Information($"Loaded {_store.Count} groups from database.");
_logger.Information($"Loaded {_store.Count} connections from database.");
}
protected override void OnInitialAdd(string key, Connection value)

62
Store/EmoteStore.cs Normal file
View File

@@ -0,0 +1,62 @@
using System.Collections.Concurrent;
using HermesSocketLibrary.db;
using HermesSocketLibrary.Requests.Messages;
using HermesSocketServer.Store.Internal;
namespace HermesSocketServer.Store
{
public class EmoteStore : AutoSavedStore<string, EmoteInfo>
{
private readonly string _userId;
private readonly Database _database;
private readonly Serilog.ILogger _logger;
public EmoteStore(string userId, DatabaseTable table, Database database, Serilog.ILogger logger)
: base(table, database, logger)
{
_userId = userId;
_database = database;
_logger = logger;
}
public override async Task Load()
{
var emotes = new List<EmoteInfo>();
var data = new Dictionary<string, object>() { { "user", _userId } };
string sql = $"SELECT id, name FROM \"Emote\" WHERE \"userId\" = @user";
await _database.Execute(sql, data, (reader) =>
{
var id = reader.GetString(0);
var name = reader.GetString(1);
_store.Add(id, new EmoteInfo()
{
Id = id,
UserId = _userId,
Name = name,
});
emotes.Add(new EmoteInfo() { Id = id, Name = name, UserId = _userId });
});
_logger.Information($"Loaded {_store.Count} emotes from database.");
}
protected override void OnInitialAdd(string key, EmoteInfo value)
{
ArgumentException.ThrowIfNullOrWhiteSpace(key, nameof(key));
ArgumentNullException.ThrowIfNull(value, nameof(value));
ArgumentException.ThrowIfNullOrWhiteSpace(value.UserId, nameof(value.UserId));
ArgumentException.ThrowIfNullOrWhiteSpace(value.Name, nameof(value.Name));
}
protected override void OnInitialModify(string key, EmoteInfo oldValue, EmoteInfo newValue)
{
ArgumentNullException.ThrowIfNull(newValue, nameof(newValue));
ArgumentException.ThrowIfNullOrWhiteSpace(newValue.UserId, nameof(newValue.UserId));
ArgumentException.ThrowIfNullOrWhiteSpace(newValue.Name, nameof(newValue.Name));
}
protected override void OnPostRemove(string key, EmoteInfo value)
{
}
}
}

View File

@@ -67,7 +67,7 @@ namespace HermesSocketServer.Store
{
List<Task> tasks = _chatters.Values.Select(c => c.Save()).ToList();
tasks.Add(base.Save());
await Task.WhenAll(base.Save());
await Task.WhenAll(tasks);
}
protected override void OnInitialAdd(string key, Group value)

View File

@@ -2,6 +2,7 @@ namespace HermesSocketServer.Store
{
public interface IStore<K, V>
{
bool Exists(K key);
V? Get(K key);
IDictionary<K, V> Get();
Task Load();

View File

@@ -46,13 +46,26 @@ namespace HermesSocketServer.Store.Internal
private async Task GenerateQuery(IList<K> keys, Func<int, string> generate, Func<string, IEnumerable<K>, IEnumerable<V>, Task<int>> execute)
{
ImmutableList<K>? list = null;
lock (_lock)
_rwls.EnterUpgradeableReadLock();
try
{
if (!keys.Any())
return;
list = keys.ToImmutableList();
keys.Clear();
_rwls.EnterWriteLock();
try
{
list = keys.ToImmutableList();
keys.Clear();
}
finally
{
_rwls.ExitWriteLock();
}
}
finally
{
_rwls.ExitUpgradeableReadLock();
}
var query = generate(list.Count);

View File

@@ -51,13 +51,26 @@ namespace HermesSocketServer.Store.Internal
private async Task GenerateQuery(IList<K> keys, Func<int, string> generate, Func<string, IEnumerable<K>, IEnumerable<V?>, Task<int>> execute)
{
ImmutableList<K>? list = null;
lock (_lock)
_rwls.EnterUpgradeableReadLock();
try
{
if (!keys.Any())
return;
list = keys.ToImmutableList();
keys.Clear();
_rwls.EnterWriteLock();
try
{
list = keys.ToImmutableList();
keys.Clear();
}
finally
{
_rwls.ExitWriteLock();
}
}
finally
{
_rwls.ExitUpgradeableReadLock();
}
var query = generate(list.Count);
@@ -77,15 +90,27 @@ namespace HermesSocketServer.Store.Internal
private async Task GenerateDeleteQuery(IList<K> keys, IList<V> values, Func<int, string> generate, Func<string, IEnumerable<V>, Task<int>> execute)
{
ImmutableList<V>? list = null;
lock (_lock)
_rwls.EnterUpgradeableReadLock();
try
{
if (!keys.Any() || !values.Any()) {
if (!keys.Any() || !values.Any())
return;
}
list = values.ToImmutableList();
values.Clear();
keys.Clear();
_rwls.EnterWriteLock();
try
{
list = values.ToImmutableList();
values.Clear();
keys.Clear();
}
finally
{
_rwls.ExitWriteLock();
}
}
finally
{
_rwls.ExitUpgradeableReadLock();
}
var query = generate(list.Count);

View File

@@ -95,15 +95,14 @@ namespace HermesSocketServer.Store.Internal
var ctp = columns.ToDictionary(c => c, c => _columnPropertyRelations[c]);
var sb = new StringBuilder();
var columnsLower = columns.Select(c => c.ToLower());
sb.Append($"INSERT INTO \"{table}\" (\"{string.Join("\", \"", columns)}\") VALUES ");
for (var row = 0; row < rows; row++)
{
sb.Append("(");
foreach (var column in columnsLower)
foreach (var column in columns)
{
sb.Append('@')
.Append(column)
.Append(column.ToLower())
.Append(row);
if (typeMapping.TryGetValue(column, out var type))

View File

@@ -10,7 +10,7 @@ namespace HermesSocketServer.Store.Internal
protected readonly IList<K> _added;
protected readonly IList<K> _modified;
protected readonly IList<K> _deleted;
protected readonly object _lock;
protected readonly ReaderWriterLockSlim _rwls;
public GroupSaveStore()
@@ -19,7 +19,7 @@ namespace HermesSocketServer.Store.Internal
_added = new List<K>();
_modified = new List<K>();
_deleted = new List<K>();
_lock = new object();
_rwls = new ReaderWriterLockSlim();
}
public abstract Task Load();
@@ -28,22 +28,46 @@ namespace HermesSocketServer.Store.Internal
protected abstract void OnPostRemove(K key, V value);
public abstract Task Save();
public bool Exists(K key)
{
_rwls.EnterReadLock();
try
{
return _store.ContainsKey(key);
}
finally
{
_rwls.ExitReadLock();
}
}
public V? Get(K key)
{
lock (_lock)
_rwls.EnterReadLock();
try
{
if (_store.TryGetValue(key, out var value))
return value;
}
finally
{
_rwls.ExitReadLock();
}
return null;
}
public IDictionary<K, V> Get()
{
lock (_lock)
_rwls.EnterReadLock();
try
{
return _store.ToImmutableDictionary();
}
finally
{
_rwls.ExitReadLock();
}
}
public bool Modify(K? key, V value)
@@ -51,19 +75,32 @@ namespace HermesSocketServer.Store.Internal
if (key == null)
return false;
lock (_lock)
_rwls.EnterUpgradeableReadLock();
try
{
if (_store.TryGetValue(key, out V? oldValue))
{
OnInitialModify(key, oldValue, value);
_store[key] = value;
if (!_added.Contains(key) && !_modified.Contains(key))
_rwls.EnterWriteLock();
try
{
_modified.Add(key);
OnInitialModify(key, oldValue, value);
_store[key] = value;
if (!_added.Contains(key) && !_modified.Contains(key))
{
_modified.Add(key);
}
return true;
}
finally
{
_rwls.ExitWriteLock();
}
return true;
}
}
finally
{
_rwls.ExitUpgradeableReadLock();
}
return false;
}
@@ -72,18 +109,31 @@ namespace HermesSocketServer.Store.Internal
if (key == null)
return false;
lock (_lock)
_rwls.EnterUpgradeableReadLock();
try
{
if (_store.TryGetValue(key, out V? value))
{
modify(value);
if (!_added.Contains(key) && !_modified.Contains(key))
_rwls.EnterWriteLock();
try
{
_modified.Add(key);
modify(value);
if (!_added.Contains(key) && !_modified.Contains(key))
{
_modified.Add(key);
}
return true;
}
finally
{
_rwls.ExitWriteLock();
}
return true;
}
}
finally
{
_rwls.ExitUpgradeableReadLock();
}
return false;
}
@@ -92,25 +142,28 @@ namespace HermesSocketServer.Store.Internal
if (key == null)
return false;
lock (_lock)
_rwls.EnterWriteLock();
try
{
if (_store.TryGetValue(key, out var value))
{
if (_store.Remove(key))
_store.Remove(key);
OnPostRemove(key, value);
if (!_added.Remove(key))
{
OnPostRemove(key, value);
if (!_added.Remove(key))
_modified.Remove(key);
if (!fromCascade && !_deleted.Contains(key))
{
_modified.Remove(key);
if (!fromCascade && !_deleted.Contains(key))
{
_deleted.Add(key);
}
_deleted.Add(key);
}
return true;
}
return true;
}
}
finally
{
_rwls.ExitWriteLock();
}
return false;
}
@@ -119,20 +172,18 @@ namespace HermesSocketServer.Store.Internal
if (key == null)
return false;
lock (_lock)
_rwls.EnterWriteLock();
try
{
if (_store.TryGetValue(key, out V? fetched))
{
if (fetched != value)
OnInitialModify(key, fetched, value);
_store[key] = value;
if (!_added.Contains(key) && !_modified.Contains(key))
{
OnInitialModify(key, fetched, value);
_store[key] = value;
if (!_added.Contains(key) && !_modified.Contains(key))
{
_modified.Add(key);
}
return true;
_modified.Add(key);
}
return true;
}
else
{
@@ -145,7 +196,10 @@ namespace HermesSocketServer.Store.Internal
return true;
}
}
return false;
finally
{
_rwls.ExitWriteLock();
}
}
}
}

View File

@@ -19,7 +19,7 @@ namespace HermesSocketServer.Store
public override async Task Load()
{
string sql = "SELECT id, name, email, role, \"ttsDefaultVoice\" FROM \"User\";";
string sql = "SELECT id, name, email, role, \"ttsDefaultVoice\", seolkey FROM \"User\";";
await _database.Execute(sql, new Dictionary<string, object>(), (reader) =>
{
string id = reader.GetString(0);
@@ -30,6 +30,7 @@ namespace HermesSocketServer.Store
Email = reader.GetString(2),
Role = reader.GetString(3),
DefaultVoice = reader.GetString(4),
StreamElementsOverlayKey = reader.GetString(5),
});
});
_logger.Information($"Loaded {_store.Count} users from database.");