Skip to content

Commit

Permalink
Background cache update for ConsulServiceAddressCache.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
muphblu committed Nov 13, 2023
1 parent 294c363 commit 7492da4
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 92 deletions.
45 changes: 45 additions & 0 deletions ATI.Services.Consul/ConsulAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Consul;
using NLog;

namespace ATI.Services.Consul;

internal class ConsulAdapter: IDisposable
{
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private ConsulClient _consulClient;

/// <summary>
/// Возвращает список живых сервисов
/// </summary>
/// <returns></returns>
public async Task<List<ServiceEntry>> GetPassingServiceInstancesAsync(string serviceName, string environment, bool passingOnly = true, ulong index = 0, TimeSpan? waitTime = null)
{
try
{
_consulClient = new ConsulClient();
var fromConsul = await _consulClient.Health.Service(serviceName, environment, passingOnly, new QueryOptions { WaitIndex = index, WaitTime = waitTime});
if (fromConsul.StatusCode == HttpStatusCode.OK)
{
return fromConsul.Response?.ToList();
}

_logger.Error($"По запросу в консул {serviceName}:{environment}, вернулся ответ со статусом: {fromConsul.StatusCode}");
}
catch (Exception e)
{
_logger.Error(e);
}

return new List<ServiceEntry>();
}

public void Dispose()
{
_consulClient?.Dispose();
}
}
7 changes: 6 additions & 1 deletion ATI.Services.Consul/ConsulMetricsHttpClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ATI.Services.Consul
/// Обертка, включающая в себя ConsulServiceAddress, TracingHttpClientWrapper и MetricsTracingFactory
/// </summary>
[PublicAPI]
public class ConsulMetricsHttpClientWrapper
public class ConsulMetricsHttpClientWrapper : IDisposable
{
private readonly BaseServiceOptions _serviceOptions;
private readonly MetricsHttpClientWrapper _clientWrapper;
Expand Down Expand Up @@ -297,5 +297,10 @@ private async Task<OperationResult<T>> SendAsync<T, TBody>(string url,
}
}
}

public void Dispose()
{
_serviceAddress?.Dispose();
}
}
}
58 changes: 33 additions & 25 deletions ATI.Services.Consul/ConsulServiceAddress.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ATI.Services.Common.Extensions;
using Consul;
Expand All @@ -9,37 +8,45 @@

namespace ATI.Services.Consul
{
public class ConsulServiceAddress: IDisposable
public class ConsulServiceAddress : IDisposable
{
private readonly string _environment;
private readonly string _serviceName;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly Timer _updateCacheTimer;
private ConsulServiceAddressCache CachedServices { get; }
private readonly Func<Task<List<ServiceEntry>>> _getServices;
private readonly ConsulServiceAddressCache _serviceAddressCache;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddress(string serviceName, string environment, TimeSpan? timeToReload = null, bool useCaching = true)
public ConsulServiceAddress(string serviceName,
string environment,
TimeSpan? timeToReload = null,
bool useCaching = true)
{
timeToReload ??= TimeSpan.FromSeconds(5);
_environment = environment;
_serviceName = serviceName;

CachedServices = new ConsulServiceAddressCache(useCaching, _serviceName, _environment);

_updateCacheTimer = new Timer(_ => CachedServices.ReloadCache(), null, timeToReload.Value,
timeToReload.Value);
if (useCaching)
{
_serviceAddressCache = new ConsulServiceAddressCache(_serviceName, _environment, timeToReload.Value);
_getServices = () => Task.FromResult(_serviceAddressCache.GetCachedObjectsAsync());
}
else
{
_consulAdapter = new ConsulAdapter();
_getServices = async () =>
await _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment);
}
}

public async Task<List<ServiceEntry>> GetAllAsync()
{
return await CachedServices.GetCachedObjectsAsync();
}
public Task<List<ServiceEntry>> GetAllAsync() => _getServices();

public async Task<string> ToHttpAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await _getServices()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -52,10 +59,10 @@ public async Task<string> ToHttpAsync()

public async Task<(string, int)> GetAddressAndPortAsync()
{
var serviceInfo = (await CachedServices.GetCachedObjectsAsync()).RandomItem();
var serviceInfo = (await _getServices()).RandomItem();
var address = string.IsNullOrWhiteSpace(serviceInfo?.Service?.Address)
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;
? serviceInfo?.Node.Address
: serviceInfo.Service.Address;

if (string.IsNullOrWhiteSpace(address) || serviceInfo.Service == null)
{
Expand All @@ -67,30 +74,31 @@ public async Task<string> ToHttpAsync()
}

#region Obsolete

[Obsolete("Method GetAll is deprecated, pls use GetAllAsync instead")]
public List<ServiceEntry> GetAll()
{
return GetAllAsync().GetAwaiter().GetResult();
}

[Obsolete("Method ToHttp is deprecated, pls use ToHttpAsync instead")]
public string ToHttp()
{
return ToHttpAsync().GetAwaiter().GetResult();
}

[Obsolete("Method GetAddressAndPort is deprecated, pls use GetAddressAndPortAsync instead")]
public (string, int) GetAddressAndPort()
{
return GetAddressAndPortAsync().GetAwaiter().GetResult();
}

#endregion

public void Dispose()
{
_updateCacheTimer.Dispose();
_serviceAddressCache?.Dispose();
_consulAdapter?.Dispose();
}
}
}
}
104 changes: 38 additions & 66 deletions ATI.Services.Consul/ConsulServiceAddressCache.cs
Original file line number Diff line number Diff line change
@@ -1,79 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Consul;
using NLog;

namespace ATI.Services.Consul
namespace ATI.Services.Consul;

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// </summary>
internal class ConsulServiceAddressCache: IDisposable
{
private readonly string _serviceName;
private readonly string _environment;
private List<ServiceEntry> _cachedServices;
private readonly Timer _updateCacheTimer;
private Task<List<ServiceEntry>> _updateCacheTask;
private readonly ConsulAdapter _consulAdapter;

public ConsulServiceAddressCache(string serviceName, string environment, TimeSpan ttl)
{
_serviceName = serviceName;
_environment = environment;
_consulAdapter = new ConsulAdapter();
_updateCacheTimer = new Timer(_ => ReloadCache(), null, ttl, ttl);
_cachedServices = _consulAdapter.GetPassingServiceInstancesAsync(serviceName, environment).GetAwaiter().GetResult();
}

/// <summary>
/// Обеспечивает получение доступных сервисов от консула и их кеширование (опционально)
/// Возвращает коллекцию сервисов
/// </summary>
public class ConsulServiceAddressCache
{
private Task<List<ServiceEntry>> _reloadCacheTask;
private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private readonly bool _useCaching;
private readonly string _serviceName;
private readonly string _environment;
/// <returns></returns>
public List<ServiceEntry> GetCachedObjectsAsync() => _cachedServices;

public ConsulServiceAddressCache(bool useCaching, string serviceName, string environment)
{
_useCaching = useCaching;
_serviceName = serviceName;
_environment = environment;

if (!_useCaching)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}
/// <summary>
/// Запускает таску на обновление кеша
/// </summary>
private void ReloadCache()
{
if(_updateCacheTask.IsCompleted)
_updateCacheTask = _consulAdapter.GetPassingServiceInstancesAsync(_serviceName, _environment);

/// <summary>
/// Возвращает коллекцию сервисов
/// </summary>
/// <returns></returns>
public Task<List<ServiceEntry>> GetCachedObjectsAsync()
{
return _useCaching ? _reloadCacheTask : GetServiceFromConsulAsync();
}

/// <summary>
/// Запускает таску на обновление кеша, если кеширование включено
/// </summary>
public void ReloadCache()
{
if (!_useCaching)
return;

if (!_reloadCacheTask.IsCompleted)
return;

_reloadCacheTask = GetServiceFromConsulAsync();
}
_cachedServices = _updateCacheTask.GetAwaiter().GetResult();
}

/// <summary>
/// Возвращает список живых сервисов
/// </summary>
/// <returns></returns>
private async Task<List<ServiceEntry>> GetServiceFromConsulAsync()
{
try
{
using var cc = new ConsulClient();
var fromConsul = await cc.Health.Service(_serviceName, _environment, true);
if (fromConsul.StatusCode == HttpStatusCode.OK && fromConsul.Response.Length > 0)
{
return fromConsul.Response.ToList();
}
}
catch (Exception e)
{
_logger.Error(e);
}
return new List<ServiceEntry>();
}
public void Dispose()
{
_updateCacheTimer.Dispose();
}
}

0 comments on commit 7492da4

Please sign in to comment.