Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a new CustomFunction namespaced resource #430

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/api/Synapse.Api.Application/Services/CloudEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,21 @@ public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IConnectio
protected bool SupportsStreaming { get; private set; }

/// <inheritdoc/>
public virtual Task StartAsync(CancellationToken cancellationToken)
public virtual async Task StartAsync(CancellationToken cancellationToken)
{
if (options.Value.CloudEvents.Endpoint == null) logger.LogWarning("No endpoint configured for cloud events. Events will not be published.");
else _ = this.PublishEnqueuedEventsAsync();
var version = ((string)(this.Database.Execute("INFO", "server"))!).Split('\n').FirstOrDefault(line => line.StartsWith("redis_version:"))?[14..]?.Trim() ?? "undetermined";
try
{
this.Database.StreamInfo(SynapseDefaults.CloudEvents.Bus.StreamName);
try
{
await this.Database.StreamInfoAsync(SynapseDefaults.CloudEvents.Bus.StreamName).ConfigureAwait(false);
}
catch (RedisServerException ex) when (ex.Message.StartsWith("ERR no such key"))
{
this.Logger.LogWarning("The cloud event stream is currently unavailable, but it should be created when cloud events are published or when correlators are activated");
}
this.SupportsStreaming = true;
this.Logger.LogInformation("Redis server version '{version}' supports streaming commands. Streaming feature is enabled", version);
}
Expand All @@ -100,7 +107,10 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
this.SupportsStreaming = false;
this.Logger.LogInformation("Redis server version '{version}' does not support streaming commands. Streaming feature is emulated using lists", version);
}
return Task.CompletedTask;
catch(Exception ex)
{
this.Logger.LogWarning("An error occurred while starting the cloud event publisher, possibly affecting the server's ability to publish cloud events to correlators: {ex}", ex);
}
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Neuroglia.Serialization;
using ServerlessWorkflow.Sdk.IO;
using ServerlessWorkflow.Sdk.Models;
using Synapse.Api.Application.Configuration;
Expand All @@ -28,9 +29,11 @@ namespace Synapse.Api.Application.Services;
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
/// <param name="yamlSerializer">The service used to serialize/deserialize data to/from YAML</param>
/// <param name="workflowDefinitionReader">The service used to read <see cref="WorkflowDefinition"/>s</param>
/// <param name="options">The service used to access the current <see cref="ApiServerOptions"/></param>
public class WorkflowDatabaseInitializer(IServiceProvider serviceProvider, ILogger<WorkflowDatabaseInitializer> logger, IWorkflowDefinitionReader workflowDefinitionReader, IOptions<ApiServerOptions> options)
public class WorkflowDatabaseInitializer(IServiceProvider serviceProvider, ILogger<WorkflowDatabaseInitializer> logger, IJsonSerializer jsonSerializer, IYamlSerializer yamlSerializer, IWorkflowDefinitionReader workflowDefinitionReader, IOptions<ApiServerOptions> options)
: IHostedService
{

Expand All @@ -44,6 +47,16 @@ public class WorkflowDatabaseInitializer(IServiceProvider serviceProvider, ILogg
/// </summary>
protected ILogger Logger { get; } = logger;

/// <summary>
/// Gets the service used to serialize/deserialize data to/from JSON
/// </summary>
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;

/// <summary>
/// Gets the service used to serialize/deserialize data to/from YAML
/// </summary>
protected IYamlSerializer YamlSerializer { get; } = yamlSerializer;

/// <summary>
/// Gets the service used to read <see cref="WorkflowDefinition"/>s
/// </summary>
Expand Down Expand Up @@ -80,15 +93,80 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
this.Logger.LogWarning("The directory '{directory}' does not exist or cannot be found. Skipping static resource import", directory.FullName);
return;
}
this.Logger.LogInformation("Starting importing static resources from directory '{directory}'...", directory.FullName);
await this.ProvisionNamespacesAsync(resources, cancellationToken).ConfigureAwait(false);
await this.ProvisionWorkflowsAsync(resources, cancellationToken).ConfigureAwait(false);
await this.ProvisionFunctionsAsync(resources, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Provisions namespaces from statis resource files
/// </summary>
/// <param name="resources">The <see cref="IResourceRepository"/> used to manage <see cref="IResource"/>s</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ProvisionNamespacesAsync(IResourceRepository resources, CancellationToken cancellationToken)
{
var stopwatch = new Stopwatch();
var directory = new DirectoryInfo(Path.Combine(this.Options.Seeding.Directory, "namespaces"));
if (!directory.Exists) return;
this.Logger.LogInformation("Starting importing namespaces from directory '{directory}'...", directory.FullName);
var files = directory.GetFiles(this.Options.Seeding.FilePattern, SearchOption.AllDirectories).Where(f => f.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yml", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yaml", StringComparison.OrdinalIgnoreCase));
if (!files.Any())
{
this.Logger.LogWarning("No static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
this.Logger.LogWarning("No namespace static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
return;
}
stopwatch.Restart();
var count = 0;
foreach (var file in files)
{
try
{
var extension = file.FullName.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault();
var serializer = extension?.ToLowerInvariant() switch
{
"json" => (ITextSerializer)this.JsonSerializer,
"yml" or "yaml" => this.YamlSerializer,
_ => throw new NotSupportedException($"The specified extension '{extension}' is not supported for static resource files")
};
using var stream = file.OpenRead();
using var streamReader = new StreamReader(stream);
var text = await streamReader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
var ns = serializer.Deserialize<NamespaceDefinition>(text)!;
await resources.AddAsync(ns, false, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Successfully imported namespace '{namespace}' from file '{file}'", $"{ns.Metadata.Name}", file.FullName);
count++;
}
catch (Exception ex)
{
this.Logger.LogError("An error occurred while reading a namespace from file '{file}': {ex}", file.FullName, ex);
continue;
}
}
stopwatch.Stop();
this.Logger.LogInformation("Completed importing {count} namespaces in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
}

/// <summary>
/// Provisions workflows from statis resource files
/// </summary>
/// <param name="resources">The <see cref="IResourceRepository"/> used to manage <see cref="IResource"/>s</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ProvisionWorkflowsAsync(IResourceRepository resources, CancellationToken cancellationToken)
{
var stopwatch = new Stopwatch();
var directory = new DirectoryInfo(Path.Combine(this.Options.Seeding.Directory, "workflows"));
if (!directory.Exists) return;
this.Logger.LogInformation("Starting importing workflows from directory '{directory}'...", directory.FullName);
var files = directory.GetFiles(this.Options.Seeding.FilePattern, SearchOption.AllDirectories).Where(f => f.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yml", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yaml", StringComparison.OrdinalIgnoreCase));
if (!files.Any())
{
this.Logger.LogWarning("No workflow static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
return;
}
stopwatch.Restart();
var count = 0;
foreach (var file in files)
{
try
Expand All @@ -110,11 +188,6 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
Versions = [workflowDefinition]
}
};
if (await resources.GetAsync<Namespace>(workflow.GetNamespace()!, cancellationToken: cancellationToken).ConfigureAwait(false) == null)
{
await resources.AddAsync(new Namespace() { Metadata = new() { Name = workflow.GetNamespace()! } }, false, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Successfully created namespace '{namespace}'", workflow.GetNamespace());
}
await resources.AddAsync(workflow, false, cancellationToken).ConfigureAwait(false);
}
else
Expand All @@ -138,14 +211,63 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
this.Logger.LogInformation("Successfully imported workflow '{workflow}' from file '{file}'", $"{workflowDefinition.Document.Name}.{workflowDefinition.Document.Namespace}:{workflowDefinition.Document.Version}", file.FullName);
count++;
}
catch(Exception ex)
catch (Exception ex)
{
this.Logger.LogError("An error occurred while reading a workflow definition from file '{file}': {ex}", file.FullName, ex);
continue;
}
}
stopwatch.Stop();
this.Logger.LogInformation("Completed importing {count} static resources in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
this.Logger.LogInformation("Completed importing {count} workflows in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
}

/// <summary>
/// Provisions functions from statis resource files
/// </summary>
/// <param name="resources">The <see cref="IResourceRepository"/> used to manage <see cref="IResource"/>s</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ProvisionFunctionsAsync(IResourceRepository resources, CancellationToken cancellationToken)
{
var stopwatch = new Stopwatch();
var directory = new DirectoryInfo(Path.Combine(this.Options.Seeding.Directory, "functions"));
if (!directory.Exists) return;
this.Logger.LogInformation("Starting importing custom functions from directory '{directory}'...", directory.FullName);
var files = directory.GetFiles(this.Options.Seeding.FilePattern, SearchOption.AllDirectories).Where(f => f.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yml", StringComparison.OrdinalIgnoreCase) || f.FullName.EndsWith(".yaml", StringComparison.OrdinalIgnoreCase));
if (!files.Any())
{
this.Logger.LogWarning("No custom function static resource files matching search pattern '{pattern}' found in directory '{directory}'. Skipping import.", this.Options.Seeding.FilePattern, directory.FullName);
return;
}
stopwatch.Restart();
var count = 0;
foreach (var file in files)
{
try
{
var extension = file.FullName.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault();
var serializer = extension?.ToLowerInvariant() switch
{
"json" => (ITextSerializer)this.JsonSerializer,
"yml" or "yaml" => this.YamlSerializer,
_ => throw new NotSupportedException($"The specified extension '{extension}' is not supported for static resource files")
};
using var stream = file.OpenRead();
using var streamReader = new StreamReader(stream);
var text = await streamReader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
var func = serializer.Deserialize<CustomFunction>(text)!;
await resources.AddAsync(func, false, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Successfully imported custom function '{customFunction}' from file '{file}'", func.GetQualifiedName(), file.FullName);
count++;
}
catch (Exception ex)
{
this.Logger.LogError("An error occurred while reading a custom function from file '{file}': {ex}", file.FullName, ex);
continue;
}
}
stopwatch.Stop();
this.Logger.LogInformation("Completed importing {count} custom functions in {ms} milliseconds", count, stopwatch.Elapsed.TotalMilliseconds);
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
5 changes: 5 additions & 0 deletions src/api/Synapse.Api.Client.Core/Services/ISynapseApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface ISynapseApiClient
/// </summary>
INamespacedResourceApiClient<Correlator> Correlators { get; }

/// <summary>
/// Gets the Synapse API used to manage <see cref="CustomFunction"/>s
/// </summary>
INamespacedResourceApiClient<CustomFunction> CustomFunctions { get; }

/// <summary>
/// Gets the Synapse API used to manage <see cref="Document"/>s
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public SynapseHttpApiClient(IServiceProvider serviceProvider, ILoggerFactory log
/// <inheritdoc/>
public INamespacedResourceApiClient<Correlator> Correlators { get; private set; } = null!;

/// <inheritdoc/>
public INamespacedResourceApiClient<CustomFunction> CustomFunctions { get; private set; } = null!;

/// <inheritdoc/>
public IDocumentApiClient Documents { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
28 changes: 28 additions & 0 deletions src/api/Synapse.Api.Http/Controllers/CustomFunctionsController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Synapse.Api.Http.Controllers;

/// <summary>
/// Represents the <see cref="NamespacedResourceController{TResource}"/> used to manage <see cref="CustomFunction"/>s
/// </summary>
/// <param name="mediator">The service used to mediate calls</param>
/// <param name="jsonSerializer">The service used to serialize/deserialize objects to/from JSON</param>
[Route("api/v1/custom-functions")]
public class CustomFunctionsController(IMediator mediator, IJsonSerializer jsonSerializer)
: NamespacedResourceController<CustomFunction>(mediator, jsonSerializer)
{



}
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<OutputType>Library</OutputType>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
2 changes: 1 addition & 1 deletion src/cli/Synapse.Cli/Synapse.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<NeutralLanguage>en</NeutralLanguage>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha3.3</VersionSuffix>
<VersionSuffix>alpha4</VersionSuffix>
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
<FileVersion>$(VersionPrefix)</FileVersion>
<Authors>The Synapse Authors</Authors>
Expand Down
Loading
Loading