Skip to content

Commit

Permalink
Resolve review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RusJaI committed Sep 7, 2023
1 parent b750b1f commit deaf861
Showing 1 changed file with 2 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@

package org.wso2.carbon.inbound.endpoint.protocol.cdc;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.inbound.InboundProcessorParams;
import org.apache.synapse.inbound.InboundTaskProcessor;
import org.apache.synapse.mediators.Value;
import org.apache.synapse.task.TaskStartupObserver;
import org.apache.synapse.util.xpath.SynapseXPath;
import org.jaxen.JaxenException;
import org.apache.synapse.util.resolver.SecureVaultResolver;
import org.wso2.carbon.inbound.endpoint.common.InboundRequestProcessorImpl;
import org.wso2.carbon.inbound.endpoint.common.InboundTask;
import org.wso2.carbon.inbound.endpoint.protocol.PollingConstants;
Expand All @@ -38,8 +34,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -67,9 +61,6 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta
private String injectingSeq;
private String onErrorSeq;
private boolean sequential;

private static final String SECURE_VAULT_REGEX = "(wso2:vault-lookup\\('(.*?)'\\))";
private static Pattern vaultLookupPattern = Pattern.compile(SECURE_VAULT_REGEX);
private static final String ENDPOINT_POSTFIX = "CDC" + COMMON_ENDPOINT_POSTFIX;
private static final String FILE_OFFSET_STORAGE_CLASS = "org.apache.kafka.connect.storage.FileOffsetBackingStore";
private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory";
Expand Down Expand Up @@ -124,9 +115,8 @@ private void setProperties () {

String passwordString = this.cdcProperties.getProperty(DEBEZIUM_DATABASE_PASSWORD);
SynapseEnvironment synapseEnvironment = this.synapseEnvironment;
MessageContext messageContext = synapseEnvironment.createMessageContext();

this.cdcProperties.setProperty(DEBEZIUM_DATABASE_PASSWORD, resolveSecureVault(messageContext, passwordString));
this.cdcProperties.setProperty(DEBEZIUM_DATABASE_PASSWORD, SecureVaultResolver.resolve(synapseEnvironment, passwordString));

if (this.cdcProperties.getProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL) == null) {
this.cdcProperties.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, TRUE);
Expand Down Expand Up @@ -177,32 +167,6 @@ private void createFile (String filePath) throws IOException {
}
}

private static synchronized String resolveSecureVault(MessageContext messageContext, String passwordString) {
if (passwordString == null) {
return null;
}
Matcher lookupMatcher = vaultLookupPattern.matcher(passwordString);
String resolvedValue = "";
if (lookupMatcher.find()) {
Value expression;
String expressionStr = lookupMatcher.group(1);
try {
expression = new Value(new SynapseXPath(expressionStr));

} catch (JaxenException e) {
throw new SynapseException("Error while building the expression : " + expressionStr, e);
}
resolvedValue = expression.evaluateValue(messageContext);
if (StringUtils.isEmpty(resolvedValue)) {
LOGGER.warn("Found Empty value for expression : " + expression.getExpression());
resolvedValue = "";
}
} else {
resolvedValue = passwordString;
}
return resolvedValue;
}


/**
* This will be called at the time of synapse artifact deployment.
Expand Down

0 comments on commit deaf861

Please sign in to comment.