Skip to content

Commit

Permalink
refactor: cleanup and reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
Guilherme Ferreira committed Dec 12, 2023
1 parent ab262f8 commit 415231d
Show file tree
Hide file tree
Showing 269 changed files with 7,496 additions and 7,000 deletions.
11 changes: 5 additions & 6 deletions samples/KafkaFlow.Retry.API.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ public static void Main(string[] args)
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
public static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); });
}
}
57 changes: 27 additions & 30 deletions samples/KafkaFlow.Retry.API.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,46 @@ public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
Configuration = configuration;
}

public IConfiguration Configuration { get; }

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}

app.UseHttpsRedirection();
app.UseHttpsRedirection();

app.UseRouting();
app.UseRouting();

app.UseAuthorization();
app.UseAuthorization();

app.UseKafkaFlowRetryEndpoints();
app.UseKafkaFlowRetryEndpoints();

app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
app.UseEndpoints(endpoints => { endpoints.MapControllers(); });
}

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton(sp =>
new MongoDbDataProviderFactory()
.TryCreate(
new MongoDbSettings
{
ConnectionString = "mongodb://localhost:27017/SVC_KAFKA_FLOW_RETRY_DURABLE",
DatabaseName = "SVC_KAFKA_FLOW_RETRY_DURABLE",
RetryQueueCollectionName = "RetryQueues",
RetryQueueItemCollectionName = "RetryQueueItems"
}
).Result
);

services.AddControllers();
}
services.AddSingleton(sp =>
new MongoDbDataProviderFactory()
.TryCreate(
new MongoDbSettings
{
ConnectionString = "mongodb://localhost:27017/SVC_KAFKA_FLOW_RETRY_DURABLE",
DatabaseName = "SVC_KAFKA_FLOW_RETRY_DURABLE",
RetryQueueCollectionName = "RetryQueues",
RetryQueueItemCollectionName = "RetryQueueItems"
}
).Result
);

services.AddControllers();
}
}
20 changes: 12 additions & 8 deletions samples/KafkaFlow.Retry.Common.Sample/Helpers/KafkaHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;

namespace KafkaFlow.Retry.Common.Sample.Helpers;

public static class KafkaHelper
{
public static async Task CreateKafkaTopics(string kafkaBrokers, string[] topics)
{
using (var adminClient = new Confluent.Kafka.AdminClientBuilder(new Confluent.Kafka.AdminClientConfig { BootstrapServers = kafkaBrokers }).Build())
using (var adminClient =
new AdminClientBuilder(new AdminClientConfig { BootstrapServers = kafkaBrokers }).Build())
{
foreach (var topic in topics)
{
Expand All @@ -19,13 +22,14 @@ public static async Task CreateKafkaTopics(string kafkaBrokers, string[] topics)
try
{
var deleteTopicRecords = new List<Confluent.Kafka.TopicPartitionOffset>();
for (int i = 0; i < topicMetadata.Topics.First().Partitions.Count; i++)
for (var i = 0; i < topicMetadata.Topics.First().Partitions.Count; i++)
{
deleteTopicRecords.Add(new Confluent.Kafka.TopicPartitionOffset(topic, i, Confluent.Kafka.Offset.End));
deleteTopicRecords.Add(new Confluent.Kafka.TopicPartitionOffset(topic, i, Offset.End));
}

await adminClient.DeleteRecordsAsync(deleteTopicRecords).ConfigureAwait(false);
}
catch (Confluent.Kafka.Admin.DeleteRecordsException e)
catch (DeleteRecordsException e)
{
Console.WriteLine($"An error occured deleting topic records: {e.Results[0].Error.Reason}");
}
Expand All @@ -36,19 +40,19 @@ public static async Task CreateKafkaTopics(string kafkaBrokers, string[] topics)
{
await adminClient
.CreatePartitionsAsync(
new List<Confluent.Kafka.Admin.PartitionsSpecification>
new List<PartitionsSpecification>
{
new Confluent.Kafka.Admin.PartitionsSpecification
new()
{
Topic = topic,
IncreaseTo = 6
}
})
.ConfigureAwait(false);
}
catch (Confluent.Kafka.Admin.CreateTopicsException e)
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != Confluent.Kafka.ErrorCode.UnknownTopicOrPart)
if (e.Results[0].Error.Code != ErrorCode.UnknownTopicOrPart)
{
Console.WriteLine($"An error occured creating a topic: {e.Results[0].Error.Reason}");
}
Expand Down
19 changes: 10 additions & 9 deletions samples/KafkaFlow.Retry.Common.Sample/Helpers/SqlServerHelper.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;

namespace KafkaFlow.Retry.Common.Sample.Helpers;

public static class SqlServerHelper
{
public static async Task RecreateSqlSchema(string databaseName, string connectionString)
{
using (SqlConnection openCon = new SqlConnection(connectionString))
using (var openCon = new SqlConnection(connectionString))
{
openCon.Open();

foreach (var script in GetScriptsForSchemaCreation())
{
string[] batches = script.Split(new[] { "GO\r\n", "GO\t", "GO\n" }, System.StringSplitOptions.RemoveEmptyEntries);
var batches = script.Split(new[] { "GO\r\n", "GO\t", "GO\n" }, StringSplitOptions.RemoveEmptyEntries);

foreach (var batch in batches)
{
string replacedBatch = batch.Replace("@dbname", databaseName);
var replacedBatch = batch.Replace("@dbname", databaseName);

using (SqlCommand queryCommand = new SqlCommand(replacedBatch))
using (var queryCommand = new SqlCommand(replacedBatch))
{
queryCommand.Connection = openCon;

Expand All @@ -36,15 +37,15 @@ public static async Task RecreateSqlSchema(string databaseName, string connectio

private static IEnumerable<string> GetScriptsForSchemaCreation()
{
Assembly sqlServerAssembly = Assembly.LoadFrom("KafkaFlow.Retry.SqlServer.dll");
var sqlServerAssembly = Assembly.LoadFrom("KafkaFlow.Retry.SqlServer.dll");
return sqlServerAssembly
.GetManifestResourceNames()
.OrderBy(x => x)
.Select(script =>
{
using (Stream s = sqlServerAssembly.GetManifestResourceStream(script))
using (var s = sqlServerAssembly.GetManifestResourceStream(script))
{
using (StreamReader sr = new StreamReader(s))
using (var sr = new StreamReader(s))
{
return sr.ReadToEnd();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ namespace KafkaFlow.Retry.Sample.Exceptions;
public class RetryDurableTestException : Exception
{
public RetryDurableTestException(string message) : base(message)
{ }
{
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Retry.Sample.Exceptions;
using KafkaFlow.Retry.Sample.Messages;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Retry.Sample.Exceptions;
using KafkaFlow.Retry.Sample.Messages;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Retry.Sample.Exceptions;
using KafkaFlow.Retry.Sample.Messages;

Expand Down
Loading

0 comments on commit 415231d

Please sign in to comment.