Cleaned up request acks. Added internal service bus for internal messaging.

This commit is contained in:
Tom
2024-11-08 15:32:42 +00:00
parent fe2eb86a08
commit 66f2bf7ec6
33 changed files with 1326 additions and 415 deletions

View File

@@ -0,0 +1,10 @@
using HermesSocketLibrary.Requests.Messages;
namespace TwitchChatTTS.Bus.Data
{
public class RedemptionInitiation
{
public required IEnumerable<Redemption> Redemptions { get; set; }
public required IDictionary<string, RedeemableAction> Actions { get; set; }
}
}

83
Bus/ServiceBusCentral.cs Normal file
View File

@@ -0,0 +1,83 @@
using System.Collections.Immutable;
using Serilog;
namespace TwitchChatTTS.Bus
{
public class ServiceBusCentral
{
private readonly IDictionary<string, ServiceBusObservable> _topics;
private readonly IDictionary<string, ISet<IObserver<ServiceBusData>>> _receivers;
private readonly ILogger _logger;
private readonly object _lock;
public ServiceBusCentral(ILogger logger)
{
_topics = new Dictionary<string, ServiceBusObservable>();
_receivers = new Dictionary<string, ISet<IObserver<ServiceBusData>>>();
_logger = logger;
_lock = new object();
}
public void Add(string topic, IObserver<ServiceBusData> observer)
{
lock (_lock)
{
if (!_receivers.TryGetValue(topic, out var observers))
{
observers = new HashSet<IObserver<ServiceBusData>>();
_receivers.Add(topic, observers);
}
observers.Add(observer);
}
}
public ServiceBusObservable GetTopic(string topic)
{
lock (_lock)
{
if (!_topics.TryGetValue(topic, out var bus))
{
bus = new ServiceBusObservable(topic, this);
_topics.Add(topic, bus);
}
return bus;
}
}
public IEnumerable<IObserver<ServiceBusData>> GetObservers(string topic)
{
lock (_lock)
{
if (_receivers.TryGetValue(topic, out var observers))
return observers.ToImmutableArray();
}
return [];
}
public bool RemoveObserver(string topic, IObserver<ServiceBusData> observer)
{
lock (_lock)
{
if (_receivers.TryGetValue(topic, out var observers))
return observers.Remove(observer);
}
return false;
}
public void Send(object sender, string topic, object value)
{
var observers = GetObservers(topic);
foreach (var consumer in observers)
{
try
{
consumer.OnNext(new ServiceBusData(sender, topic, value));
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to execute observer on send.");
}
}
}
}
}

18
Bus/ServiceBusData.cs Normal file
View File

@@ -0,0 +1,18 @@
namespace TwitchChatTTS.Bus
{
public class ServiceBusData
{
public string Topic { get; }
public object? Sender { get; }
public object? Value { get; }
public DateTime Timestamp { get; }
public ServiceBusData(object sender, string topic, object value)
{
Topic = topic;
Sender = sender;
Value = value;
Timestamp = DateTime.UtcNow;
}
}
}

View File

@@ -0,0 +1,41 @@
using System.Reactive;
namespace TwitchChatTTS.Bus
{
public class ServiceBusObservable : ObservableBase<ServiceBusData>
{
private readonly string _topic;
private readonly ServiceBusCentral _central;
public ServiceBusObservable(string topic, ServiceBusCentral central)
{
_topic = topic;
_central = central;
}
protected override IDisposable SubscribeCore(IObserver<ServiceBusData> observer)
{
_central.Add(_topic, observer);
return new ServiceBusUnsubscriber(_topic, _central, observer);
}
private sealed class ServiceBusUnsubscriber : IDisposable
{
private readonly string _topic;
private readonly ServiceBusCentral _central;
private readonly IObserver<ServiceBusData> _receiver;
public ServiceBusUnsubscriber(string topic, ServiceBusCentral central, IObserver<ServiceBusData> receiver)
{
_topic = topic;
_central = central;
_receiver = receiver;
}
public void Dispose()
{
_central.RemoveObserver(_topic, _receiver);
}
}
}
}

31
Bus/ServiceBusObserver.cs Normal file
View File

@@ -0,0 +1,31 @@
using System.Reactive;
using Serilog;
namespace TwitchChatTTS.Bus
{
public class ServiceBusObserver : ObserverBase<ServiceBusData>
{
private readonly Action<ServiceBusData> _action;
private readonly ILogger _logger;
public ServiceBusObserver(Action<ServiceBusData> action, ILogger logger)
{
_action = action;
_logger = logger;
}
protected override void OnCompletedCore()
{
}
protected override void OnErrorCore(Exception error)
{
_logger.Error(error, "Error occurred.");
}
protected override void OnNextCore(ServiceBusData value)
{
_action.Invoke(value);
}
}
}