From 81d8ee6bd781039875a0305eacbf0e782fe8d29a Mon Sep 17 00:00:00 2001 From: aelassas Date: Sat, 17 Aug 2024 17:12:01 +0100 Subject: [PATCH] Fix issue #81 --- Wexflow.sln | 30 ++ src/net/Wexflow.Core/Workflow.cs | 491 +++++++++--------- src/net/Wexflow.Server.Test/App.config | 11 + src/net/Wexflow.Server.Test/Program.cs | 40 ++ .../Properties/AssemblyInfo.cs | 36 ++ .../Wexflow.Server.Test.csproj | 100 ++++ src/net/Wexflow.Tasks.SevenZip/SevenZip.cs | 1 + src/netcore/Wexflow.Core/Workflow.cs | 488 ++++++++--------- src/netcore/Wexflow.Server.Test/Program.cs | 36 ++ .../Wexflow.Server.Test.csproj | 25 + .../Wexflow.Server.Test/appsettings.json | 5 + 11 files changed, 789 insertions(+), 474 deletions(-) create mode 100644 src/net/Wexflow.Server.Test/App.config create mode 100644 src/net/Wexflow.Server.Test/Program.cs create mode 100644 src/net/Wexflow.Server.Test/Properties/AssemblyInfo.cs create mode 100644 src/net/Wexflow.Server.Test/Wexflow.Server.Test.csproj create mode 100644 src/netcore/Wexflow.Server.Test/Program.cs create mode 100644 src/netcore/Wexflow.Server.Test/Wexflow.Server.Test.csproj create mode 100644 src/netcore/Wexflow.Server.Test/appsettings.json diff --git a/Wexflow.sln b/Wexflow.sln index ee60139c..a8bc29a6 100644 --- a/Wexflow.sln +++ b/Wexflow.sln @@ -594,6 +594,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution .editorconfig = .editorconfig EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wexflow.Server.Test", "src\net\Wexflow.Server.Test\Wexflow.Server.Test.csproj", "{D9572452-7118-4570-ACF0-7BCAF6DC92B3}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wexflow.Server.Test", "src\netcore\Wexflow.Server.Test\Wexflow.Server.Test.csproj", "{40F36818-10CA-4EEC-A38F-A8BA43332BB4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -3796,6 +3800,30 @@ Global {857EFFEA-7E97-48C9-8FA6-6FDFA46FF93E}.Release|x64.Build.0 = Release|x64 {857EFFEA-7E97-48C9-8FA6-6FDFA46FF93E}.Release|x86.ActiveCfg = Release|x86 {857EFFEA-7E97-48C9-8FA6-6FDFA46FF93E}.Release|x86.Build.0 = Release|x86 + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Debug|x64.ActiveCfg = Debug|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Debug|x64.Build.0 = Debug|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Debug|x86.ActiveCfg = Debug|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Debug|x86.Build.0 = Debug|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Release|Any CPU.Build.0 = Release|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Release|x64.ActiveCfg = Release|x64 + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Release|x64.Build.0 = Release|x64 + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Release|x86.ActiveCfg = Release|Any CPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3}.Release|x86.Build.0 = Release|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Debug|x64.ActiveCfg = Debug|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Debug|x64.Build.0 = Debug|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Debug|x86.ActiveCfg = Debug|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Debug|x86.Build.0 = Debug|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Release|Any CPU.Build.0 = Release|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Release|x64.ActiveCfg = Release|x64 + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Release|x64.Build.0 = Release|x64 + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Release|x86.ActiveCfg = Release|Any CPU + {40F36818-10CA-4EEC-A38F-A8BA43332BB4}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -4082,6 +4110,8 @@ Global {DCA26F4E-4E47-4945-A564-8CECBC864261} = {16281492-0F39-4C01-9FAA-281650F28ADB} {2F1EC7CE-A0EC-4F63-976C-83E75CDB725E} = {74678727-8612-4E24-B9FB-47BD5DAD5573} {857EFFEA-7E97-48C9-8FA6-6FDFA46FF93E} = {74678727-8612-4E24-B9FB-47BD5DAD5573} + {D9572452-7118-4570-ACF0-7BCAF6DC92B3} = {432B6823-FB53-4893-A5B0-FA49E9593275} + {40F36818-10CA-4EEC-A38F-A8BA43332BB4} = {49D1A283-E9F1-42BD-8A15-8BD1CCB5B775} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {694D69C8-AA13-4F54-BAAA-5B967F1A8F9C} diff --git a/src/net/Wexflow.Core/Workflow.cs b/src/net/Wexflow.Core/Workflow.cs index 8026a155..80cecad9 100644 --- a/src/net/Wexflow.Core/Workflow.cs +++ b/src/net/Wexflow.Core/Workflow.cs @@ -950,269 +950,281 @@ public bool StartSync(string startedBy, Guid instanceId, ref bool resultWarning) { var resultSuccess = true; - StartedOn = DateTime.Now; - StartedBy = startedBy; - InstanceId = instanceId; - Jobs.Add(InstanceId, this); - - // - // Parse the workflow definition (Global variables and local variables.) - // - var dest = Parse(Xml); - Load(dest); + try + { + lock (this) + { + StartedOn = DateTime.Now; + StartedBy = startedBy; + InstanceId = instanceId; + Jobs.Add(InstanceId, this); - _stopCalled = false; + // + // Parse the workflow definition (Global variables and local variables.) + // + var dest = Parse(Xml); + Load(dest); - Logs.Clear(); + _stopCalled = false; - if (WexflowEngine.LogLevel != LogLevel.None) - { - var msg = $"{LogTag} Workflow started - Instance Id: {InstanceId}"; - Logger.Info(msg); - Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} INFO - {msg}"); - } + Logs.Clear(); - Database.IncrementRunningCount(); + if (WexflowEngine.LogLevel != LogLevel.None) + { + var msg = $"{LogTag} Workflow started - Instance Id: {InstanceId}"; + Logger.Info(msg); + Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} INFO - {msg}"); + } - var entry = Database.GetEntry(Id, InstanceId); - if (entry == null) - { - var newEntry = new Entry - { - WorkflowId = Id, - JobId = InstanceId.ToString(), - Name = Name, - LaunchType = (Db.LaunchType)(int)LaunchType, - Description = Description, - Status = Db.Status.Running, - StatusDate = DateTime.Now, - Logs = string.Join("\r\n", Logs) - }; - Database.InsertEntry(newEntry); - } - else - { - entry.Status = Db.Status.Running; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - } - entry = Database.GetEntry(Id, InstanceId); + Database.IncrementRunningCount(); - _historyEntry = new HistoryEntry - { - WorkflowId = Id, - Name = Name, - LaunchType = (Db.LaunchType)(int)LaunchType, - Description = Description - }; + var entry = Database.GetEntry(Id, InstanceId); + if (entry == null) + { + var newEntry = new Entry + { + WorkflowId = Id, + JobId = InstanceId.ToString(), + Name = Name, + LaunchType = (Db.LaunchType)(int)LaunchType, + Description = Description, + Status = Db.Status.Running, + StatusDate = DateTime.Now, + Logs = string.Join("\r\n", Logs) + }; + Database.InsertEntry(newEntry); + } + else + { + entry.Status = Db.Status.Running; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + } + entry = Database.GetEntry(Id, InstanceId); - try - { - IsRunning = true; - IsRejected = false; + _historyEntry = new HistoryEntry + { + WorkflowId = Id, + Name = Name, + LaunchType = (Db.LaunchType)(int)LaunchType, + Description = Description + }; - // Create the temp folder - CreateTempFolder(); + try + { + IsRunning = true; + IsRejected = false; - // Run the tasks - if (ExecutionGraph == null) - { - var success = true; - var warning = false; - var error = true; - RunSequentialTasks(Tasks, ref success, ref warning, ref error); + // Create the temp folder + CreateTempFolder(); - if (!_stopCalled) - { - if (IsRejected) + // Run the tasks + if (ExecutionGraph == null) { - LogWorkflowFinished(); - Database.IncrementRejectedCount(); - entry.Status = Db.Status.Rejected; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Rejected; + var success = true; + var warning = false; + var error = true; + RunSequentialTasks(Tasks, ref success, ref warning, ref error); + + if (!_stopCalled) + { + if (IsRejected) + { + LogWorkflowFinished(); + Database.IncrementRejectedCount(); + entry.Status = Db.Status.Rejected; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Rejected; + } + else + { + if (success) + { + LogWorkflowFinished(); + Database.IncrementDoneCount(); + entry.Status = Db.Status.Done; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Done; + } + else if (warning) + { + LogWorkflowFinished(); + Database.IncrementWarningCount(); + entry.Status = Db.Status.Warning; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Warning; + resultWarning = true; + } + else if (error) + { + LogWorkflowFinished(); + Database.IncrementFailedCount(); + entry.Status = Db.Status.Failed; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Failed; + resultSuccess = false; + } + } + } } else { - if (success) - { - LogWorkflowFinished(); - Database.IncrementDoneCount(); - entry.Status = Db.Status.Done; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Done; - } - else if (warning) - { - LogWorkflowFinished(); - Database.IncrementWarningCount(); - entry.Status = Db.Status.Warning; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Warning; - resultWarning = true; - } - else if (error) + var status = RunTasks(ExecutionGraph.Nodes, Tasks, false); + + if (!_stopCalled) { - LogWorkflowFinished(); - Database.IncrementFailedCount(); - entry.Status = Db.Status.Failed; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Failed; - resultSuccess = false; + switch (status) + { + case Status.Success: + if (ExecutionGraph.OnSuccess != null) + { + var successTasks = NodesToTasks(ExecutionGraph.OnSuccess.Nodes); + _ = RunTasks(ExecutionGraph.OnSuccess.Nodes, successTasks, false); + } + LogWorkflowFinished(); + Database.IncrementDoneCount(); + entry.Status = Db.Status.Done; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Done; + break; + case Status.Warning: + if (ExecutionGraph.OnWarning != null) + { + var warningTasks = NodesToTasks(ExecutionGraph.OnWarning.Nodes); + _ = RunTasks(ExecutionGraph.OnWarning.Nodes, warningTasks, false); + } + LogWorkflowFinished(); + Database.IncrementWarningCount(); + entry.Status = Db.Status.Warning; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Warning; + resultWarning = true; + break; + case Status.Error: + if (ExecutionGraph.OnError != null) + { + var errorTasks = NodesToTasks(ExecutionGraph.OnError.Nodes); + _ = RunTasks(ExecutionGraph.OnError.Nodes, errorTasks, false); + } + LogWorkflowFinished(); + Database.IncrementFailedCount(); + entry.Status = Db.Status.Failed; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Failed; + resultSuccess = false; + break; + case Status.Rejected: + if (ExecutionGraph.OnRejected != null) + { + var rejectedTasks = NodesToTasks(ExecutionGraph.OnRejected.Nodes); + _ = RunTasks(ExecutionGraph.OnRejected.Nodes, rejectedTasks, true); + } + LogWorkflowFinished(); + Database.IncrementRejectedCount(); + entry.Status = Db.Status.Rejected; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Rejected; + break; + default: + break; + } } } - } - } - else - { - var status = RunTasks(ExecutionGraph.Nodes, Tasks, false); - if (!_stopCalled) + if (!_stopCalled) + { + _historyEntry.StatusDate = DateTime.Now; + _historyEntry.Logs = string.Join("\r\n", Logs); + Database.InsertHistoryEntry(_historyEntry); + Database.DecrementRunningCount(); + } + } + catch (ThreadAbortException) + { + _stopCalled = true; + } + catch (Exception e) { - switch (status) + if (WexflowEngine.LogLevel != LogLevel.None) { - case Status.Success: - if (ExecutionGraph.OnSuccess != null) - { - var successTasks = NodesToTasks(ExecutionGraph.OnSuccess.Nodes); - _ = RunTasks(ExecutionGraph.OnSuccess.Nodes, successTasks, false); - } - LogWorkflowFinished(); - Database.IncrementDoneCount(); - entry.Status = Db.Status.Done; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Done; - break; - case Status.Warning: - if (ExecutionGraph.OnWarning != null) - { - var warningTasks = NodesToTasks(ExecutionGraph.OnWarning.Nodes); - _ = RunTasks(ExecutionGraph.OnWarning.Nodes, warningTasks, false); - } - LogWorkflowFinished(); - Database.IncrementWarningCount(); - entry.Status = Db.Status.Warning; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Warning; - resultWarning = true; - break; - case Status.Error: - if (ExecutionGraph.OnError != null) - { - var errorTasks = NodesToTasks(ExecutionGraph.OnError.Nodes); - _ = RunTasks(ExecutionGraph.OnError.Nodes, errorTasks, false); - } - LogWorkflowFinished(); - Database.IncrementFailedCount(); - entry.Status = Db.Status.Failed; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Failed; - resultSuccess = false; - break; - case Status.Rejected: - if (ExecutionGraph.OnRejected != null) - { - var rejectedTasks = NodesToTasks(ExecutionGraph.OnRejected.Nodes); - _ = RunTasks(ExecutionGraph.OnRejected.Nodes, rejectedTasks, true); - } - LogWorkflowFinished(); - Database.IncrementRejectedCount(); - entry.Status = Db.Status.Rejected; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Rejected; - break; - default: - break; + var emsg = $"An error occured while running the workflow. Error: {this}"; + Logger.Error(emsg, e); + Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} ERROR - {emsg}\r\n{e}"); } + Database.DecrementRunningCount(); + Database.IncrementFailedCount(); + entry.Status = Db.Status.Failed; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Failed; + _historyEntry.StatusDate = DateTime.Now; + _historyEntry.Logs = string.Join("\r\n", Logs); + Database.InsertHistoryEntry(_historyEntry); } - } - - if (!_stopCalled) - { - _historyEntry.StatusDate = DateTime.Now; - _historyEntry.Logs = string.Join("\r\n", Logs); - Database.InsertHistoryEntry(_historyEntry); - Database.DecrementRunningCount(); - } - } - catch (ThreadAbortException) - { - _stopCalled = true; - } - catch (Exception e) - { - if (WexflowEngine.LogLevel != LogLevel.None) - { - var emsg = $"An error occured while running the workflow. Error: {this}"; - Logger.Error(emsg, e); - Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} ERROR - {emsg}\r\n{e}"); - } - Database.DecrementRunningCount(); - Database.IncrementFailedCount(); - entry.Status = Db.Status.Failed; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Failed; - _historyEntry.StatusDate = DateTime.Now; - _historyEntry.Logs = string.Join("\r\n", Logs); - Database.InsertHistoryEntry(_historyEntry); - } - finally - { - // Cleanup - if (!_stopCalled) - { - Logs.Clear(); - } - foreach (var files in FilesPerTask.Values) - { - files.Clear(); - } + finally + { + // Cleanup + if (!_stopCalled) + { + Logs.Clear(); + } + foreach (var files in FilesPerTask.Values) + { + files.Clear(); + } - foreach (var entities in EntitiesPerTask.Values) - { - entities.Clear(); - } + foreach (var entities in EntitiesPerTask.Values) + { + entities.Clear(); + } - IsRunning = false; - IsRejected = false; - GC.Collect(); + IsRunning = false; + IsRejected = false; + GC.Collect(); - JobId = ++ParallelJobId; - _ = Jobs.Remove(InstanceId); + JobId = ++ParallelJobId; + _ = Jobs.Remove(InstanceId); - if (_jobsQueue.Count > 0) - { - var job = _jobsQueue.Dequeue(); - _ = job.Workflow.StartAsync(startedBy); - } - else - { - if (!_stopCalled) - { - Load(Xml); // Reload the original workflow + if (_jobsQueue.Count > 0) + { + var job = _jobsQueue.Dequeue(); + _ = job.Workflow.StartAsync(startedBy); + } + else + { + if (!_stopCalled) + { + Load(Xml); // Reload the original workflow + } + RestVariables.Clear(); + } } - RestVariables.Clear(); + + return resultSuccess; + } } + catch (ThreadInterruptedException) + { + } return resultSuccess; } @@ -1699,10 +1711,15 @@ public bool Stop(string stoppedBy) Logs.Clear(); _ = Jobs.Remove(InstanceId); - if (_jobsQueue.Count > 0) + //if (_jobsQueue.Count > 0) + //{ + // var job = _jobsQueue.Dequeue(); + // _ = job.Workflow.StartAsync(StartedBy); + //} + + foreach (var job in _jobsQueue) { - var job = _jobsQueue.Dequeue(); - _ = job.Workflow.StartAsync(StartedBy); + _ = job.Workflow.Stop(stoppedBy); } Load(Xml); // Reload the original workflow diff --git a/src/net/Wexflow.Server.Test/App.config b/src/net/Wexflow.Server.Test/App.config new file mode 100644 index 00000000..e186c537 --- /dev/null +++ b/src/net/Wexflow.Server.Test/App.config @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/src/net/Wexflow.Server.Test/Program.cs b/src/net/Wexflow.Server.Test/Program.cs new file mode 100644 index 00000000..4b62ca56 --- /dev/null +++ b/src/net/Wexflow.Server.Test/Program.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Configuration; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Wexflow.Core.Service.Client; + +namespace Wexflow.Server.Test +{ + internal class Program + { + static void Main(string[] args) + { + var client = new WexflowServiceClient(ConfigurationManager.AppSettings["WexflowWebServiceUri"]); + var username = ConfigurationManager.AppSettings["Username"]; + var password = ConfigurationManager.AppSettings["Password"]; + + Action startWorkflow = () => + { + Thread.CurrentThread.IsBackground = true; + var jobId = client.StartWorkflow(41, username, password); + Console.WriteLine(jobId); + }; + + new Thread(() => + { + startWorkflow(); + }).Start(); + + new Thread(() => + { + startWorkflow(); + }).Start(); + + Console.ReadKey(); + } + } +} diff --git a/src/net/Wexflow.Server.Test/Properties/AssemblyInfo.cs b/src/net/Wexflow.Server.Test/Properties/AssemblyInfo.cs new file mode 100644 index 00000000..27cf2a56 --- /dev/null +++ b/src/net/Wexflow.Server.Test/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// Les informations générales relatives à un assembly dépendent de +// l'ensemble d'attributs suivant. Changez les valeurs de ces attributs pour modifier les informations +// associées à un assembly. +[assembly: AssemblyTitle("Wexflow.Server.Test")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Wexflow.Server.Test")] +[assembly: AssemblyCopyright("Copyright © 2024")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// L'affectation de la valeur false à ComVisible rend les types invisibles dans cet assembly +// aux composants COM. Si vous devez accéder à un type dans cet assembly à partir de +// COM, affectez la valeur true à l'attribut ComVisible sur ce type. +[assembly: ComVisible(false)] + +// Le GUID suivant est pour l'ID de la typelib si ce projet est exposé à COM +[assembly: Guid("d9572452-7118-4570-acf0-7bcaf6dc92b3")] + +// Les informations de version pour un assembly se composent des quatre valeurs suivantes : +// +// Version principale +// Version secondaire +// Numéro de build +// Révision +// +// Vous pouvez spécifier toutes les valeurs ou indiquer les numéros de build et de révision par défaut +// en utilisant '*', comme indiqué ci-dessous : +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/net/Wexflow.Server.Test/Wexflow.Server.Test.csproj b/src/net/Wexflow.Server.Test/Wexflow.Server.Test.csproj new file mode 100644 index 00000000..86b85b58 --- /dev/null +++ b/src/net/Wexflow.Server.Test/Wexflow.Server.Test.csproj @@ -0,0 +1,100 @@ + + + + + Debug + AnyCPU + {D9572452-7118-4570-ACF0-7BCAF6DC92B3} + Exe + Wexflow.Server.Test + Wexflow.Server.Test + v4.8 + 512 + true + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + true + bin\x64\Debug\ + DEBUG;TRACE + full + x64 + 7.3 + prompt + true + + + bin\x64\Release\ + TRACE + true + pdbonly + x64 + 7.3 + prompt + true + + + true + bin\x86\Debug\ + DEBUG;TRACE + full + x86 + 7.3 + prompt + true + + + bin\x86\Release\ + TRACE + true + pdbonly + x86 + 7.3 + prompt + true + + + + + + + + + + + + + + + + + + + {3cfa5f19-d8e1-4af8-9b6a-dcb959c035db} + Wexflow.Core.Service.Client + + + + + + + \ No newline at end of file diff --git a/src/net/Wexflow.Tasks.SevenZip/SevenZip.cs b/src/net/Wexflow.Tasks.SevenZip/SevenZip.cs index da47e24d..553bfe62 100644 --- a/src/net/Wexflow.Tasks.SevenZip/SevenZip.cs +++ b/src/net/Wexflow.Tasks.SevenZip/SevenZip.cs @@ -2,6 +2,7 @@ using System; using System.IO; using System.Linq; +using System.Reflection; using System.Threading; using System.Xml.Linq; using Wexflow.Core; diff --git a/src/netcore/Wexflow.Core/Workflow.cs b/src/netcore/Wexflow.Core/Workflow.cs index 34c3b9e9..1c002dbd 100644 --- a/src/netcore/Wexflow.Core/Workflow.cs +++ b/src/netcore/Wexflow.Core/Workflow.cs @@ -1003,269 +1003,278 @@ public bool StartSync(string startedBy, Guid instanceId, ref bool resultWarning) { var resultSuccess = true; - StartedOn = DateTime.Now; - StartedBy = startedBy; - InstanceId = instanceId; - Jobs.Add(InstanceId, this); - - // - // Parse the workflow definition (Global variables and local variables.) - // - var dest = Parse(Xml); - Load(dest); + try + { + lock (this) + { + StartedOn = DateTime.Now; + StartedBy = startedBy; + InstanceId = instanceId; + Jobs.Add(InstanceId, this); - _stopCalled = false; + // + // Parse the workflow definition (Global variables and local variables.) + // + var dest = Parse(Xml); + Load(dest); - Logs.Clear(); + _stopCalled = false; - if (WexflowEngine.LogLevel != LogLevel.None) - { - var msg = $"{LogTag} Workflow started - Instance Id: {InstanceId}"; - Logger.Info(msg); - Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} INFO - {msg}"); - } + Logs.Clear(); - Database.IncrementRunningCount(); + if (WexflowEngine.LogLevel != LogLevel.None) + { + var msg = $"{LogTag} Workflow started - Instance Id: {InstanceId}"; + Logger.Info(msg); + Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} INFO - {msg}"); + } - var entry = Database.GetEntry(Id, InstanceId); - if (entry == null) - { - Entry newEntry = new() - { - WorkflowId = Id, - JobId = InstanceId.ToString(), - Name = Name, - LaunchType = (Db.LaunchType)(int)LaunchType, - Description = Description, - Status = Db.Status.Running, - StatusDate = DateTime.Now, - Logs = string.Join("\r\n", Logs) - }; - Database.InsertEntry(newEntry); - } - else - { - entry.Status = Db.Status.Running; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - } - entry = Database.GetEntry(Id, InstanceId); + Database.IncrementRunningCount(); - _historyEntry = new HistoryEntry - { - WorkflowId = Id, - Name = Name, - LaunchType = (Db.LaunchType)(int)LaunchType, - Description = Description - }; + var entry = Database.GetEntry(Id, InstanceId); + if (entry == null) + { + Entry newEntry = new() + { + WorkflowId = Id, + JobId = InstanceId.ToString(), + Name = Name, + LaunchType = (Db.LaunchType)(int)LaunchType, + Description = Description, + Status = Db.Status.Running, + StatusDate = DateTime.Now, + Logs = string.Join("\r\n", Logs) + }; + Database.InsertEntry(newEntry); + } + else + { + entry.Status = Db.Status.Running; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + } + entry = Database.GetEntry(Id, InstanceId); - try - { - IsRunning = true; - IsRejected = false; + _historyEntry = new HistoryEntry + { + WorkflowId = Id, + Name = Name, + LaunchType = (Db.LaunchType)(int)LaunchType, + Description = Description + }; - // Create the temp folder - CreateTempFolder(); + try + { + IsRunning = true; + IsRejected = false; - // Run the tasks - if (ExecutionGraph == null) - { - var success = true; - var warning = false; - var error = true; - RunSequentialTasks(Tasks, ref success, ref warning, ref error); + // Create the temp folder + CreateTempFolder(); - if (!_stopCalled) - { - if (IsRejected) + // Run the tasks + if (ExecutionGraph == null) { - LogWorkflowFinished(); - Database.IncrementRejectedCount(); - entry.Status = Db.Status.Rejected; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Rejected; + var success = true; + var warning = false; + var error = true; + RunSequentialTasks(Tasks, ref success, ref warning, ref error); + + if (!_stopCalled) + { + if (IsRejected) + { + LogWorkflowFinished(); + Database.IncrementRejectedCount(); + entry.Status = Db.Status.Rejected; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Rejected; + } + else + { + if (success) + { + LogWorkflowFinished(); + Database.IncrementDoneCount(); + entry.Status = Db.Status.Done; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Done; + } + else if (warning) + { + LogWorkflowFinished(); + Database.IncrementWarningCount(); + entry.Status = Db.Status.Warning; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Warning; + resultWarning = true; + } + else if (error) + { + LogWorkflowFinished(); + Database.IncrementFailedCount(); + entry.Status = Db.Status.Failed; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Failed; + resultSuccess = false; + } + } + } } else { - if (success) - { - LogWorkflowFinished(); - Database.IncrementDoneCount(); - entry.Status = Db.Status.Done; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Done; - } - else if (warning) - { - LogWorkflowFinished(); - Database.IncrementWarningCount(); - entry.Status = Db.Status.Warning; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Warning; - resultWarning = true; - } - else if (error) + var status = RunTasks(ExecutionGraph.Nodes, Tasks, false); + + if (!_stopCalled) { - LogWorkflowFinished(); - Database.IncrementFailedCount(); - entry.Status = Db.Status.Failed; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Failed; - resultSuccess = false; + switch (status) + { + case Status.Success: + if (ExecutionGraph.OnSuccess != null) + { + var successTasks = NodesToTasks(ExecutionGraph.OnSuccess.Nodes); + _ = RunTasks(ExecutionGraph.OnSuccess.Nodes, successTasks, false); + } + LogWorkflowFinished(); + Database.IncrementDoneCount(); + entry.Status = Db.Status.Done; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Done; + break; + case Status.Warning: + if (ExecutionGraph.OnWarning != null) + { + var warningTasks = NodesToTasks(ExecutionGraph.OnWarning.Nodes); + _ = RunTasks(ExecutionGraph.OnWarning.Nodes, warningTasks, false); + } + LogWorkflowFinished(); + Database.IncrementWarningCount(); + entry.Status = Db.Status.Warning; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Warning; + resultWarning = true; + break; + case Status.Error: + if (ExecutionGraph.OnError != null) + { + var errorTasks = NodesToTasks(ExecutionGraph.OnError.Nodes); + _ = RunTasks(ExecutionGraph.OnError.Nodes, errorTasks, false); + } + LogWorkflowFinished(); + Database.IncrementFailedCount(); + entry.Status = Db.Status.Failed; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Failed; + resultSuccess = false; + break; + case Status.Rejected: + if (ExecutionGraph.OnRejected != null) + { + var rejectedTasks = NodesToTasks(ExecutionGraph.OnRejected.Nodes); + _ = RunTasks(ExecutionGraph.OnRejected.Nodes, rejectedTasks, true); + } + LogWorkflowFinished(); + Database.IncrementRejectedCount(); + entry.Status = Db.Status.Rejected; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Rejected; + break; + default: + break; + } } } - } - } - else - { - var status = RunTasks(ExecutionGraph.Nodes, Tasks, false); - if (!_stopCalled) + if (!_stopCalled) + { + _historyEntry.StatusDate = DateTime.Now; + _historyEntry.Logs = string.Join("\r\n", Logs); + Database.InsertHistoryEntry(_historyEntry); + Database.DecrementRunningCount(); + } + } + catch (ThreadInterruptedException) + { + _stopCalled = true; + } + catch (Exception e) { - switch (status) + if (WexflowEngine.LogLevel != LogLevel.None) { - case Status.Success: - if (ExecutionGraph.OnSuccess != null) - { - var successTasks = NodesToTasks(ExecutionGraph.OnSuccess.Nodes); - _ = RunTasks(ExecutionGraph.OnSuccess.Nodes, successTasks, false); - } - LogWorkflowFinished(); - Database.IncrementDoneCount(); - entry.Status = Db.Status.Done; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Done; - break; - case Status.Warning: - if (ExecutionGraph.OnWarning != null) - { - var warningTasks = NodesToTasks(ExecutionGraph.OnWarning.Nodes); - _ = RunTasks(ExecutionGraph.OnWarning.Nodes, warningTasks, false); - } - LogWorkflowFinished(); - Database.IncrementWarningCount(); - entry.Status = Db.Status.Warning; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Warning; - resultWarning = true; - break; - case Status.Error: - if (ExecutionGraph.OnError != null) - { - var errorTasks = NodesToTasks(ExecutionGraph.OnError.Nodes); - _ = RunTasks(ExecutionGraph.OnError.Nodes, errorTasks, false); - } - LogWorkflowFinished(); - Database.IncrementFailedCount(); - entry.Status = Db.Status.Failed; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Failed; - resultSuccess = false; - break; - case Status.Rejected: - if (ExecutionGraph.OnRejected != null) - { - var rejectedTasks = NodesToTasks(ExecutionGraph.OnRejected.Nodes); - _ = RunTasks(ExecutionGraph.OnRejected.Nodes, rejectedTasks, true); - } - LogWorkflowFinished(); - Database.IncrementRejectedCount(); - entry.Status = Db.Status.Rejected; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Rejected; - break; - default: - break; + var emsg = $"An error occured while running the workflow. Error: {this}"; + Logger.Error(emsg, e); + Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} ERROR - {emsg}\r\n{e}"); } + Database.DecrementRunningCount(); + Database.IncrementFailedCount(); + entry.Status = Db.Status.Failed; + entry.StatusDate = DateTime.Now; + entry.Logs = string.Join("\r\n", Logs); + Database.UpdateEntry(entry.GetDbId(), entry); + _historyEntry.Status = Db.Status.Failed; + _historyEntry.StatusDate = DateTime.Now; + _historyEntry.Logs = string.Join("\r\n", Logs); + Database.InsertHistoryEntry(_historyEntry); } - } - - if (!_stopCalled) - { - _historyEntry.StatusDate = DateTime.Now; - _historyEntry.Logs = string.Join("\r\n", Logs); - Database.InsertHistoryEntry(_historyEntry); - Database.DecrementRunningCount(); - } - } - catch (ThreadInterruptedException) - { - _stopCalled = true; - } - catch (Exception e) - { - if (WexflowEngine.LogLevel != LogLevel.None) - { - var emsg = $"An error occured while running the workflow. Error: {this}"; - Logger.Error(emsg, e); - Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} ERROR - {emsg}\r\n{e}"); - } - Database.DecrementRunningCount(); - Database.IncrementFailedCount(); - entry.Status = Db.Status.Failed; - entry.StatusDate = DateTime.Now; - entry.Logs = string.Join("\r\n", Logs); - Database.UpdateEntry(entry.GetDbId(), entry); - _historyEntry.Status = Db.Status.Failed; - _historyEntry.StatusDate = DateTime.Now; - _historyEntry.Logs = string.Join("\r\n", Logs); - Database.InsertHistoryEntry(_historyEntry); - } - finally - { - // Cleanup - if (!_stopCalled) - { - Logs.Clear(); - } - foreach (var files in FilesPerTask.Values) - { - files.Clear(); - } + finally + { + // Cleanup + if (!_stopCalled) + { + Logs.Clear(); + } + foreach (var files in FilesPerTask.Values) + { + files.Clear(); + } - foreach (var entities in EntitiesPerTask.Values) - { - entities.Clear(); - } + foreach (var entities in EntitiesPerTask.Values) + { + entities.Clear(); + } - IsRunning = false; - IsRejected = false; - GC.Collect(); + IsRunning = false; + IsRejected = false; + GC.Collect(); - JobId = ++ParallelJobId; - _ = Jobs.Remove(InstanceId); + JobId = ++ParallelJobId; + _ = Jobs.Remove(InstanceId); - if (_jobsQueue.Count > 0) - { - var job = _jobsQueue.Dequeue(); - _ = job.Workflow.StartAsync(startedBy); - } - else - { - if (!_stopCalled) - { - Load(Xml); // Reload the original workflow + if (_jobsQueue.Count > 0) + { + var job = _jobsQueue.Dequeue(); + _ = job.Workflow.StartAsync(startedBy); + } + else + { + if (!_stopCalled) + { + Load(Xml); // Reload the original workflow + } + RestVariables.Clear(); + } } - RestVariables.Clear(); } } + catch (ThreadInterruptedException) + { + } return resultSuccess; } @@ -1752,10 +1761,15 @@ public bool Stop(string stoppedBy) Logs.Clear(); _ = Jobs.Remove(InstanceId); - if (_jobsQueue.Count > 0) + //if (_jobsQueue.Count > 0) + //{ + // var job = _jobsQueue.Dequeue(); + // _ = job.Workflow.StartAsync(StartedBy); + //} + + foreach(var job in _jobsQueue) { - var job = _jobsQueue.Dequeue(); - _ = job.Workflow.StartAsync(StartedBy); + _ = job.Workflow.Stop(stoppedBy); } Load(Xml); // Reload the original workflow diff --git a/src/netcore/Wexflow.Server.Test/Program.cs b/src/netcore/Wexflow.Server.Test/Program.cs new file mode 100644 index 00000000..a7295d7b --- /dev/null +++ b/src/netcore/Wexflow.Server.Test/Program.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Configuration; +using Wexflow.Core.Service.Client; + +try +{ + var config = new ConfigurationBuilder() + .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) + .Build(); + + WexflowServiceClient client = new(config["WexflowWebServiceUri"]); + var username = config["Username"]; + var password = config["Password"]; + + Action startWorkflow = async () => + { + Thread.CurrentThread.IsBackground = true; + var jobId = await client.StartWorkflow(41, username, password); + Console.WriteLine(jobId); + }; + + new Thread(() => + { + startWorkflow(); + }).Start(); + + new Thread(() => + { + startWorkflow(); + }).Start(); +} +catch (Exception e) +{ + Console.WriteLine("An error occured: {0}", e); +} + +_ = Console.ReadKey(); \ No newline at end of file diff --git a/src/netcore/Wexflow.Server.Test/Wexflow.Server.Test.csproj b/src/netcore/Wexflow.Server.Test/Wexflow.Server.Test.csproj new file mode 100644 index 00000000..c8a7531f --- /dev/null +++ b/src/netcore/Wexflow.Server.Test/Wexflow.Server.Test.csproj @@ -0,0 +1,25 @@ + + + + Exe + net8.0 + enable + enable + AnyCPU;x64;x86 + + + + + + + + + + + + + Always + + + + diff --git a/src/netcore/Wexflow.Server.Test/appsettings.json b/src/netcore/Wexflow.Server.Test/appsettings.json new file mode 100644 index 00000000..21dc868b --- /dev/null +++ b/src/netcore/Wexflow.Server.Test/appsettings.json @@ -0,0 +1,5 @@ +{ + "WexflowWebServiceUri": "http://localhost:8000/api/v1/", + "Username": "admin", + "Password": "wexflow2018" +} \ No newline at end of file