Skip to content

Commit

Permalink
Merge pull request #39 from conductor-sdk/compatibility-fixes
Browse files Browse the repository at this point in the history
Compatibility fixes
  • Loading branch information
gardusig authored Dec 15, 2022
2 parents 0739481 + ba125e1 commit 9fb1eab
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 54 deletions.
40 changes: 21 additions & 19 deletions Conductor/Client/Extensions/DependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,36 @@ namespace Conductor.Client.Extensions
{
public static class DependencyInjectionExtensions
{
public static IServiceCollection WithConductorWorker<T>(this IServiceCollection services) where T : IWorkflowTask
public static IServiceCollection AddConductorWorkflowTask<T>(this IServiceCollection services) where T : IWorkflowTask
{
services.AddTransient(typeof(IWorkflowTask), typeof(T));
services.AddTransient(typeof(T));
return services;
}

public static IServiceCollection WithOrkesApiClient(this IServiceCollection services, OrkesApiClient orkesApiClient)
public static IServiceCollection AddConductorWorker(this IServiceCollection services, Configuration configuration = null, Action<IServiceProvider, HttpClient> configureHttpClient = null)
{
services.AddHttpClient();
services.AddOptions();
if (configuration == null)
{
configuration = new Configuration();
}
services.AddSingleton(configuration);
OrkesApiClient orkesApiClient = new OrkesApiClient(configuration);
services.AddSingleton(orkesApiClient);
services.AddTransient<IConductorWorkerRestClient, ConductorWorkerRestClient>();
services.AddSingleton(new ConductorWorkerRestClient(orkesApiClient));
services.AddSingleton<IWorkflowTaskCoordinator, WorkflowTaskCoordinator>();
services.AddTransient<IWorkflowTaskExecutor, WorkflowTaskExecutor>();
services.AddSingleton(orkesApiClient);
return services;
}

public static IServiceCollection WithHostedService<T>(this IServiceCollection services) where T : BackgroundService
{
services.AddHostedService<T>();
return services;
return services.AddConductorClient(configureHttpClient);
}

public static IServiceCollection WithConfiguration(this IServiceCollection services, Configuration configuration)
public static IServiceCollection AddConductorClient(this IServiceCollection services, Func<IServiceProvider, string> serverUrl)
{
if (configuration == null)
services.AddHttpClient<IConductorWorkerRestClient, ConductorWorkerRestClient>((provider, client) =>
{
services.AddSingleton<Configuration>();
}
else
{
services.AddSingleton(configuration);
}
client.BaseAddress = new Uri(serverUrl(provider));
});
return services;
}

Expand All @@ -59,5 +54,12 @@ public static IServiceCollection AddConductorClient(this IServiceCollection serv
}
return services;
}

public static IServiceCollection WithHostedService<T>(this IServiceCollection services) where T : BackgroundService
{
services.AddHostedService<T>();
return services;
}

}
}
12 changes: 7 additions & 5 deletions Conductor/Client/OrkesApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ public class OrkesApiClient
{
private Configuration _configuration;
private MemoryCache _memoryCache;

private OrkesAuthenticationSettings _authenticationSettings;
private TokenResourceApi _tokenClient;
private OrkesAuthenticationSettings _authenticationSettings;

private OrkesApiClient()
{
_memoryCache = new MemoryCache(new MemoryCacheOptions());
_configuration = null;
_authenticationSettings = null;
_tokenClient = null;
_authenticationSettings = null;
}

public OrkesApiClient(Configuration configuration = null, OrkesAuthenticationSettings authenticationSettings = null) : this()
public OrkesApiClient(Configuration configuration = null) : this()
{
_authenticationSettings = authenticationSettings;
_configuration = configuration;
if (_configuration != null && !string.IsNullOrEmpty(_configuration.keyId) && !string.IsNullOrEmpty(_configuration.keySecret))
{
_authenticationSettings = new OrkesAuthenticationSettings(_configuration.keyId, _configuration.keySecret);
}
RefreshAuthenticationHeader();
}

Expand Down
2 changes: 1 addition & 1 deletion Conductor/Client/Worker/ConductorWorkerRestClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Conductor.Api;
using Conductor.Client.Interfaces;
using Conductor.Client.Models;
using System.Threading.Tasks;
using Conductor.Api;

namespace Conductor.Client
{
Expand Down
7 changes: 3 additions & 4 deletions Conductor/Client/Worker/WorkflowTaskCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ namespace Conductor.Client.Worker
internal class WorkflowTaskCoordinator : IWorkflowTaskCoordinator
{
private int _concurrentWorkers;
private IServiceProvider _serviceProvider;
private ILogger<WorkflowTaskCoordinator> _logger;
private IWorkflowTaskExecutor _workflowTaskExecutor;
private HashSet<Type> _workerDefinitions;
private TaskResourceApi _client;

public WorkflowTaskCoordinator(IServiceProvider serviceProvider, ILogger<WorkflowTaskCoordinator> logger, OrkesApiClient orkesApiClient, int? concurrentWorkers = null)
{
_serviceProvider = serviceProvider;
_logger = logger;
_workerDefinitions = new HashSet<Type>();
if (concurrentWorkers == null)
Expand All @@ -27,6 +26,7 @@ public WorkflowTaskCoordinator(IServiceProvider serviceProvider, ILogger<Workflo
}
_concurrentWorkers = concurrentWorkers.Value;
_client = orkesApiClient.GetClient<TaskResourceApi>();
_workflowTaskExecutor = serviceProvider.GetService(typeof(IWorkflowTaskExecutor)) as IWorkflowTaskExecutor;
}

public async Task Start()
Expand All @@ -35,8 +35,7 @@ public async Task Start()
var pollers = new List<Task>();
for (var i = 0; i < _concurrentWorkers; i++)
{
var executor = _serviceProvider.GetService(typeof(IWorkflowTaskExecutor)) as IWorkflowTaskExecutor;
pollers.Add(executor.StartPoller(_workerDefinitions.ToList()));
pollers.Add(_workflowTaskExecutor.StartPoller(_workerDefinitions.ToList()));
}
await Task.WhenAll(pollers);
}
Expand Down
4 changes: 2 additions & 2 deletions Conductor/Client/Worker/WorkflowTaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Conductor.Client.Worker
{
internal class WorkflowTaskExecutor : IWorkflowTaskExecutor
{
private IServiceProvider _serviceProvider;
private List<Type> workers;
private ILogger<WorkflowTaskExecutor> logger;
private readonly Configuration configuration;
Expand All @@ -24,12 +25,11 @@ internal class WorkflowTaskExecutor : IWorkflowTaskExecutor
private readonly string workerId = Environment.MachineName + "_" + new Random(epoch).Next();

public WorkflowTaskExecutor(
IConductorWorkerRestClient taskClient,
IServiceProvider serviceProvider,
ILogger<WorkflowTaskExecutor> logger,
IOptions<Configuration> configuration)
{
this.taskClient = taskClient;
this.taskClient = serviceProvider.GetService(typeof(ConductorWorkerRestClient)) as ConductorWorkerRestClient;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.configuration = configuration.Value;
Expand Down
1 change: 1 addition & 0 deletions Conductor/conductor-csharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
<PackageReference Include="System.ComponentModel.Annotations" Version="5.0.0" />
<PackageReference Include="System.Net.Http.Json" Version="6.0.0" />
<PackageReference Include="RestSharp.Serializers.NewtonsoftJson" Version="106.13.0" />
<None Include="README.md" Pack="true" PackagePath="\"/>
</ItemGroup>
</Project>
28 changes: 7 additions & 21 deletions Tests/Util/ApiUtil.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Conductor.Api;
using Conductor.Client;
using Conductor.Client.Authentication;
using Conductor.Executor;
using System;
using System.Diagnostics;
Expand Down Expand Up @@ -34,30 +33,17 @@ public static WorkflowExecutor GetWorkflowExecutor()

public static T GetClient<T>() where T : IApiAccessor, new()
{
OrkesApiClient apiClient = GetApiClient();
OrkesApiClient apiClient = new OrkesApiClient(GetConfiguration());
return apiClient.GetClient<T>();
}

public static OrkesApiClient GetApiClient()
public static Configuration GetConfiguration()
{
return GetApiClient(
basePath: _basePath,
keyId: _keyId,
keySecret: _keySecret
);
}

private static OrkesApiClient GetApiClient(string basePath, string keyId, string keySecret)
{
return new OrkesApiClient(
configuration: new Configuration()
{
BasePath = basePath
},
authenticationSettings: new OrkesAuthenticationSettings(
keyId, keySecret
)
);
Configuration configuration = new Configuration();
configuration.keyId = _keyId;
configuration.keySecret = _keySecret;
configuration.BasePath = _basePath;
return configuration;
}

private static string GetEnvironmentVariable(string variable)
Expand Down
7 changes: 5 additions & 2 deletions Tests/Worker/WorkerTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using Conductor.Api;
using Conductor.Client;
using Conductor.Client.Extensions;
using Conductor.Client.Interfaces;
using Conductor.Definition;
using Conductor.Definition.TaskType;
using Conductor.Executor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
Expand Down Expand Up @@ -83,8 +86,8 @@ private IHost GetWorkerHost()
.ConfigureServices(
(ctx, services) =>
{
services.WithOrkesApiClient(ApiUtil.GetApiClient());
services.WithConductorWorker<SimpleWorker>();
services.AddConductorWorker(ApiUtil.GetConfiguration());
services.AddConductorWorkflowTask<SimpleWorker>();
services.WithHostedService<WorkerService>();
}
).ConfigureLogging(
Expand Down

0 comments on commit 9fb1eab

Please sign in to comment.