Fixed several socket issues. Added backoff for reconnection.

This commit is contained in:
Tom
2024-08-10 19:31:08 +00:00
parent aa9e3dbcd7
commit ab90d47b89
8 changed files with 207 additions and 106 deletions

View File

@@ -2,41 +2,55 @@
namespace CommonSocketLibrary.Abstract
{
public abstract class HandlerTypeManager<Client, Handler>
public interface ICodedOperation
{
int OperationCode { get; }
}
public interface IMessageTypeManager
{
Type? GetMessageTypeByCode(int code);
}
public abstract class MessageTypeManager<Handler> : IMessageTypeManager where Handler : ICodedOperation
{
private readonly IDictionary<int, Type> _types;
public IDictionary<int, Type> HandlerTypes { get => _types; }
protected readonly ILogger _logger;
public HandlerTypeManager(ILogger logger, HandlerManager<Client, Handler> handlers)
public MessageTypeManager(IEnumerable<Handler> handlers, ILogger logger)
{
_types = new Dictionary<int, Type>();
_logger = logger;
GenerateHandlerTypes(handlers.Handlers);
GenerateHandlerTypes(handlers);
}
private void GenerateHandlerTypes(IDictionary<int, Handler> handlers)
public Type? GetMessageTypeByCode(int code)
{
foreach (var entry in handlers)
_types.TryGetValue(code, out Type? type);
return type;
}
private void GenerateHandlerTypes(IEnumerable<Handler> handlers)
{
foreach (var handler in handlers)
{
if (entry.Value == null)
if (handler == null)
{
_logger.Error($"Failed to link websocket handler #{entry.Key} due to null value.");
_logger.Error($"Failed to link websocket handler due to null value.");
continue;
}
var type = entry.Value.GetType();
var type = handler.GetType();
var target = FetchMessageType(type);
if (target == null)
{
_logger.Error($"Failed to link websocket handler #{entry.Key} due to no match for {target}.");
_logger.Error($"Failed to link websocket handler #{handler.OperationCode} due to no match for {target}.");
continue;
}
_types.Add(entry.Key, target);
_logger.Debug($"Linked websocket handler #{entry.Key} to type {target.AssemblyQualifiedName}.");
_types.Add(handler.OperationCode, target);
_logger.Debug($"Linked websocket handler #{handler.OperationCode} to type {target.AssemblyQualifiedName}.");
}
}

View File

@@ -1,51 +1,56 @@
using CommonSocketLibrary.Backoff;
using Serilog;
using System.Collections;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
namespace CommonSocketLibrary.Abstract
{
public abstract class SocketClient<Message> : IDisposable
public abstract class SocketClient<Message> : IDisposable where Message : class
{
private ClientWebSocket? _socket;
private CancellationTokenSource? _cts;
protected ClientWebSocket? _socket;
protected CancellationTokenSource? _cts;
private readonly int ReceiveBufferSize = 8192;
protected readonly ILogger _logger;
protected readonly JsonSerializerOptions _options;
private bool _disposed;
public bool Connected { get; set; }
public int ReceiveBufferSize { get; } = 8192;
public event EventHandler<EventArgs> OnConnected;
public event EventHandler<SocketDisconnectionEventArgs> OnDisconnected;
public SocketClient(ILogger logger, JsonSerializerOptions options)
{
_logger = logger;
_options = options;
Connected = false;
_disposed = false;
}
public async Task ConnectAsync(string url)
protected async Task ConnectAsync(string url)
{
if (_socket != null)
{
if (_socket.State == WebSocketState.Open) return;
else _socket.Dispose();
else if (!_disposed) _socket.Dispose();
}
_socket = new ClientWebSocket();
_socket.Options.RemoteCertificateValidationCallback = (o, c, ch, er) => true;
_socket.Options.UseDefaultCredentials = false;
_disposed = false;
if (_cts != null) _cts.Dispose();
_cts = new CancellationTokenSource();
await _socket.ConnectAsync(new Uri(url), _cts.Token);
await Task.Factory.StartNew(ReceiveLoop, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
await OnConnection();
OnConnected?.Invoke(this, EventArgs.Empty);
}
public async Task DisconnectAsync()
public async Task DisconnectAsync(SocketDisconnectionEventArgs args)
{
if (_socket == null || _cts == null) return;
// TODO: requests cleanup code, sub-protocol dependent.
if (_disposed || _socket == null || _cts == null)
return;
if (_socket.State == WebSocketState.Open)
{
_cts.CancelAfter(TimeSpan.FromMilliseconds(500));
@@ -53,14 +58,20 @@ namespace CommonSocketLibrary.Abstract
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
}
Connected = false;
OnDisconnected?.Invoke(this, args);
_socket.Dispose();
_socket = null;
_cts.Dispose();
_cts = null;
}
public void Dispose() => DisconnectAsync().Wait();
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
}
private async Task ReceiveLoop()
{
@@ -87,78 +98,84 @@ namespace CommonSocketLibrary.Abstract
await ResponseReceived(outputStream);
}
}
catch (TaskCanceledException) { }
catch (WebSocketException wse)
{
string data = string.Join(string.Empty, wse.Data.Cast<DictionaryEntry>().Select(e => e.Key + "=" + e.Value));
_logger.Error($"Websocket connection problem while receiving data [state: {_socket.State}][code: {wse.ErrorCode}][data: {data}]");
}
catch (TaskCanceledException)
{
_logger.Error($"Socket's receive loop got canceled forcefully [state: {_socket.State}]");
}
finally
{
outputStream?.Dispose();
if (_socket.State.ToString().Contains("Close") || _socket.State == WebSocketState.Aborted)
await DisconnectAsync(new SocketDisconnectionEventArgs(_socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty));
}
}
public async Task SendRaw(string content)
protected async Task Reconnect(IBackoff backoff, Action reconnect)
{
if (!Connected) return;
var bytes = new byte[1024 * 4];
var array = new ArraySegment<byte>(bytes);
var total = Encoding.UTF8.GetBytes(content).Length;
var current = 0;
while (current < total)
while (true)
{
var size = Encoding.UTF8.GetBytes(content.Substring(current), array);
await _socket.SendAsync(array, WebSocketMessageType.Text, true, _cts.Token);
current += size;
}
await OnMessageSend(-1, content);
}
public async Task Send<T>(int opcode, T data)
{
try
{
var message = GenerateMessage(opcode, data);
var content = JsonSerializer.Serialize(message, _options);
var bytes = Encoding.UTF8.GetBytes(content);
var array = new ArraySegment<byte>(bytes);
var total = bytes.Length;
var current = 0;
while (current < total)
try
{
var size = Encoding.UTF8.GetBytes(content.Substring(current), array);
await _socket.SendAsync(array, WebSocketMessageType.Text, current + size >= total, _cts.Token);
current += size;
TimeSpan delay = backoff.GetNextDelay();
await Task.Delay(delay);
reconnect.Invoke();
backoff.Reset();
break;
}
catch (Exception)
{
_logger.Error("Unable to reconnect to server.");
}
await OnMessageSend(opcode, content);
}
catch (Exception e)
{
Connected = false;
_logger.Error(e, "Failed to send a message: " + opcode);
}
}
private async Task ResponseReceived(Stream stream)
{
Message? data = null;
try
{
var data = await JsonSerializer.DeserializeAsync<Message>(stream);
await OnResponseReceived(data);
data = await JsonSerializer.DeserializeAsync<Message>(stream, _options);
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to read or execute a websocket message.");
_logger.Error(ex, "Failed to read a websocket message.");
}
finally
{
stream.Dispose();
}
if (data == null)
{
_logger.Error("Failed to read a websocket message.");
return;
}
try
{
await OnResponseReceived(data);
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to execute a websocket message.");
}
}
protected abstract Message GenerateMessage<T>(int opcode, T data);
protected abstract Task OnResponseReceived(Message? content);
protected abstract Task OnMessageSend(int opcode, string? content);
protected abstract Task OnConnection();
}
public class SocketDisconnectionEventArgs : EventArgs
{
public string Status { get; }
public string Reason { get; }
public SocketDisconnectionEventArgs(string status, string reason)
{
Status = status;
Reason = reason;
}
}
}