Skip to content

Commit

Permalink
Consul cache background update (#21)
Browse files Browse the repository at this point in the history
Background cache update for ConsulServiceAddressCache.cs
  • Loading branch information
muphblu authored Nov 21, 2023
1 parent 923bd19 commit 89a6a62
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 92 deletions.
54 changes: 54 additions & 0 deletions ATI.Services.Consul/ConsulAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Metrics;
using Consul;
using NLog;

namespace ATI.Services.Consul;

internal class ConsulAdapter: IDisposable
{
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly ConsulClient _consulClient = new();
private readonly MetricsFactory _metricsFactory = MetricsFactory.CreateHttpClientMetricsFactory(nameof(ConsulAdapter), "consul");

/// <summary>
/// Возвращает список живых инстансов сервиса
/// </summary>
/// <returns></returns>
public async Task<OperationResult<List<ServiceEntry>>> GetPassingServiceInstancesAsync(
string serviceName,
string environment,
bool passingOnly = true)
{
try
{
using (_metricsFactory.CreateMetricsTimer("/health/service/:service"))
{
var fromConsul = await _consulClient.Health.Service(serviceName, environment, passingOnly);
if (fromConsul.StatusCode == HttpStatusCode.OK)
{
return new(fromConsul.Response?.ToList());
}

_logger.Error(
$"По запросу в консул {serviceName}:{environment}, вернулся ответ со статусом: {fromConsul.StatusCode}");
}
}
catch (Exception e)
{
_logger.Error(e);
}

return new(ActionStatus.InternalServerError);
}

public void Dispose()
{
_consulClient?.Dispose();
}
}
7 changes: 6 additions & 1 deletion ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ATI.Services.Consul
/// Обертка, включающая в себя ConsulServiceAddress, TracingHttpClientWrapper и MetricsTracingFactory
/// </summary>
[PublicAPI]
public class ConsulMetricsHttpClientWrapper
public class ConsulMetricsHttpClientWrapper : IDisposable
{
private readonly BaseServiceOptions _serviceOptions;
private readonly MetricsHttpClientWrapper _clientWrapper;
Expand Down Expand Up @@ -328,5 +328,10 @@ private async Task<OperationResult<T>> SendAsync<T, TBody>(string url,
}
}
}

public void Dispose()
{
_serviceAddress?.Dispose();
}
}
}
62 changes: 37 additions & 25 deletions ATI.Services.Consul/ConsulServiceAddress.cs
Original file line number Diff line number Diff line change
@@ -1,45 +1,56 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Extensions;
using Consul;
using NLog;


namespace ATI.Services.Consul
{
public class ConsulServiceAddress: IDisposable
public class ConsulServiceAddress : IDisposable
{
private readonly string _environment;
private readonly string _serviceName;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly Timer _updateCacheTimer;
private ConsulServiceAddressCache CachedServices { get; }
private readonly Func<Task<List<ServiceEntry>>> _getServices;
private readonly ConsulServiceAddressCache _serviceAddressCache;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddress(string serviceName, string environment, TimeSpan? timeToReload = null, bool useCaching = true)
public ConsulServiceAddress(string serviceName,
string environment,
TimeSpan? timeToReload = null,
bool useCaching = true,
bool passingOnly = true)
{
timeToReload ??= TimeSpan.FromSeconds(5);
_environment = environment;
_serviceName = serviceName;

CachedServices = new ConsulServiceAddressCache(useCaching, _serviceName, _environment);

_updateCacheTimer = new Timer(_ => CachedServices.ReloadCache(), null, timeToReload.Value,
timeToReload.Value);
if (useCaching)
{
_serviceAddressCache = new ConsulServiceAddressCache(_serviceName, _environment, timeToReload.Value, passingOnly);
_getServices = () => Task.FromResult(_serviceAddressCache.GetCachedObjectsAsync());
}
else
{
_consulAdapter = new ConsulAdapter();
_getServices = async () =>
await _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment, passingOnly) is var result && result.Success
? result.Value
: new List<ServiceEntry>();
}
}

public async Task<List<ServiceEntry>> GetAllAsync()
{
return await CachedServices.GetCachedObjectsAsync();
}
public async Task<List<ServiceEntry>> GetAllAsync() => await _getServices();

public async Task<string> ToHttpAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await GetAllAsync()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -52,10 +63,10 @@ public async Task<string> ToHttpAsync()

public async Task<(string, int)> GetAddressAndPortAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await GetAllAsync()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -67,30 +78,31 @@ public async Task<string> ToHttpAsync()
}

#region Obsolete

[Obsolete("Method GetAll is deprecated, pls use GetAllAsync instead")]
public List<ServiceEntry> GetAll()
{
return GetAllAsync().GetAwaiter().GetResult();
}

[Obsolete("Method ToHttp is deprecated, pls use ToHttpAsync instead")]
public string ToHttp()
{
return ToHttpAsync().GetAwaiter().GetResult();
}

[Obsolete("Method GetAddressAndPort is deprecated, pls use GetAddressAndPortAsync instead")]
public (string, int) GetAddressAndPort()
{
return GetAddressAndPortAsync().GetAwaiter().GetResult();
}

#endregion

public void Dispose()
{
_updateCacheTimer.Dispose();
_serviceAddressCache?.Dispose();
_consulAdapter?.Dispose();
}
}
}
}
118 changes: 52 additions & 66 deletions ATI.Services.Consul/ConsulServiceAddressCache.cs
Original file line number Diff line number Diff line change
@@ -1,79 +1,65 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Extensions;
using Consul;
using NLog;

namespace ATI.Services.Consul
namespace ATI.Services.Consul;

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// </summary>
internal class ConsulServiceAddressCache: IDisposable
{
private readonly string _serviceName;
private readonly string _environment;
private readonly bool _passingOnly;
private List<ServiceEntry> _cachedServices;
private readonly Timer _updateCacheTimer;
private Task<OperationResult<List<ServiceEntry>>> _updateCacheTask;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddressCache(string serviceName,
string environment,
TimeSpan ttl,
bool passingOnly = true)
{
_serviceName = serviceName;
_environment = environment;
_passingOnly = passingOnly;
_consulAdapter = new ConsulAdapter();
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment, passingOnly);
_cachedServices = _updateCacheTask.GetAwaiter().GetResult() is var result && result.Success
? result.Value
: new List<ServiceEntry>();

_updateCacheTimer = new Timer(_ => ReloadCache().Forget(), null, ttl, ttl);
}

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// Возвращает коллекцию сервисов
/// </summary>
public class ConsulServiceAddressCache
/// <returns></returns>
public List<ServiceEntry> GetCachedObjectsAsync() => _cachedServices;

/// <summary>
/// Запускает таску на обновление кеша
/// </summary>
private async Task ReloadCache()
{
private Task<List<ServiceEntry>> _reloadCacheTask;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly bool _useCaching;
private readonly string _serviceName;
private readonly string _environment;

public ConsulServiceAddressCache(bool useCaching, string serviceName, string environment)
{
_useCaching = useCaching;
_serviceName = serviceName;
_environment = environment;

if (!_useCaching)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}

/// <summary>
/// Возвращает коллекцию сервисов
/// </summary>
/// <returns></returns>
public Task<List<ServiceEntry>> GetCachedObjectsAsync()
{
return _useCaching ? _reloadCacheTask : GetServiceFromConsulAsync();
}
if(_updateCacheTask == null || _updateCacheTask.IsCompleted)
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment, _passingOnly);

/// <summary>
/// Запускает таску на обновление кеша, если кеширование включено
/// </summary>
public void ReloadCache()
{
if (!_useCaching)
return;

if (!_reloadCacheTask.IsCompleted)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}
_cachedServices = await _updateCacheTask is var result && result.Success
? result.Value
: _cachedServices;
}

/// <summary>
/// Возвращает список живых сервисов
/// </summary>
/// <returns></returns>
private async Task<List<ServiceEntry>> GetServiceFromConsulAsync()
{
try
{
using var cc = new ConsulClient();
var fromConsul = await cc.Health.Service(_serviceName, _environment, true);
if (fromConsul.StatusCode == HttpStatusCode.OK && fromConsul.Response.Length > 0)
{
return fromConsul.Response.ToList();
}
}
catch (Exception e)
{
_logger.Error(e);
}
return new List<ServiceEntry>();
}
public void Dispose()
{
_updateCacheTimer.Dispose();
_consulAdapter.Dispose();
}
}

0 comments on commit 89a6a62

Please sign in to comment.