Skip to content

Commit

Permalink
Merge pull request data-integrations#27 from cloudsufi/patch/add_retr…
Browse files Browse the repository at this point in the history
…y_failsafe

 [PLUGIN-1754] Add Retry with dev.failsafe
  • Loading branch information
vikasrathee-cs authored Mar 28, 2024
2 parents 6fc4093 + 9358fe5 commit f2f426f
Show file tree
Hide file tree
Showing 14 changed files with 984 additions and 656 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<guava.version>27.0.1-jre</guava.version>
<gcs.client.version>2.6.0</gcs.client.version>
<commons-codec.version>1.10</commons-codec.version>
<failsafe.version>3.3.2</failsafe.version>
</properties>

<repositories>
Expand All @@ -60,6 +61,11 @@

<dependencies>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.plugin.ariba.source.config.AribaPluginConfig;
import io.cdap.plugin.ariba.source.connector.AribaConnector;
import io.cdap.plugin.ariba.source.exception.AribaException;
import io.cdap.plugin.ariba.source.util.AribaUtil;
import io.cdap.plugin.ariba.source.util.ResourceConstants;
Expand Down Expand Up @@ -69,7 +68,12 @@ public class AribaBatchSource extends BatchSource<NullWritable, StructuredRecord

public AribaBatchSource(AribaPluginConfig pluginConfig) {
this.pluginConfig = pluginConfig;
aribaServices = new AribaServices(pluginConfig.getConnection());
aribaServices = new AribaServices(pluginConfig.getConnection(),
pluginConfig.getMaxRetryCount(),
pluginConfig.getInitialRetryDuration(),
pluginConfig.getMaxRetryDuration(),
pluginConfig.getRetryMultiplier(),
false);
}

@Override
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/io/cdap/plugin/ariba/source/AribaInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public class AribaInputFormat extends InputFormat<NullWritable, StructuredRecord
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
AribaPluginConfig pluginConfig = getPluginConfig(jobContext);
AribaServices aribaServices = new AribaServices(pluginConfig.getConnection());
AribaServices aribaServices = new AribaServices(pluginConfig.getConnection(),
pluginConfig.getMaxRetryCount(),
pluginConfig.getInitialRetryDuration(),
pluginConfig.getMaxRetryDuration(),
pluginConfig.getRetryMultiplier(),
true);
boolean previewEnabled = Boolean.parseBoolean(jobContext.getConfiguration().
get(ResourceConstants.IS_PREVIEW_ENABLED));

Expand All @@ -66,7 +71,12 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
public RecordReader<NullWritable, StructuredRecord> createRecordReader(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) throws IOException {
AribaPluginConfig pluginConfig = getPluginConfig(taskAttemptContext);
AribaServices aribaServices = new AribaServices(pluginConfig.getConnection());
AribaServices aribaServices = new AribaServices(pluginConfig.getConnection(),
pluginConfig.getMaxRetryCount(),
pluginConfig.getInitialRetryDuration(),
pluginConfig.getMaxRetryDuration(),
pluginConfig.getRetryMultiplier(),
true);
Schema outputSchema = Schema.parseJson(taskAttemptContext.getConfiguration().get(ResourceConstants.OUTPUT_SCHEMA));
return new AribaRecordReader(aribaServices, outputSchema, pluginConfig);
}
Expand Down
183 changes: 112 additions & 71 deletions src/main/java/io/cdap/plugin/ariba/source/AribaServices.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.ariba.source.config.AribaPluginConfig;
import io.cdap.plugin.ariba.source.connector.AribaConnectorConfig;
import io.cdap.plugin.ariba.source.exception.AribaException;
import io.cdap.plugin.ariba.source.exception.AribaRetryableException;
import io.cdap.plugin.ariba.source.metadata.AribaColumnMetadata;
import io.cdap.plugin.ariba.source.metadata.AribaResponseContainer;
import io.cdap.plugin.ariba.source.metadata.AribaSchemaGenerator;
Expand Down Expand Up @@ -57,8 +61,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.ZipInputStream;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -100,18 +102,36 @@ public class AribaServices {
private static final String TYPE = "type";
private static final String UTC = "UTC";
private static final Logger LOG = LoggerFactory.getLogger(AribaServices.class);
private static final int MAX_RETRIES = 5;
private final AribaConnectorConfig pluginConfig;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Gson gson = new Gson();
private int availableLimit;
private boolean isDayLimitExhausted;
private boolean isHourLimitExhausted;
private boolean isMinuteLimitExhausted;
private boolean isSecondsLimitExhausted;
boolean isDayLimitExhausted;
boolean isHourLimitExhausted;
boolean isMinuteLimitExhausted;
boolean isSecondsLimitExhausted;

public AribaServices(AribaConnectorConfig pluginConfig) {
private final Integer initialRetryDuration;
private final Integer maxRetryDuration;
private final Integer maxRetryCount;
private final Integer retryMultiplier;

/**
* Determines if retry is required for the service call.
* If true, then the service call will be retried based on the retry configuration.
* If false, then the service call will not be retried.
*/
private final boolean retryRequired;

public AribaServices(AribaConnectorConfig pluginConfig, Integer maxRetryCount,
Integer initialRetryDuration, Integer maxRetryDuration, Integer retryMultiplier,
boolean retryRequired) {
this.pluginConfig = pluginConfig;
this.maxRetryCount = maxRetryCount;
this.initialRetryDuration = initialRetryDuration;
this.maxRetryDuration = maxRetryDuration;
this.retryMultiplier = retryMultiplier;
this.retryRequired = retryRequired;
}

/**
Expand Down Expand Up @@ -366,16 +386,7 @@ public List<AribaColumnMetadata> getMetadata(String accessToken, String template
public JsonNode createJob(AribaPluginConfig aribaPluginConfig, @Nullable String pageToken, String templateName)
throws AribaException, IOException, InterruptedException {
Request req = buildJobRequest(jobBuilder(pageToken).build().url(), aribaPluginConfig, templateName);
int count = 0;
Response response;
do {
response = executeRequest(req);
if (response.code() == ResourceConstants.API_LIMIT_EXCEED) {
checkAndThrowException(response);
count++;
}
} while (response.code() == ResourceConstants.API_LIMIT_EXCEED && count <= MAX_RETRIES);

Response response = executeRequest(req);
AribaResponseContainer responseContainer = tokenResponse(response);
InputStream responseStream = responseContainer.getResponseBody();
if (responseContainer.getHttpStatusCode() == HttpURLConnection.HTTP_OK) {
Expand All @@ -394,17 +405,11 @@ public JsonNode createJob(AribaPluginConfig aribaPluginConfig, @Nullable String
public JsonNode fetchJobStatus(String accessToken, String jobId)
throws IOException, AribaException, InterruptedException {
URL url = fetchDataBuilder(jobId).build().url();
int count = 0;
Request req = buildFetchRequest(url, accessToken);
Response response = null;
try {
do {
response = executeRequest(req);
checkAndThrowException(response);
count++;
} while (response.code() == ResourceConstants.API_LIMIT_EXCEED && count <= MAX_RETRIES);
response = executeRequest(req);
AribaResponseContainer responseContainer = tokenResponse(response);

if (responseContainer.getHttpStatusCode() == HttpURLConnection.HTTP_OK) {
InputStream responseStream = responseContainer.getResponseBody();
JsonNode responseNode = objectMapper.readTree(responseStream);
Expand All @@ -418,7 +423,9 @@ public JsonNode fetchJobStatus(String accessToken, String jobId)
}
throw new AribaException(response.message(), response.code());
} finally {
response.close();
if (response != null) {
response.close();
}
}
}

Expand Down Expand Up @@ -621,59 +628,74 @@ protected Request buildFetchRequest(URL endPoint, String accessToken) {
}

/**
* Executes the given Ariba request and returns the response.
*
* @param req request
* @return Response
* @throws AribaException AribaException
* @throws InterruptedException InterruptedException
* @throws IOException IOException
*/
public Response executeRequest(Request req) throws AribaException, InterruptedException, IOException {
OkHttpClient enhancedOkHttpClient = getConfiguredClient().build();
Response response = null;
int actualMaxRetryCount = retryRequired ? maxRetryCount : 0;
RetryPolicy<Object> retryPolicy = RetryPolicy.builder()
.handle(AribaRetryableException.class)
.withBackoff(initialRetryDuration, maxRetryDuration, ChronoUnit.SECONDS, retryMultiplier)
.withMaxRetries(actualMaxRetryCount)
.onRetry(event -> LOG.info("Retrying Ariba call with plugin. Retry count: " + event.getAttemptCount()))
.onSuccess(event -> LOG.debug("Ariba plugin call has been executed successfully."))
.onRetriesExceeded(event -> LOG.error("Retry limit for the Ariba plugin has been exceeded.",
event.getException()))
.build();

try {
response = enhancedOkHttpClient.newCall(req).execute();
if (response.code() != HttpURLConnection.HTTP_OK && AribaUtil.isNullOrEmpty(response.message())) {
AribaResponseContainer responseContainer = aribaResponse(response);
InputStream responseStream = responseContainer.getResponseBody();
JsonNode jsonNode = objectMapper.readTree(responseStream);
String errMsg = jsonNode.get(ResourceConstants.MESSAGE).asText() != null
? jsonNode.get(ResourceConstants.MESSAGE).asText() :
ResourceConstants.ERR_NOT_FOUND.getMsgForKey();
throw new AribaException(errMsg, response.code());
}
if (!isApiLimitExhausted(response)) {
checkAndThrowException(response);
return response;
} else {
response = enhancedOkHttpClient.newCall(req).execute();
return Failsafe.with(retryPolicy).get(() -> executeRetryableRequest(req, actualMaxRetryCount > 0));
} catch (FailsafeException fse) {
Throwable t = fse.getCause();
if (t instanceof AribaException) {
throw (AribaException) t;
} else if (t instanceof InterruptedException) {
throw (InterruptedException) t;
} else if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new RuntimeException(t);
}
}
} catch (IOException e) {
throw new IOException("Endpoint is incorrect. Unable to validate the source with the provided.", e);
}

/**
* Calls given Ariba API.
* @param req request
* @param shouldWait Do we to wait for time defined in header if API limit is exhausted
* @return Response
* @throws AribaException AribaException
* @throws InterruptedException InterruptedException
* @throws IOException IOException
* @throws AribaRetryableException
*/
public Response executeRetryableRequest(Request req, boolean shouldWait)
throws AribaException, InterruptedException, IOException, AribaRetryableException {

LOG.debug("Retryable Ariba URL: " + req.url());
OkHttpClient enhancedOkHttpClient = getConfiguredClient().build();
Response response = enhancedOkHttpClient.newCall(req).execute();
checkAndThrowException(response, shouldWait);
return response;
}
}

/**
/**
* Calls given Ariba API.
* @param jobId Ariba Job Id
* @return JsonNode
*/
public JsonNode fetchData(String jobId, String fileName)
throws IOException, InterruptedException, AribaException {

OkHttpClient enhancedOkHttpClient = getConfiguredClient().build();
HttpUrl.Builder zipUrl = zipBuilder(jobId, fileName);
Response zipResponse;
do {
zipResponse = enhancedOkHttpClient
.newCall(fetchZipFileData(zipUrl.build().url(), getAccessToken())).execute();
if (zipResponse.code() == ResourceConstants.API_LIMIT_EXCEED) {
checkAndThrowException(zipResponse);
}
} while (zipResponse.code() == ResourceConstants.API_LIMIT_EXCEED);
Response zipResponse = executeRequest(fetchZipFileData(zipUrl.build().url(), getAccessToken()));

LOG.info("Fetch Data Response Code is: {} for Job Id: {} , and File: {}"
, zipResponse.code(), jobId, fileName);
LOG.info("Fetch Data Response Code is: {} for Job Id: {} , and File: {}", zipResponse.code(), jobId, fileName);

AribaResponseContainer responseContainer = tokenResponse(zipResponse);
try (InputStream responseStream = responseContainer.getResponseBody();
Expand All @@ -695,6 +717,7 @@ public JsonNode fetchData(String jobId, String fileName)
* @return boolean
*/
public boolean isApiLimitExhausted(Response response) {
isDayLimitExhausted = isHourLimitExhausted = isMinuteLimitExhausted = isSecondsLimitExhausted = false;
if (response.code() != HttpURLConnection.HTTP_OK &&
Integer.parseInt(Objects.requireNonNull(response.header(RATE_LIMIT_DAY))) < 1) {
isDayLimitExhausted = true;
Expand All @@ -719,14 +742,17 @@ public boolean isApiLimitExhausted(Response response) {
* Check for limit and status code than throws exception accordingly
*
* @param response response
* @param shouldWait Do we to wait for time defined in header if API limit is exhausted
* @throws AribaException
* @throws InterruptedException
*/
@VisibleForTesting
void checkAndThrowException(Response response) throws AribaException, InterruptedException {
void checkAndThrowException(Response response, boolean shouldWait) throws AribaException, InterruptedException,
AribaRetryableException {
if (response.code() == HttpURLConnection.HTTP_BAD_REQUEST && !AribaUtil.isNullOrEmpty(response.message())) {
throw new AribaException(response.message(), response.code());
}

boolean limitExhausted = isApiLimitExhausted(response);

if (limitExhausted && isDayLimitExhausted) {
Expand All @@ -736,20 +762,35 @@ void checkAndThrowException(Response response) throws AribaException, Interrupte
throw new AribaException(ResourceConstants.ERR_API_LIMIT_EXCEED_FOR_DAY.getMsgForKey(retryAfter),
ResourceConstants.LIMIT_EXCEED_ERROR_CODE);
} else if (limitExhausted && isHourLimitExhausted) {
int retryAfter =
(Integer.parseInt(Objects.requireNonNull(response.header(ResourceConstants.RETRY_AFTER))) / 60) + 1;
LOG.info("API rate limit exceeded for the Hour, waiting for {} min", retryAfter);
TimeUnit.MINUTES.sleep(retryAfter);
if (shouldWait) {
int retryAfter =
(Integer.parseInt(Objects.requireNonNull(response.header(ResourceConstants.RETRY_AFTER))) / 60) + 1;
LOG.info("API rate limit exceeded for the Hour, waiting for {} min", retryAfter);
TimeUnit.MINUTES.sleep(retryAfter);
}
String errorMsg = String.format("Call to Ariba failed. Status Code: %s, Root Cause: %s.", response.code(),
response.message());
throw new AribaRetryableException(errorMsg, response.code());
} else if (limitExhausted && isMinuteLimitExhausted) {
int retryAfter =
(Integer.parseInt(Objects.requireNonNull(response.header(ResourceConstants.RETRY_AFTER))));
LOG.debug("API rate limit exceeded for the Minute, waiting for {} Seconds", retryAfter);
TimeUnit.SECONDS.sleep(retryAfter);
if (shouldWait) {
int retryAfter =
(Integer.parseInt(Objects.requireNonNull(response.header(ResourceConstants.RETRY_AFTER))));
LOG.debug("API rate limit exceeded for the Minute, waiting for {} Seconds", retryAfter);
TimeUnit.SECONDS.sleep(retryAfter);
}
String errorMsg = String.format("Call to Ariba failed. Status Code: %s, Root Cause: %s.", response.code(),
response.message());
throw new AribaRetryableException(errorMsg, response.code());
} else if (limitExhausted && isSecondsLimitExhausted) {
int retryAfter =
(Integer.parseInt(Objects.requireNonNull(response.header(ResourceConstants.RETRY_AFTER))));
LOG.debug("API rate limit exceeded for the Second, waiting for {} Seconds", retryAfter);
TimeUnit.SECONDS.sleep(retryAfter);
if (shouldWait) {
int retryAfter =
(Integer.parseInt(Objects.requireNonNull(response.header(ResourceConstants.RETRY_AFTER))));
LOG.debug("API rate limit exceeded for the Second, waiting for {} Seconds", retryAfter);
TimeUnit.SECONDS.sleep(retryAfter);
}
String errorMsg = String.format("Call to Ariba failed. Status Code: %s, Root Cause: %s.", response.code(),
response.message());
throw new AribaRetryableException(errorMsg, response.code());
} else if (response.code() != HttpURLConnection.HTTP_OK) {
throw new AribaException(response.message(), response.code());
}
Expand Down
Loading

0 comments on commit f2f426f

Please sign in to comment.