Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul cache background update #21

Merged
merged 8 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions ATI.Services.Consul/ConsulAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Metrics;
using Consul;
using NLog;

namespace ATI.Services.Consul;

internal class ConsulAdapter: IDisposable
{
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly ConsulClient _consulClient = new();
private readonly MetricsFactory _metricsFactory = MetricsFactory.CreateHttpClientMetricsFactory(nameof(ConsulAdapter), "consul");

/// <summary>
/// Возвращает список живых инстансов сервиса
/// </summary>
/// <returns></returns>
public async Task<OperationResult<List<ServiceEntry>>> GetPassingServiceInstancesAsync(
string serviceName,
string environment,
bool passingOnly = true)
{
try
{
using (_metricsFactory.CreateMetricsTimer("/health/service/:service"))
{
var fromConsul = await _consulClient.Health.Service(serviceName, environment, passingOnly);
if (fromConsul.StatusCode == HttpStatusCode.OK)
{
return new(fromConsul.Response?.ToList());
}

_logger.Error(
$"По запросу в консул {serviceName}:{environment}, вернулся ответ со статусом: {fromConsul.StatusCode}");
}
}
catch (Exception e)
{
_logger.Error(e);
}

return new(ActionStatus.InternalServerError);
}

public void Dispose()
{
_consulClient?.Dispose();
}
}
7 changes: 6 additions & 1 deletion ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ATI.Services.Consul
/// Обертка, включающая в себя ConsulServiceAddress, TracingHttpClientWrapper и MetricsTracingFactory
/// </summary>
[PublicAPI]
public class ConsulMetricsHttpClientWrapper
public class ConsulMetricsHttpClientWrapper : IDisposable
{
private readonly BaseServiceOptions _serviceOptions;
private readonly MetricsHttpClientWrapper _clientWrapper;
Expand Down Expand Up @@ -297,5 +297,10 @@ private async Task<OperationResult<T>> SendAsync<T, TBody>(string url,
}
}
}

public void Dispose()
{
_serviceAddress?.Dispose();
}
}
}
62 changes: 37 additions & 25 deletions ATI.Services.Consul/ConsulServiceAddress.cs
Original file line number Diff line number Diff line change
@@ -1,45 +1,56 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Extensions;
using Consul;
using NLog;


namespace ATI.Services.Consul
{
public class ConsulServiceAddress: IDisposable
public class ConsulServiceAddress : IDisposable
{
private readonly string _environment;
private readonly string _serviceName;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly Timer _updateCacheTimer;
private ConsulServiceAddressCache CachedServices { get; }
private readonly Func<Task<List<ServiceEntry>>> _getServices;
private readonly ConsulServiceAddressCache _serviceAddressCache;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddress(string serviceName, string environment, TimeSpan? timeToReload = null, bool useCaching = true)
public ConsulServiceAddress(string serviceName,
string environment,
TimeSpan? timeToReload = null,
bool useCaching = true,
bool passingOnly = true)
{
timeToReload ??= TimeSpan.FromSeconds(5);
_environment = environment;
_serviceName = serviceName;

CachedServices = new ConsulServiceAddressCache(useCaching, _serviceName, _environment);

_updateCacheTimer = new Timer(_ => CachedServices.ReloadCache(), null, timeToReload.Value,
timeToReload.Value);
if (useCaching)
{
_serviceAddressCache = new ConsulServiceAddressCache(_serviceName, _environment, timeToReload.Value, passingOnly);
_getServices = () => Task.FromResult(_serviceAddressCache.GetCachedObjectsAsync());
}
else
{
_consulAdapter = new ConsulAdapter();
_getServices = async () =>
await _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment, passingOnly) is var result && result.Success
? result.Value
: new List<ServiceEntry>();
}
}

public async Task<List<ServiceEntry>> GetAllAsync()
{
return await CachedServices.GetCachedObjectsAsync();
}
public async Task<List<ServiceEntry>> GetAllAsync() => await _getServices();

public async Task<string> ToHttpAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await GetAllAsync()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -52,10 +63,10 @@ public async Task<string> ToHttpAsync()

public async Task<(string, int)> GetAddressAndPortAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await GetAllAsync()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -67,30 +78,31 @@ public async Task<string> ToHttpAsync()
}

#region Obsolete

[Obsolete("Method GetAll is deprecated, pls use GetAllAsync instead")]
public List<ServiceEntry> GetAll()
{
return GetAllAsync().GetAwaiter().GetResult();
}

[Obsolete("Method ToHttp is deprecated, pls use ToHttpAsync instead")]
public string ToHttp()
{
return ToHttpAsync().GetAwaiter().GetResult();
}

[Obsolete("Method GetAddressAndPort is deprecated, pls use GetAddressAndPortAsync instead")]
public (string, int) GetAddressAndPort()
{
return GetAddressAndPortAsync().GetAwaiter().GetResult();
}

#endregion

public void Dispose()
{
_updateCacheTimer.Dispose();
_serviceAddressCache?.Dispose();
_consulAdapter?.Dispose();
}
}
}
}
118 changes: 52 additions & 66 deletions ATI.Services.Consul/ConsulServiceAddressCache.cs
Original file line number Diff line number Diff line change
@@ -1,79 +1,65 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using ATI.Services.Common.Behaviors;
using ATI.Services.Common.Extensions;
using Consul;
using NLog;

namespace ATI.Services.Consul
namespace ATI.Services.Consul;

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// </summary>
internal class ConsulServiceAddressCache: IDisposable
{
private readonly string _serviceName;
private readonly string _environment;
private readonly bool _passingOnly;
private List<ServiceEntry> _cachedServices;
private readonly Timer _updateCacheTimer;
private Task<OperationResult<List<ServiceEntry>>> _updateCacheTask;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddressCache(string serviceName,
string environment,
TimeSpan ttl,
bool passingOnly = true)
{
_serviceName = serviceName;
_environment = environment;
_passingOnly = passingOnly;
_consulAdapter = new ConsulAdapter();
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment, passingOnly);
_cachedServices = _updateCacheTask.GetAwaiter().GetResult() is var result && result.Success
? result.Value
: new List<ServiceEntry>();

_updateCacheTimer = new Timer(_ => ReloadCache().Forget(), null, ttl, ttl);
}

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// Возвращает коллекцию сервисов
/// </summary>
public class ConsulServiceAddressCache
/// <returns></returns>
public List<ServiceEntry> GetCachedObjectsAsync() => _cachedServices;

/// <summary>
/// Запускает таску на обновление кеша
/// </summary>
private async Task ReloadCache()
{
private Task<List<ServiceEntry>> _reloadCacheTask;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly bool _useCaching;
private readonly string _serviceName;
private readonly string _environment;

public ConsulServiceAddressCache(bool useCaching, string serviceName, string environment)
{
_useCaching = useCaching;
_serviceName = serviceName;
_environment = environment;

if (!_useCaching)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}

/// <summary>
/// Возвращает коллекцию сервисов
/// </summary>
/// <returns></returns>
public Task<List<ServiceEntry>> GetCachedObjectsAsync()
{
return _useCaching ? _reloadCacheTask : GetServiceFromConsulAsync();
}
if(_updateCacheTask == null || _updateCacheTask.IsCompleted)
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment, _passingOnly);

/// <summary>
/// Запускает таску на обновление кеша, если кеширование включено
/// </summary>
public void ReloadCache()
{
if (!_useCaching)
return;

if (!_reloadCacheTask.IsCompleted)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}
_cachedServices = await _updateCacheTask is var result && result.Success
? result.Value
: _cachedServices;
}

/// <summary>
/// Возвращает список живых сервисов
/// </summary>
/// <returns></returns>
private async Task<List<ServiceEntry>> GetServiceFromConsulAsync()
{
try
{
using var cc = new ConsulClient();
var fromConsul = await cc.Health.Service(_serviceName, _environment, true);
if (fromConsul.StatusCode == HttpStatusCode.OK && fromConsul.Response.Length > 0)
{
return fromConsul.Response.ToList();
}
}
catch (Exception e)
{
_logger.Error(e);
}
return new List<ServiceEntry>();
}
public void Dispose()
{
_updateCacheTimer.Dispose();
Ba-Ski marked this conversation as resolved.
Show resolved Hide resolved
_consulAdapter.Dispose();
}
}
Loading