Changed various locking mechanisms.

This commit is contained in:
Tom
2025-03-29 20:28:36 +00:00
parent eddd9e6403
commit fb04f4003f
11 changed files with 272 additions and 155 deletions

View File

@ -102,7 +102,7 @@ namespace TwitchChatTTS.Chat.Commands.Limits
private IDictionary<T, UserUsageData> _usages { get; } private IDictionary<T, UserUsageData> _usages { get; }
private IList<UsagePolicyNode<T>> _children { get; } private IList<UsagePolicyNode<T>> _children { get; }
private ILogger _logger; private ILogger _logger;
private object _lock { get; } private ReaderWriterLockSlim _rwls { get; }
public UsagePolicyNode(string name, UsagePolicyLimit? data, UsagePolicyNode<T>? parent, ILogger logger, bool root = false) public UsagePolicyNode(string name, UsagePolicyLimit? data, UsagePolicyNode<T>? parent, ILogger logger, bool root = false)
{ {
@ -114,11 +114,14 @@ namespace TwitchChatTTS.Chat.Commands.Limits
_usages = new Dictionary<T, UserUsageData>(); _usages = new Dictionary<T, UserUsageData>();
_children = new List<UsagePolicyNode<T>>(); _children = new List<UsagePolicyNode<T>>();
_logger = logger; _logger = logger;
_lock = new object(); _rwls = new ReaderWriterLockSlim();
} }
public UsagePolicyNode<T>? Get(IEnumerable<string> path) public UsagePolicyNode<T>? Get(IEnumerable<string> path)
{
_rwls.EnterReadLock();
try
{ {
if (!path.Any()) if (!path.Any())
return this; return this;
@ -129,8 +132,16 @@ namespace TwitchChatTTS.Chat.Commands.Limits
return this; return this;
return next.Get(path.Skip(1)); return next.Get(path.Skip(1));
} }
finally
{
_rwls.ExitReadLock();
}
}
public UsagePolicyNode<T>? Remove(IEnumerable<string> path) public UsagePolicyNode<T>? Remove(IEnumerable<string> path)
{
_rwls.EnterWriteLock();
try
{ {
if (!path.Any()) if (!path.Any())
{ {
@ -148,8 +159,16 @@ namespace TwitchChatTTS.Chat.Commands.Limits
return null; return null;
return next.Remove(path.Skip(1)); return next.Remove(path.Skip(1));
} }
finally
{
_rwls.ExitWriteLock();
}
}
public void Set(IEnumerable<string> path, int count, TimeSpan span) public void Set(IEnumerable<string> path, int count, TimeSpan span)
{
_rwls.EnterWriteLock();
try
{ {
if (!path.Any()) if (!path.Any())
{ {
@ -167,8 +186,16 @@ namespace TwitchChatTTS.Chat.Commands.Limits
} }
next.Set(path.Skip(1), count, span); next.Set(path.Skip(1), count, span);
} }
finally
{
_rwls.ExitWriteLock();
}
}
public bool TryUse(T key, DateTime timestamp) public bool TryUse(T key, DateTime timestamp)
{
_rwls.EnterUpgradeableReadLock();
try
{ {
if (_parent == null) if (_parent == null)
return false; return false;
@ -176,23 +203,35 @@ namespace TwitchChatTTS.Chat.Commands.Limits
return _parent.TryUse(key, timestamp); return _parent.TryUse(key, timestamp);
UserUsageData? usage; UserUsageData? usage;
lock (_lock)
{
if (!_usages.TryGetValue(key, out usage)) if (!_usages.TryGetValue(key, out usage))
{
_rwls.EnterWriteLock();
try
{ {
usage = new UserUsageData(Limit.Count, 1 % Limit.Count); usage = new UserUsageData(Limit.Count, 1 % Limit.Count);
usage.Uses[0] = timestamp; usage.Uses[0] = timestamp;
_usages.Add(key, usage); _usages.Add(key, usage);
}
finally
{
_rwls.ExitWriteLock();
}
_logger.Debug($"internal use node create"); _logger.Debug($"internal use node create");
return true; return true;
} }
if (usage.Uses.Length != Limit.Count) if (usage.Uses.Length != Limit.Count)
{
_rwls.EnterWriteLock();
try
{ {
var sizeDiff = Math.Max(0, usage.Uses.Length - Limit.Count); var sizeDiff = Math.Max(0, usage.Uses.Length - Limit.Count);
var temp = usage.Uses.Skip(sizeDiff); var temp = usage.Uses.Skip(sizeDiff);
var tempSize = usage.Uses.Length - sizeDiff; var tempSize = usage.Uses.Length - sizeDiff;
usage.Uses = temp.Union(new DateTime[Math.Max(0, Limit.Count - tempSize)]).ToArray(); usage.Uses = temp.Union(new DateTime[Math.Max(0, Limit.Count - tempSize)]).ToArray();
finally
{
_rwls.ExitWriteLock();
} }
} }
@ -204,11 +243,21 @@ namespace TwitchChatTTS.Chat.Commands.Limits
} }
_logger.Debug($"internal use node normal [span: {(timestamp - usage.Uses[usage.Index]).TotalMilliseconds}][index: {usage.Index}]"); _logger.Debug($"internal use node normal [span: {(timestamp - usage.Uses[usage.Index]).TotalMilliseconds}][index: {usage.Index}]");
lock (_lock) _rwls.EnterWriteLock();
try
{ {
usage.Uses[usage.Index] = timestamp; usage.Uses[usage.Index] = timestamp;
usage.Index = (usage.Index + 1) % Limit.Count; usage.Index = (usage.Index + 1) % Limit.Count;
} }
finally
{
_rwls.ExitWriteLock();
}
}
finally
{
_rwls.ExitWriteLock();
}
return true; return true;
} }

View File

@ -3,7 +3,6 @@ namespace TwitchChatTTS.Chat.Emotes
public class EmoteDatabase : IEmoteDatabase public class EmoteDatabase : IEmoteDatabase
{ {
private readonly IDictionary<string, string> _emotes; private readonly IDictionary<string, string> _emotes;
public IDictionary<string, string> Emotes { get => _emotes.AsReadOnly(); }
public EmoteDatabase() public EmoteDatabase()
{ {

View File

@ -54,8 +54,8 @@ namespace TwitchChatTTS.Hermes.Socket.Handlers
_user.DefaultTTSVoice = message.DefaultTTSVoice; _user.DefaultTTSVoice = message.DefaultTTSVoice;
_user.VoicesAvailable = new ConcurrentDictionary<string, string>(message.TTSVoicesAvailable); _user.VoicesAvailable = new ConcurrentDictionary<string, string>(message.TTSVoicesAvailable);
_user.VoicesEnabled = new HashSet<string>(message.EnabledTTSVoices); _user.VoicesEnabled = new HashSet<string>(message.EnabledTTSVoices);
_user.TwitchConnection = message.Connections.FirstOrDefault(c => c.Default && c.Type == "twitch"); _user.TwitchConnection = message.Connections.FirstOrDefault(c => c.Default && c.Type == "twitch") ?? message.Connections.FirstOrDefault(c => c.Type == "twitch");
_user.NightbotConnection = message.Connections.FirstOrDefault(c => c.Default && c.Type == "nightbot"); _user.NightbotConnection = message.Connections.FirstOrDefault(c => c.Default && c.Type == "nightbot") ?? message.Connections.FirstOrDefault(c => c.Type == "nightbot");
if (_user.TwitchConnection != null) if (_user.TwitchConnection != null)
{ {
_logger.Information("Twitch connection: " + _user.TwitchConnection.Name + " / " + _user.TwitchConnection.AccessToken); _logger.Information("Twitch connection: " + _user.TwitchConnection.Name + " / " + _user.TwitchConnection.AccessToken);

View File

@ -28,7 +28,7 @@ namespace TwitchChatTTS.Hermes.Socket
public string? UserId { get; set; } public string? UserId { get; set; }
private readonly System.Timers.Timer _heartbeatTimer; private readonly System.Timers.Timer _heartbeatTimer;
private readonly IBackoff _backoff; private readonly IBackoff _backoff;
private readonly object _lock; private readonly ReaderWriterLockSlim _rwls;
public bool Connected { get; set; } public bool Connected { get; set; }
public bool LoggedIn { get; set; } public bool LoggedIn { get; set; }
@ -62,7 +62,7 @@ namespace TwitchChatTTS.Hermes.Socket
LastHeartbeatReceived = LastHeartbeatSent = DateTime.UtcNow; LastHeartbeatReceived = LastHeartbeatSent = DateTime.UtcNow;
URL = $"wss://{BASE_URL}"; URL = $"wss://{BASE_URL}";
_lock = new object(); _rwls = new ReaderWriterLockSlim();
var ttsCreateUserVoice = _bus.GetTopic("tts.user.voice.create"); var ttsCreateUserVoice = _bus.GetTopic("tts.user.voice.create");
ttsCreateUserVoice.Subscribe(async data => await Send(3, new RequestMessage() ttsCreateUserVoice.Subscribe(async data => await Send(3, new RequestMessage()
@ -82,26 +82,36 @@ namespace TwitchChatTTS.Hermes.Socket
public override async Task Connect() public override async Task Connect()
{ {
lock (_lock) _rwls.EnterWriteLock();
try
{ {
if (Connected) if (Connected)
return; return;
}
_logger.Debug($"Attempting to connect to {URL}"); _logger.Debug($"Attempting to connect to {URL}");
await ConnectAsync(URL); await ConnectAsync(URL);
} }
finally
{
_rwls.ExitWriteLock();
}
}
private async Task Disconnect() private async Task Disconnect()
{ {
lock (_lock) _rwls.EnterWriteLock();
try
{ {
if (!Connected) if (!Connected)
return; return;
}
await DisconnectAsync(new SocketDisconnectionEventArgs("Normal disconnection", "Disconnection was executed")); await DisconnectAsync(new SocketDisconnectionEventArgs("Normal disconnection", "Disconnection was executed"));
} }
finally
{
_rwls.ExitWriteLock();
}
}
public async Task CreateTTSVoice(string voiceName) public async Task CreateTTSVoice(string voiceName)
{ {
@ -250,13 +260,11 @@ namespace TwitchChatTTS.Hermes.Socket
_logger.Information("Initializing Tom to Speech websocket client."); _logger.Information("Initializing Tom to Speech websocket client.");
OnConnected += async (sender, e) => OnConnected += async (sender, e) =>
{
lock (_lock)
{ {
if (Connected) if (Connected)
return; return;
Connected = true; Connected = true;
}
_logger.Information("Tom to Speech websocket client connected."); _logger.Information("Tom to Speech websocket client connected.");
_heartbeatTimer.Enabled = true; _heartbeatTimer.Enabled = true;
@ -272,8 +280,6 @@ namespace TwitchChatTTS.Hermes.Socket
}; };
OnDisconnected += async (sender, e) => OnDisconnected += async (sender, e) =>
{
lock (_lock)
{ {
if (!Connected) if (!Connected)
return; return;
@ -282,7 +288,6 @@ namespace TwitchChatTTS.Hermes.Socket
LoggedIn = false; LoggedIn = false;
Ready = false; Ready = false;
_user.Slave = true; _user.Slave = true;
}
_logger.Warning("Tom to Speech websocket client disconnected."); _logger.Warning("Tom to Speech websocket client disconnected.");
@ -423,6 +428,9 @@ namespace TwitchChatTTS.Hermes.Socket
} }
public new async Task Send<T>(int opcode, T message) public new async Task Send<T>(int opcode, T message)
{
_rwls.EnterReadLock();
try
{ {
if (!Connected) if (!Connected)
{ {
@ -432,5 +440,10 @@ namespace TwitchChatTTS.Hermes.Socket
await base.Send(opcode, message); await base.Send(opcode, message);
} }
finally
{
_rwls.ExitReadLock();
}
}
} }
} }

View File

@ -23,7 +23,7 @@ namespace TwitchChatTTS.Hermes.Socket.Requests
return; return;
} }
if (long.TryParse(requestData["chatter"].ToString(), out var chatterId)) if (!long.TryParse(requestData["chatter"].ToString(), out var chatterId))
{ {
_logger.Warning($"Chatter Id is invalid [chatter id: {chatterId}]"); _logger.Warning($"Chatter Id is invalid [chatter id: {chatterId}]");
return; return;

View File

@ -9,25 +9,28 @@ namespace TwitchChatTTS.Seven.Socket.Handlers
{ {
public class DispatchHandler : IWebSocketHandler public class DispatchHandler : IWebSocketHandler
{ {
public int OperationCode { get; } = 0;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IEmoteDatabase _emotes; private readonly IEmoteDatabase _emotes;
private readonly object _lock = new object(); private readonly Mutex _lock;
public int OperationCode { get; } = 0;
public DispatchHandler(IEmoteDatabase emotes, ILogger logger) public DispatchHandler(IEmoteDatabase emotes, ILogger logger)
{ {
_emotes = emotes; _emotes = emotes;
_logger = logger; _logger = logger;
_lock = new Mutex();
} }
public Task Execute<Data>(SocketClient<WebSocketMessage> sender, Data data) public Task Execute<Data>(SocketClient<WebSocketMessage> sender, Data data)
{ {
if (data is not DispatchMessage message || message == null) if (data is not DispatchMessage message || message == null || message.Body == null)
return Task.CompletedTask; return Task.CompletedTask;
ApplyChanges(message?.Body?.Pulled, cf => cf.OldValue, true);
ApplyChanges(message?.Body?.Pushed, cf => cf.Value, false); ApplyChanges(message.Body.Pulled, cf => cf.OldValue, true);
ApplyChanges(message?.Body?.Removed, cf => cf.OldValue, true); ApplyChanges(message.Body.Pushed, cf => cf.Value, false);
ApplyChanges(message?.Body?.Updated, cf => cf.OldValue, false, cf => cf.Value); ApplyChanges(message.Body.Removed, cf => cf.OldValue, true);
ApplyChanges(message.Body.Updated, cf => cf.OldValue, false, cf => cf.Value);
return Task.CompletedTask; return Task.CompletedTask;
} }
@ -42,7 +45,7 @@ namespace TwitchChatTTS.Seven.Socket.Handlers
if (value == null) if (value == null)
continue; continue;
var o = JsonSerializer.Deserialize<EmoteField>(value.ToString(), new JsonSerializerOptions() var o = JsonSerializer.Deserialize<EmoteField>(value.ToString()!, new JsonSerializerOptions()
{ {
PropertyNameCaseInsensitive = false, PropertyNameCaseInsensitive = false,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower
@ -50,8 +53,9 @@ namespace TwitchChatTTS.Seven.Socket.Handlers
if (o == null) if (o == null)
continue; continue;
lock (_lock) try
{ {
_lock.WaitOne();
if (removing) if (removing)
{ {
if (_emotes.Get(o.Name) != o.Id) if (_emotes.Get(o.Name) != o.Id)
@ -71,8 +75,10 @@ namespace TwitchChatTTS.Seven.Socket.Handlers
} }
_emotes.Remove(o.Name); _emotes.Remove(o.Name);
var update = updater(val); var update = updater(val);
if (update == null)
continue;
var u = JsonSerializer.Deserialize<EmoteField>(update.ToString(), new JsonSerializerOptions() var u = JsonSerializer.Deserialize<EmoteField>(update.ToString()!, new JsonSerializerOptions()
{ {
PropertyNameCaseInsensitive = false, PropertyNameCaseInsensitive = false,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower
@ -94,6 +100,10 @@ namespace TwitchChatTTS.Seven.Socket.Handlers
_logger.Information($"Added 7tv emote [name: {o.Name}][id: {o.Id}]"); _logger.Information($"Added 7tv emote [name: {o.Name}][id: {o.Id}]");
} }
} }
finally
{
_lock.ReleaseMutex();
}
} }
} }
} }

View File

@ -1,4 +1,3 @@
using System.Net.WebSockets;
using CommonSocketLibrary.Abstract; using CommonSocketLibrary.Abstract;
using CommonSocketLibrary.Common; using CommonSocketLibrary.Common;
using TwitchChatTTS.Seven.Socket.Data; using TwitchChatTTS.Seven.Socket.Data;

View File

@ -27,7 +27,7 @@ namespace TwitchChatTTS.Twitch.Redemptions
private readonly AudioPlaybackEngine _playback; private readonly AudioPlaybackEngine _playback;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly Random _random; private readonly Random _random;
private readonly object _lock; private readonly ReaderWriterLockSlim _rwls;
public RedemptionManager( public RedemptionManager(
@ -50,7 +50,7 @@ namespace TwitchChatTTS.Twitch.Redemptions
_playback = playback; _playback = playback;
_logger = logger; _logger = logger;
_random = new Random(); _random = new Random();
_lock = new object(); _rwls = new ReaderWriterLockSlim();
var topic = _bus.GetTopic("redemptions_initiation"); var topic = _bus.GetTopic("redemptions_initiation");
topic.Subscribe(data => topic.Subscribe(data =>
@ -110,16 +110,15 @@ namespace TwitchChatTTS.Twitch.Redemptions
private void Add(string twitchRedemptionId, string redemptionId) private void Add(string twitchRedemptionId, string redemptionId)
{ {
lock (_lock) _rwls.EnterWriteLock();
try
{ {
if (!_redeems.TryGetValue(twitchRedemptionId, out var redeems)) if (!_redeems.TryGetValue(twitchRedemptionId, out var redeems))
_redeems.Add(twitchRedemptionId, redeems = new List<string>()); _redeems.Add(twitchRedemptionId, redeems = new List<string>());
var item = _redemptions.TryGetValue(redemptionId, out var r) ? r : null; var item = _redemptions.TryGetValue(redemptionId, out var r) ? r : null;
if (item == null) if (item == null)
{
return; return;
}
var redemptions = redeems.Select(r => _redemptions.TryGetValue(r, out var rr) ? rr : null); var redemptions = redeems.Select(r => _redemptions.TryGetValue(r, out var rr) ? rr : null);
bool added = false; bool added = false;
@ -138,12 +137,17 @@ namespace TwitchChatTTS.Twitch.Redemptions
if (!added) if (!added)
redeems.Add(redemptionId); redeems.Add(redemptionId);
} }
finally
{
_rwls.ExitWriteLock();
}
_logger.Debug($"Added redemption action [redemption id: {redemptionId}][twitch redemption id: {twitchRedemptionId}]"); _logger.Debug($"Added redemption action [redemption id: {redemptionId}][twitch redemption id: {twitchRedemptionId}]");
} }
private void Add(string twitchRedemptionId, Redemption item) private void Add(string twitchRedemptionId, Redemption item)
{ {
lock (_lock) _rwls.EnterWriteLock();
try
{ {
if (!_redeems.TryGetValue(twitchRedemptionId, out var redemptionNames)) if (!_redeems.TryGetValue(twitchRedemptionId, out var redemptionNames))
_redeems.Add(twitchRedemptionId, redemptionNames = new List<string>()); _redeems.Add(twitchRedemptionId, redemptionNames = new List<string>());
@ -165,6 +169,10 @@ namespace TwitchChatTTS.Twitch.Redemptions
if (!added) if (!added)
redemptionNames.Add(item.Id); redemptionNames.Add(item.Id);
} }
finally
{
_rwls.ExitWriteLock();
}
_logger.Debug($"Added redemption action [redemption id: {item.Id}][twitch redemption id: {twitchRedemptionId}]"); _logger.Debug($"Added redemption action [redemption id: {item.Id}][twitch redemption id: {twitchRedemptionId}]");
} }
@ -419,7 +427,8 @@ namespace TwitchChatTTS.Twitch.Redemptions
public IEnumerable<RedeemableAction> Get(string twitchRedemptionId) public IEnumerable<RedeemableAction> Get(string twitchRedemptionId)
{ {
lock (_lock) _rwls.EnterReadLock();
try
{ {
if (_redeems.TryGetValue(twitchRedemptionId, out var redemptionIds)) if (_redeems.TryGetValue(twitchRedemptionId, out var redemptionIds))
return redemptionIds.Select(r => _redemptions.TryGetValue(r, out var redemption) ? redemption : null) return redemptionIds.Select(r => _redemptions.TryGetValue(r, out var redemption) ? redemption : null)
@ -427,15 +436,19 @@ namespace TwitchChatTTS.Twitch.Redemptions
.Select(r => _actions.TryGetValue(r!.ActionName, out var action) ? action : null) .Select(r => _actions.TryGetValue(r!.ActionName, out var action) ? action : null)
.Where(a => a != null)!; .Where(a => a != null)!;
} }
finally
{
_rwls.ExitReadLock();
}
return []; return [];
} }
public void Initialize() public void Initialize()
{ {
_logger.Debug($"Redemption manager is about to initialize [redemption count: {_redemptions.Count()}][action count: {_actions.Count}]"); _rwls.EnterWriteLock();
try
lock (_lock)
{ {
_logger.Debug($"Redemption manager is about to initialize [redemption count: {_redemptions.Count()}][action count: {_actions.Count}]");
_redeems.Clear(); _redeems.Clear();
var ordered = _redemptions.Select(r => r.Value).Where(r => r != null).OrderBy(r => r.Order); var ordered = _redemptions.Select(r => r.Value).Where(r => r != null).OrderBy(r => r.Order);
@ -463,18 +476,31 @@ namespace TwitchChatTTS.Twitch.Redemptions
} }
} }
} }
finally
{
_rwls.ExitWriteLock();
}
_logger.Debug("All redemptions added. Redemption Manager is ready."); _logger.Debug("All redemptions added. Redemption Manager is ready.");
} }
public bool RemoveAction(string actionName) public bool RemoveAction(string actionName)
{
_rwls.EnterWriteLock();
try
{ {
return _actions.Remove(actionName); return _actions.Remove(actionName);
} }
finally
{
_rwls.ExitWriteLock();
}
}
public bool RemoveRedemption(string redemptionId) public bool RemoveRedemption(string redemptionId)
{ {
lock (_lock) _rwls.EnterWriteLock();
try
{ {
if (!_redemptions.TryGetValue(redemptionId, out var redemption)) if (!_redemptions.TryGetValue(redemptionId, out var redemption))
{ {
@ -490,6 +516,10 @@ namespace TwitchChatTTS.Twitch.Redemptions
return true; return true;
} }
} }
finally
{
_rwls.ExitWriteLock();
}
return false; return false;
} }
@ -507,7 +537,8 @@ namespace TwitchChatTTS.Twitch.Redemptions
public bool Update(Redemption redemption) public bool Update(Redemption redemption)
{ {
lock (_lock) _rwls.EnterWriteLock();
try
{ {
if (_redemptions.TryGetValue(redemption.Id, out var r)) if (_redemptions.TryGetValue(redemption.Id, out var r))
{ {
@ -548,12 +579,19 @@ namespace TwitchChatTTS.Twitch.Redemptions
return true; return true;
} }
} }
finally
{
_rwls.ExitWriteLock();
}
_logger.Warning($"Cannot find redemption by name [redemption id: {redemption.Id}][redemption action: {redemption.ActionName}]"); _logger.Warning($"Cannot find redemption by name [redemption id: {redemption.Id}][redemption action: {redemption.ActionName}]");
return false; return false;
} }
public bool Update(RedeemableAction action) public bool Update(RedeemableAction action)
{
_rwls.EnterWriteLock();
try
{ {
if (_actions.TryGetValue(action.Name, out var a)) if (_actions.TryGetValue(action.Name, out var a))
{ {
@ -562,6 +600,11 @@ namespace TwitchChatTTS.Twitch.Redemptions
_logger.Debug($"Updated redeemable action in redemption manager [action name: {action.Name}]"); _logger.Debug($"Updated redeemable action in redemption manager [action name: {action.Name}]");
return true; return true;
} }
}
finally
{
_rwls.ExitWriteLock();
}
_logger.Warning($"Cannot find redeemable action by name [action name: {action.Name}]"); _logger.Warning($"Cannot find redeemable action by name [action name: {action.Name}]");
return false; return false;

View File

@ -39,7 +39,7 @@ namespace TwitchChatTTS.Twitch.Socket.Handlers
_logger.Warning($"Twitch connection has {timeLeft} before it is revoked. Refreshing the token is soon required."); _logger.Warning($"Twitch connection has {timeLeft} before it is revoked. Refreshing the token is soon required.");
else else
{ {
_logger.Error("Twitch connection has its permissions revoked. Refresh the token. Twitch client will not be connecting."); _logger.Error($"Twitch connection has its permissions revoked. Refresh the token. Twitch client will not be connecting. [expired at: {twitchConnection.ExpiresAt}]");
return; return;
} }

View File

@ -18,21 +18,22 @@ namespace TwitchChatTTS.Twitch.Socket
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly object _lock; private readonly Mutex _mutex;
public TwitchConnectionManager(IServiceProvider serviceProvider, ILogger logger) public TwitchConnectionManager(IServiceProvider serviceProvider, ILogger logger)
{ {
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_logger = logger; _logger = logger;
_lock = new object(); _mutex = new Mutex();
} }
public TwitchWebsocketClient GetBackupClient() public TwitchWebsocketClient GetBackupClient()
{ {
lock (_lock) try
{ {
_mutex.WaitOne();
if (_identified == null) if (_identified == null)
throw new InvalidOperationException("Cannot get backup Twitch client yet. Waiting for identification."); throw new InvalidOperationException("Cannot get backup Twitch client yet. Waiting for identification.");
if (_backup != null) if (_backup != null)
@ -40,12 +41,17 @@ namespace TwitchChatTTS.Twitch.Socket
return CreateNewClient(); return CreateNewClient();
} }
finally
{
_mutex.ReleaseMutex();
}
} }
public TwitchWebsocketClient GetWorkingClient() public TwitchWebsocketClient GetWorkingClient()
{ {
lock (_lock) try
{ {
_mutex.WaitOne();
if (_identified == null) if (_identified == null)
{ {
return CreateNewClient(); return CreateNewClient();
@ -53,6 +59,10 @@ namespace TwitchChatTTS.Twitch.Socket
return _identified; return _identified;
} }
finally
{
_mutex.ReleaseMutex();
}
} }
private TwitchWebsocketClient CreateNewClient() private TwitchWebsocketClient CreateNewClient()
@ -74,8 +84,9 @@ namespace TwitchChatTTS.Twitch.Socket
private async Task OnDisconnection(TwitchWebsocketClient client) private async Task OnDisconnection(TwitchWebsocketClient client)
{ {
bool reconnecting = false; bool reconnecting = false;
lock (_lock) try
{ {
_mutex.WaitOne();
if (_identified?.UID == client.UID) if (_identified?.UID == client.UID)
{ {
_logger.Debug($"Identified Twitch client has disconnected [client: {client.UID}][main: {_identified.UID}][backup: {_backup?.UID}]"); _logger.Debug($"Identified Twitch client has disconnected [client: {client.UID}][main: {_identified.UID}][backup: {_backup?.UID}]");
@ -92,19 +103,25 @@ namespace TwitchChatTTS.Twitch.Socket
else else
_logger.Warning($"Twitch client disconnected from unknown source [client: {client.UID}][main: {_identified?.UID}][backup: {_backup?.UID}]"); _logger.Warning($"Twitch client disconnected from unknown source [client: {client.UID}][main: {_identified?.UID}][backup: {_backup?.UID}]");
} }
finally
{
_mutex.ReleaseMutex();
}
if (reconnecting) if (reconnecting)
{ {
var newClient = GetWorkingClient(); var newClient = GetWorkingClient();
await newClient.Reconnect(); await newClient.Reconnect();
} }
} }
private async Task OnIdentified(TwitchWebsocketClient client) private async Task OnIdentified(TwitchWebsocketClient client)
{ {
bool clientDisconnect = false; bool clientDisconnect = false;
lock (_lock) try
{ {
_mutex.WaitOne();
if (_identified == null || _identified.ReceivedReconnecting) if (_identified == null || _identified.ReceivedReconnecting)
{ {
if (_backup != null && _backup.UID == client.UID) if (_backup != null && _backup.UID == client.UID)
@ -125,10 +142,14 @@ namespace TwitchChatTTS.Twitch.Socket
_logger.Warning($"Twitch client has been identified, but isn't main or backup [client: {client.UID}][main: {_identified.UID}][backup: {_backup?.UID}]"); _logger.Warning($"Twitch client has been identified, but isn't main or backup [client: {client.UID}][main: {_identified.UID}][backup: {_backup?.UID}]");
clientDisconnect = true; clientDisconnect = true;
} }
}
if (clientDisconnect) if (clientDisconnect)
await client.DisconnectAsync(new SocketDisconnectionEventArgs("Closed", "No need for a tertiary client.")); client.DisconnectAsync(new SocketDisconnectionEventArgs("Closed", "No need for a tertiary client.")).Wait();
}
finally
{
_mutex.ReleaseMutex();
}
} }
} }
} }

View File

@ -17,15 +17,11 @@ namespace TwitchChatTTS.Twitch.Socket
private readonly IDictionary<string, string> _subscriptions; private readonly IDictionary<string, string> _subscriptions;
private readonly IBackoff _backoff; private readonly IBackoff _backoff;
private readonly Configuration _configuration; private readonly Configuration _configuration;
private bool _disconnected;
private readonly object _lock;
public event EventHandler<EventArgs>? OnIdentified; public event EventHandler<EventArgs>? OnIdentified;
public string UID { get; } public string UID { get; }
public string URL; public string URL;
public bool Connected { get; private set; }
public bool Identified { get; private set; }
public string? SessionId { get; private set; } public string? SessionId { get; private set; }
public bool ReceivedReconnecting { get; set; } public bool ReceivedReconnecting { get; set; }
public bool TwitchReconnected { get; set; } public bool TwitchReconnected { get; set; }
@ -46,13 +42,14 @@ namespace TwitchChatTTS.Twitch.Socket
_backoff = backoff; _backoff = backoff;
_configuration = configuration; _configuration = configuration;
_subscriptions = new Dictionary<string, string>(); _subscriptions = new Dictionary<string, string>();
_lock = new object();
_messageTypes = new Dictionary<string, Type>(); _messageTypes = new Dictionary<string, Type>
_messageTypes.Add("session_keepalive", typeof(object)); {
_messageTypes.Add("session_welcome", typeof(SessionWelcomeMessage)); { "session_keepalive", typeof(object) },
_messageTypes.Add("session_reconnect", typeof(SessionWelcomeMessage)); { "session_welcome", typeof(SessionWelcomeMessage) },
_messageTypes.Add("notification", typeof(NotificationMessage)); { "session_reconnect", typeof(SessionWelcomeMessage) },
{ "notification", typeof(NotificationMessage) }
};
UID = Guid.NewGuid().ToString("D"); UID = Guid.NewGuid().ToString("D");
@ -88,25 +85,12 @@ namespace TwitchChatTTS.Twitch.Socket
_logger.Information($"Initializing Twitch websocket client."); _logger.Information($"Initializing Twitch websocket client.");
OnConnected += (sender, e) => OnConnected += (sender, e) =>
{ {
Connected = true;
_logger.Information("Twitch websocket client connected."); _logger.Information("Twitch websocket client connected.");
_disconnected = false;
}; };
OnDisconnected += (sender, e) => OnDisconnected += (sender, e) =>
{ {
lock (_lock)
{
if (_disconnected)
return;
_disconnected = true;
}
_logger.Information($"Twitch websocket client disconnected [status: {e.Status}][reason: {e.Reason}][client: {UID}]"); _logger.Information($"Twitch websocket client disconnected [status: {e.Status}][reason: {e.Reason}][client: {UID}]");
Connected = false;
Identified = false;
}; };
} }
@ -126,7 +110,6 @@ namespace TwitchChatTTS.Twitch.Socket
public void Identify(string sessionId) public void Identify(string sessionId)
{ {
Identified = true;
SessionId = sessionId; SessionId = sessionId;
OnIdentified?.Invoke(this, EventArgs.Empty); OnIdentified?.Invoke(this, EventArgs.Empty);
} }