Skip to content

Commit

Permalink
[DSIP-52][Stage apache#1]split tenant config from worker config
Browse files Browse the repository at this point in the history
  • Loading branch information
pegasas committed Jun 28, 2024
1 parent c7a0994 commit 3059aa4
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
Expand All @@ -32,12 +33,12 @@
public class DefaultWorkerTaskExecutor extends WorkerTaskExecutor {

public DefaultWorkerTaskExecutor(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull TenantConfig tenantConfig,
@NonNull WorkerMessageSender workerMessageSender,
@Nullable StorageOperator storageOperator,
@NonNull WorkerRegistryClient workerRegistryClient) {
super(taskExecutionContext,
workerConfig,
tenantConfig,
workerMessageSender,
storageOperator,
workerRegistryClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
Expand All @@ -32,18 +33,18 @@ public class DefaultWorkerTaskExecutorFactory
WorkerTaskExecutorFactory<DefaultWorkerTaskExecutor> {

private final @NonNull TaskExecutionContext taskExecutionContext;
private final @NonNull WorkerConfig workerConfig;
private final @NonNull TenantConfig tenantConfig;
private final @NonNull WorkerMessageSender workerMessageSender;
private final @Nullable StorageOperator storageOperator;
private final @NonNull WorkerRegistryClient workerRegistryClient;

public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull TenantConfig tenantConfig,
@NonNull WorkerMessageSender workerMessageSender,
@Nullable StorageOperator storageOperator,
@NonNull WorkerRegistryClient workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.tenantConfig = tenantConfig;
this.workerMessageSender = workerMessageSender;
this.storageOperator = storageOperator;
this.workerRegistryClient = workerRegistryClient;
Expand All @@ -53,7 +54,7 @@ public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecut
public DefaultWorkerTaskExecutor createWorkerTaskExecutor() {
return new DefaultWorkerTaskExecutor(
taskExecutionContext,
workerConfig,
tenantConfig,
workerMessageSender,
storageOperator,
workerRegistryClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
Expand All @@ -74,7 +75,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
protected static final Logger log = LoggerFactory.getLogger(WorkerTaskExecutor.class);

protected final TaskExecutionContext taskExecutionContext;
protected final WorkerConfig workerConfig;
protected final TenantConfig tenantConfig;
protected final WorkerMessageSender workerMessageSender;
protected final @Nullable StorageOperator storageOperator;
protected final WorkerRegistryClient workerRegistryClient;
Expand All @@ -83,12 +84,12 @@ public abstract class WorkerTaskExecutor implements Runnable {

protected WorkerTaskExecutor(
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull TenantConfig tenantConfig,
@NonNull WorkerMessageSender workerMessageSender,
@Nullable StorageOperator storageOperator,
@NonNull WorkerRegistryClient workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.tenantConfig = tenantConfig;
this.workerMessageSender = workerMessageSender;
this.storageOperator = storageOperator;
this.workerRegistryClient = workerRegistryClient;
Expand Down Expand Up @@ -207,10 +208,7 @@ protected void beforeExecute() {
log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
taskExecutionContext.getHost());

// In most of case the origin tenant is the same as the current tenant
// Except `default` tenant. The originTenant is used to download the resources
String originTenant = taskExecutionContext.getTenantCode();
taskExecutionContext.setTenantCode(TenantUtils.getOrCreateActualTenant(workerConfig, taskExecutionContext));
taskExecutionContext.setTenantCode(TenantUtils.getOrCreateActualTenant(tenantConfig, taskExecutionContext));
log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode());

TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public WorkerTaskExecutorFactoryBuilder(

public WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) {
return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
workerConfig,
workerConfig.getTenantConfig(),
workerMessageSender,
storageOperator,
workerRegistryClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ public static boolean isTenantEnable() {
* <p>
* If sudo is not enabled, will not check the tenant code.
*/
public static String getOrCreateActualTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
TenantConfig tenantConfig = workerConfig.getTenantConfig();

public static String getOrCreateActualTenant(TenantConfig tenantConfig, TaskExecutionContext taskExecutionContext) {
if (!isTenantEnable()) {
log.info("Tenant is not enabled, will use the bootstrap: {} user as tenant", getBootstrapTenant());
return getBootstrapTenant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
Expand All @@ -33,7 +34,7 @@ public class DefaultWorkerTaskExecutorTest {

private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);

private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);
private TenantConfig tenantConfig = Mockito.mock(TenantConfig.class);

private String masterAddress = "localhost:5678";

Expand All @@ -53,7 +54,7 @@ public void testDryRun() {
.build();
WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor(
taskExecutionContext,
workerConfig,
tenantConfig,
workerMessageSender,
storageOperator,
workerRegistryClient);
Expand All @@ -76,7 +77,7 @@ public void testErrorboundTestDataSource() {
.build();
WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor(
taskExecutionContext,
workerConfig,
tenantConfig,
workerMessageSender,
storageOperator,
workerRegistryClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
Expand Down Expand Up @@ -146,7 +147,7 @@ static class MockWorkerTaskExecutor extends WorkerTaskExecutor {

protected MockWorkerTaskExecutor(Runnable runnable) {
super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(),
new WorkerConfig(),
new TenantConfig(),
new WorkerMessageSender(),
null,
new WorkerRegistryClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
Expand All @@ -58,6 +59,8 @@ public class TaskInstanceOperationFunctionTest {

private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);

private TenantConfig tenantConfig = Mockito.mock(TenantConfig.class);

private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);

private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
Expand Down Expand Up @@ -182,6 +185,8 @@ public void testTaskInstancePauseOperationFunction() {

@Test
public void testTaskInstanceDispatchOperationFunction() {
given(workerConfig.getTenantConfig()).willReturn(tenantConfig);

WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder(
workerConfig,
workerMessageSender,
Expand Down

0 comments on commit 3059aa4

Please sign in to comment.