diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java index 91c9b828f7..bf21acf357 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobCallerBuilder.java @@ -15,8 +15,13 @@ */ package com.oceanbase.odc.service.task.caller; +import java.io.File; +import java.nio.charset.Charset; import java.util.Map; +import org.apache.commons.io.FileUtils; + +import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.service.task.config.JobConfigurationHolder; import com.oceanbase.odc.service.task.config.TaskFrameworkProperties; import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants; @@ -36,7 +41,23 @@ public class JobCallerBuilder { public static JobCaller buildProcessCaller(JobContext context) { Map environments = new JobEnvironmentFactory().build(context, TaskRunMode.PROCESS); JobUtils.encryptEnvironments(environments); - + /** + * write JobContext to file in case of exceeding the environments size limit; set the file path in + * the environment instead + */ + String jobContextFilePath = JobUtils.getExecutorDataPath() + "/" + StringUtils.uuid() + ".enc"; + try { + FileUtils.writeStringToFile(new File(jobContextFilePath), + JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY), + environments.get(JobEnvKeyConstants.ENCRYPT_SALT), JobUtils.toJson(context)), + Charset.defaultCharset()); + } catch (Exception ex) { + FileUtils.deleteQuietly(new File(jobContextFilePath)); + throw new RuntimeException("Failed to write job context to file: " + jobContextFilePath, ex); + } + environments.put(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH, + JobUtils.encrypt(environments.get(JobEnvKeyConstants.ENCRYPT_KEY), + environments.get(JobEnvKeyConstants.ENCRYPT_SALT), jobContextFilePath)); ProcessConfig config = new ProcessConfig(); config.setEnvironments(environments); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java index 47efeaf0b5..63328a7136 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentEncryptor.java @@ -42,6 +42,7 @@ public class JobEnvironmentEncryptor { JobEnvKeyConstants.ODC_EXECUTOR_DATABASE_PASSWORD, JobEnvKeyConstants.ODC_OBJECT_STORAGE_CONFIGURATION, JobEnvKeyConstants.ODC_PROPERTY_ENCRYPTION_SALT, + JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH, JobEnvKeyConstants.ODC_JOB_CONTEXT); private final AtomicBoolean encrypted = new AtomicBoolean(false); @@ -85,5 +86,13 @@ public void decrypt(@NonNull Map environments) { }); } + public String encrypt(String key, String salt, String raw) { + TextEncryptor textEncryptor = Encryptors.aesBase64(key, salt); + return textEncryptor.encrypt(raw); + } + public String decrypt(String key, String salt, String encrypted) { + TextEncryptor textEncryptor = Encryptors.aesBase64(key, salt); + return textEncryptor.decrypt(encrypted); + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java index dc64dfe467..290e0ffad1 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/caller/JobEnvironmentFactory.java @@ -44,8 +44,9 @@ public class JobEnvironmentFactory { public Map build(JobContext context, TaskRunMode runMode) { putEnv(JobEnvKeyConstants.ODC_BOOT_MODE, () -> JobConstants.ODC_BOOT_MODE_EXECUTOR); putEnv(JobEnvKeyConstants.ODC_TASK_RUN_MODE, runMode::name); - putEnv(JobEnvKeyConstants.ODC_JOB_CONTEXT, () -> JobUtils.toJson(context)); - + if (runMode.isK8s()) { + putEnv(JobEnvKeyConstants.ODC_JOB_CONTEXT, () -> JobUtils.toJson(context)); + } JobCredentialProvider jobCredentialProvider = JobConfigurationHolder.getJobConfiguration() .getJobCredentialProvider(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java index ab53f505f1..be733708e3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/constants/JobEnvKeyConstants.java @@ -27,6 +27,8 @@ public class JobEnvKeyConstants { public static final String ODC_JOB_CONTEXT = "ODC_JOB_CONTEXT"; + public static final String ODC_JOB_CONTEXT_FILE_PATH = "ODC_JOB_CONTEXT_FILE_PATH"; + public static final String ODC_TASK_RUN_MODE = "ODC_TASK_RUN_MODE"; public static final String ODC_BOOT_MODE = "ODC_BOOT_MODE"; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java index 0e78ca0b09..84b5a3758f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/TaskApplication.java @@ -27,6 +27,7 @@ import com.oceanbase.odc.common.trace.TaskContextHolder; import com.oceanbase.odc.common.trace.TraceContextHolder; +import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.common.util.SystemUtils; import com.oceanbase.odc.core.shared.Verify; import com.oceanbase.odc.service.task.caller.JobContext; @@ -90,7 +91,8 @@ private void init(String[] args) { log.info("decrypt environment variables success."); // 3 step: get JobContext from environment - context = JobContextProviderFactory.create().provide(); + context = JobContextProviderFactory.create(SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ODC_TASK_RUN_MODE)) + .provide(); log.info("initial job context success."); // 4 step: trace taskId in log4j2 context @@ -151,9 +153,14 @@ private void setLog4JConfigXml() { } private void validEnvValues() { - validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT); - validNotBlank(JobEnvKeyConstants.ODC_BOOT_MODE); validNotBlank(JobEnvKeyConstants.ODC_TASK_RUN_MODE); + if (StringUtils.equalsIgnoreCase("PROCESS", + SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ODC_TASK_RUN_MODE))) { + validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH); + } else { + validNotBlank(JobEnvKeyConstants.ODC_JOB_CONTEXT); + } + validNotBlank(JobEnvKeyConstants.ODC_BOOT_MODE); validNotBlank(JobEnvKeyConstants.ENCRYPT_SALT); validNotBlank(JobEnvKeyConstants.ENCRYPT_KEY); validNotBlank(JobEnvKeyConstants.ODC_EXECUTOR_USER_ID); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java index 01ccbd813d..c285caf3fa 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/JobContextProviderFactory.java @@ -16,14 +16,23 @@ package com.oceanbase.odc.service.task.executor.context; +import com.oceanbase.odc.service.task.enums.TaskRunMode; + +import lombok.NonNull; + /** * @author gaoda.xy * @date 2023/11/23 13:55 */ public class JobContextProviderFactory { - public static JobContextProvider create() { - return new DefaultJobContextProvider(); + public static JobContextProvider create(@NonNull String taskRunMode) { + if (taskRunMode.equalsIgnoreCase(TaskRunMode.PROCESS.name())) { + return new ProcessJobContextProvider(); + } else if (taskRunMode.equalsIgnoreCase(TaskRunMode.K8S.name())) { + return new K8sJobContextProvider(); + } else { + throw new RuntimeException("Unsupported task run mode: " + taskRunMode); + } } - } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/DefaultJobContextProvider.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/K8sJobContextProvider.java similarity index 94% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/DefaultJobContextProvider.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/K8sJobContextProvider.java index 02c1ea299a..03eaad455e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/DefaultJobContextProvider.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/K8sJobContextProvider.java @@ -25,7 +25,7 @@ * @author gaoda.xy * @date 2023/11/22 20:21 */ -public class DefaultJobContextProvider implements JobContextProvider { +public class K8sJobContextProvider implements JobContextProvider { @Override public JobContext provide() { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java new file mode 100644 index 0000000000..c76f02739a --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/context/ProcessJobContextProvider.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.task.executor.context; + +import java.io.File; +import java.nio.charset.Charset; + +import org.apache.commons.io.FileUtils; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.util.SystemUtils; +import com.oceanbase.odc.service.task.caller.DefaultJobContext; +import com.oceanbase.odc.service.task.caller.JobContext; +import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants; +import com.oceanbase.odc.service.task.util.JobUtils; + +/** + * @Author: Lebie + * @Date: 2024/9/24 17:27 + * @Description: [] + */ +public class ProcessJobContextProvider implements JobContextProvider { + @Override + public JobContext provide() { + String encryptedJobContextJson; + try { + encryptedJobContextJson = FileUtils + .readFileToString(new File(System.getProperty(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH)), + Charset.defaultCharset()); + } catch (Exception ex) { + throw new RuntimeException("read job context file failed, ex=", ex); + } finally { + FileUtils.deleteQuietly(new File(System.getProperty(JobEnvKeyConstants.ODC_JOB_CONTEXT_FILE_PATH))); + } + String rawJobContextJson = JobUtils.decrypt(SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ENCRYPT_KEY), + SystemUtils.getEnvOrProperty(JobEnvKeyConstants.ENCRYPT_SALT), encryptedJobContextJson); + return JsonUtils.fromJson(rawJobContextJson, DefaultJobContext.class); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java index d8c4dc708d..9f02bff787 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/util/JobUtils.java @@ -180,4 +180,12 @@ public static void putEnvToSysProperties(String environmentKey) { public static void encryptEnvironments(Map environments) { new JobEnvironmentEncryptor().encrypt(environments); } + + public static String encrypt(String key, String salt, String raw) { + return new JobEnvironmentEncryptor().encrypt(key, salt, raw); + } + + public static String decrypt(String key, String salt, String encrypted) { + return new JobEnvironmentEncryptor().decrypt(key, salt, encrypted); + } }