diff --git a/Services/ChannelManager.cs b/Services/ChannelManager.cs index 94eafc2..3edef2c 100644 --- a/Services/ChannelManager.cs +++ b/Services/ChannelManager.cs @@ -14,7 +14,7 @@ namespace HermesSocketServer.Services private readonly ServerConfiguration _configuration; private readonly Serilog.ILogger _logger; private readonly IDictionary _channels; - private readonly object _lock; + private readonly Mutex _mutex; public ChannelManager(IStore users, Database database, IStore voices, ServerConfiguration configuration, Serilog.ILogger logger) { @@ -24,7 +24,7 @@ namespace HermesSocketServer.Services _configuration = configuration; _logger = logger; _channels = new ConcurrentDictionary(); - _lock = new object(); + _mutex = new Mutex(); } @@ -36,15 +36,16 @@ namespace HermesSocketServer.Services return await Task.Run(() => { - lock (_lock) + try { + _mutex.WaitOne(); if (_channels.TryGetValue(userId, out var channel)) - return Task.FromResult(channel); + return channel; var actionTable = _configuration.Database.Tables["Action"]; var chatterTable = _configuration.Database.Tables["Chatter"]; var connectionTable = _configuration.Database.Tables["Connection"]; - //var chatterGroupTable = _configuration.Database.Tables["ChatterGroup"]; + var connectionStateTable = _configuration.Database.Tables["ConnectionState"]; var groupTable = _configuration.Database.Tables["Group"]; var groupPermissionTable = _configuration.Database.Tables["GroupPermission"]; var policyTable = _configuration.Database.Tables["Policy"]; @@ -93,22 +94,43 @@ namespace HermesSocketServer.Services ]); _channels.Add(userId, channel); - return Task.FromResult(channel); + return channel; + } + finally + { + _mutex.ReleaseMutex(); } }); } public Channel? Get(string channelId) { - if (_channels.TryGetValue(channelId, out var channel)) - return channel; - return null; + try + { + _mutex.WaitOne(); + if (_channels.TryGetValue(channelId, out var channel)) + return channel; + return null; + } + finally + { + _mutex.ReleaseMutex(); + } } public async Task Save(string userId) { - if (!_channels.TryGetValue(userId, out var channel)) - return; + Channel? channel; + try + { + _mutex.WaitOne(); + if (!_channels.TryGetValue(userId, out channel)) + return; + } + finally + { + _mutex.ReleaseMutex(); + } _logger.Debug($"Saving channel data to database [channel id: {channel.Id}][channel name: {channel.User.Name}]"); await Task.WhenAll([ diff --git a/Socket/Handlers/ChatterHandler.cs b/Socket/Handlers/ChatterHandler.cs index 094a735..fc6956e 100644 --- a/Socket/Handlers/ChatterHandler.cs +++ b/Socket/Handlers/ChatterHandler.cs @@ -14,7 +14,7 @@ namespace HermesSocketServer.Socket.Handlers private readonly long[] _array; private readonly ILogger _logger; - private readonly object _lock; + private readonly Mutex _lock; private int _index; public ChatterHandler(Database database, ILogger logger) @@ -24,7 +24,7 @@ namespace HermesSocketServer.Socket.Handlers _chatters = new HashSet(CHATTER_BUFFER_SIZE); _array = new long[CHATTER_BUFFER_SIZE]; _index = -1; - _lock = new object(); + _lock = new Mutex(); } public async Task Execute(WebSocketUser sender, T message, HermesSocketManager sockets) @@ -32,8 +32,9 @@ namespace HermesSocketServer.Socket.Handlers if (message is not ChatterMessage data || sender.Id == null) return; - lock (_lock) + try { + _lock.WaitOne(); if (_chatters.Contains(data.Id)) return; @@ -43,11 +44,16 @@ namespace HermesSocketServer.Socket.Handlers _index = -1; var previous = _array[++_index]; - if (previous != 0) { + if (previous != 0) + { _chatters.Remove(previous); } _array[_index] = data.Id; } + finally + { + _lock.ReleaseMutex(); + } try { diff --git a/Socket/Handlers/EmoteDetailsHandler.cs b/Socket/Handlers/EmoteDetailsHandler.cs index e290e79..a244d88 100644 --- a/Socket/Handlers/EmoteDetailsHandler.cs +++ b/Socket/Handlers/EmoteDetailsHandler.cs @@ -14,7 +14,7 @@ namespace HermesSocketServer.Socket.Handlers private readonly HashSet _emotes; private readonly string[] _array; private readonly ILogger _logger; - private readonly object _lock; + private readonly Mutex _mutex; private int _index; @@ -24,7 +24,7 @@ namespace HermesSocketServer.Socket.Handlers _emotes = new HashSet(EMOTE_BUFFER_SIZE); _array = new string[EMOTE_BUFFER_SIZE]; _logger = logger; - _lock = new object(); + _mutex = new Mutex(); _index = -1; } @@ -36,8 +36,9 @@ namespace HermesSocketServer.Socket.Handlers if (data.Emotes == null || !data.Emotes.Any()) return; - lock (_lock) + try { + _mutex.WaitOne(); foreach (var entry in data.Emotes) { if (_emotes.Contains(entry.Key)) @@ -59,6 +60,10 @@ namespace HermesSocketServer.Socket.Handlers _array[_index] = entry.Key; } } + finally + { + _mutex.ReleaseMutex(); + } if (!data.Emotes.Any()) return; diff --git a/Socket/Handlers/EmoteUsageHandler.cs b/Socket/Handlers/EmoteUsageHandler.cs index dbd9157..cf8eaa2 100644 --- a/Socket/Handlers/EmoteUsageHandler.cs +++ b/Socket/Handlers/EmoteUsageHandler.cs @@ -13,7 +13,7 @@ namespace HermesSocketServer.Socket.Handlers private readonly HashSet _history; private readonly EmoteUsageMessage[] _array; private readonly ILogger _logger; - private readonly object _lock; + private readonly Mutex _mutex; private int _index; @@ -21,9 +21,9 @@ namespace HermesSocketServer.Socket.Handlers { _database = database; _logger = logger; - _history = new HashSet(101); - _array = new EmoteUsageMessage[100]; - _lock = new object(); + _history = new HashSet(1001); + _array = new EmoteUsageMessage[1000]; + _mutex = new Mutex(); _index = -1; } @@ -33,7 +33,7 @@ namespace HermesSocketServer.Socket.Handlers if (message is not EmoteUsageMessage data || sender.Id == null) return; - lock (_lock) + try { if (_history.Contains(data.MessageId)) { @@ -50,6 +50,10 @@ namespace HermesSocketServer.Socket.Handlers _array[_index] = data; } + finally + { + _mutex.ReleaseMutex(); + } int rows = 0; string sql = "INSERT INTO \"EmoteUsageHistory\" (timestamp, \"broadcasterId\", \"emoteId\", \"chatterId\") VALUES (@time, @broadcaster, @emote, @chatter)"; diff --git a/Socket/Handlers/HermesLoginHandler.cs b/Socket/Handlers/HermesLoginHandler.cs index e9a5931..a7d8b8f 100644 --- a/Socket/Handlers/HermesLoginHandler.cs +++ b/Socket/Handlers/HermesLoginHandler.cs @@ -17,7 +17,7 @@ namespace HermesSocketServer.Socket.Handlers private readonly Database _database; private readonly HermesSocketManager _sockets; private readonly ILogger _logger; - private readonly object _lock; + private readonly SemaphoreSlim _semaphore; public HermesLoginHandler(ChannelManager manager, IStore voices, ServerConfiguration configuration, Database database, HermesSocketManager sockets, ILogger logger) { @@ -27,7 +27,7 @@ namespace HermesSocketServer.Socket.Handlers _database = database; _sockets = sockets; _logger = logger; - _lock = new object(); + _semaphore = new SemaphoreSlim(1); } @@ -46,90 +46,95 @@ namespace HermesSocketServer.Socket.Handlers return; IEnumerable recipients = Enumerable.Empty(); - lock (_lock) + try { + await _semaphore.WaitAsync(); if (sender.Id != null) return; sender.Id = userId; - recipients = _sockets.GetSockets(userId).ToList().Where(s => s.SessionId != sender.SessionId); sender.Slave = data.WebLogin || recipients.Where(r => r != null && !r.WebLogin).Any(); - } + recipients = _sockets.GetSockets(userId).ToList().Where(s => s.SessionId != sender.SessionId); - sender.ApiKey = data.ApiKey; - sender.WebLogin = data.WebLogin; + sender.ApiKey = data.ApiKey; + sender.WebLogin = data.WebLogin; - // Fetch channel data. - var channel = _manager.Get(userId); - if (channel == null) - { - channel = await _manager.Add(userId); + // Fetch channel data. + var channel = _manager.Get(userId); if (channel == null) - throw new Exception("Channel does not exist."); - } - - sender.Name = channel.User.Name; - sender.Admin = channel.User.Role == "ADMIN"; - - if (string.IsNullOrEmpty(sender.Name)) - { - _logger.Error($"Could not find username for a certain user [user id: {userId}][api key: {data.ApiKey}]"); - return; - } - - sql = "select \"providerAccountId\" from \"Account\" where \"userId\" = @user and provider = @provider"; - var result2 = await _database.ExecuteScalar(sql, new Dictionary() { { "user", userId }, { "provider", "twitch" } }); - var providerId = result2?.ToString(); - if (providerId == null) - { - _logger.Warning($"Could not find the Provider Account Id [user id: {userId}][provider: twitch]"); - return; - } - - var voices = _voices.Get(); - var ack = new LoginAckMessage() - { - UserId = userId, - ProviderAccountId = providerId, - SessionId = sender.SessionId, - UserName = channel.User.Name, - OwnerId = _configuration.Tts.OwnerId, - Admin = sender.Admin, - WebLogin = data.WebLogin, - WordFilters = channel.Filters.Get().Values, - DefaultTTSVoice = channel.User.DefaultVoice ?? _configuration.Tts.DefaultTtsVoice, - TTSVoicesAvailable = _voices.Get().ToDictionary(v => v.Key, v => v.Value.Name), - EnabledTTSVoices = channel.VoiceStates.Get().Values.Where(v => v.Enabled && voices.ContainsKey(v.Id)).Select(v => voices[v.Id].Name).ToList(), - Connections = channel.Connections.Get().Values.ToList(), - Slave = sender.Slave, - }; - - await sender.Send(2, ack); - - // Sending notification to other clients about another client logging in. - string version = $"{data.MajorVersion}.{data.MinorVersion}.{data.PatchVersion}"; - _logger.Information($"Hermes client logged in {(sender.Admin ? "as administrator " : "")}[name: {sender.Name}][id: {userId}][ip: {sender.IPAddress}][version: {version}][web: {data.WebLogin}]"); - - ack = new LoginAckMessage() - { - AnotherClient = true, - UserId = userId, - OwnerId = _configuration.Tts.OwnerId, - WebLogin = data.WebLogin - }; - - var tasks = new List(); - foreach (var socket in recipients) - { - try { - tasks.Add(socket!.Send(2, ack)); + channel = await _manager.Add(userId); + if (channel == null) + throw new Exception("Channel does not exist."); } - catch (Exception) + + sender.Name = channel.User.Name; + sender.Admin = channel.User.Role == "ADMIN"; + + if (string.IsNullOrEmpty(sender.Name)) { + _logger.Error($"Could not find username for a certain user [user id: {userId}][api key: {data.ApiKey}]"); + return; } + + sql = "select \"providerAccountId\" from \"Account\" where \"userId\" = @user and provider = @provider"; + var result2 = await _database.ExecuteScalar(sql, new Dictionary() { { "user", userId }, { "provider", "twitch" } }); + var providerId = result2?.ToString(); + if (providerId == null) + { + _logger.Warning($"Could not find the Provider Account Id [user id: {userId}][provider: twitch]"); + return; + } + + var voices = _voices.Get(); + var ack = new LoginAckMessage() + { + UserId = userId, + ProviderAccountId = providerId, + SessionId = sender.SessionId, + UserName = channel.User.Name, + OwnerId = _configuration.Tts.OwnerId, + Admin = sender.Admin, + WebLogin = data.WebLogin, + WordFilters = channel.Filters.Get().Values, + DefaultTTSVoice = channel.User.DefaultVoice ?? _configuration.Tts.DefaultTtsVoice, + TTSVoicesAvailable = _voices.Get().ToDictionary(v => v.Key, v => v.Value.Name), + EnabledTTSVoices = channel.VoiceStates.Get().Values.Where(v => v.Enabled && voices.ContainsKey(v.Id)).Select(v => voices[v.Id].Name).ToList(), + Connections = channel.Connections.Get().Values.ToList(), + Slave = sender.Slave, + }; + + await sender.Send(2, ack); + + // Sending notification to other clients about another client logging in. + string version = $"{data.MajorVersion}.{data.MinorVersion}.{data.PatchVersion}"; + _logger.Information($"Hermes client logged in {(sender.Admin ? "as administrator " : "")}[name: {sender.Name}][id: {userId}][ip: {sender.IPAddress}][version: {version}][web: {data.WebLogin}]"); + + ack = new LoginAckMessage() + { + AnotherClient = true, + UserId = userId, + OwnerId = _configuration.Tts.OwnerId, + WebLogin = data.WebLogin + }; + + var tasks = new List(); + foreach (var socket in recipients) + { + try + { + tasks.Add(socket!.Send(2, ack)); + } + catch (Exception) + { + } + } + await Task.WhenAll(tasks); + } + finally + { + _semaphore.Release(); } - await Task.WhenAll(tasks); } } } \ No newline at end of file diff --git a/Store/ChatterGroupStore.cs b/Store/ChatterGroupStore.cs index 0d9ea74..3699f78 100644 --- a/Store/ChatterGroupStore.cs +++ b/Store/ChatterGroupStore.cs @@ -30,7 +30,8 @@ namespace HermesSocketServer.Store await _database.Execute(sql, data, (reader) => { var chatterId = reader.GetInt32(0).ToString(); - lock (_lock) + _rwls.EnterWriteLock(); + try { _store.Add(chatterId, new GroupChatter() { @@ -40,6 +41,10 @@ namespace HermesSocketServer.Store ChatterLabel = reader.GetString(1), }); } + finally + { + _rwls.ExitWriteLock(); + } }); _logger.Information($"Loaded {_store.Count} group chatters from database [group id: {_groupId}]"); } diff --git a/Store/IStore.cs b/Store/IStore.cs index 425a861..c448a55 100644 --- a/Store/IStore.cs +++ b/Store/IStore.cs @@ -2,6 +2,7 @@ namespace HermesSocketServer.Store { public interface IStore { + bool Exists(K key); V? Get(K key); IDictionary Get(); Task Load(); diff --git a/Store/Internal/AutoSavedStore.cs b/Store/Internal/AutoSavedStore.cs index dd0eb41..9f12412 100644 --- a/Store/Internal/AutoSavedStore.cs +++ b/Store/Internal/AutoSavedStore.cs @@ -46,13 +46,26 @@ namespace HermesSocketServer.Store.Internal private async Task GenerateQuery(IList keys, Func generate, Func, IEnumerable, Task> execute) { ImmutableList? list = null; - lock (_lock) + _rwls.EnterUpgradeableReadLock(); + try { if (!keys.Any()) return; - list = keys.ToImmutableList(); - keys.Clear(); + _rwls.EnterWriteLock(); + try + { + list = keys.ToImmutableList(); + keys.Clear(); + } + finally + { + _rwls.ExitWriteLock(); + } + } + finally + { + _rwls.ExitUpgradeableReadLock(); } var query = generate(list.Count); diff --git a/Store/Internal/ComplexAutoSavedStore.cs b/Store/Internal/ComplexAutoSavedStore.cs index 57eeeec..40bb74b 100644 --- a/Store/Internal/ComplexAutoSavedStore.cs +++ b/Store/Internal/ComplexAutoSavedStore.cs @@ -51,13 +51,26 @@ namespace HermesSocketServer.Store.Internal private async Task GenerateQuery(IList keys, Func generate, Func, IEnumerable, Task> execute) { ImmutableList? list = null; - lock (_lock) + _rwls.EnterUpgradeableReadLock(); + try { if (!keys.Any()) return; - list = keys.ToImmutableList(); - keys.Clear(); + _rwls.EnterWriteLock(); + try + { + list = keys.ToImmutableList(); + keys.Clear(); + } + finally + { + _rwls.ExitWriteLock(); + } + } + finally + { + _rwls.ExitUpgradeableReadLock(); } var query = generate(list.Count); @@ -77,16 +90,27 @@ namespace HermesSocketServer.Store.Internal private async Task GenerateDeleteQuery(IList keys, IList values, Func generate, Func, Task> execute) { ImmutableList? list = null; - lock (_lock) + _rwls.EnterUpgradeableReadLock(); + try { if (!keys.Any() || !values.Any()) - { return; - } - list = values.ToImmutableList(); - values.Clear(); - keys.Clear(); + _rwls.EnterWriteLock(); + try + { + list = values.ToImmutableList(); + values.Clear(); + keys.Clear(); + } + finally + { + _rwls.ExitWriteLock(); + } + } + finally + { + _rwls.ExitUpgradeableReadLock(); } var query = generate(list.Count); diff --git a/Store/Internal/GroupedSaveStore.cs b/Store/Internal/GroupedSaveStore.cs index d383291..af49ce5 100644 --- a/Store/Internal/GroupedSaveStore.cs +++ b/Store/Internal/GroupedSaveStore.cs @@ -10,7 +10,7 @@ namespace HermesSocketServer.Store.Internal protected readonly IList _added; protected readonly IList _modified; protected readonly IList _deleted; - protected readonly object _lock; + protected readonly ReaderWriterLockSlim _rwls; public GroupSaveStore() @@ -19,7 +19,7 @@ namespace HermesSocketServer.Store.Internal _added = new List(); _modified = new List(); _deleted = new List(); - _lock = new object(); + _rwls = new ReaderWriterLockSlim(); } public abstract Task Load(); @@ -28,22 +28,46 @@ namespace HermesSocketServer.Store.Internal protected abstract void OnPostRemove(K key, V value); public abstract Task Save(); + + public bool Exists(K key) + { + _rwls.EnterReadLock(); + try + { + return _store.ContainsKey(key); + } + finally + { + _rwls.ExitReadLock(); + } + } + public V? Get(K key) { - lock (_lock) + _rwls.EnterReadLock(); + try { if (_store.TryGetValue(key, out var value)) return value; } + finally + { + _rwls.ExitReadLock(); + } return null; } public IDictionary Get() { - lock (_lock) + _rwls.EnterReadLock(); + try { return _store.ToImmutableDictionary(); } + finally + { + _rwls.ExitReadLock(); + } } public bool Modify(K? key, V value) @@ -51,19 +75,32 @@ namespace HermesSocketServer.Store.Internal if (key == null) return false; - lock (_lock) + _rwls.EnterUpgradeableReadLock(); + try { if (_store.TryGetValue(key, out V? oldValue)) { - OnInitialModify(key, oldValue, value); - _store[key] = value; - if (!_added.Contains(key) && !_modified.Contains(key)) + _rwls.EnterWriteLock(); + try { - _modified.Add(key); + OnInitialModify(key, oldValue, value); + _store[key] = value; + if (!_added.Contains(key) && !_modified.Contains(key)) + { + _modified.Add(key); + } + return true; + } + finally + { + _rwls.ExitWriteLock(); } - return true; } } + finally + { + _rwls.ExitUpgradeableReadLock(); + } return false; } @@ -72,18 +109,31 @@ namespace HermesSocketServer.Store.Internal if (key == null) return false; - lock (_lock) + _rwls.EnterUpgradeableReadLock(); + try { if (_store.TryGetValue(key, out V? value)) { - modify(value); - if (!_added.Contains(key) && !_modified.Contains(key)) + _rwls.EnterWriteLock(); + try { - _modified.Add(key); + modify(value); + if (!_added.Contains(key) && !_modified.Contains(key)) + { + _modified.Add(key); + } + return true; + } + finally + { + _rwls.ExitWriteLock(); } - return true; } } + finally + { + _rwls.ExitUpgradeableReadLock(); + } return false; } @@ -92,25 +142,28 @@ namespace HermesSocketServer.Store.Internal if (key == null) return false; - lock (_lock) + _rwls.EnterWriteLock(); + try { if (_store.TryGetValue(key, out var value)) { - if (_store.Remove(key)) + _store.Remove(key); + OnPostRemove(key, value); + if (!_added.Remove(key)) { - OnPostRemove(key, value); - if (!_added.Remove(key)) + _modified.Remove(key); + if (!fromCascade && !_deleted.Contains(key)) { - _modified.Remove(key); - if (!fromCascade && !_deleted.Contains(key)) - { - _deleted.Add(key); - } + _deleted.Add(key); } - return true; } + return true; } } + finally + { + _rwls.ExitWriteLock(); + } return false; } @@ -119,20 +172,18 @@ namespace HermesSocketServer.Store.Internal if (key == null) return false; - lock (_lock) + _rwls.EnterWriteLock(); + try { if (_store.TryGetValue(key, out V? fetched)) { - if (fetched != value) + OnInitialModify(key, fetched, value); + _store[key] = value; + if (!_added.Contains(key) && !_modified.Contains(key)) { - OnInitialModify(key, fetched, value); - _store[key] = value; - if (!_added.Contains(key) && !_modified.Contains(key)) - { - _modified.Add(key); - } - return true; + _modified.Add(key); } + return true; } else { @@ -145,7 +196,10 @@ namespace HermesSocketServer.Store.Internal return true; } } - return false; + finally + { + _rwls.ExitWriteLock(); + } } } } \ No newline at end of file