Skip to content

Commit

Permalink
security patches and upgrades (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n authored Sep 16, 2023
1 parent 7309ba6 commit 64edea1
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 204 deletions.
65 changes: 2 additions & 63 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
buildscript {
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:2.5.7"
classpath "org.springframework.boot:spring-boot-gradle-plugin:2.5.15"
classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.+'
}
}
Expand All @@ -12,7 +12,7 @@ plugins {
ext {
group = 'io.orkes.conductor'
appVersion = '0.0.1-SNAPSHOT'
springBootVersion = '2.5.6'
springBootVersion = '2.7.6'

versions = [
revConductor : '3.13.8',
Expand Down Expand Up @@ -148,65 +148,4 @@ subprojects {
}
}
build.dependsOn(spotlessApply)


publishing {
publications {
mavenJava(MavenPublication) {
from components.java
pom {
name = 'Orkes Conductor Community'
description = 'Orkes supported version of open source Netflix Conductor'
url = 'https://github.com/orkes-io/orkes-conductor-community'
scm {
connection = 'scm:git://github.com/orkes-io/orkes-conductor-community.git'
developerConnection = 'scm:git://github.com/orkes-io/orkes-conductor-community.git'
url = 'https://github.com/orkes-io/orkes-conductor-community'
}
licenses {
license {
name = 'Apache 2.0'
url = 'http://www.apache.org/licenses/LICENSE-2.0'
}
}
developers {
developer {
organization = 'Orkes'
organizationUrl = 'https://orkes.io'
name = 'Orkes Development Team'
email = '[email protected]'
}
}
}
}
}

repositories {
maven {
if (project.hasProperty("mavenCentral")) {
println "Publishing to Sonatype Repository"
url = "https://s01.oss.sonatype.org/${project.version.endsWith('-SNAPSHOT') ? "content/repositories/snapshots/" : "service/local/staging/deploy/maven2/"}"
credentials {
username project.properties.username
password project.properties.password
}
}
}
}

def signingKeyId = findProperty('signingKeyId')
if(signingKeyId) {
println 'Signing the artifact with keys'
signing {
def signingKey = findProperty('signingKey')
def signingPassword = findProperty('signingPassword')
if (signingKeyId && signingKey && signingPassword) {
useInMemoryPgpKeys(signingKeyId, signingKey, signingPassword)
}

sign publishing.publications
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
Expand All @@ -29,11 +26,7 @@
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.core.metadata.MetadataMapperService;
Expand All @@ -42,15 +35,13 @@
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.redis.dao.RedisExecutionDAO;
import com.netflix.conductor.service.ExecutionLockService;

import io.orkes.conductor.id.TimeBasedUUIDGenerator;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;
Expand All @@ -66,7 +57,6 @@ public class OrkesWorkflowExecutor extends WorkflowExecutor {

private final ExecutionDAOFacade orkesExecutionDAOFacade;
private final SystemTaskRegistry systemTaskRegistry;
private final ExecutorService taskUpdateExecutor;
private final RedisExecutionDAO executionDAO;

public OrkesWorkflowExecutor(
Expand Down Expand Up @@ -103,26 +93,6 @@ public OrkesWorkflowExecutor(
this.orkesExecutionDAOFacade = executionDAOFacade;
this.systemTaskRegistry = systemTaskRegistry;
this.executionDAO = executionDAO;

int threadPoolSize = Runtime.getRuntime().availableProcessors() * 10;
this.taskUpdateExecutor =
new ThreadPoolExecutor(
threadPoolSize,
threadPoolSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadPoolSize) {
@Override
public boolean offer(Runnable runnable) {
try {
return super.offer(runnable, 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
return false;
}
}
},
new ThreadFactoryBuilder().setNameFormat("task-update-thread-%d").build());

log.info("OrkesWorkflowExecutor initialized");
}

Expand All @@ -142,117 +112,6 @@ public String rerun(RerunWorkflowRequest request) {
return super.rerun(request);
}

@Override
boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
List<TaskModel> tasksToBeQueued;
boolean startedSystemTasks = false;

try {
if (tasks == null || tasks.isEmpty()) {
return false;
}

// Get the highest seq number
int count = workflow.getTasks().stream().mapToInt(TaskModel::getSeq).max().orElse(0);

for (TaskModel task : tasks) {
if (task.getSeq() == 0) { // Set only if the seq was not set
task.setSeq(++count);
}
}

// metric to track the distribution of number of tasks within a workflow
Monitors.recordNumTasksInWorkflow(
workflow.getTasks().size() + tasks.size(),
workflow.getWorkflowName(),
String.valueOf(workflow.getWorkflowVersion()));

// Save the tasks in the DAO
orkesExecutionDAOFacade.createTasks(tasks);

List<TaskModel> systemTasks =
tasks.stream()
.filter(task -> systemTaskRegistry.isSystemTask(task.getTaskType()))
.collect(Collectors.toList());

tasksToBeQueued =
tasks.stream()
.filter(task -> !systemTaskRegistry.isSystemTask(task.getTaskType()))
.collect(Collectors.toList());

// Traverse through all the system tasks, start the sync tasks, in case of async, queue
// the tasks
List<Future<TaskModel>> futureExecutions = new ArrayList<>();
for (TaskModel task : systemTasks) {
WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType());
if (workflowSystemTask == null) {
throw new NotFoundException(
"No system task found by name %s", task.getTaskType());
}
if (task.getStatus() != null
&& !task.getStatus().isTerminal()
&& task.getStartTime() == 0) {
task.setStartTime(System.currentTimeMillis());
}
if (!workflowSystemTask.isAsync()) {
Future<TaskModel> future =
taskUpdateExecutor.submit(
() -> {
workflowSystemTask.start(workflow, task, this);
return task;
});
futureExecutions.add(future);
startedSystemTasks = true;
} else {
tasksToBeQueued.add(task);
}
}

futureExecutions.forEach(
future -> {
try {
TaskModel task = future.get();
orkesExecutionDAOFacade.updateTask(task);
} catch (Exception e) {
throw new NonTransientException(e.getMessage(), e);
}
});

} catch (Exception e) {
List<String> taskIds =
tasks.stream().map(TaskModel::getTaskId).collect(Collectors.toList());
String errorMsg =
String.format(
"Error scheduling tasks: %s, for workflow: %s",
taskIds, workflow.getWorkflowId());
log.error(errorMsg, e);
Monitors.error(OrkesWorkflowExecutor.class.getSimpleName(), "scheduleTask");
throw new TerminateWorkflowException(errorMsg);
}

// On addTaskToQueue failures, ignore the exceptions and let WorkflowRepairService take care
// of republishing the messages to the queue.
try {
addTaskToQueue(tasksToBeQueued);
} catch (Exception e) {
List<String> taskIds =
tasksToBeQueued.stream().map(TaskModel::getTaskId).collect(Collectors.toList());
String errorMsg =
String.format(
"Error pushing tasks to the queue: %s, for workflow: %s",
taskIds, workflow.getWorkflowId());
log.warn(errorMsg, e);
Monitors.error(OrkesWorkflowExecutor.class.getSimpleName(), "scheduleTask");
}
return startedSystemTasks;
}

private void addTaskToQueue(final List<TaskModel> tasks) {
for (TaskModel task : tasks) {
addTaskToQueue(task);
}
}

@Override
public void addTaskToQueue(TaskModel task) {
// put in queue
Expand Down

0 comments on commit 64edea1

Please sign in to comment.