Skip to content

Commit

Permalink
fix(task-framework): starting task fails when environment variables e…
Browse files Browse the repository at this point in the history
…xceed 2MB in Process Mode (#3545)

* write JobContext to file

* delete useless constant

* response to comments
  • Loading branch information
MarkPotato777 authored Sep 25, 2024
1 parent 4b4d16b commit 60c645e
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package com.oceanbase.odc.service.task.caller;

import java.io.File;
import java.nio.charset.Charset;
import java.util.Map;

import org.apache.commons.io.FileUtils;

import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.service.task.config.JobConfigurationHolder;
import com.oceanbase.odc.service.task.config.TaskFrameworkProperties;
import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants;
Expand All @@ -36,7 +41,23 @@ public class JobCallerBuilder {
public static JobCaller buildProcessCaller(JobContext context) {
Map<String, String> environments = new JobEnvironmentFactory().build(context, TaskRunMode.PROCESS);
JobUtils.encryptEnvironments(environments);

/**
* write JobContext to file in case of exceeding the environments size limit; set the file path in
* the environment instead
*/
String jobContextFilePath = JobUtils.getExecutorDataPath() + "/" + StringUtils.uuid() + ".enc";
try {
FileUtils.writeStringToFile(new File(jobContextFilePath),
JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY),
environments.get(JobEnvKeyConstants.ENCRYPT_SALT), JobUtils.toJson(context)),
Charset.defaultCharset());
} catch (Exception ex) {
FileUtils.deleteQuietly(new File(jobContextFilePath));
throw new RuntimeException("Failed to write job context to file: " + jobContextFilePath, ex);
}
environments.put(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH,
JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY),
environments.get(JobEnvKeyConstants.ENCRYPT_SALT), jobContextFilePath));
ProcessConfig config = new ProcessConfig();
config.setEnvironments(environments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class JobEnvironmentEncryptor {
JobEnvKeyConstants.ODC_EXECUTOR_DATABASE_PASSWORD,
JobEnvKeyConstants.ODC_OBJECT_STORAGE_CONFIGURATION,
JobEnvKeyConstants.ODC_PROPERTY_ENCRYPTION_SALT,
JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH,
JobEnvKeyConstants.ODC_JOB_CONTEXT);

private final AtomicBoolean encrypted = new AtomicBoolean(false);
Expand Down Expand Up @@ -85,5 +86,13 @@ public void decrypt(@NonNull Map<String, String> environments) {
});
}

public String encrypt(String key, String salt, String raw) {
TextEncryptor textEncryptor = Encryptors.aesBase64(key, salt);
return textEncryptor.encrypt(raw);
}

public String decrypt(String key, String salt, String encrypted) {
TextEncryptor textEncryptor = Encryptors.aesBase64(key, salt);
return textEncryptor.decrypt(encrypted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public class JobEnvironmentFactory {
public Map<String, String> build(JobContext context, TaskRunMode runMode) {
putEnv(JobEnvKeyConstants.ODC_BOOT_MODE, () -> JobConstants.ODC_BOOT_MODE_EXECUTOR);
putEnv(JobEnvKeyConstants.ODC_TASK_RUN_MODE, runMode::name);
putEnv(JobEnvKeyConstants.ODC_JOB_CONTEXT, () -> JobUtils.toJson(context));

if (runMode.isK8s()) {
putEnv(JobEnvKeyConstants.ODC_JOB_CONTEXT, () -> JobUtils.toJson(context));
}
JobCredentialProvider jobCredentialProvider = JobConfigurationHolder.getJobConfiguration()
.getJobCredentialProvider();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class JobEnvKeyConstants {

public static final String ODC_JOB_CONTEXT = "ODC_JOB_CONTEXT";

public static final String ODC_JOB_CONTEXT_FILE_PATH = "ODC_JOB_CONTEXT_FILE_PATH";

public static final String ODC_TASK_RUN_MODE = "ODC_TASK_RUN_MODE";

public static final String ODC_BOOT_MODE = "ODC_BOOT_MODE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.oceanbase.odc.common.trace.TaskContextHolder;
import com.oceanbase.odc.common.trace.TraceContextHolder;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.common.util.SystemUtils;
import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.service.task.caller.JobContext;
Expand Down Expand Up @@ -90,7 +91,8 @@ private void init(String[] args) {
log.info("decrypt environment variables success.");

// 3 step: get JobContext from environment
context = JobContextProviderFactory.create().provide();
context = JobContextProviderFactory.create(SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ODC_TASK_RUN_MODE))
.provide();
log.info("initial job context success.");

// 4 step: trace taskId in log4j2 context
Expand Down Expand Up @@ -151,9 +153,14 @@ private void setLog4JConfigXml() {
}

private void validEnvValues() {
validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT);
validNotBlank(JobEnvKeyConstants.ODC_BOOT_MODE);
validNotBlank(JobEnvKeyConstants.ODC_TASK_RUN_MODE);
if (StringUtils.equalsIgnoreCase("PROCESS",
SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ODC_TASK_RUN_MODE))) {
validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH);
} else {
validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT);
}
validNotBlank(JobEnvKeyConstants.ODC_BOOT_MODE);
validNotBlank(JobEnvKeyConstants.ENCRYPT_SALT);
validNotBlank(JobEnvKeyConstants.ENCRYPT_KEY);
validNotBlank(JobEnvKeyConstants.ODC_EXECUTOR_USER_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@

package com.oceanbase.odc.service.task.executor.context;

import com.oceanbase.odc.service.task.enums.TaskRunMode;

import lombok.NonNull;

/**
* @author gaoda.xy
* @date 2023/11/23 13:55
*/
public class JobContextProviderFactory {

public static JobContextProvider create() {
return new DefaultJobContextProvider();
public static JobContextProvider create(@NonNull String taskRunMode) {
if (taskRunMode.equalsIgnoreCase(TaskRunMode.PROCESS.name())) {
return new ProcessJobContextProvider();
} else if (taskRunMode.equalsIgnoreCase(TaskRunMode.K8S.name())) {
return new K8sJobContextProvider();
} else {
throw new RuntimeException("Unsupported task run mode: " + taskRunMode);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @author gaoda.xy
* @date 2023/11/22 20:21
*/
public class DefaultJobContextProvider implements JobContextProvider {
public class K8sJobContextProvider implements JobContextProvider {

@Override
public JobContext provide() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.task.executor.context;

import java.io.File;
import java.nio.charset.Charset;

import org.apache.commons.io.FileUtils;

import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.SystemUtils;
import com.oceanbase.odc.service.task.caller.DefaultJobContext;
import com.oceanbase.odc.service.task.caller.JobContext;
import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants;
import com.oceanbase.odc.service.task.util.JobUtils;

/**
* @Author: Lebie
* @Date: 2024/9/24 17:27
* @Description: []
*/
public class ProcessJobContextProvider implements JobContextProvider {
@Override
public JobContext provide() {
String encryptedJobContextJson;
try {
encryptedJobContextJson = FileUtils
.readFileToString(new File(System.getProperty(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH)),
Charset.defaultCharset());
} catch (Exception ex) {
throw new RuntimeException("read job context file failed, ex=", ex);
} finally {
FileUtils.deleteQuietly(new File(System.getProperty(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH)));
}
String rawJobContextJson = JobUtils.decrypt(SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ENCRYPT_KEY),
SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ENCRYPT_SALT), encryptedJobContextJson);
return JsonUtils.fromJson(rawJobContextJson, DefaultJobContext.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,12 @@ public static void putEnvToSysProperties(String environmentKey) {
public static void encryptEnvironments(Map<String, String> environments) {
new JobEnvironmentEncryptor().encrypt(environments);
}

public static String encrypt(String key, String salt, String raw) {
return new JobEnvironmentEncryptor().encrypt(key, salt, raw);
}

public static String decrypt(String key, String salt, String encrypted) {
return new JobEnvironmentEncryptor().decrypt(key, salt, encrypted);
}
}

0 comments on commit 60c645e

Please sign in to comment.