Skip to content
This repository has been archived by the owner on Jul 18, 2023. It is now read-only.

Add RetentionPolicy support #76

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ namespace InfluxDB.Collector.Configuration
{
public abstract class CollectorEmitConfiguration
{
public abstract CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null);
public abstract CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null);

public CollectorConfiguration InfluxDB(string serverBaseAddress, string database, string username = null, string password = null)
public CollectorConfiguration InfluxDB(string serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null)
{
return InfluxDB(new Uri(serverBaseAddress), database, username, password);
return InfluxDB(new Uri(serverBaseAddress), database, username, password, retentionPolicy);
}

public abstract CollectorConfiguration Emitter(Action<PointData[]> emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public PipelinedCollectorEmitConfiguration(
_configuration = configuration;
}

public override CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null)
public override CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null)
{
if (string.Compare(serverBaseAddress.Scheme, "udp", ignoreCase: true) == 0)
_client = new LineProtocolUdpClient(serverBaseAddress, database, username, password);
_client = new LineProtocolUdpClient(serverBaseAddress, database, username, password, retentionPolicy);
else
_client = new LineProtocolClient(serverBaseAddress, database, username, password);
_client = new LineProtocolClient(serverBaseAddress, database, username, password, retentionPolicy);
return _configuration;
}

Expand Down
11 changes: 7 additions & 4 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public class LineProtocolClient : LineProtocolClientBase
{
private readonly HttpClient _httpClient;

public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password)
public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null, string retentionPolicy = null)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password, retentionPolicy)
{
}

Expand All @@ -24,8 +24,9 @@ protected LineProtocolClient(
Uri serverBaseAddress,
string database,
string username,
string password)
:base(serverBaseAddress, database, username, password)
string password,
string retentionPolicy)
:base(serverBaseAddress, database, username, password, retentionPolicy)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand All @@ -42,6 +43,8 @@ protected override async Task<LineProtocolWriteResult> OnSendAsync(
CancellationToken cancellationToken = default(CancellationToken))
{
var endpoint = $"write?db={Uri.EscapeDataString(_database)}";
if (!string.IsNullOrWhiteSpace(_retentionPolicy))
endpoint += $"&rp={Uri.EscapeDataString(_retentionPolicy)}";
if (!string.IsNullOrEmpty(_username))
endpoint += $"&u={Uri.EscapeDataString(_username)}&p={Uri.EscapeDataString(_password)}";

Expand Down
5 changes: 3 additions & 2 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace InfluxDB.LineProtocol.Client
{
public abstract class LineProtocolClientBase : ILineProtocolClient
{
protected readonly string _database, _username, _password;
protected readonly string _database, _username, _password, _retentionPolicy;

protected LineProtocolClientBase(Uri serverBaseAddress, string database, string username, string password)
protected LineProtocolClientBase(Uri serverBaseAddress, string database, string username, string password, string retentionPolicy)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand All @@ -25,6 +25,7 @@ protected LineProtocolClientBase(Uri serverBaseAddress, string database, string
_database = database;
_username = username;
_password = password;
_retentionPolicy = retentionPolicy;
}

public Task<LineProtocolWriteResult> WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken))
Expand Down
5 changes: 3 additions & 2 deletions src/InfluxDB.LineProtocol/Client/LineProtocolUdpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public LineProtocolUdpClient(
Uri serverBaseAddress,
string database,
string username = null,
string password = null)
:base(serverBaseAddress, database, username, password)
string password = null,
string retentionPolicy = null)
:base(serverBaseAddress, database, username, password, retentionPolicy)
{
if (serverBaseAddress == null)
throw new ArgumentNullException(nameof(serverBaseAddress));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class MockLineProtocolClient : LineProtocolClient
{
}

private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database) : base(handler, serverBaseAddress, database, null, null)
private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database) : base(handler, serverBaseAddress, database, null, null, null)
{
Handler = handler;
BaseAddress = serverBaseAddress;
Expand Down