Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul cache background update #21

Merged
merged 8 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions ATI.Services.Consul/ConsulAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using ATI.Services.Common.Metrics;
using Consul;
using NLog;

namespace ATI.Services.Consul;

internal class ConsulAdapter: IDisposable
{
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly ConsulClient _consulClient = new();
private readonly MetricsFactory _metricsFactory = MetricsFactory.CreateHttpClientMetricsFactory(nameof(ConsulAdapter), "consul");

/// <summary>
/// Возвращает список живых сервисов
/// </summary>
/// <returns></returns>
public async Task<List<ServiceEntry>> GetPassingServiceInstancesAsync(string serviceName, string environment, bool passingOnly = true)
{

CptnSnail marked this conversation as resolved.
Show resolved Hide resolved

try
{
using (_metricsFactory.CreateMetricsTimer(nameof(GetPassingServiceInstancesAsync)))
CptnSnail marked this conversation as resolved.
Show resolved Hide resolved
{
var fromConsul = await _consulClient.Health.Service(serviceName, environment, passingOnly);
if (fromConsul.StatusCode == HttpStatusCode.OK)
{
return fromConsul.Response?.ToList();
}

_logger.Error($"По запросу в консул {serviceName}:{environment}, вернулся ответ со статусом: {fromConsul.StatusCode}");
}
}
catch (Exception e)
{
_logger.Error(e);
}

return new List<ServiceEntry>();
}

public void Dispose()
{
_consulClient?.Dispose();
}
}
7 changes: 6 additions & 1 deletion ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ATI.Services.Consul
/// Обертка, включающая в себя ConsulServiceAddress, TracingHttpClientWrapper и MetricsTracingFactory
/// </summary>
[PublicAPI]
public class ConsulMetricsHttpClientWrapper
public class ConsulMetricsHttpClientWrapper : IDisposable
{
private readonly BaseServiceOptions _serviceOptions;
private readonly MetricsHttpClientWrapper _clientWrapper;
Expand Down Expand Up @@ -297,5 +297,10 @@ private async Task<OperationResult<T>> SendAsync<T, TBody>(string url,
}
}
}

public void Dispose()
{
_serviceAddress?.Dispose();
}
}
}
59 changes: 34 additions & 25 deletions ATI.Services.Consul/ConsulServiceAddress.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ATI.Services.Common.Extensions;
using Consul;
Expand All @@ -9,37 +8,46 @@

namespace ATI.Services.Consul
{
public class ConsulServiceAddress: IDisposable
public class ConsulServiceAddress : IDisposable
{
private readonly string _environment;
private readonly string _serviceName;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly Timer _updateCacheTimer;
private ConsulServiceAddressCache CachedServices { get; }
private readonly Func<Task<List<ServiceEntry>>> _getServices;
private readonly ConsulServiceAddressCache _serviceAddressCache;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddress(string serviceName, string environment, TimeSpan? timeToReload = null, bool useCaching = true)
public ConsulServiceAddress(string serviceName,
string environment,
TimeSpan? timeToReload = null,
bool useCaching = true,
bool passingOnly = true)
{
timeToReload ??= TimeSpan.FromSeconds(5);
_environment = environment;
_serviceName = serviceName;

CachedServices = new ConsulServiceAddressCache(useCaching, _serviceName, _environment);

_updateCacheTimer = new Timer(_ => CachedServices.ReloadCache(), null, timeToReload.Value,
timeToReload.Value);
if (useCaching)
{
_serviceAddressCache = new ConsulServiceAddressCache(_serviceName, _environment, timeToReload.Value, passingOnly);
_getServices = () => Task.FromResult(_serviceAddressCache.GetCachedObjectsAsync());
}
else
{
_consulAdapter = new ConsulAdapter();
_getServices = async () =>
await _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment, passingOnly);
}
}

public async Task<List<ServiceEntry>> GetAllAsync()
{
return await CachedServices.GetCachedObjectsAsync();
}
public Task<List<ServiceEntry>> GetAllAsync() => _getServices();

public async Task<string> ToHttpAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await _getServices()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -52,10 +60,10 @@ public async Task<string> ToHttpAsync()

public async Task<(string, int)> GetAddressAndPortAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await _getServices()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -67,30 +75,31 @@ public async Task<string> ToHttpAsync()
}

#region Obsolete

[Obsolete("Method GetAll is deprecated, pls use GetAllAsync instead")]
public List<ServiceEntry> GetAll()
{
return GetAllAsync().GetAwaiter().GetResult();
}

[Obsolete("Method ToHttp is deprecated, pls use ToHttpAsync instead")]
public string ToHttp()
{
return ToHttpAsync().GetAwaiter().GetResult();
}

[Obsolete("Method GetAddressAndPort is deprecated, pls use GetAddressAndPortAsync instead")]
public (string, int) GetAddressAndPort()
{
return GetAddressAndPortAsync().GetAwaiter().GetResult();
}

#endregion

public void Dispose()
{
_updateCacheTimer.Dispose();
_serviceAddressCache?.Dispose();
_consulAdapter?.Dispose();
}
}
}
}
110 changes: 44 additions & 66 deletions ATI.Services.Consul/ConsulServiceAddressCache.cs
Original file line number Diff line number Diff line change
@@ -1,79 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Consul;
using NLog;

namespace ATI.Services.Consul
namespace ATI.Services.Consul;

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// </summary>
internal class ConsulServiceAddressCache: IDisposable
{
private readonly string _serviceName;
private readonly string _environment;
private readonly bool _passingOnly;
private List<ServiceEntry> _cachedServices;
private readonly Timer _updateCacheTimer;
private Task<List<ServiceEntry>> _updateCacheTask;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddressCache(string serviceName,
string environment,
TimeSpan ttl,
bool passingOnly = true)
{
_serviceName = serviceName;
_environment = environment;
_passingOnly = passingOnly;
_consulAdapter = new ConsulAdapter();
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment, passingOnly);
_cachedServices = _updateCacheTask.GetAwaiter().GetResult();
_updateCacheTimer = new Timer(_ => ReloadCache(), null, ttl, ttl);
Ba-Ski marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// Возвращает коллекцию сервисов
/// </summary>
public class ConsulServiceAddressCache
{
private Task<List<ServiceEntry>> _reloadCacheTask;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly bool _useCaching;
private readonly string _serviceName;
private readonly string _environment;
/// <returns></returns>
public List<ServiceEntry> GetCachedObjectsAsync() => _cachedServices;

public ConsulServiceAddressCache(bool useCaching, string serviceName, string environment)
{
_useCaching = useCaching;
_serviceName = serviceName;
_environment = environment;

if (!_useCaching)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}
/// <summary>
/// Запускает таску на обновление кеша
/// </summary>
private void ReloadCache()
muphblu marked this conversation as resolved.
Show resolved Hide resolved
{
if(_updateCacheTask == null || _updateCacheTask.IsCompleted)
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment, _passingOnly);

/// <summary>
/// Возвращает коллекцию сервисов
/// </summary>
/// <returns></returns>
public Task<List<ServiceEntry>> GetCachedObjectsAsync()
{
return _useCaching ? _reloadCacheTask : GetServiceFromConsulAsync();
}

/// <summary>
/// Запускает таску на обновление кеша, если кеширование включено
/// </summary>
public void ReloadCache()
{
if (!_useCaching)
return;

if (!_reloadCacheTask.IsCompleted)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}
_cachedServices = _updateCacheTask.GetAwaiter().GetResult();
}

/// <summary>
/// Возвращает список живых сервисов
/// </summary>
/// <returns></returns>
private async Task<List<ServiceEntry>> GetServiceFromConsulAsync()
{
try
{
using var cc = new ConsulClient();
var fromConsul = await cc.Health.Service(_serviceName, _environment, true);
if (fromConsul.StatusCode == HttpStatusCode.OK && fromConsul.Response.Length > 0)
{
return fromConsul.Response.ToList();
}
}
catch (Exception e)
{
_logger.Error(e);
}
return new List<ServiceEntry>();
}
public void Dispose()
{
_updateCacheTimer.Dispose();
Ba-Ski marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading