Files
hermes-common-library/Abstract/SocketClient.cs
2026-01-03 05:13:01 +00:00

181 lines
5.9 KiB
C#

using CommonSocketLibrary.Backoff;
using Serilog;
using System.Collections;
using System.Net.WebSockets;
using System.Text.Json;
namespace CommonSocketLibrary.Abstract
{
public abstract class SocketClient<Message> : IDisposable where Message : class
{
protected ClientWebSocket? _socket;
private readonly int ReceiveBufferSize = 8192;
protected readonly ILogger _logger;
protected readonly JsonSerializerOptions _options;
private bool _disposed;
public event EventHandler<EventArgs> OnConnected;
public event EventHandler<SocketDisconnectionEventArgs> OnDisconnected;
public SocketClient(ILogger logger, JsonSerializerOptions options)
{
_logger = logger;
_options = options;
_disposed = false;
}
public abstract Task Connect();
protected async Task ConnectAsync(string url)
{
if (_socket != null)
{
if (_socket.State == WebSocketState.Open) return;
else if (!_disposed) _socket.Dispose();
}
_socket = new ClientWebSocket();
_socket.Options.RemoteCertificateValidationCallback = (o, c, ch, er) => true;
_socket.Options.UseDefaultCredentials = false;
_disposed = false;
await _socket.ConnectAsync(new Uri(url), CancellationToken.None);
await Task.Factory.StartNew(ReceiveLoop, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
OnConnected?.Invoke(this, EventArgs.Empty);
}
public async Task DisconnectAsync(SocketDisconnectionEventArgs args)
{
if (_disposed || _socket == null)
return;
if (_socket.State == WebSocketState.Open)
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.Empty, "", CancellationToken.None);
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
}
OnDisconnected?.Invoke(this, args);
_socket.Dispose();
_socket = null;
}
protected virtual async Task<T?> Deserialize<T>(Stream stream)
{
return await JsonSerializer.DeserializeAsync<T>(stream, _options);
}
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
}
private async Task ReceiveLoop()
{
if (_socket == null) return;
MemoryStream? outputStream = null;
WebSocketReceiveResult? receiveResult = null;
var buffer = new byte[ReceiveBufferSize];
try
{
while (_socket.State == WebSocketState.Open || _socket.State == WebSocketState.CloseSent)
{
outputStream = new MemoryStream(ReceiveBufferSize);
do
{
receiveResult = await _socket.ReceiveAsync(buffer, CancellationToken.None);
outputStream.Write(buffer, 0, receiveResult.Count);
}
while (!receiveResult.EndOfMessage);
if (receiveResult.MessageType == WebSocketMessageType.Close) break;
outputStream.Position = 0;
await ResponseReceived(outputStream);
}
}
catch (WebSocketException wse)
{
string data = string.Join(string.Empty, wse.Data.Cast<DictionaryEntry>().Select(e => e.Key + "=" + e.Value));
_logger.Error(wse, $"Websocket connection problem while receiving data [state: {_socket.State}][code: {wse.ErrorCode}][data: {data}]");
}
catch (TaskCanceledException)
{
_logger.Error($"Socket's receive loop got canceled forcefully [state: {_socket.State}]");
}
finally
{
if (_socket.State.ToString().Contains("Close") || _socket.State == WebSocketState.Aborted)
await DisconnectAsync(new SocketDisconnectionEventArgs(_socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty));
}
}
protected async Task Reconnect(IBackoff backoff)
{
while (true)
{
try
{
TimeSpan delay = backoff.GetNextDelay();
await Task.Delay(delay);
await Connect();
backoff.Reset();
break;
}
catch
{
_logger.Error("Unable to reconnect to server.");
}
}
}
private async Task ResponseReceived(Stream stream)
{
Message? data = null;
try
{
data = await Deserialize<Message>(stream);
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to read a websocket message.");
}
finally
{
stream.Dispose();
}
if (data == null)
{
_logger.Error("Failed to read a websocket message.");
return;
}
try
{
await OnResponseReceived(data);
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to execute a websocket message.");
}
}
protected abstract Task OnResponseReceived(Message? content);
}
public class SocketDisconnectionEventArgs : EventArgs
{
public string Status { get; }
public string Reason { get; }
public SocketDisconnectionEventArgs(string status, string reason)
{
Status = status;
Reason = reason;
}
}
}