From 7492da4c4904a8e52d10301c3fcc036451388231 Mon Sep 17 00:00:00 2001 From: Aidar Shaikhiev Date: Mon, 13 Nov 2023 19:55:49 +0600 Subject: [PATCH] Background cache update for ConsulServiceAddressCache.cs --- ATI.Services.Consul/ConsulAdapter.cs | 45 ++++++++ .../ConsulMetricsHttpClientWrapper.cs | 7 +- ATI.Services.Consul/ConsulServiceAddress.cs | 58 +++++----- .../ConsulServiceAddressCache.cs | 104 +++++++----------- 4 files changed, 122 insertions(+), 92 deletions(-) create mode 100644 ATI.Services.Consul/ConsulAdapter.cs diff --git a/ATI.Services.Consul/ConsulAdapter.cs b/ATI.Services.Consul/ConsulAdapter.cs new file mode 100644 index 0000000..ce7188a --- /dev/null +++ b/ATI.Services.Consul/ConsulAdapter.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Consul; +using NLog; + +namespace ATI.Services.Consul; + +internal class ConsulAdapter: IDisposable +{ + private readonly ILogger _logger = LogManager.GetCurrentClassLogger(); + private ConsulClient _consulClient; + + /// + /// Возвращает список живых сервисов + /// + /// + public async Task> GetPassingServiceInstancesAsync(string serviceName, string environment, bool passingOnly = true, ulong index = 0, TimeSpan? waitTime = null) + { + try + { + _consulClient = new ConsulClient(); + var fromConsul = await _consulClient.Health.Service(serviceName, environment, passingOnly, new QueryOptions { WaitIndex = index, WaitTime = waitTime}); + if (fromConsul.StatusCode == HttpStatusCode.OK) + { + return fromConsul.Response?.ToList(); + } + + _logger.Error($"По запросу в консул {serviceName}:{environment}, вернулся ответ со статусом: {fromConsul.StatusCode}"); + } + catch (Exception e) + { + _logger.Error(e); + } + + return new List(); + } + + public void Dispose() + { + _consulClient?.Dispose(); + } +} \ No newline at end of file diff --git a/ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs b/ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs index 9227696..96a3b5d 100644 --- a/ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs +++ b/ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs @@ -18,7 +18,7 @@ namespace ATI.Services.Consul /// Обертка, включающая в себя ConsulServiceAddress, TracingHttpClientWrapper и MetricsTracingFactory /// [PublicAPI] - public class ConsulMetricsHttpClientWrapper + public class ConsulMetricsHttpClientWrapper : IDisposable { private readonly BaseServiceOptions _serviceOptions; private readonly MetricsHttpClientWrapper _clientWrapper; @@ -297,5 +297,10 @@ private async Task> SendAsync(string url, } } } + + public void Dispose() + { + _serviceAddress?.Dispose(); + } } } \ No newline at end of file diff --git a/ATI.Services.Consul/ConsulServiceAddress.cs b/ATI.Services.Consul/ConsulServiceAddress.cs index 7ffa1a3..408c8ee 100644 --- a/ATI.Services.Consul/ConsulServiceAddress.cs +++ b/ATI.Services.Consul/ConsulServiceAddress.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Threading; using System.Threading.Tasks; using ATI.Services.Common.Extensions; using Consul; @@ -9,37 +8,45 @@ namespace ATI.Services.Consul { - public class ConsulServiceAddress: IDisposable + public class ConsulServiceAddress : IDisposable { private readonly string _environment; private readonly string _serviceName; private readonly ILogger _logger = LogManager.GetCurrentClassLogger(); - private readonly Timer _updateCacheTimer; - private ConsulServiceAddressCache CachedServices { get; } + private readonly Func>> _getServices; + private readonly ConsulServiceAddressCache _serviceAddressCache; + private readonly ConsulAdapter _consulAdapter; - public ConsulServiceAddress(string serviceName, string environment, TimeSpan? timeToReload = null, bool useCaching = true) + public ConsulServiceAddress(string serviceName, + string environment, + TimeSpan? timeToReload = null, + bool useCaching = true) { timeToReload ??= TimeSpan.FromSeconds(5); _environment = environment; _serviceName = serviceName; - CachedServices = new ConsulServiceAddressCache(useCaching, _serviceName, _environment); - - _updateCacheTimer = new Timer(_ => CachedServices.ReloadCache(), null, timeToReload.Value, - timeToReload.Value); + if (useCaching) + { + _serviceAddressCache = new ConsulServiceAddressCache(_serviceName, _environment, timeToReload.Value); + _getServices = () => Task.FromResult(_serviceAddressCache.GetCachedObjectsAsync()); + } + else + { + _consulAdapter = new ConsulAdapter(); + _getServices = async () => + await _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment); + } } - public async Task> GetAllAsync() - { - return await CachedServices.GetCachedObjectsAsync(); - } + public Task> GetAllAsync() => _getServices(); public async Task ToHttpAsync() { - var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem(); + var serviceInfo = (await _getServices()).RandomItem(); var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address) - ? serviceInfo?.Node.Address - : serviceInfo.Service.Address; + ? serviceInfo?.Node.Address + : serviceInfo.Service.Address; if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null) { @@ -52,10 +59,10 @@ public async Task ToHttpAsync() public async Task<(string, int)> GetAddressAndPortAsync() { - var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem(); + var serviceInfo = (await _getServices()).RandomItem(); var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address) - ? serviceInfo?.Node.Address - : serviceInfo.Service.Address; + ? serviceInfo?.Node.Address + : serviceInfo.Service.Address; if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null) { @@ -67,19 +74,19 @@ public async Task ToHttpAsync() } #region Obsolete - + [Obsolete("Method GetAll is deprecated, pls use GetAllAsync instead")] public List GetAll() { return GetAllAsync().GetAwaiter().GetResult(); } - + [Obsolete("Method ToHttp is deprecated, pls use ToHttpAsync instead")] public string ToHttp() { return ToHttpAsync().GetAwaiter().GetResult(); } - + [Obsolete("Method GetAddressAndPort is deprecated, pls use GetAddressAndPortAsync instead")] public (string, int) GetAddressAndPort() { @@ -87,10 +94,11 @@ public string ToHttp() } #endregion - + public void Dispose() { - _updateCacheTimer.Dispose(); + _serviceAddressCache?.Dispose(); + _consulAdapter?.Dispose(); } } -} +} \ No newline at end of file diff --git a/ATI.Services.Consul/ConsulServiceAddressCache.cs b/ATI.Services.Consul/ConsulServiceAddressCache.cs index a55bdc0..70ded29 100644 --- a/ATI.Services.Consul/ConsulServiceAddressCache.cs +++ b/ATI.Services.Consul/ConsulServiceAddressCache.cs @@ -1,79 +1,51 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Net; +using System.Threading; using System.Threading.Tasks; using Consul; -using NLog; -namespace ATI.Services.Consul +namespace ATI.Services.Consul; + +/// +/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально) +/// +internal class ConsulServiceAddressCache: IDisposable { + private readonly string _serviceName; + private readonly string _environment; + private List _cachedServices; + private readonly Timer _updateCacheTimer; + private Task> _updateCacheTask; + private readonly ConsulAdapter _consulAdapter; + + public ConsulServiceAddressCache(string serviceName, string environment, TimeSpan ttl) + { + _serviceName = serviceName; + _environment = environment; + _consulAdapter = new ConsulAdapter(); + _updateCacheTimer = new Timer(_ => ReloadCache(), null, ttl, ttl); + _cachedServices = _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment).GetAwaiter().GetResult(); + } + /// - /// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально) + /// Возвращает коллекцию сервисов /// - public class ConsulServiceAddressCache - { - private Task> _reloadCacheTask; - private readonly ILogger _logger = LogManager.GetCurrentClassLogger(); - private readonly bool _useCaching; - private readonly string _serviceName; - private readonly string _environment; + /// + public List GetCachedObjectsAsync() => _cachedServices; - public ConsulServiceAddressCache(bool useCaching, string serviceName, string environment) - { - _useCaching = useCaching; - _serviceName = serviceName; - _environment = environment; - - if (!_useCaching) - return; - - _reloadCacheTask = GetServiceFromConsulAsync(); - } + /// + /// Запускает таску на обновление кеша + /// + private void ReloadCache() + { + if(_updateCacheTask.IsCompleted) + _updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment); - /// - /// Возвращает коллекцию сервисов - /// - /// - public Task> GetCachedObjectsAsync() - { - return _useCaching ? _reloadCacheTask : GetServiceFromConsulAsync(); - } - - /// - /// Запускает таску на обновление кеша, если кеширование включено - /// - public void ReloadCache() - { - if (!_useCaching) - return; - - if (!_reloadCacheTask.IsCompleted) - return; - - _reloadCacheTask = GetServiceFromConsulAsync(); - } + _cachedServices = _updateCacheTask.GetAwaiter().GetResult(); + } - /// - /// Возвращает список живых сервисов - /// - /// - private async Task> GetServiceFromConsulAsync() - { - try - { - using var cc = new ConsulClient(); - var fromConsul = await cc.Health.Service(_serviceName, _environment, true); - if (fromConsul.StatusCode == HttpStatusCode.OK && fromConsul.Response.Length > 0) - { - return fromConsul.Response.ToList(); - } - } - catch (Exception e) - { - _logger.Error(e); - } - return new List(); - } + public void Dispose() + { + _updateCacheTimer.Dispose(); } } \ No newline at end of file