Skip to content

Commit

Permalink
Merge pull request #56 from rabazit/adding-multi-tenant-support
Browse files Browse the repository at this point in the history
Adding multi tenant support
  • Loading branch information
VonDerBeck authored Dec 30, 2024
2 parents d16513b + 68529ae commit c4c528b
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 38 deletions.
3 changes: 2 additions & 1 deletion src/Zeebe.Client.Accelerator/Abstractions/IJobHandlerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Zeebe.Client.Accelerator.Abstractions
{
public interface IJobHandlerInfo
{
{
MethodInfo Handler { get; }
ServiceLifetime HandlerServiceLifetime { get; }
string JobType { get; }
Expand All @@ -17,5 +17,6 @@ public interface IJobHandlerInfo
TimeSpan? PollingTimeout { get; }
string[] FetchVariabeles { get; }
bool AutoComplete { get; }
public string[] TenantIds { get; }
}
}
20 changes: 20 additions & 0 deletions src/Zeebe.Client.Accelerator/Attributes/TenantIdsAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

using Zeebe.Client.Accelerator.Abstractions;

namespace Zeebe.Client.Accelerator.Attributes;

public class TenantIdsAttribute : AbstractWorkerAttribute
{
public TenantIdsAttribute(params string[] tenantIds)
{
if (tenantIds is null || tenantIds.Length == 0)
{
throw new ArgumentNullException(nameof(tenantIds));
}

TenantIds = tenantIds;
}

public string[] TenantIds { get; }
}
5 changes: 4 additions & 1 deletion src/Zeebe.Client.Accelerator/JobHandlerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public JobHandlerInfo(
TimeSpan? pollInterval = null,
TimeSpan? pollingTimeout = null,
string[] fetchVariabeles = null,
bool? autoComplete = null)
bool? autoComplete = null,
string[] tenantIds = null)
{
if (string.IsNullOrWhiteSpace(jobType))
throw new ArgumentException($"'{nameof(jobType)}' cannot be null or whitespace.", nameof(jobType));
Expand Down Expand Up @@ -48,6 +49,7 @@ public JobHandlerInfo(
this.PollingTimeout = pollingTimeout;
this.FetchVariabeles = fetchVariabeles ?? (new string[0]);
this.AutoComplete = autoComplete ?? true;
this.TenantIds = tenantIds ?? Array.Empty<string>();
}

public MethodInfo Handler { get; }
Expand All @@ -61,5 +63,6 @@ public JobHandlerInfo(
public TimeSpan? PollingTimeout { get; }
public string[] FetchVariabeles { get; }
public bool AutoComplete { get; }
public string[] TenantIds { get; }
}
}
36 changes: 24 additions & 12 deletions src/Zeebe.Client.Accelerator/JobHandlerInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,32 @@ private static IJobHandlerInfo CreateJobHandlerInfo(Type jobHandlerType, MethodI

return new JobHandlerInfo
(
jobHandlerMethod,
jobHandlerMethod,
GetServiceLifetime(jobHandlerMethod),
GetJobType(jobHandlerType),
GetWorkerName(jobHandlerType),
GetMaxJobsActive(jobHandlerType),
GetJobType(jobHandlerType),
GetWorkerName(jobHandlerType),
GetMaxJobsActive(jobHandlerType),
GetHandlerThreads(jobHandlerType),
GetTimeout(jobHandlerType),
GetPollInterval(jobHandlerType),
GetPollInterval(jobHandlerType),
GetPollingTimeout(jobHandlerType),
GetFetchVariables(jobType, jobHandlerType),
GetAutoComplete(jobHandlerType)
GetAutoComplete(jobHandlerType),
GetTenantIds(jobHandlerType)
);
}

private static string[] GetTenantIds(Type jobHandlerType)
{
var attr = jobHandlerType.GetCustomAttribute<TenantIdsAttribute>();
if (attr != null)
{
return attr.TenantIds;
}

return Array.Empty<string>();
}

private static ServiceLifetime GetServiceLifetime(MethodInfo handlerMethod)
{
var handler = handlerMethod.ReflectedType;
Expand Down Expand Up @@ -160,7 +172,7 @@ private static string[] GetFetchVariables(Type jobType, Type jobHandlerType)
var fetchVariables = GetFetchVariablesFromJobState(jobType);
if(fetchVariables != null)
return fetchVariables;

return null;
}

Expand All @@ -183,7 +195,7 @@ private static string[] GetFetchVariablesFromJobState(Type jobType)
return jobStateType.GetProperties()
.Where(p => p.CanWrite && (p.GetCustomAttribute<JsonIgnoreAttribute>() == null))
.Select(p => GetAttributeName(p))
.ToArray();
.ToArray();
}

private static string GetAttributeName(PropertyInfo p)
Expand All @@ -199,7 +211,7 @@ private static string GetAttributeName(PropertyInfo p)
private static bool IsZeebeWorker(Type t)
{
if (t.IsAbstract) return false;

var interfaces = t.GetInterfaces();
return
interfaces.Contains(typeof(IZeebeWorker)) ||
Expand All @@ -214,16 +226,16 @@ private static bool IsJobHandlerMethod(MethodInfo method, IEnumerable<MethodInfo
.Select(p => p.ParameterType)
.ToList();

return jobHandlerMethods.Any(h =>
return jobHandlerMethods.Any(h =>
h.Name.Equals(method.Name) &&
h.GetParameters().Select(p => p.ParameterType).SequenceEqual(methodParameters) &&
h.ReturnParameter.ParameterType.Equals(method.ReturnParameter.ParameterType)
);
}

private static bool IsZeebeWorkerInterface(Type i)
{
return
return
i.Equals(typeof(IZeebeWorker)) ||
i.Equals(typeof(IAsyncZeebeWorker)) ||
i.IsGenericType && GENERIC_HANDLER_TYPES.Any(h => i.GetGenericTypeDefinition().Equals(h));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

namespace Zeebe.Client.Accelerator.Options
{
public class ZeebeClientAcceleratorOptions
public class ZeebeClientAcceleratorOptions
{
public virtual ClientOptions Client { get; set; } = new ClientOptions();
public virtual WorkerOptions Worker { get; set; }

public class ClientOptions
public class ClientOptions
{
private string _gatewayAddress;
public virtual string GatewayAddress {
get { return GetEnvironmentVariable("ZEEBE_ADDRESS", _gatewayAddress); }
set { _gatewayAddress = value; }
set { _gatewayAddress = value; }
}
public virtual TransportEncryptionOptions TransportEncryption { get; set; }
public virtual CloudOptions Cloud { get; set; }
Expand All @@ -41,8 +41,8 @@ public class CloudOptions
{
private string _clientId;
public virtual string ClientId {
get { return GetEnvironmentVariable("ZEEBE_CLIENT_ID", _clientId); }
set { _clientId = value; }
get { return GetEnvironmentVariable("ZEEBE_CLIENT_ID", _clientId); }
set { _clientId = value; }
}
private string _clientSecret;
public virtual string ClientSecret {
Expand All @@ -51,13 +51,13 @@ public virtual string ClientSecret {
}
private string _authorizationServerUrl = "https://login.cloud.camunda.io/oauth/token";
public virtual string AuthorizationServerUrl {
get { return GetEnvironmentVariable("ZEEBE_AUTHORIZATION_SERVER_URL", _authorizationServerUrl); }
set { _authorizationServerUrl = value; }
get { return GetEnvironmentVariable("ZEEBE_AUTHORIZATION_SERVER_URL", _authorizationServerUrl); }
set { _authorizationServerUrl = value; }
}
private string _tokenAudience = "zeebe.camunda.io";
public virtual string TokenAudience {
get { return GetEnvironmentVariable("ZEEBE_TOKEN_AUDIENCE", _tokenAudience); }
set { _tokenAudience = value; }
set { _tokenAudience = value; }
}
}
}
Expand All @@ -75,6 +75,7 @@ public class WorkerOptions
public virtual long RetryTimeoutInMilliseconds { get; set; }
public virtual TimeSpan RetryTimeout { get { return TimeSpan.FromMilliseconds(RetryTimeoutInMilliseconds); } }
public virtual string Name { get; set; }
public virtual string[] TenantIds { get; set; } = Array.Empty<string>();
}

public static string GetEnvironmentVariable(string name, string defaultValue)
Expand Down
1 change: 1 addition & 0 deletions src/Zeebe.Client.Accelerator/ZeebeHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public Task StartAsync(CancellationToken cancellationToken)
.Handler((jobClient, job) => HandleJob(jobClient, job, cancellationTokenSource.Token))
.FetchVariables(jobHandlerInfo.FetchVariabeles)
.MaxJobsActive(jobHandlerInfo.MaxJobsActive ?? zeebeWorkerOptions.MaxJobsActive)
.TenantIds(jobHandlerInfo.TenantIds.Length > 0 ? jobHandlerInfo.TenantIds : zeebeWorkerOptions.TenantIds)
.Name(zeebeWorkerOptions.Name ?? jobHandlerInfo.WorkerName)
.PollingTimeout(jobHandlerInfo.PollingTimeout ?? zeebeWorkerOptions.PollingTimeout)
.PollInterval(jobHandlerInfo.PollInterval ?? zeebeWorkerOptions.PollInterval)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;

using Xunit;

using Zeebe.Client.Accelerator.Attributes;

namespace Zeebe.Client.Accelerator.Unit.Tests.Attributes;
public class TenantIdsAttributeTests
{
[Theory]
[InlineData(null)]
[InlineData(new object[] { new string[] { } })]
public void ThrowsArgumentExceptionWhenFetchVariablesIsNullOrEmptyOrWhiteSpace(string[] tenantIds)
{
Assert.Throws<ArgumentNullException>(nameof(tenantIds), () => new TenantIdsAttribute(tenantIds));
}

[Fact]
public void AllPropertiesAreSetWhenCreated()
{
string[] expectedTenantIds = new string[]
{
Guid.NewGuid().ToString(),
Guid.NewGuid().ToString()
};

var attribute = new TenantIdsAttribute(expectedTenantIds);
Assert.NotNull(attribute.TenantIds);
Assert.Equal(expectedTenantIds, attribute.TenantIds);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void ThrowsArgumentNullExceptionWhenAssemblyProviderIsNull()
[Fact]
public void AllJobHandlersAreFoundWhenCreated() {
var actual = Handlers();
var expected = 9;
var expected = 10;

Assert.Equal(expected, actual.Count());
}
Expand Down Expand Up @@ -149,6 +149,17 @@ public void PollingTimeoutPropertyIsSetCorrectlyWhenCreated()
Assert.Contains(TimeSpan.FromMilliseconds(int.MaxValue - 3), actual);
}

[Fact]
public void TenantIdsPropertyIsSetCorrectlyWhenCreated()
{
var expected = new string[] { "tenant1", "tenant2"};
var handlers = Handlers();

var actual = handlers.Select(h => h.TenantIds);
Assert.Contains(new string[0], actual);
Assert.Contains(expected, actual);
}

private JobHandlerInfoProvider Create()
{
return new JobHandlerInfoProvider(this.assembly);
Expand Down
42 changes: 29 additions & 13 deletions test/Zeebe.Client.Accelerator.Unit.Tests/JobHandlerInfoTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ public class JobHandlerInfoTests
private readonly TimeSpan pollingTimeout;
private readonly TimeSpan pollInterval;
private string[] fetchVariabeles;
private string[] tenantIds;

[Fact]
public void ThrowsArgumenNullExceptionWhenHandlerIsNull()
public void ThrowsArgumenNullExceptionWhenHandlerIsNull()
{
Assert.Throws<ArgumentNullException>("handler", () => new JobHandlerInfo(null, this.handlerServiceLifetime, this.jobType, this.workerName));
}
Expand All @@ -46,7 +47,7 @@ public void ThrowsArgumentExceptionWheWorkerNameIsNullOrEmptyOrWhiteSpace(string
[Theory]
[InlineData(0)]
[InlineData(-1)]
public void ThrowsArgumentOutOfRangeExceptionWhenMaxJobsActiveIsSmallerOrEqualThen0(int maxJobsActive)
public void ThrowsArgumentOutOfRangeExceptionWhenMaxJobsActiveIsSmallerOrEqualThen0(int maxJobsActive)
{
Assert.Throws<ArgumentOutOfRangeException>("maxJobsActive", () => new JobHandlerInfo(this.handler, this.handlerServiceLifetime, this.jobType, this.workerName, maxJobsActive, this.handlerThreads, this.timeout, this.pollInterval, this.pollingTimeout));
}
Expand All @@ -62,30 +63,30 @@ public void ThrowsArgumentOutOfRangeExceptionWhenHandlerThreadsActiveIs0(byte ha
[Theory]
[InlineData(0)]
[InlineData(-1)]
public void ThrowsArgumentOutOfRangeExceptionWhenTimeoutIsSmallerOrEqualThen0(int timeout)
public void ThrowsArgumentOutOfRangeExceptionWhenTimeoutIsSmallerOrEqualThen0(int timeout)
{
Assert.Throws<ArgumentOutOfRangeException>("timeout", () => new JobHandlerInfo(this.handler, this.handlerServiceLifetime, this.jobType, this.workerName, this.maxJobsActive, this.handlerThreads, TimeSpan.FromMilliseconds(timeout), this.pollInterval, this.pollingTimeout));
}

[Theory]
[InlineData(0)]
[InlineData(-1)]
public void ThrowsArgumentOutOfRangeExceptionWhenPollIntervalIsSmallerOrEqualThen0(int pollInterval)
public void ThrowsArgumentOutOfRangeExceptionWhenPollIntervalIsSmallerOrEqualThen0(int pollInterval)
{
Assert.Throws<ArgumentOutOfRangeException>("pollInterval", () => new JobHandlerInfo(this.handler, this.handlerServiceLifetime, this.jobType, this.workerName, this.maxJobsActive, this.handlerThreads, this.timeout, TimeSpan.FromMilliseconds(pollInterval), this.pollingTimeout));
}

[Theory]
[InlineData(0)]
[InlineData(-1)]
public void ThrowsArgumentOutOfRangeExceptionWhenPollingTimeoutIsSmallerOrEqualThen0(int pollingTimeout)
public void ThrowsArgumentOutOfRangeExceptionWhenPollingTimeoutIsSmallerOrEqualThen0(int pollingTimeout)
{
Assert.Throws<ArgumentOutOfRangeException>("pollingTimeout", () => new JobHandlerInfo(this.handler, this.handlerServiceLifetime, this.jobType, this.workerName, this.maxJobsActive, this.handlerThreads, this.timeout, this.pollInterval, TimeSpan.FromMilliseconds(pollingTimeout)));
}

[Fact]
public void AllPropertiesAreSetWhenCreated()
{
{
var actual = Create();
Assert.NotNull(actual);
Assert.Equal(this.handler, actual.Handler);
Expand All @@ -97,6 +98,7 @@ public void AllPropertiesAreSetWhenCreated()
Assert.Equal(this.pollInterval, actual.PollInterval);
Assert.Equal(this.pollingTimeout, actual.PollingTimeout);
Assert.Equal(this.fetchVariabeles, actual.FetchVariabeles);
Assert.Equal(this.tenantIds, actual.TenantIds);
}

[Fact]
Expand All @@ -107,6 +109,14 @@ public void EmptyStringArrayIsCreatedWhenFetchVariabelesIsNull()
Assert.Equal(new string[0], actual.FetchVariabeles);
}

[Fact]
public void EmptyStringArrayIsCreatedWhenTenantIdsIsNull()
{
this.tenantIds = null;
var actual = Create();
Assert.Equal(new string[0], actual.TenantIds);
}

public JobHandlerInfoTests()
{
Expression<Func<int, string>> expression = i => i.ToString();
Expand All @@ -122,13 +132,18 @@ public JobHandlerInfoTests()
this.handlerThreads = Convert.ToByte(random.Next(1, 255));
this.pollingTimeout = TimeSpan.FromMilliseconds(random.Next(1, int.MaxValue));
this.pollInterval = TimeSpan.FromMilliseconds(random.Next(1, int.MaxValue));
this.fetchVariabeles = new string[] {
Guid.NewGuid().ToString(),
Guid.NewGuid().ToString()
this.fetchVariabeles = new string[] {
Guid.NewGuid().ToString(),
Guid.NewGuid().ToString()
};

this.tenantIds = new string[] {
Guid.NewGuid().ToString(),
Guid.NewGuid().ToString()
};
}

private JobHandlerInfo Create()
private JobHandlerInfo Create()
{
return new JobHandlerInfo
(
Expand All @@ -141,8 +156,9 @@ private JobHandlerInfo Create()
this.timeout,
this.pollInterval,
this.pollingTimeout,
this.fetchVariabeles
);
}
this.fetchVariabeles,
tenantIds: this.tenantIds
);
}
}
}
Loading

0 comments on commit c4c528b

Please sign in to comment.