Skip to content
This repository has been archived by the owner on Mar 26, 2019. It is now read-only.

Used Polly as execution engine for HTTP calls #28

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<ItemGroup>
<PackageReference Include="App.Metrics.Abstractions" Version="$(AppMetricsCoreVersion)" />
<PackageReference Include="Polly-Signed" Version="5.8.0" />
</ItemGroup>

</Project>
153 changes: 68 additions & 85 deletions src/App.Metrics.Reporting.InfluxDB/Client/DefaultLineProtocolClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,25 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics.Logging;
using Polly;
using Polly.CircuitBreaker;

namespace App.Metrics.Reporting.InfluxDB.Client
{
public class DefaultLineProtocolClient : ILineProtocolClient
{
private static readonly ILog Logger = LogProvider.For<DefaultLineProtocolClient>();

private static long _backOffTicks;
private static long _failureAttempts;
private static long _failuresBeforeBackoff;
private static TimeSpan _backOffPeriod;

private readonly HttpClient _httpClient;
private readonly InfluxDbOptions _influxDbOptions;
private readonly Policy<LineProtocolWriteResult> _executionPolicy;

public DefaultLineProtocolClient(
InfluxDbOptions influxDbOptions,
Expand All @@ -31,112 +30,96 @@ public DefaultLineProtocolClient(
{
_influxDbOptions = influxDbOptions ?? throw new ArgumentNullException(nameof(influxDbOptions));
_httpClient = httpClient;
_backOffPeriod = httpPolicy?.BackoffPeriod ?? throw new ArgumentNullException(nameof(httpPolicy));
_failuresBeforeBackoff = httpPolicy.FailuresBeforeBackoff;
_failureAttempts = 0;
var backOffPeriod = httpPolicy?.BackoffPeriod ?? throw new ArgumentNullException(nameof(httpPolicy));
var failuresBeforeBackoff = httpPolicy.FailuresBeforeBackoff;

var circutBreaker = Policy<LineProtocolWriteResult>
.Handle<Exception>()
.OrResult(result => !result.Success)
.CircuitBreakerAsync(failuresBeforeBackoff, backOffPeriod);
_executionPolicy = Policy<LineProtocolWriteResult>
.Handle<BrokenCircuitException>()
.FallbackAsync(LineProtocolWriteResult.Error("Too many failures in writing to InfluxDB, Circuit Opened"))
.WrapAsync(circutBreaker);
}

public async Task<LineProtocolWriteResult> WriteAsync(
public Task<LineProtocolWriteResult> WriteAsync(
string payload,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(payload))
{
return new LineProtocolWriteResult(true);
}

if (NeedToBackoff())
return _executionPolicy.ExecuteAsync(
async (context, cancelation) =>
{
return new LineProtocolWriteResult(false, "Too many failures in writing to InfluxDB, Circuit Opened");
}

try
{
var content = new StringContent(payload, Encoding.UTF8);

var response = await _httpClient.PostAsync(_influxDbOptions.Endpoint, content, cancellationToken);

if (response.StatusCode == HttpStatusCode.NotFound && _influxDbOptions.CreateDataBaseIfNotExists)
if (string.IsNullOrWhiteSpace((string)context["payload"]))
{
await TryCreateDatabase(cancellationToken);

response = await _httpClient.PostAsync(_influxDbOptions.Endpoint, content, cancellationToken);
return LineProtocolWriteResult.Ok();
}

if (!response.IsSuccessStatusCode)
try
{
Interlocked.Increment(ref _failureAttempts);
var content = new StringContent((string)context["payload"], Encoding.UTF8);

var errorMessage = $"Failed to write to InfluxDB - StatusCode: {response.StatusCode} Reason: {response.ReasonPhrase}";
Logger.Error(errorMessage);
var response = await _httpClient.PostAsync(_influxDbOptions.Endpoint, content, cancelation);

return new LineProtocolWriteResult(false, errorMessage);
}
if (response.StatusCode == HttpStatusCode.NotFound && _influxDbOptions.CreateDataBaseIfNotExists)
{
await TryCreateDatabase(cancelation);

Logger.Trace("Successful write to InfluxDB");
response = await _httpClient.PostAsync(_influxDbOptions.Endpoint, content, cancelation);
}

return new LineProtocolWriteResult(true);
}
catch (Exception ex)
{
Interlocked.Increment(ref _failureAttempts);
Logger.Error(ex, "Failed to write to InfluxDB");
return new LineProtocolWriteResult(false, ex.ToString());
}
}
if (!response.IsSuccessStatusCode)
{
var errorMessage = $"Failed to write to InfluxDB - StatusCode: {response.StatusCode} Reason: {response.ReasonPhrase}";
Logger.Error(errorMessage);

private async Task<LineProtocolWriteResult> TryCreateDatabase(CancellationToken cancellationToken = default)
{
try
{
Logger.Trace($"Attempting to create InfluxDB Database '{_influxDbOptions.Database}'");

var content = new StringContent(string.Empty, Encoding.UTF8);
return LineProtocolWriteResult.Error(errorMessage);
}

var response = await _httpClient.PostAsync($"query?q=CREATE DATABASE \"{Uri.EscapeDataString(_influxDbOptions.Database)}\"", content, cancellationToken);
Logger.Trace("Successful write to InfluxDB");

if (!response.IsSuccessStatusCode)
return LineProtocolWriteResult.Ok();
}
catch (Exception ex)
{
var errorMessage = $"Failed to create InfluxDB Database '{_influxDbOptions.Database}' - StatusCode: {response.StatusCode} Reason: {response.ReasonPhrase}";
Logger.Error(errorMessage);

return new LineProtocolWriteResult(false, errorMessage);
Logger.Error(ex, "Failed to write to InfluxDB");
throw;
}

Logger.Trace($"Successfully created InfluxDB Database '{_influxDbOptions.Database}'");

return new LineProtocolWriteResult(true);
}
catch (Exception ex)
{
Logger.Error(ex, $"Failed to create InfluxDB Database'{_influxDbOptions.Database}'");
return new LineProtocolWriteResult(false, ex.ToString());
}
},
new Dictionary<string, object> { ["payload"] = payload },
cancellationToken);
}

private bool NeedToBackoff()
private Task<LineProtocolWriteResult> TryCreateDatabase(CancellationToken cancellationToken = default)
{
if (Interlocked.Read(ref _failureAttempts) < _failuresBeforeBackoff)
{
return false;
}
return _executionPolicy.ExecuteAsync(
async (cancelation) =>
{
try
{
Logger.Trace($"Attempting to create InfluxDB Database '{_influxDbOptions.Database}'");

Logger.Error($"InfluxDB write backoff for {_backOffPeriod.Seconds} secs");
var content = new StringContent(string.Empty, Encoding.UTF8);

if (Interlocked.Read(ref _backOffTicks) == 0)
{
Interlocked.Exchange(ref _backOffTicks, DateTime.UtcNow.Add(_backOffPeriod).Ticks);
}
var response = await _httpClient.PostAsync($"query?q=CREATE DATABASE \"{Uri.EscapeDataString(_influxDbOptions.Database)}\"", content, cancelation);

if (DateTime.UtcNow.Ticks <= Interlocked.Read(ref _backOffTicks))
{
return true;
}
if (!response.IsSuccessStatusCode)
{
var errorMessage = $"Failed to create InfluxDB Database '{_influxDbOptions.Database}' - StatusCode: {response.StatusCode} Reason: {response.ReasonPhrase}";
Logger.Error(errorMessage);

return LineProtocolWriteResult.Error(errorMessage);
}

Interlocked.Exchange(ref _failureAttempts, 0);
Interlocked.Exchange(ref _backOffTicks, 0);
Logger.Trace($"Successfully created InfluxDB Database '{_influxDbOptions.Database}'");

return false;
return LineProtocolWriteResult.Ok();
}
catch (Exception ex)
{
Logger.Error(ex, $"Failed to create InfluxDB Database'{_influxDbOptions.Database}'");
throw;
}
}, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,15 @@ public LineProtocolWriteResult(bool success, string errorMessage)
public string ErrorMessage { get; }

public bool Success { get; }

public static LineProtocolWriteResult Ok()
{
return new LineProtocolWriteResult(true);
}

public static LineProtocolWriteResult Error(string message)
{
return new LineProtocolWriteResult(false, message);
}
}
}