Fixed 7tv & Twitch reconnection. Added adbreak, follow, subscription handlers for Twitch. Added multi-chat support. Added support to unsubscribe from Twitch event subs.

This commit is contained in:
Tom
2024-08-06 19:29:29 +00:00
parent 75fcb8e0f8
commit 8014c12bc5
60 changed files with 1064 additions and 672 deletions

View File

@@ -6,26 +6,32 @@ using System.Net.WebSockets;
using TwitchChatTTS.Twitch.Socket.Messages;
using System.Text;
using TwitchChatTTS.Twitch.Socket.Handlers;
using CommonSocketLibrary.Backoff;
namespace TwitchChatTTS.Twitch.Socket
{
public class TwitchWebsocketClient : SocketClient<TwitchWebsocketMessage>
{
private readonly IDictionary<string, ITwitchSocketHandler> _handlers;
private readonly IDictionary<string, Type> _messageTypes;
private readonly IDictionary<string, string> _subscriptions;
private readonly IBackoff _backoff;
private DateTime _lastReceivedMessageTimestamp;
private bool _disconnected;
private readonly object _lock;
public event EventHandler<EventArgs> OnIdentified;
public string URL;
private IDictionary<string, ITwitchSocketHandler> _handlers;
private IDictionary<string, Type> _messageTypes;
private readonly Configuration _configuration;
private System.Timers.Timer _reconnectTimer;
public bool Connected { get; set; }
public bool Identified { get; set; }
public string SessionId { get; set; }
public bool Connected { get; private set; }
public bool Identified { get; private set; }
public string SessionId { get; private set; }
public bool ReceivedReconnecting { get; set; }
public TwitchWebsocketClient(
Configuration configuration,
[FromKeyedServices("twitch")] IEnumerable<ITwitchSocketHandler> handlers,
[FromKeyedServices("twitch")] IBackoff backoff,
ILogger logger
) : base(logger, new JsonSerializerOptions()
{
@@ -34,14 +40,12 @@ namespace TwitchChatTTS.Twitch.Socket
})
{
_handlers = handlers.ToDictionary(h => h.Name, h => h);
_configuration = configuration;
_reconnectTimer = new System.Timers.Timer(TimeSpan.FromSeconds(30));
_reconnectTimer.AutoReset = false;
_reconnectTimer.Elapsed += async (sender, e) => await Reconnect();
_reconnectTimer.Enabled = false;
_backoff = backoff;
_subscriptions = new Dictionary<string, string>();
_lock = new object();
_messageTypes = new Dictionary<string, Type>();
_messageTypes.Add("session_keepalive", typeof(object));
_messageTypes.Add("session_welcome", typeof(SessionWelcomeMessage));
_messageTypes.Add("session_reconnect", typeof(SessionWelcomeMessage));
_messageTypes.Add("notification", typeof(NotificationMessage));
@@ -50,23 +54,56 @@ namespace TwitchChatTTS.Twitch.Socket
}
public void AddSubscription(string broadcasterId, string type, string id)
{
if (_subscriptions.ContainsKey(broadcasterId + '|' + type))
_subscriptions[broadcasterId + '|' + type] = id;
else
_subscriptions.Add(broadcasterId + '|' + type, id);
}
public string? GetSubscriptionId(string broadcasterId, string type)
{
if (_subscriptions.TryGetValue(broadcasterId + '|' + type, out var id))
return id;
return null;
}
public void RemoveSubscription(string broadcasterId, string type)
{
_subscriptions.Remove(broadcasterId + '|' + type);
}
public void Initialize()
{
_logger.Information($"Initializing OBS websocket client.");
_logger.Information($"Initializing Twitch websocket client.");
OnConnected += (sender, e) =>
{
Connected = true;
_reconnectTimer.Enabled = false;
_logger.Information("Twitch websocket client connected.");
_disconnected = false;
};
OnDisconnected += (sender, e) =>
OnDisconnected += async (sender, e) =>
{
_reconnectTimer.Enabled = Identified;
_logger.Information($"Twitch websocket client disconnected [status: {e.Status}][reason: {e.Reason}] " + (Identified ? "Will be attempting to reconnect every 30 seconds." : "Will not be attempting to reconnect."));
lock (_lock)
{
if (_disconnected)
return;
_disconnected = true;
}
_logger.Information($"Twitch websocket client disconnected [status: {e.Status}][reason: {e.Reason}]");
Connected = false;
Identified = false;
if (!ReceivedReconnecting)
{
_logger.Information("Attempting to reconnect to Twitch websocket server.");
await Reconnect(_backoff, async () => await Connect());
}
};
}
@@ -79,42 +116,14 @@ namespace TwitchChatTTS.Twitch.Socket
}
_logger.Debug($"Twitch websocket client attempting to connect to {URL}");
try
{
await ConnectAsync(URL);
}
catch (Exception)
{
_logger.Warning("Connecting to twitch failed. Skipping Twitch websockets.");
}
await ConnectAsync(URL);
}
private async Task Reconnect()
public void Identify(string sessionId)
{
if (Connected)
{
try
{
await DisconnectAsync(new SocketDisconnectionEventArgs(WebSocketCloseStatus.Empty.ToString(), ""));
}
catch (Exception)
{
_logger.Error("Failed to disconnect from Twitch websocket server.");
}
}
try
{
await Connect();
}
catch (WebSocketException wse) when (wse.Message.Contains("502"))
{
_logger.Error("Twitch websocket server cannot be found.");
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to reconnect to Twitch websocket server.");
}
Identified = true;
SessionId = sessionId;
OnIdentified?.Invoke(this, EventArgs.Empty);
}
protected TwitchWebsocketMessage GenerateMessage<T>(string messageType, T data)
@@ -134,14 +143,17 @@ namespace TwitchChatTTS.Twitch.Socket
protected override async Task OnResponseReceived(TwitchWebsocketMessage? message)
{
if (message == null || message.Metadata == null) {
if (message == null || message.Metadata == null)
{
_logger.Information("Twitch message is null");
return;
}
_lastReceivedMessageTimestamp = DateTime.UtcNow;
string content = message.Payload?.ToString() ?? string.Empty;
if (message.Metadata.MessageType != "session_keepalive")
_logger.Information("Twitch RX #" + message.Metadata.MessageType + ": " + content);
_logger.Debug("Twitch RX #" + message.Metadata.MessageType + ": " + content);
if (!_messageTypes.TryGetValue(message.Metadata.MessageType, out var type) || type == null)
{
@@ -156,6 +168,11 @@ namespace TwitchChatTTS.Twitch.Socket
}
var data = JsonSerializer.Deserialize(content, type, _options);
if (data == null)
{
_logger.Warning("Twitch websocket message payload is null.");
return;
}
await handler.Execute(this, data);
}
@@ -180,7 +197,7 @@ namespace TwitchChatTTS.Twitch.Socket
await _socket!.SendAsync(array, WebSocketMessageType.Text, current + size >= total, _cts!.Token);
current += size;
}
_logger.Information("TX #" + type + ": " + content);
_logger.Debug("Twitch TX #" + type + ": " + content);
}
catch (Exception e)
{