Added policy messages for WS. Fixed DB changes via stores. Updated chat voices messages via WS to use stores.

This commit is contained in:
Tom 2024-10-21 20:44:20 +00:00
parent e3c78d96fa
commit 94e0d54c31
19 changed files with 255 additions and 137 deletions

View File

@ -2,7 +2,7 @@ namespace HermesSocketServer.Models
{ {
public class ChatterVoice public class ChatterVoice
{ {
public string ChatterId { get; set; } public long ChatterId { get; set; }
public string UserId { get; set; } public string UserId { get; set; }
public string VoiceId { get; set; } public string VoiceId { get; set; }
} }

View File

@ -1,12 +0,0 @@
namespace HermesSocketServer.Store
{
public class Policy
{
public string Id { get; set; }
public string UserId { get; set; }
public string GroupId { get; set; }
public string Path { get; set; }
public int Usage { get; set; }
public TimeSpan Span { get; set; }
}
}

49
Requests/CreatePolicy.cs Normal file
View File

@ -0,0 +1,49 @@
using HermesSocketServer.Models;
using HermesSocketServer.Services;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Requests
{
public class CreatePolicy : IRequest
{
public string Name => "create_policy";
public string[] RequiredKeys => ["groupId", "path", "count", "span"];
private ChannelManager _channels;
private ILogger _logger;
public CreatePolicy(ChannelManager channels, ILogger logger)
{
_channels = channels;
_logger = logger;
}
public async Task<RequestResult> Grant(string sender, IDictionary<string, object>? data)
{
var id = Guid.NewGuid();
string groupId = data["groupId"].ToString()!;
string path = data["path"].ToString()!;
int count = int.Parse(data["count"].ToString()!);
int span = int.Parse(data["span"].ToString()!);
var policy = new PolicyMessage()
{
Id = id,
UserId = sender,
GroupId = Guid.Parse(groupId),
Path = path,
Usage = count,
Span = span,
};
var channel = _channels.Get(sender);
bool result = channel.Policies.Set(id.ToString(), policy);
if (result)
{
_logger.Information($"Added policy to channel [policy id: {id}][group id: {groupId}][path: {path}][count: {count}][span: {span}][channel: {sender}]");
return RequestResult.Successful(policy);
}
return RequestResult.Failed("Something went wrong when updating the cache.");
}
}
}

View File

@ -8,7 +8,6 @@ namespace HermesSocketServer.Requests
public class CreateTTSUser : IRequest public class CreateTTSUser : IRequest
{ {
public string Name => "create_tts_user"; public string Name => "create_tts_user";
public string[] RequiredKeys => ["chatter", "voice"]; public string[] RequiredKeys => ["chatter", "voice"];
private ChannelManager _channels; private ChannelManager _channels;
private Database _database; private Database _database;
@ -40,7 +39,7 @@ namespace HermesSocketServer.Requests
bool result = channel.Chatters.Set(chatterId.ToString(), new ChatterVoice() bool result = channel.Chatters.Set(chatterId.ToString(), new ChatterVoice()
{ {
UserId = sender, UserId = sender,
ChatterId = chatterId.ToString(), ChatterId = chatterId,
VoiceId = data["voice"].ToString()! VoiceId = data["voice"].ToString()!
}); });

View File

@ -35,7 +35,7 @@ namespace HermesSocketServer.Requests
Default = sql.GetBoolean(7) Default = sql.GetBoolean(7)
}) })
); );
return RequestResult.Successful(connections, false); return RequestResult.Successful(connections, notifyClientsOnAccount: false);
} }
} }
} }

28
Requests/GetPolicies.cs Normal file
View File

@ -0,0 +1,28 @@
using HermesSocketServer.Services;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Requests
{
public class GetPolicies : IRequest
{
public string Name => "get_policies";
public string[] RequiredKeys => [];
private ChannelManager _channels;
private ILogger _logger;
public GetPolicies(ChannelManager channels, ILogger logger)
{
_channels = channels;
_logger = logger;
}
public async Task<RequestResult> Grant(string sender, IDictionary<string, object>? data)
{
var channel = _channels.Get(sender);
var results = channel.Policies.Get().Values;
_logger.Information($"Fetched policies for channel [policy size: {results.Count}][channel: {sender}]");
return RequestResult.Successful(results, notifyClientsOnAccount: false);
}
}
}

View File

@ -1,6 +1,5 @@
using HermesSocketLibrary.Socket.Data; using HermesSocketLibrary.Socket.Data;
using HermesSocketServer.Services; using HermesSocketServer.Services;
using Serilog;
namespace HermesSocketServer.Requests namespace HermesSocketServer.Requests
{ {
@ -21,11 +20,17 @@ namespace HermesSocketServer.Requests
public async Task<RequestResult> Grant(string sender, RequestMessage? message) public async Task<RequestResult> Grant(string sender, RequestMessage? message)
{ {
if (message == null || message.Type == null) if (message == null || message.Type == null)
{
_logger.Debug($"Request type does not exist [id: {message.RequestId}][nounce: {message.Nounce}]");
return RequestResult.Failed("Request type does not exist."); return RequestResult.Failed("Request type does not exist.");
}
var channel = _channels.Get(sender); var channel = _channels.Get(sender);
if (channel == null) if (channel == null)
{
_logger.Debug($"Channel does not exist [id: {message.RequestId}][nounce: {message.Nounce}]");
return RequestResult.Failed("Channel does not exist."); return RequestResult.Failed("Channel does not exist.");
}
if (!_requests.TryGetValue(message.Type, out IRequest? request) || request == null) if (!_requests.TryGetValue(message.Type, out IRequest? request) || request == null)
{ {
@ -36,12 +41,18 @@ namespace HermesSocketServer.Requests
if (request.RequiredKeys.Any()) if (request.RequiredKeys.Any())
{ {
if (message.Data == null) if (message.Data == null)
{
_logger.Debug($"Request is lacking data entries [id: {message.RequestId}][nounce: {message.Nounce}]");
return RequestResult.Failed($"Request is lacking data entries."); return RequestResult.Failed($"Request is lacking data entries.");
}
foreach (var key in request.RequiredKeys) foreach (var key in request.RequiredKeys)
{ {
if (!message.Data.ContainsKey(key)) if (!message.Data.ContainsKey(key))
{
_logger.Debug($"Request is missing '{key}' in its data entries [id: {message.RequestId}][nounce: {message.Nounce}]");
return RequestResult.Failed($"Request is missing '{key}' in its data entries."); return RequestResult.Failed($"Request is missing '{key}' in its data entries.");
}
} }
} }

View File

@ -15,12 +15,12 @@ namespace HermesSocketServer.Requests
public static RequestResult Successful(object? result, bool notifyClientsOnAccount = true) public static RequestResult Successful(object? result, bool notifyClientsOnAccount = true)
{ {
return RequestResult.Successful(result, notifyClientsOnAccount); return new RequestResult(true, result, notifyClientsOnAccount);
} }
public static RequestResult Failed(string error, bool notifyClientsOnAccount = true) public static RequestResult Failed(string error, bool notifyClientsOnAccount = true)
{ {
return RequestResult.Successful(error, notifyClientsOnAccount); return new RequestResult(false, error, notifyClientsOnAccount);
} }
} }
} }

48
Requests/UpdatePolicy.cs Normal file
View File

@ -0,0 +1,48 @@
using HermesSocketServer.Models;
using HermesSocketServer.Services;
using ILogger = Serilog.ILogger;
namespace HermesSocketServer.Requests
{
public class UpdatePolicy : IRequest
{
public string Name => "update_policy";
public string[] RequiredKeys => ["id", "groupId", "path", "count", "span"];
private ChannelManager _channels;
private ILogger _logger;
public UpdatePolicy(ChannelManager channels, ILogger logger)
{
_channels = channels;
_logger = logger;
}
public async Task<RequestResult> Grant(string sender, IDictionary<string, object>? data)
{
var id = Guid.Parse(data["id"].ToString()!);
string groupId = data["groupId"].ToString()!;
string path = data["path"].ToString()!;
int count = int.Parse(data["count"].ToString()!);
int span = int.Parse(data["span"].ToString()!);
var channel = _channels.Get(sender)!;
bool result = channel.Policies.Set(id.ToString(), new PolicyMessage()
{
Id = id,
UserId = sender,
GroupId = Guid.Parse(groupId),
Path = path,
Usage = count,
Span = span,
});
if (result)
{
var policy = channel.Policies.Get(id.ToString());
_logger.Information($"Updated policy to channel [policy id: {id}][group id: {groupId}][path: {path}][count: {count}][span: {span}][channel: {sender}]");
return RequestResult.Successful(policy);
}
return RequestResult.Failed("Something went wrong when updating the cache.");
}
}
}

View File

@ -28,6 +28,7 @@ namespace HermesSocketServer.Requests
if (long.TryParse(data["chatter"].ToString(), out long chatterId)) if (long.TryParse(data["chatter"].ToString(), out long chatterId))
data["chatter"] = chatterId; data["chatter"] = chatterId;
data["voice"] = data["voice"].ToString(); data["voice"] = data["voice"].ToString();
data["user"] = sender;
var check = await _database.ExecuteScalar("SELECT state FROM \"TtsVoiceState\" WHERE \"userId\" = @user AND \"ttsVoiceId\" = @voice", data) ?? false; var check = await _database.ExecuteScalar("SELECT state FROM \"TtsVoiceState\" WHERE \"userId\" = @user AND \"ttsVoiceId\" = @voice", data) ?? false;
if ((check is not bool state || !state) && chatterId != _configuration.Tts.OwnerId) if ((check is not bool state || !state) && chatterId != _configuration.Tts.OwnerId)
@ -37,7 +38,7 @@ namespace HermesSocketServer.Requests
var result = channel.Chatters.Set(chatterId.ToString(), new ChatterVoice() var result = channel.Chatters.Set(chatterId.ToString(), new ChatterVoice()
{ {
UserId = sender, UserId = sender,
ChatterId = chatterId.ToString(), ChatterId = chatterId,
VoiceId = data["voice"].ToString()! VoiceId = data["voice"].ToString()!
}); });
if (result) if (result)

View File

@ -1,3 +1,4 @@
using HermesSocketLibrary.db;
using HermesSocketServer.Store; using HermesSocketServer.Store;
namespace HermesSocketServer.Services namespace HermesSocketServer.Services
@ -29,15 +30,15 @@ namespace HermesSocketServer.Services
await Task.Run(async () => await Task.Run(async () =>
{ {
await Task.Delay(TimeSpan.FromSeconds(_configuration.Database.SaveDelayInSeconds)); await Task.Delay(TimeSpan.FromSeconds(_configuration.Database.SaveDelayInSeconds));
while (true) while (true)
{ {
await Task.WhenAll([ await Task.WhenAll([
_voices.Save(), _voices.Save(),
_users.Save(), _users.Save(),
_channels.Save(), _channels.Save(),
Task.Delay(TimeSpan.FromSeconds(_configuration.Database.SaveDelayInSeconds)),
]); ]);
await Task.Delay(TimeSpan.FromSeconds(_configuration.Database.SaveDelayInSeconds));
} }
}); });
} }

View File

@ -70,8 +70,6 @@ namespace HermesSocketServer.Socket.Handlers
} }
} }
} }
} }
} }
} }

View File

@ -84,14 +84,6 @@ s.AddSingleton<VoiceStore>();
s.AddSingleton<UserStore>(); s.AddSingleton<UserStore>();
// Request handlers // Request handlers
s.AddSingleton<IRequest, GetTTSUsers>();
s.AddSingleton<IRequest, GetTTSVoices>();
s.AddSingleton<IRequest, GetTTSWordFilters>();
s.AddSingleton<IRequest, CreateTTSUser>();
s.AddSingleton<IRequest, CreateTTSVoice>();
s.AddSingleton<IRequest, DeleteTTSVoice>();
s.AddSingleton<IRequest, UpdateTTSUser>();
s.AddSingleton<IRequest, UpdateTTSVoice>();
s.AddSingleton<IRequest, GetChatterIds>(); s.AddSingleton<IRequest, GetChatterIds>();
s.AddSingleton<IRequest, GetConnections>(); s.AddSingleton<IRequest, GetConnections>();
s.AddSingleton<IRequest, GetDefaultTTSVoice>(); s.AddSingleton<IRequest, GetDefaultTTSVoice>();
@ -100,8 +92,19 @@ s.AddSingleton<IRequest, GetEnabledTTSVoices>();
s.AddSingleton<IRequest, GetPermissions>(); s.AddSingleton<IRequest, GetPermissions>();
s.AddSingleton<IRequest, GetRedemptions>(); s.AddSingleton<IRequest, GetRedemptions>();
s.AddSingleton<IRequest, GetRedeemableActions>(); s.AddSingleton<IRequest, GetRedeemableActions>();
s.AddSingleton<IRequest, UpdateTTSVoiceState>(); s.AddSingleton<IRequest, GetPolicies>();
s.AddSingleton<IRequest, GetTTSUsers>();
s.AddSingleton<IRequest, GetTTSVoices>();
s.AddSingleton<IRequest, GetTTSWordFilters>();
s.AddSingleton<IRequest, CreatePolicy>();
s.AddSingleton<IRequest, CreateTTSUser>();
s.AddSingleton<IRequest, CreateTTSVoice>();
s.AddSingleton<IRequest, DeleteTTSVoice>();
s.AddSingleton<IRequest, UpdateTTSUser>();
s.AddSingleton<IRequest, UpdateTTSVoice>();
s.AddSingleton<IRequest, UpdateDefaultTTSVoice>(); s.AddSingleton<IRequest, UpdateDefaultTTSVoice>();
s.AddSingleton<IRequest, UpdateTTSVoiceState>();
s.AddSingleton<IRequest, UpdatePolicy>();
// Managers // Managers
s.AddSingleton<ChannelManager>(); s.AddSingleton<ChannelManager>();

View File

@ -32,8 +32,8 @@ namespace HermesSocketServer.Store
string sql = $"SELECT \"chatterId\", \"ttsVoiceId\" FROM \"TtsChatVoice\" WHERE \"userId\" = @user"; string sql = $"SELECT \"chatterId\", \"ttsVoiceId\" FROM \"TtsChatVoice\" WHERE \"userId\" = @user";
await _database.Execute(sql, data, (reader) => await _database.Execute(sql, data, (reader) =>
{ {
string chatterId = reader.GetInt64(0).ToString(); var chatterId = reader.GetInt64(0);
_store.Add(chatterId, new ChatterVoice() _store.Add(chatterId.ToString(), new ChatterVoice()
{ {
UserId = _userId, UserId = _userId,
ChatterId = chatterId, ChatterId = chatterId,
@ -70,7 +70,7 @@ namespace HermesSocketServer.Store
} }
_logger.Debug($"TtsChatVoice - Adding {count} rows to database: {sql}"); _logger.Debug($"TtsChatVoice - Adding {count} rows to database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_modified.Any()) if (_modified.Any())
{ {
@ -81,7 +81,7 @@ namespace HermesSocketServer.Store
_modified.Clear(); _modified.Clear();
} }
_logger.Debug($"TtsChatVoice - Modifying {count} rows in database: {sql}"); _logger.Debug($"TtsChatVoice - Modifying {count} rows in database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_deleted.Any()) if (_deleted.Any())
{ {
@ -92,7 +92,7 @@ namespace HermesSocketServer.Store
_deleted.Clear(); _deleted.Clear();
} }
_logger.Debug($"TtsChatVoice - Deleting {count} rows from database: {sql}"); _logger.Debug($"TtsChatVoice - Deleting {count} rows from database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
return true; return true;
} }

View File

@ -86,7 +86,9 @@ namespace HermesSocketServer.Store
.Append("),"); .Append("),");
} }
sb.Remove(sb.Length - 1, 1) sb.Remove(sb.Length - 1, 1)
.Append($") AS c(\"{string.Join("\", \"", columns)}\") WHERE id = c.id;"); .Append($") AS c(\"{string.Join("\", \"", columns)}\") WHERE ")
.Append(string.Join(" AND ", keyColumns.Select(c => "t.\"" + c + "\" = c.\"" + c + "\"")))
.Append(";");
return sb.ToString(); return sb.ToString();
} }
@ -131,6 +133,12 @@ namespace HermesSocketServer.Store
sb.Append("'") sb.Append("'")
.Append(value) .Append(value)
.Append("'"); .Append("'");
else if (type == typeof(Guid))
sb.Append("'")
.Append(value?.ToString())
.Append("'");
else if (type == typeof(TimeSpan))
sb.Append(((TimeSpan)value).TotalMilliseconds);
else else
sb.Append(value); sb.Append(value);
} }

View File

@ -1,13 +1,14 @@
using HermesSocketLibrary.db; using HermesSocketLibrary.db;
using HermesSocketServer.Models;
namespace HermesSocketServer.Store namespace HermesSocketServer.Store
{ {
public class PolicyStore : GroupSaveStore<string, Policy> public class PolicyStore : GroupSaveStore<string, PolicyMessage>
{ {
private readonly string _userId; private readonly string _userId;
private readonly Database _database; private readonly Database _database;
private readonly Serilog.ILogger _logger; private readonly Serilog.ILogger _logger;
private readonly GroupSaveSqlGenerator<Policy> _generator; private readonly GroupSaveSqlGenerator<PolicyMessage> _generator;
public PolicyStore(string userId, Database database, Serilog.ILogger logger) : base(logger) public PolicyStore(string userId, Database database, Serilog.ILogger logger) : base(logger)
@ -25,7 +26,7 @@ namespace HermesSocketServer.Store
{ "count", "Usage" }, { "count", "Usage" },
{ "timespan", "Span" }, { "timespan", "Span" },
}; };
_generator = new GroupSaveSqlGenerator<Policy>(ctp); _generator = new GroupSaveSqlGenerator<PolicyMessage>(ctp);
} }
public override async Task Load() public override async Task Load()
@ -34,25 +35,25 @@ namespace HermesSocketServer.Store
string sql = $"SELECT id, \"groupId\", path, count, timespan FROM \"GroupPermissionPolicy\" WHERE \"userId\" = @user"; string sql = $"SELECT id, \"groupId\", path, count, timespan FROM \"GroupPermissionPolicy\" WHERE \"userId\" = @user";
await _database.Execute(sql, data, (reader) => await _database.Execute(sql, data, (reader) =>
{ {
string id = reader.GetString(0).ToString(); var id = reader.GetGuid(0);
_store.Add(id, new Policy() _store.Add(id.ToString(), new PolicyMessage()
{ {
Id = id, Id = id,
UserId = _userId, UserId = _userId,
GroupId = reader.GetString(1), GroupId = reader.GetGuid(1),
Path = reader.GetString(2), Path = reader.GetString(2),
Usage = reader.GetInt32(3), Usage = reader.GetInt32(3),
Span = TimeSpan.FromMilliseconds(reader.GetInt32(4)), Span = reader.GetInt32(4),
}); });
}); });
_logger.Information($"Loaded {_store.Count} policies from database."); _logger.Information($"Loaded {_store.Count} policies from database.");
} }
protected override void OnInitialAdd(string key, Policy value) protected override void OnInitialAdd(string key, PolicyMessage value)
{ {
} }
protected override void OnInitialModify(string key, Policy value) protected override void OnInitialModify(string key, PolicyMessage value)
{ {
} }
@ -75,7 +76,7 @@ namespace HermesSocketServer.Store
} }
_logger.Debug($"GroupPermissionPolicy - Adding {count} rows to database: {sql}"); _logger.Debug($"GroupPermissionPolicy - Adding {count} rows to database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_modified.Any()) if (_modified.Any())
{ {
@ -86,7 +87,7 @@ namespace HermesSocketServer.Store
_modified.Clear(); _modified.Clear();
} }
_logger.Debug($"GroupPermissionPolicy - Modifying {count} rows in database: {sql}"); _logger.Debug($"GroupPermissionPolicy - Modifying {count} rows in database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_deleted.Any()) if (_deleted.Any())
{ {
@ -97,7 +98,7 @@ namespace HermesSocketServer.Store
_deleted.Clear(); _deleted.Clear();
} }
_logger.Debug($"GroupPermissionPolicy - Deleting {count} rows from database: {sql}"); _logger.Debug($"GroupPermissionPolicy - Deleting {count} rows from database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
return true; return true;
} }

View File

@ -70,7 +70,7 @@ namespace HermesSocketServer.Store
_added.Clear(); _added.Clear();
} }
_logger.Debug($"User - Adding {count} rows to database: {sql}"); _logger.Debug($"User - Adding {count} rows to database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_modified.Any()) if (_modified.Any())
{ {
@ -81,7 +81,7 @@ namespace HermesSocketServer.Store
_modified.Clear(); _modified.Clear();
} }
_logger.Debug($"User - Modifying {count} rows in database: {sql}"); _logger.Debug($"User - Modifying {count} rows in database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_deleted.Any()) if (_deleted.Any())
{ {
@ -92,7 +92,7 @@ namespace HermesSocketServer.Store
_deleted.Clear(); _deleted.Clear();
} }
_logger.Debug($"User - Deleting {count} rows from database: {sql}"); _logger.Debug($"User - Deleting {count} rows from database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
return true; return true;
} }

View File

@ -73,7 +73,7 @@ namespace HermesSocketServer.Store
} }
_logger.Debug($"TtsVoice - Adding {count} rows to database: {sql}"); _logger.Debug($"TtsVoice - Adding {count} rows to database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_modified.Any()) if (_modified.Any())
{ {
@ -84,7 +84,7 @@ namespace HermesSocketServer.Store
_modified.Clear(); _modified.Clear();
} }
_logger.Debug($"TtsVoice - Modifying {count} rows in database: {sql}"); _logger.Debug($"TtsVoice - Modifying {count} rows in database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
if (_deleted.Any()) if (_deleted.Any())
{ {
@ -95,7 +95,7 @@ namespace HermesSocketServer.Store
_deleted.Clear(); _deleted.Clear();
} }
_logger.Debug($"TtsVoice - Deleting {count} rows from database: {sql}"); _logger.Debug($"TtsVoice - Deleting {count} rows from database: {sql}");
await _database.ExecuteScalar(sql); await _database.ExecuteScalarTransaction(sql);
} }
return true; return true;
} }

View File

@ -5,9 +5,7 @@ namespace HermesSocketLibrary.db
{ {
public class Database public class Database
{ {
private NpgsqlDataSource _source; private readonly NpgsqlDataSource _source;
private ServerConfiguration _configuration;
public NpgsqlDataSource DataSource { get => _source; } public NpgsqlDataSource DataSource { get => _source; }
@ -19,111 +17,96 @@ namespace HermesSocketLibrary.db
public async Task Execute(string sql, IDictionary<string, object>? values, Action<NpgsqlDataReader> reading) public async Task Execute(string sql, IDictionary<string, object>? values, Action<NpgsqlDataReader> reading)
{ {
using (var connection = await _source.OpenConnectionAsync()) await using var connection = await _source.OpenConnectionAsync();
await using var command = new NpgsqlCommand(sql, connection);
if (values != null)
{ {
using (var command = new NpgsqlCommand(sql, connection)) foreach (var entry in values)
{ command.Parameters.AddWithValue(entry.Key, entry.Value);
if (values != null) }
{ await command.PrepareAsync();
foreach (var entry in values)
command.Parameters.AddWithValue(entry.Key, entry.Value);
}
await command.PrepareAsync();
using (var reader = await command.ExecuteReaderAsync()) await using var reader = await command.ExecuteReaderAsync();
{ while (await reader.ReadAsync())
while (await reader.ReadAsync()) {
{ reading(reader);
reading(reader);
}
}
}
} }
} }
public async Task Execute(string sql, Action<NpgsqlCommand> action, Action<NpgsqlDataReader> reading) public async Task Execute(string sql, Action<NpgsqlCommand> action, Action<NpgsqlDataReader> reading)
{ {
using (var connection = await _source.OpenConnectionAsync()) await using var connection = await _source.OpenConnectionAsync();
{ await using var command = new NpgsqlCommand(sql, connection);
using (var command = new NpgsqlCommand(sql, connection)) action(command);
{ await command.PrepareAsync();
action(command);
await command.PrepareAsync();
using (var reader = await command.ExecuteReaderAsync()) await using var reader = await command.ExecuteReaderAsync();
{ while (await reader.ReadAsync())
while (await reader.ReadAsync()) {
{ reading(reader);
reading(reader);
}
}
}
} }
} }
public async Task<int> Execute(string sql, IDictionary<string, object>? values) public async Task<int> Execute(string sql, IDictionary<string, object>? values)
{ {
using (var connection = await _source.OpenConnectionAsync()) await using var connection = await _source.OpenConnectionAsync();
await using var command = new NpgsqlCommand(sql, connection);
if (values != null)
{ {
using (var command = new NpgsqlCommand(sql, connection)) foreach (var entry in values)
{ command.Parameters.AddWithValue(entry.Key, entry.Value);
if (values != null)
{
foreach (var entry in values)
command.Parameters.AddWithValue(entry.Key, entry.Value);
}
await command.PrepareAsync();
return await command.ExecuteNonQueryAsync();
}
} }
await command.PrepareAsync();
return await command.ExecuteNonQueryAsync();
} }
public async Task<int> Execute(string sql, Action<NpgsqlCommand> prepare) public async Task<int> Execute(string sql, Action<NpgsqlCommand> prepare)
{ {
using (var connection = await _source.OpenConnectionAsync()) await using var connection = await _source.OpenConnectionAsync();
{ await using var command = new NpgsqlCommand(sql, connection);
using (var command = new NpgsqlCommand(sql, connection)) prepare(command);
{ await command.PrepareAsync();
prepare(command); return await command.ExecuteNonQueryAsync();
await command.PrepareAsync();
return await command.ExecuteNonQueryAsync();
}
}
} }
public async Task<object?> ExecuteScalar(string sql, IDictionary<string, object>? values = null) public async Task<object?> ExecuteScalar(string sql, IDictionary<string, object>? values = null)
{ {
using (var connection = await _source.OpenConnectionAsync()) await using var connection = await _source.OpenConnectionAsync();
await using var command = new NpgsqlCommand(sql, connection);
if (values != null)
{ {
using (var command = new NpgsqlCommand(sql, connection)) foreach (var entry in values)
{ command.Parameters.AddWithValue(entry.Key, entry.Value);
if (values != null)
{
foreach (var entry in values)
command.Parameters.AddWithValue(entry.Key, entry.Value);
}
await command.PrepareAsync();
return await command.ExecuteScalarAsync();
}
} }
await command.PrepareAsync();
return await command.ExecuteScalarAsync();
}
public async Task<object?> ExecuteScalarTransaction(string sql, IDictionary<string, object>? values = null)
{
await using var connection = await _source.OpenConnectionAsync();
await using var transaction = await connection.BeginTransactionAsync();
await using var command = new NpgsqlCommand(sql, connection, transaction);
if (values != null)
{
foreach (var entry in values)
command.Parameters.AddWithValue(entry.Key, entry.Value);
}
await command.PrepareAsync();
var results = await command.ExecuteScalarAsync();
await transaction.CommitAsync();
return results;
} }
public async Task<object?> ExecuteScalar(string sql, Action<NpgsqlCommand> action) public async Task<object?> ExecuteScalar(string sql, Action<NpgsqlCommand> action)
{ {
using (var connection = await _source.OpenConnectionAsync()) await using var connection = await _source.OpenConnectionAsync();
{ await using var command = new NpgsqlCommand(sql, connection);
using (var command = new NpgsqlCommand(sql, connection)) action(command);
{ await command.PrepareAsync();
action(command); return await command.ExecuteScalarAsync();
await command.PrepareAsync();
return await command.ExecuteScalarAsync();
}
}
} }
} }
} }