From 1699e536d926edd8cb2c8692e1170129655d6a4c Mon Sep 17 00:00:00 2001 From: legalles Date: Mon, 21 Oct 2024 09:20:10 +0200 Subject: [PATCH] style: use file-scope namespace and primary constructor --- .../Api/Responses/IPublishMessageResponse.cs | 15 +- .../Api/Responses/IResolveIncidentResponse.cs | 15 +- Client/Api/Responses/ISetVariablesResponse.cs | 17 +- Client/Api/Responses/IThrowErrorResponse.cs | 15 +- Client/Api/Responses/ITopology.cs | 25 +- .../Responses/IUpdateJobTimeoutResponse.cs | 15 +- .../Api/Responses/IUpdateRetriesResponse.cs | 13 +- Client/Api/Responses/PartitionBrokerRole.cs | 19 +- Client/Api/Worker/IJobClient.cs | 189 ++++---- Client/Api/Worker/IJobWorker.cs | 25 +- Client/Api/Worker/IJobWorkerBuilderStep1.cs | 449 +++++++++--------- Client/ZeebeClient.cs | 329 ++++++------- 12 files changed, 556 insertions(+), 570 deletions(-) diff --git a/Client/Api/Responses/IPublishMessageResponse.cs b/Client/Api/Responses/IPublishMessageResponse.cs index f6ab308c..9ea5d5f5 100644 --- a/Client/Api/Responses/IPublishMessageResponse.cs +++ b/Client/Api/Responses/IPublishMessageResponse.cs @@ -1,9 +1,6 @@ -namespace Zeebe.Client.Api.Responses -{ - /// - /// Response for publishing a message. - /// - public interface IPublishMessageResponse - { - } -} \ No newline at end of file +namespace Zeebe.Client.Api.Responses; + +/// +/// Response for publishing a message. +/// +public interface IPublishMessageResponse; \ No newline at end of file diff --git a/Client/Api/Responses/IResolveIncidentResponse.cs b/Client/Api/Responses/IResolveIncidentResponse.cs index 7111f7ac..f82f20a0 100644 --- a/Client/Api/Responses/IResolveIncidentResponse.cs +++ b/Client/Api/Responses/IResolveIncidentResponse.cs @@ -1,9 +1,6 @@ -namespace Zeebe.Client.Api.Responses -{ - /// - /// Response for an resolve incident request. - /// - public interface IResolveIncidentResponse - { - } -} \ No newline at end of file +namespace Zeebe.Client.Api.Responses; + +/// +/// Response for an resolve incident request. +/// +public interface IResolveIncidentResponse; \ No newline at end of file diff --git a/Client/Api/Responses/ISetVariablesResponse.cs b/Client/Api/Responses/ISetVariablesResponse.cs index 60af656c..3fb7553b 100644 --- a/Client/Api/Responses/ISetVariablesResponse.cs +++ b/Client/Api/Responses/ISetVariablesResponse.cs @@ -1,11 +1,10 @@ -namespace Zeebe.Client.Api.Responses +namespace Zeebe.Client.Api.Responses; + +/// +/// Response for an set variables request. +/// +public interface ISetVariablesResponse { - /// - /// Response for an set variables request. - /// - public interface ISetVariablesResponse - { - /// The unique key of the command - long Key { get; } - } + /// The unique key of the command + long Key { get; } } \ No newline at end of file diff --git a/Client/Api/Responses/IThrowErrorResponse.cs b/Client/Api/Responses/IThrowErrorResponse.cs index 61a1940a..ea9888f6 100644 --- a/Client/Api/Responses/IThrowErrorResponse.cs +++ b/Client/Api/Responses/IThrowErrorResponse.cs @@ -1,9 +1,6 @@ -namespace Zeebe.Client.Api.Responses -{ - /// - /// Response for an throw error request. - /// - public interface IThrowErrorResponse - { - } -} \ No newline at end of file +namespace Zeebe.Client.Api.Responses; + +/// +/// Response for an throw error request. +/// +public interface IThrowErrorResponse; \ No newline at end of file diff --git a/Client/Api/Responses/ITopology.cs b/Client/Api/Responses/ITopology.cs index a130b8b4..169b0931 100644 --- a/Client/Api/Responses/ITopology.cs +++ b/Client/Api/Responses/ITopology.cs @@ -14,18 +14,17 @@ // limitations under the License. using System.Collections.Generic; -namespace Zeebe.Client.Api.Responses +namespace Zeebe.Client.Api.Responses; + +public interface ITopology { - public interface ITopology - { - /// - /// All (known) brokers of the cluster - /// - IList Brokers { get; } + /// + /// All (known) brokers of the cluster + /// + IList Brokers { get; } - /// - /// The gateway version or 'lower then 0.23' if none was found. - /// - string GatewayVersion { get; } - } -} + /// + /// The gateway version or 'lower then 0.23' if none was found. + /// + string GatewayVersion { get; } +} \ No newline at end of file diff --git a/Client/Api/Responses/IUpdateJobTimeoutResponse.cs b/Client/Api/Responses/IUpdateJobTimeoutResponse.cs index 6a0156a2..75662254 100644 --- a/Client/Api/Responses/IUpdateJobTimeoutResponse.cs +++ b/Client/Api/Responses/IUpdateJobTimeoutResponse.cs @@ -1,9 +1,6 @@ -namespace Zeebe.Client.Api.Responses -{ - /// - /// Response for an update job timeout request. - /// - public interface IUpdateJobTimeoutResponse - { - } -} \ No newline at end of file +namespace Zeebe.Client.Api.Responses; + +/// +/// Response for an update job timeout request. +/// +public interface IUpdateJobTimeoutResponse; \ No newline at end of file diff --git a/Client/Api/Responses/IUpdateRetriesResponse.cs b/Client/Api/Responses/IUpdateRetriesResponse.cs index 25ec13bc..3b634b68 100644 --- a/Client/Api/Responses/IUpdateRetriesResponse.cs +++ b/Client/Api/Responses/IUpdateRetriesResponse.cs @@ -1,9 +1,8 @@ -namespace Zeebe.Client.Api.Responses +namespace Zeebe.Client.Api.Responses; + +/// +/// Response for an update retries request. +/// +public interface IUpdateRetriesResponse { - /// - /// Response for an update retries request. - /// - public interface IUpdateRetriesResponse - { - } } \ No newline at end of file diff --git a/Client/Api/Responses/PartitionBrokerRole.cs b/Client/Api/Responses/PartitionBrokerRole.cs index a66373c9..7584cfa4 100644 --- a/Client/Api/Responses/PartitionBrokerRole.cs +++ b/Client/Api/Responses/PartitionBrokerRole.cs @@ -12,14 +12,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -namespace Zeebe.Client.Api.Responses +namespace Zeebe.Client.Api.Responses; + +/// +/// The RAFT role of the broker either LEADER or FOLLOWER. +/// +public enum PartitionBrokerRole { - /// - /// The RAFT role of the broker either LEADER or FOLLOWER. - /// - public enum PartitionBrokerRole - { - LEADER, - FOLLOWER - } -} + LEADER, + FOLLOWER +} \ No newline at end of file diff --git a/Client/Api/Worker/IJobClient.cs b/Client/Api/Worker/IJobClient.cs index acdcaacd..14431822 100644 --- a/Client/Api/Worker/IJobClient.cs +++ b/Client/Api/Worker/IJobClient.cs @@ -16,104 +16,103 @@ using Zeebe.Client.Api.Commands; using Zeebe.Client.Api.Responses; -namespace Zeebe.Client.Api.Worker +namespace Zeebe.Client.Api.Worker; + +/// +/// A client with access to all job-related operation: +/// +/// complete a job +/// mark a job as failed +/// update the retries of a job +/// +/// +public interface IJobClient { /// - /// A client with access to all job-related operation: - /// - /// complete a job - /// mark a job as failed - /// update the retries of a job - /// + /// Command to complete a job. /// - public interface IJobClient - { - /// - /// Command to complete a job. - /// - /// - /// - /// long jobKey = ..; - /// - /// jobClient - /// .NewCompleteJobCommand(jobKey) - /// .Variables(json) - /// .Send(); - /// - /// - /// - /// - /// The job is linked to a process instance, which means this command will complete the related - /// activity and continue the flow. - /// - /// - /// the key which identifies the job - /// a builder for the command - ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey); + /// + /// + /// long jobKey = ..; + /// + /// jobClient + /// .NewCompleteJobCommand(jobKey) + /// .Variables(json) + /// .Send(); + /// + /// + /// + /// + /// The job is linked to a process instance, which means this command will complete the related + /// activity and continue the flow. + /// + /// + /// the key which identifies the job + /// a builder for the command + ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey); - /// - /// Command to complete a job. - /// - /// - /// - /// - /// IJob activatedJob = ..; - /// - /// jobClient - /// .NewCompleteJobCommand(activatedJob) - /// .Variables(json) - /// .Send(); - /// - /// - /// - /// - /// The job is linked to a process instance, which means this command will complete the related - /// activity and continue the flow. - /// - /// - /// the job, which should be completed - /// a builder for the command - ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob); + /// + /// Command to complete a job. + /// + /// + /// + /// + /// IJob activatedJob = ..; + /// + /// jobClient + /// .NewCompleteJobCommand(activatedJob) + /// .Variables(json) + /// .Send(); + /// + /// + /// + /// + /// The job is linked to a process instance, which means this command will complete the related + /// activity and continue the flow. + /// + /// + /// the job, which should be completed + /// a builder for the command + ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob); - /// - /// Command to mark a job as failed. - /// - /// - /// - /// - /// long jobKey = ..; - /// - /// jobClient - /// .NewFailCommand(jobKey) - /// .Retries(3) - /// .Send(); - /// - /// - /// - /// - /// If the given retries are greater than zero then this job will be picked up again by a job - /// worker. Otherwise, an incident is created for this job. - /// - /// the key which identifies the job - /// a builder for the command - IFailJobCommandStep1 NewFailCommand(long jobKey); + /// + /// Command to mark a job as failed. + /// + /// + /// + /// + /// long jobKey = ..; + /// + /// jobClient + /// .NewFailCommand(jobKey) + /// .Retries(3) + /// .Send(); + /// + /// + /// + /// + /// If the given retries are greater than zero then this job will be picked up again by a job + /// worker. Otherwise, an incident is created for this job. + /// + /// the key which identifies the job + /// a builder for the command + IFailJobCommandStep1 NewFailCommand(long jobKey); - /// - /// Command to report a business error (i.e. non-technical) that occurs while processing a job. - /// - /// - /// - /// long jobKey = ...; - /// string code = ...; - /// jobClient - /// .NewThrowErrorCommand(jobKey) - /// .ErrorCode(code) - /// .ErrorMessage("Business error message") - /// .Send(); - /// - /// - /// the key which identifies the job - /// a builder for the command - IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey); - } -} + /// + /// Command to report a business error (i.e. non-technical) that occurs while processing a job. + /// + /// + /// + /// long jobKey = ...; + /// string code = ...; + /// jobClient + /// .NewThrowErrorCommand(jobKey) + /// .ErrorCode(code) + /// .ErrorMessage("Business error message") + /// .Send(); + /// + /// + /// the key which identifies the job + /// a builder for the command + IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey); +} \ No newline at end of file diff --git a/Client/Api/Worker/IJobWorker.cs b/Client/Api/Worker/IJobWorker.cs index 899e5e1f..4b8b324a 100644 --- a/Client/Api/Worker/IJobWorker.cs +++ b/Client/Api/Worker/IJobWorker.cs @@ -14,19 +14,18 @@ // limitations under the License. using System; -namespace Zeebe.Client.Api.Worker +namespace Zeebe.Client.Api.Worker; + +/// +/// Represents an job worker that performs jobs of a certain type. While a registration is +/// open, the worker continuously receives jobs from the broker and hands them to a registered +/// . +/// +public interface IJobWorker : IDisposable { - /// - /// Represents an job worker that performs jobs of a certain type. While a registration is - /// open, the worker continuously receives jobs from the broker and hands them to a registered - /// . - /// - public interface IJobWorker : IDisposable - { - /// true if this registration is currently active and work items are being received for it - bool IsOpen(); + /// true if this registration is currently active and work items are being received for it + bool IsOpen(); - /// true if this registration is not open and is not in the process of opening or closing - bool IsClosed(); - } + /// true if this registration is not open and is not in the process of opening or closing + bool IsClosed(); } \ No newline at end of file diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index ebf5ac8a..2c09b528 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -18,236 +18,235 @@ using Zeebe.Client.Api.Commands; using Zeebe.Client.Api.Responses; -namespace Zeebe.Client.Api.Worker +namespace Zeebe.Client.Api.Worker; + +public interface IJobWorkerBuilderStep1 +{ + /// + /// Set the type of jobs to work on. + /// + /// the type of jobs (e.g. "payment") + /// the builder for this worker + IJobWorkerBuilderStep2 JobType(string type); +} + +/// +/// The job handler which contains the business logic. +/// +/// the job client to complete or fail the job +/// the job, which was activated by the worker +public delegate void JobHandler(IJobClient client, IJob activatedJob); + +/// +/// The asynchronous job handler which contains the business logic. +/// +/// the job client to complete or fail the job +/// the job, which was activated by the worker +/// A representing the asynchronous operation. +public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob); + +public interface IJobWorkerBuilderStep2 { - public interface IJobWorkerBuilderStep1 - { - /// - /// Set the type of jobs to work on. - /// - /// the type of jobs (e.g. "payment") - /// the builder for this worker - IJobWorkerBuilderStep2 JobType(string type); - } + /// + /// Set the handler to process the jobs. At the end of the processing, the handler can + /// complete the job or mark it as failed. + /// + /// + /// + /// + /// Example JobHandler implementation: + /// + /// + /// + /// var handler = (client, job) => + /// { + /// String json = job.Variables; + /// // modify variables + /// + /// client + /// .CompleteCommand(job.Key) + /// .Variables(json) + /// .Send(); + /// }; + /// + /// + /// The handler must be thread-safe. + /// the handle to process the jobs + /// the builder for this worker + IJobWorkerBuilderStep3 Handler(JobHandler handler); + + /// + /// Set an async handler to process the jobs asynchronously. At the end of the processing, the handler can + /// complete the job or mark it as failed. + /// + /// + /// + /// + /// Example JobHandler implementation: + /// + /// + /// + /// var handler = async (client, job) => + /// { + /// String json = job.Variables; + /// // modify variables + /// + /// await client + /// .CompleteCommand(job.Key) + /// .Variables(json) + /// .Send(); + /// }; + /// + /// + /// The handler must be thread-safe. + /// the handle to process the jobs + /// the builder for this worker + IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler); +} + +public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep +{ + /// + /// Set the time for how long a job is exclusively assigned for this worker. + /// + /// + /// In this time, the job can not be assigned by other workers to ensure that only one worker + /// work on the job. When the time is over then the job can be assigned again by this or other + /// worker if it's not completed yet. + /// + /// + /// the time as time span (e.g. "TimeSpan.FromMinutes(10)") + /// the builder for this worker + IJobWorkerBuilderStep3 Timeout(TimeSpan timeout); + + /// + /// Set the name of the worker owner. + /// + /// + /// + /// This name is used to identify the worker to which a job is exclusively assigned to. + /// + /// + /// the name of the worker (e.g. "payment-service") + /// the builder for this worker + IJobWorkerBuilderStep3 Name(string workerName); + + /// + /// Set the maximum number of jobs which will be exclusively activated for this worker at the same + /// time. + /// + /// + /// This is used to control the back pressure of the worker. When the maximum is reached then + /// the worker will stop activating new jobs in order to not overwhelm the client and give other + /// workers the chance to work on the jobs. The worker will try to activate new jobs again when + /// jobs are completed (or marked as failed). + /// + /// + /// Considerations: + /// + /// + /// + /// A greater value can avoid situations in which the client waits idle for the broker to + /// provide more jobs. This can improve the worker's throughput. + /// + /// + /// The memory used by the worker is linear with respect to this value. + /// + /// + /// The job's timeout starts to run down as soon as the broker pushes the job. Keep in mind + /// that the following must hold to ensure fluent job handling: + /// + /// + /// time spent in buffer + time job handler needs until job completion < job timeout + /// + /// + /// + /// + /// the maximum jobs active by this worker + /// the builder for this worker + IJobWorkerBuilderStep3 MaxJobsActive(int maxJobsActive); + + /// + /// Set a list of variable names which should be fetch on job activation. + /// + /// + /// The jobs which are activated by this command will only contain variables from this list. + /// + /// + /// + /// This can be used to limit the number of variables of the activated jobs. + /// + /// list of variables names to fetch on activation + /// the builder for this worker + IJobWorkerBuilderStep3 FetchVariables(IList fetchVariables); + + /// + /// Set a list of variable names which should be fetch on job activation. + /// + /// + /// The jobs which are activated by this command will only contain variables from this list. + /// + /// + /// + /// This can be used to limit the number of variables of the activated jobs. + /// + /// list of variables names to fetch on activation + /// the builder for this worker + IJobWorkerBuilderStep3 FetchVariables(params string[] fetchVariables); + + /// + /// Set the maximal interval between polling for new jobs. + /// + /// + /// A job worker will automatically try to always activate new jobs after completing jobs. If + /// no jobs can be activated after completing the worker will periodically poll for new jobs. + /// + /// + /// the maximal interval to check for new jobs + /// the builder for this worker + IJobWorkerBuilderStep3 PollInterval(TimeSpan pollInterval); + + /// + /// Set the polling timeout for the job activation. + /// + /// + /// + /// The activate jobs request will be completed when at least one job is activated or after the given requestTimeout. + /// + /// the polling timeout (e.g. "TimeSpan.FromMinutes(10)") + /// + /// the builder for this worker + IJobWorkerBuilderStep3 PollingTimeout(TimeSpan pollingTimeout); + + /// + /// Enables job worker auto completion. + /// + /// + /// + /// This means if the user does not complete or fails the activated job by himself + /// then the worker will do it. + /// + /// the builder for this worker + IJobWorkerBuilderStep3 AutoCompletion(); /// - /// The job handler which contains the business logic. + /// Specifies how many handler threads are used by this job worker. /// - /// the job client to complete or fail the job - /// the job, which was activated by the worker - public delegate void JobHandler(IJobClient client, IJob activatedJob); + /// + /// + /// The previous defined job handler can be called by multiple threads, to execute more jobs concurrently. + /// Per default one job handler thread is used by an job worker. + /// This means the job handler implementation needs to be thread safe. + /// + /// + /// Note: Job polling is done by a separate thread. + /// handler thread count, needs to be larger then zero + /// the builder for this worker + IJobWorkerBuilderStep3 HandlerThreads(byte threadCount); /// - /// The asynchronous job handler which contains the business logic. + /// Open the worker and start to work on available tasks. /// - /// the job client to complete or fail the job - /// the job, which was activated by the worker - /// A representing the asynchronous operation. - public delegate Task AsyncJobHandler(IJobClient client, IJob activatedJob); - - public interface IJobWorkerBuilderStep2 - { - /// - /// Set the handler to process the jobs. At the end of the processing, the handler can - /// complete the job or mark it as failed. - /// - /// - /// - /// - /// Example JobHandler implementation: - /// - /// - /// - /// var handler = (client, job) => - /// { - /// String json = job.Variables; - /// // modify variables - /// - /// client - /// .CompleteCommand(job.Key) - /// .Variables(json) - /// .Send(); - /// }; - /// - /// - /// The handler must be thread-safe. - /// the handle to process the jobs - /// the builder for this worker - IJobWorkerBuilderStep3 Handler(JobHandler handler); - - /// - /// Set an async handler to process the jobs asynchronously. At the end of the processing, the handler can - /// complete the job or mark it as failed. - /// - /// - /// - /// - /// Example JobHandler implementation: - /// - /// - /// - /// var handler = async (client, job) => - /// { - /// String json = job.Variables; - /// // modify variables - /// - /// await client - /// .CompleteCommand(job.Key) - /// .Variables(json) - /// .Send(); - /// }; - /// - /// - /// The handler must be thread-safe. - /// the handle to process the jobs - /// the builder for this worker - IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler); - } - - public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep - { - /// - /// Set the time for how long a job is exclusively assigned for this worker. - /// - /// - /// In this time, the job can not be assigned by other workers to ensure that only one worker - /// work on the job. When the time is over then the job can be assigned again by this or other - /// worker if it's not completed yet. - /// - /// - /// the time as time span (e.g. "TimeSpan.FromMinutes(10)") - /// the builder for this worker - IJobWorkerBuilderStep3 Timeout(TimeSpan timeout); - - /// - /// Set the name of the worker owner. - /// - /// - /// - /// This name is used to identify the worker to which a job is exclusively assigned to. - /// - /// - /// the name of the worker (e.g. "payment-service") - /// the builder for this worker - IJobWorkerBuilderStep3 Name(string workerName); - - /// - /// Set the maximum number of jobs which will be exclusively activated for this worker at the same - /// time. - /// - /// - /// This is used to control the back pressure of the worker. When the maximum is reached then - /// the worker will stop activating new jobs in order to not overwhelm the client and give other - /// workers the chance to work on the jobs. The worker will try to activate new jobs again when - /// jobs are completed (or marked as failed). - /// - /// - /// Considerations: - /// - /// - /// - /// A greater value can avoid situations in which the client waits idle for the broker to - /// provide more jobs. This can improve the worker's throughput. - /// - /// - /// The memory used by the worker is linear with respect to this value. - /// - /// - /// The job's timeout starts to run down as soon as the broker pushes the job. Keep in mind - /// that the following must hold to ensure fluent job handling: - /// - /// - /// time spent in buffer + time job handler needs until job completion < job timeout - /// - /// - /// - /// - /// the maximum jobs active by this worker - /// the builder for this worker - IJobWorkerBuilderStep3 MaxJobsActive(int maxJobsActive); - - /// - /// Set a list of variable names which should be fetch on job activation. - /// - /// - /// The jobs which are activated by this command will only contain variables from this list. - /// - /// - /// - /// This can be used to limit the number of variables of the activated jobs. - /// - /// list of variables names to fetch on activation - /// the builder for this worker - IJobWorkerBuilderStep3 FetchVariables(IList fetchVariables); - - /// - /// Set a list of variable names which should be fetch on job activation. - /// - /// - /// The jobs which are activated by this command will only contain variables from this list. - /// - /// - /// - /// This can be used to limit the number of variables of the activated jobs. - /// - /// list of variables names to fetch on activation - /// the builder for this worker - IJobWorkerBuilderStep3 FetchVariables(params string[] fetchVariables); - - /// - /// Set the maximal interval between polling for new jobs. - /// - /// - /// A job worker will automatically try to always activate new jobs after completing jobs. If - /// no jobs can be activated after completing the worker will periodically poll for new jobs. - /// - /// - /// the maximal interval to check for new jobs - /// the builder for this worker - IJobWorkerBuilderStep3 PollInterval(TimeSpan pollInterval); - - /// - /// Set the polling timeout for the job activation. - /// - /// - /// - /// The activate jobs request will be completed when at least one job is activated or after the given requestTimeout. - /// - /// the polling timeout (e.g. "TimeSpan.FromMinutes(10)") - /// - /// the builder for this worker - IJobWorkerBuilderStep3 PollingTimeout(TimeSpan pollingTimeout); - - /// - /// Enables job worker auto completion. - /// - /// - /// - /// This means if the user does not complete or fails the activated job by himself - /// then the worker will do it. - /// - /// the builder for this worker - IJobWorkerBuilderStep3 AutoCompletion(); - - /// - /// Specifies how many handler threads are used by this job worker. - /// - /// - /// - /// The previous defined job handler can be called by multiple threads, to execute more jobs concurrently. - /// Per default one job handler thread is used by an job worker. - /// This means the job handler implementation needs to be thread safe. - /// - /// - /// Note: Job polling is done by a separate thread. - /// handler thread count, needs to be larger then zero - /// the builder for this worker - IJobWorkerBuilderStep3 HandlerThreads(byte threadCount); - - /// - /// Open the worker and start to work on available tasks. - /// - /// the worker - IJobWorker Open(); - } + /// the worker + IJobWorker Open(); } \ No newline at end of file diff --git a/Client/ZeebeClient.cs b/Client/ZeebeClient.cs index dcdc4a4b..265dfd1b 100644 --- a/Client/ZeebeClient.cs +++ b/Client/ZeebeClient.cs @@ -13,9 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#nullable enable using System; -using System.Collections.Generic; -using System.Linq; using System.Net.Http; using System.Net.Security; using System.Security.Cryptography.X509Certificates; @@ -36,57 +35,70 @@ using Zeebe.Client.Impl.Misc; using Zeebe.Client.Impl.Worker; -namespace Zeebe.Client +namespace Zeebe.Client; + +/// +public sealed class ZeebeClient : IZeebeClient { - /// - public class ZeebeClient : IZeebeClient - { - internal static readonly int MaxWaitTimeInSeconds = 60; - internal static readonly Func DefaultWaitTimeProvider = - retryAttempt => TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryAttempt), MaxWaitTimeInSeconds)); - private static readonly TimeSpan DefaultKeepAlive = TimeSpan.FromSeconds(30); - - private readonly GrpcChannel channelToGateway; - private readonly ILoggerFactory loggerFactory; - private volatile Gateway.GatewayClient gatewayClient; - private readonly IAsyncRetryStrategy asyncRetryStrategy; - - internal ZeebeClient(string address, TimeSpan? keepAlive, Func sleepDurationProvider, ILoggerFactory loggerFactory = null) - : this(address, ChannelCredentials.Insecure, keepAlive, sleepDurationProvider, loggerFactory) - { } - - internal ZeebeClient(string address, - ChannelCredentials credentials, - TimeSpan? keepAlive, - Func sleepDurationProvider, - ILoggerFactory loggerFactory = null, - X509Certificate2 certificate = null, - bool allowUntrusted = false) + internal static readonly int MaxWaitTimeInSeconds = 60; + internal static readonly Func DefaultWaitTimeProvider = + retryAttempt => TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryAttempt), MaxWaitTimeInSeconds)); + private static readonly TimeSpan DefaultKeepAlive = TimeSpan.FromSeconds(30); + + private readonly GrpcChannel channelToGateway; + private readonly ILoggerFactory? loggerFactory; + private volatile Gateway.GatewayClient gatewayClient; + private readonly IAsyncRetryStrategy asyncRetryStrategy; + + internal ZeebeClient(string address, TimeSpan? keepAlive, Func sleepDurationProvider, ILoggerFactory? loggerFactory = null, + GrpcChannel? grpcChannel = null) + : this(address, ChannelCredentials.Insecure, keepAlive, sleepDurationProvider, loggerFactory, grpcChannel: grpcChannel) + { } + + internal ZeebeClient(string address, + ChannelCredentials credentials, + TimeSpan? keepAlive, + Func? sleepDurationProvider, + ILoggerFactory? loggerFactory = null, + X509Certificate2? certificate = null, + bool allowUntrusted = false, + GrpcChannel? grpcChannel = null) + { + this.loggerFactory = loggerFactory; + var logger = loggerFactory?.CreateLogger(); + logger?.LogDebug("Connect to {Address}", address); + + var sslOptions = new SslClientAuthenticationOptions { - this.loggerFactory = loggerFactory; - var logger = loggerFactory?.CreateLogger(); - logger?.LogDebug("Connect to {Address}", address); + ClientCertificates = new X509Certificate2Collection(certificate is null ? Array.Empty() : new X509Certificate2[] { certificate }) + }; + if (allowUntrusted) + { + // https://github.com/dotnet/runtime/issues/42482 + // https://docs.servicestack.net/grpc/csharp#c-protoc-grpc-ssl-example + // Allows untrusted certificates, used for testing. + sslOptions.RemoteCertificateValidationCallback = (_, _, _, _) => true; + } - var sslOptions = new SslClientAuthenticationOptions() - { - ClientCertificates = new X509Certificate2Collection(certificate is null ? Array.Empty() : new X509Certificate2[] { certificate }) - }; - if (allowUntrusted) - { - // https://github.com/dotnet/runtime/issues/42482 - // https://docs.servicestack.net/grpc/csharp#c-protoc-grpc-ssl-example - // Allows untrusted certificates, used for testing. - sslOptions.RemoteCertificateValidationCallback = (sender, x509Certificate, chain, errors) => true; - } + channelToGateway = grpcChannel ?? BuildChannelToGateway(); + + var callInvoker = channelToGateway.Intercept(new UserAgentInterceptor()); + gatewayClient = new Gateway.GatewayClient(callInvoker); - channelToGateway = GrpcChannel.ForAddress(address, new GrpcChannelOptions + asyncRetryStrategy = + new TransientGrpcErrorRetryStrategy(sleepDurationProvider ?? + DefaultWaitTimeProvider); + + GrpcChannel BuildChannelToGateway() + { + return GrpcChannel.ForAddress(address, new GrpcChannelOptions { - // https://learn.microsoft.com/en-us/dotnet/architecture/grpc-for-wcf-developers/channel-credentials#combine-channelcredentials-and-callcredentials + // https://learn.microsoft.com/en-us/dotnet/architecture/grpc-for-wcf-developers/channel-credentials#combine-channelcredentials-and-callcredentials Credentials = credentials, LoggerFactory = this.loggerFactory, DisposeHttpClient = true, // for keep alive configure sockets http handler - // https://learn.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-5.0#keep-alive-pings + // https://learn.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-5.0#keep-alive-pings HttpHandler = new SocketsHttpHandler { PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan, @@ -96,154 +108,147 @@ internal ZeebeClient(string address, SslOptions = sslOptions }, }); - - var callInvoker = channelToGateway.Intercept(new UserAgentInterceptor()); - gatewayClient = new Gateway.GatewayClient(callInvoker); - - asyncRetryStrategy = - new TransientGrpcErrorRetryStrategy(sleepDurationProvider ?? - DefaultWaitTimeProvider); } + } - public async Task Connect() - { - await channelToGateway.ConnectAsync(); - } + public async Task Connect() + { + await channelToGateway.ConnectAsync(); + } - /// - /// Intercept outgoing call to inject metadata - /// The "user-agent" is already filled by grpc-dotnet - /// typically something like: key=user-agent, value=grpc-dotnet/2.53.0 (.NET 7.0.5; CLR 7.0.5; net7.0; windows; x64) - /// We want to add our version and name. Unfortunately, this will always be appended to the end. - /// - private class UserAgentInterceptor : Interceptor + /// + /// Intercept outgoing call to inject metadata + /// The "user-agent" is already filled by grpc-dotnet + /// typically something like: key=user-agent, value=grpc-dotnet/2.53.0 (.NET 7.0.5; CLR 7.0.5; net7.0; windows; x64) + /// We want to add our version and name. Unfortunately, this will always be appended to the end. + /// + private class UserAgentInterceptor : Interceptor + { + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) { - public override AsyncUnaryCall AsyncUnaryCall(TRequest request, - ClientInterceptorContext context, - AsyncUnaryCallContinuation continuation) + var clientVersion = typeof(ZeebeClient).Assembly.GetName().Version; + var userAgentString = $"zeebe-client-csharp/{clientVersion}"; + var headers = new Metadata { - var clientVersion = typeof(ZeebeClient).Assembly.GetName().Version; - var userAgentString = $"zeebe-client-csharp/{clientVersion}"; - var headers = new Metadata - { - { "user-agent", userAgentString } - }; - var newOptions = context.Options.WithHeaders(headers); - var newContext = - new ClientInterceptorContext(context.Method, context.Host, newOptions); - return base.AsyncUnaryCall(request, newContext, continuation); - } + { "user-agent", userAgentString } + }; + var newOptions = context.Options.WithHeaders(headers); + var newContext = + new ClientInterceptorContext(context.Method, context.Host, newOptions); + return base.AsyncUnaryCall(request, newContext, continuation); } + } //////////////////////////////////////////////////////////////////////// ///////////////////////////// JOBS ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// - public IJobWorkerBuilderStep1 NewWorker() - { - return new JobWorkerBuilder(this, gatewayClient, loggerFactory); - } + public IJobWorkerBuilderStep1 NewWorker() + { + return new JobWorkerBuilder(this, gatewayClient, loggerFactory); + } - public IActivateJobsCommandStep1 NewActivateJobsCommand() - { - return new ActivateJobsCommand(gatewayClient, asyncRetryStrategy); - } + public IActivateJobsCommandStep1 NewActivateJobsCommand() + { + return new ActivateJobsCommand(gatewayClient, asyncRetryStrategy); + } - public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey) - { - return new CompleteJobCommand(gatewayClient, asyncRetryStrategy, jobKey); - } + public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey) + { + return new CompleteJobCommand(gatewayClient, asyncRetryStrategy, jobKey); + } - public ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob) - { - return new CompleteJobCommand(gatewayClient, asyncRetryStrategy, activatedJob.Key); - } + public ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob) + { + return new CompleteJobCommand(gatewayClient, asyncRetryStrategy, activatedJob.Key); + } - public IFailJobCommandStep1 NewFailCommand(long jobKey) - { - return new FailJobCommand(gatewayClient, asyncRetryStrategy, jobKey); - } + public IFailJobCommandStep1 NewFailCommand(long jobKey) + { + return new FailJobCommand(gatewayClient, asyncRetryStrategy, jobKey); + } - public IUpdateRetriesCommandStep1 NewUpdateRetriesCommand(long jobKey) - { - return new UpdateRetriesCommand(gatewayClient, asyncRetryStrategy, jobKey); - } + public IUpdateRetriesCommandStep1 NewUpdateRetriesCommand(long jobKey) + { + return new UpdateRetriesCommand(gatewayClient, asyncRetryStrategy, jobKey); + } - public IUpdateJobTimeoutCommandStep1 NewUpdateJobTimeoutCommand(long jobKey) - { - return new UpdateJobTimeoutCommand(gatewayClient, asyncRetryStrategy, jobKey); - } + public IUpdateJobTimeoutCommandStep1 NewUpdateJobTimeoutCommand(long jobKey) + { + return new UpdateJobTimeoutCommand(gatewayClient, asyncRetryStrategy, jobKey); + } - public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey) - { - return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy, jobKey); - } + public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey) + { + return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy, jobKey); + } - //////////////////////////////////////////////////////////////////////// - ///////////////////////////// Processes //////////////////////////////// - //////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////// + ///////////////////////////// Processes //////////////////////////////// + //////////////////////////////////////////////////////////////////////// - public IDeployResourceCommandStep1 NewDeployCommand() - { - return new DeployResourceCommand(gatewayClient, asyncRetryStrategy); - } + public IDeployResourceCommandStep1 NewDeployCommand() + { + return new DeployResourceCommand(gatewayClient, asyncRetryStrategy); + } - public IEvaluateDecisionCommandStep1 NewEvaluateDecisionCommand() - { - return new EvaluateDecisionCommand(gatewayClient, asyncRetryStrategy); - } + public IEvaluateDecisionCommandStep1 NewEvaluateDecisionCommand() + { + return new EvaluateDecisionCommand(gatewayClient, asyncRetryStrategy); + } - public ICreateProcessInstanceCommandStep1 NewCreateProcessInstanceCommand() - { - return new CreateProcessInstanceCommand(gatewayClient, asyncRetryStrategy); - } + public ICreateProcessInstanceCommandStep1 NewCreateProcessInstanceCommand() + { + return new CreateProcessInstanceCommand(gatewayClient, asyncRetryStrategy); + } - public ICancelProcessInstanceCommandStep1 NewCancelInstanceCommand(long processInstanceKey) - { - return new CancelProcessInstanceCommand(gatewayClient, asyncRetryStrategy, processInstanceKey); - } + public ICancelProcessInstanceCommandStep1 NewCancelInstanceCommand(long processInstanceKey) + { + return new CancelProcessInstanceCommand(gatewayClient, asyncRetryStrategy, processInstanceKey); + } - public ISetVariablesCommandStep1 NewSetVariablesCommand(long elementInstanceKey) - { - return new SetVariablesCommand(gatewayClient, asyncRetryStrategy, elementInstanceKey); - } + public ISetVariablesCommandStep1 NewSetVariablesCommand(long elementInstanceKey) + { + return new SetVariablesCommand(gatewayClient, asyncRetryStrategy, elementInstanceKey); + } - public IResolveIncidentCommandStep1 NewResolveIncidentCommand(long incidentKey) - { - return new ResolveIncidentCommand(gatewayClient, asyncRetryStrategy, incidentKey); - } + public IResolveIncidentCommandStep1 NewResolveIncidentCommand(long incidentKey) + { + return new ResolveIncidentCommand(gatewayClient, asyncRetryStrategy, incidentKey); + } - public IPublishMessageCommandStep1 NewPublishMessageCommand() - { - return new PublishMessageCommand(gatewayClient, asyncRetryStrategy); - } + public IPublishMessageCommandStep1 NewPublishMessageCommand() + { + return new PublishMessageCommand(gatewayClient, asyncRetryStrategy); + } - public IModifyProcessInstanceCommandStep1 NewModifyProcessInstanceCommand(long processInstanceKey) - { - return new ModifyProcessInstanceCommand(gatewayClient, asyncRetryStrategy, processInstanceKey); - } + public IModifyProcessInstanceCommandStep1 NewModifyProcessInstanceCommand(long processInstanceKey) + { + return new ModifyProcessInstanceCommand(gatewayClient, asyncRetryStrategy, processInstanceKey); + } - public ITopologyRequestStep1 TopologyRequest() => new TopologyRequestCommand(gatewayClient, asyncRetryStrategy); + public ITopologyRequestStep1 TopologyRequest() => new TopologyRequestCommand(gatewayClient, asyncRetryStrategy); - public void Dispose() + public void Dispose() + { + if (gatewayClient is ClosedGatewayClient) { - if (gatewayClient is ClosedGatewayClient) - { - return; - } - - gatewayClient = new ClosedGatewayClient(); - channelToGateway.Dispose(); + return; } - /// - /// Creates an new IZeebeClientBuilder. This builder need to be used to construct - /// a ZeebeClient. - /// - /// an builder to construct an ZeebeClient. - public static IZeebeClientBuilder Builder() - { - return new ZeebeClientBuilder(); - } + gatewayClient = new ClosedGatewayClient(); + channelToGateway.Dispose(); + } + + /// + /// Creates an new IZeebeClientBuilder. This builder need to be used to construct + /// a ZeebeClient. + /// + /// an builder to construct an ZeebeClient. + public static IZeebeClientBuilder Builder() + { + return new ZeebeClientBuilder(); } -} +} \ No newline at end of file