Skip to content

Commit

Permalink
Merge pull request #430 from serverlessworkflow/feat-local-function-c…
Browse files Browse the repository at this point in the history
…atalog

Added a new CustomFunction namespaced resource
  • Loading branch information
cdavernas authored Oct 21, 2024
2 parents c173756 + 7d27f01 commit 5cf98c0
Show file tree
Hide file tree
Showing 58 changed files with 1,888 additions and 66 deletions.
16 changes: 13 additions & 3 deletions src/api/Synapse.Api.Application/Services/CloudEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,21 @@ public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IConnectio
protected bool SupportsStreaming { get; private set; }

/// <inheritdoc/>
public virtual Task StartAsync(CancellationToken cancellationToken)
public virtual async Task StartAsync(CancellationToken cancellationToken)
{
if (options.Value.CloudEvents.Endpoint == null) logger.LogWarning("No endpoint configured for cloud events. Events will not be published.");
else _ = this.PublishEnqueuedEventsAsync();
var version = ((string)(this.Database.Execute("INFO", "server"))!).Split('\n').FirstOrDefault(line => line.StartsWith("redis_version:"))?[14..]?.Trim() ?? "undetermined";
try
{
this.Database.StreamInfo(SynapseDefaults.CloudEvents.Bus.StreamName);
try
{
await this.Database.StreamInfoAsync(SynapseDefaults.CloudEvents.Bus.StreamName).ConfigureAwait(false);
}
catch (RedisServerException ex) when (ex.Message.StartsWith("ERR no such key"))
{
this.Logger.LogWarning("The cloud event stream is currently unavailable, but it should be created when cloud events are published or when correlators are activated");
}
this.SupportsStreaming = true;
this.Logger.LogInformation("Redis server version '{version}' supports streaming commands. Streaming feature is enabled", version);
}
Expand All @@ -100,7 +107,10 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
this.SupportsStreaming = false;
this.Logger.LogInformation("Redis server version '{version}' does not support streaming commands. Streaming feature is emulated using lists", version);
}
return Task.CompletedTask;
catch(Exception ex)
{
this.Logger.LogWarning("An error occurred while starting the cloud event publisher, possibly affecting the server's ability to publish cloud events to correlators: {ex}", ex);
}
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Neuroglia.Serialization;
using ServerlessWorkflow.Sdk.IO;
using ServerlessWorkflow.Sdk.Models;
using Synapse.Api.Application.Configuration;
Expand All @@ -28,9 +29,11 @@ namespace Synapse.Api.Application.Services;
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
/// <param name="yamlSerializer">The service used to serialize/deserialize data to/from YAML</param>
/// <param name="workflowDefinitionReader">The service used to read <see cref="WorkflowDefinition"/>s</param>
/// <param name="options">The service used to access the current <see cref="ApiServerOptions"/></param>
public class WorkflowDatabaseInitializer(IServiceProvider serviceProvider, ILogger<WorkflowDatabaseInitializer> logger, IWorkflowDefinitionReader workflowDefinitionReader, IOptions<ApiServerOptions> options)
public class WorkflowDatabaseInitializer(IServiceProvider serviceProvider, ILogger<WorkflowDatabaseInitializer> logger, IJsonSerializer jsonSerializer, IYamlSerializer yamlSerializer, IWorkflowDefinitionReader workflowDefinitionReader, IOptions<ApiServerOptions> options)
: IHostedService
{

Expand All @@ -44,6 +47,16 @@ public class WorkflowDatabaseInitializer(IServiceProvider serviceProvider, ILogg
/// </summary>
protected ILogger Logger { get; } = logger;

/// <summary>
/// Gets the service used to serialize/deserialize data to/from JSON
/// </summary>
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;

/// <summary>
/// Gets the service used to serialize/deserialize data to/from YAML
/// </summary>
protected IYamlSerializer YamlSerializer { get; } = yamlSerializer;

/// <summary>
/// Gets the service used to read <see cref="WorkflowDefinition"/>s
/// </summary>
Expand Down Expand Up @@ -80,15 +93,80 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
this.Logger.LogWarning("The directory '{directory}' does not exist or cannot be found. Skipping static resource import", directory.FullName);
return;
}
this.Logger.LogInformation("Starting importing static resources from directory '{directory}'...", directory.FullName);
await this.ProvisionNamespacesAsync(resources, cancellationToken).ConfigureAwait(false);
await this.ProvisionWorkflowsAsync(resources, cancellationToken).ConfigureAwait(false);
await this.ProvisionFunctionsAsync(resources, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Provisions namespaces from statis resource files
/// </summary>
/// <param name="resources">The <see cref="IResourceRepository"/> used to manage <see cref="IResource"/>s</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ProvisionNamespacesAsync(IResourceRepository resources, CancellationToken cancellationToken)
{
var stopwatch = new Stopwatch();
var directory = new DirectoryInfo(Path.Combine(this.Options.Seeding.Directory, "namespaces"));
if (!directory.Exists) return;
this.Logger.LogInformation("Starting importing namespaces from directory '{directory}'...", directory.FullName);
var files = directory.GetFiles(this.Options.Seeding.FilePattern, SearchOption.AllDirectories).Where(f => f.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yml", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yaml", StringComparison.OrdinalIgnoreCase));
if (!files.Any())
{
this.Logger.LogWarning("No static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
this.Logger.LogWarning("No namespace static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
return;
}
stopwatch.Restart();
var count = 0;
foreach (var file in files)
{
try
{
var extension = file.FullName.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault();
var serializer = extension?.ToLowerInvariant() switch
{
"json" => (ITextSerializer)this.JsonSerializer,
"yml" or "yaml" => this.YamlSerializer,
_ => throw new NotSupportedException($"The specified extension '{extension}' is not supported for static resource files")
};
using var stream = file.OpenRead();
using var streamReader = new StreamReader(stream);
var text = await streamReader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
var ns = serializer.Deserialize<NamespaceDefinition>(text)!;
await resources.AddAsync(ns, false, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Successfully imported namespace '{namespace}' from file '{file}'", $"{ns.Metadata.Name}", file.FullName);
count++;
}
catch (Exception ex)
{
this.Logger.LogError("An error occurred while reading a namespace from file '{file}': {ex}", file.FullName, ex);
continue;
}
}
stopwatch.Stop();
this.Logger.LogInformation("Completed importing {count} namespaces in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
}

/// <summary>
/// Provisions workflows from statis resource files
/// </summary>
/// <param name="resources">The <see cref="IResourceRepository"/> used to manage <see cref="IResource"/>s</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ProvisionWorkflowsAsync(IResourceRepository resources, CancellationToken cancellationToken)
{
var stopwatch = new Stopwatch();
var directory = new DirectoryInfo(Path.Combine(this.Options.Seeding.Directory, "workflows"));
if (!directory.Exists) return;
this.Logger.LogInformation("Starting importing workflows from directory '{directory}'...", directory.FullName);
var files = directory.GetFiles(this.Options.Seeding.FilePattern, SearchOption.AllDirectories).Where(f => f.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yml", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yaml", StringComparison.OrdinalIgnoreCase));
if (!files.Any())
{
this.Logger.LogWarning("No workflow static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
return;
}
stopwatch.Restart();
var count = 0;
foreach (var file in files)
{
try
Expand All @@ -110,11 +188,6 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
Versions = [workflowDefinition]
}
};
if (await resources.GetAsync<Namespace>(workflow.GetNamespace()!, cancellationToken: cancellationToken).ConfigureAwait(false) == null)
{
await resources.AddAsync(new Namespace() { Metadata = new() { Name = workflow.GetNamespace()! } }, false, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Successfully created namespace '{namespace}'", workflow.GetNamespace());
}
await resources.AddAsync(workflow, false, cancellationToken).ConfigureAwait(false);
}
else
Expand All @@ -138,14 +211,63 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
this.Logger.LogInformation("Successfully imported workflow '{workflow}' from file '{file}'", $"{workflowDefinition.Document.Name}.{workflowDefinition.Document.Namespace}:{workflowDefinition.Document.Version}", file.FullName);
count++;
}
catch(Exception ex)
catch (Exception ex)
{
this.Logger.LogError("An error occurred while reading a workflow definition from file '{file}': {ex}", file.FullName, ex);
continue;
}
}
stopwatch.Stop();
this.Logger.LogInformation("Completed importing {count} static resources in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
this.Logger.LogInformation("Completed importing {count} workflows in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
}

/// <summary>
/// Provisions functions from statis resource files
/// </summary>
/// <param name="resources">The <see cref="IResourceRepository"/> used to manage <see cref="IResource"/>s</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ProvisionFunctionsAsync(IResourceRepository resources, CancellationToken cancellationToken)
{
var stopwatch = new Stopwatch();
var directory = new DirectoryInfo(Path.Combine(this.Options.Seeding.Directory, "functions"));
if (!directory.Exists) return;
this.Logger.LogInformation("Starting importing custom functions from directory '{directory}'...", directory.FullName);
var files = directory.GetFiles(this.Options.Seeding.FilePattern, SearchOption.AllDirectories).Where(f => f.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yml", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yaml", StringComparison.OrdinalIgnoreCase));
if (!files.Any())
{
this.Logger.LogWarning("No custom function static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
return;
}
stopwatch.Restart();
var count = 0;
foreach (var file in files)
{
try
{
var extension = file.FullName.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault();
var serializer = extension?.ToLowerInvariant() switch
{
"json" => (ITextSerializer)this.JsonSerializer,
"yml" or "yaml" => this.YamlSerializer,
_ => throw new NotSupportedException($"The specified extension '{extension}' is not supported for static resource files")
};
using var stream = file.OpenRead();
using var streamReader = new StreamReader(stream);
var text = await streamReader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
var func = serializer.Deserialize<CustomFunction>(text)!;
await resources.AddAsync(func, false, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Successfully imported custom function '{customFunction}' from file '{file}'", func.GetQualifiedName(), file.FullName);
count++;
}
catch (Exception ex)
{
this.Logger.LogError("An error occurred while reading a custom function from file '{file}': {ex}", file.FullName, ex);
continue;
}
}
stopwatch.Stop();
this.Logger.LogInformation("Completed importing {count} custom functions in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
5 changes: 5 additions & 0 deletions src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface ISynapseApiClient
/// </summary>
INamespacedResourceApiClient<Correlator> Correlators { get; }

/// <summary>
/// Gets the Synapse API used to manage <see cref="CustomFunction"/>s
/// </summary>
INamespacedResourceApiClient<CustomFunction> CustomFunctions { get; }

/// <summary>
/// Gets the Synapse API used to manage <see cref="Document"/>s
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public SynapseHttpApiClient(IServiceProvider serviceProvider, ILoggerFactory log
/// <inheritdoc/>
public INamespacedResourceApiClient<Correlator> Correlators { get; private set; } = null!;

/// <inheritdoc/>
public INamespacedResourceApiClient<CustomFunction> CustomFunctions { get; private set; } = null!;

/// <inheritdoc/>
public IDocumentApiClient Documents { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
28 changes: 28 additions & 0 deletions src/api/Synapse.Api.Http/Controllers/CustomFunctionsController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Synapse.Api.Http.Controllers;

/// <summary>
/// Represents the <see cref="NamespacedResourceController{TResource}"/> used to manage <see cref="CustomFunction"/>s
/// </summary>
/// <param name="mediator">The service used to mediate calls</param>
/// <param name="jsonSerializer">The service used to serialize/deserialize objects to/from JSON</param>
[Route("api/v1/custom-functions")]
public class CustomFunctionsController(IMediator mediator, IJsonSerializer jsonSerializer)
: NamespacedResourceController<CustomFunction>(mediator, jsonSerializer)
{



}
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<OutputType>Library</OutputType>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
2 changes: 1 addition & 1 deletion src/cli/Synapse.Cli/Synapse.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Loading

0 comments on commit 5cf98c0

Please sign in to comment.