using System.Collections.Immutable; using Serilog; namespace TwitchChatTTS.Bus { public class ServiceBusCentral { private readonly IDictionary _topics; private readonly IDictionary>> _receivers; private readonly ILogger _logger; private readonly object _lock; public ServiceBusCentral(ILogger logger) { _topics = new Dictionary(); _receivers = new Dictionary>>(); _logger = logger; _lock = new object(); } public void Add(string topic, IObserver observer) { lock (_lock) { if (!_receivers.TryGetValue(topic, out var observers)) { observers = new HashSet>(); _receivers.Add(topic, observers); } observers.Add(observer); } } public ServiceBusObservable GetTopic(string topic) { lock (_lock) { if (!_topics.TryGetValue(topic, out var bus)) { bus = new ServiceBusObservable(topic, this); _topics.Add(topic, bus); } return bus; } } public IEnumerable> GetObservers(string topic) { lock (_lock) { if (_receivers.TryGetValue(topic, out var observers)) return observers.ToImmutableArray(); } return []; } public bool RemoveObserver(string topic, IObserver observer) { lock (_lock) { if (_receivers.TryGetValue(topic, out var observers)) return observers.Remove(observer); } return false; } public void Send(object sender, string topic, object value) { var observers = GetObservers(topic); foreach (var consumer in observers) { try { consumer.OnNext(new ServiceBusData(sender, topic, value)); } catch (Exception ex) { _logger.Error(ex, "Failed to execute observer on send."); } } } } }