Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
[Chakravarthy|dV2_BQ_API] - Enables config for setting cleint connect…
Browse files Browse the repository at this point in the history
… and read timeouts for BQ client (#43)

Google v2 streaming backend allows a increased bandwidth than the previous versions upto 1 GB/Sec at project level. Moving to v2 has produced intermittent
connection failures on timeouts. Based on the recommendation from google, this feature provides a configurable setting for read timeout and connection timeout
  • Loading branch information
chakravarthyvp authored Mar 2, 2020
1 parent f856537 commit 304883e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/gojek/beast/config/BQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ public interface BQConfig extends Config {
@DefaultValue("true")
@Key("ENABLE_BQ_ROW_INSERTID")
Boolean isBQRowInsertIdEnabled();

@DefaultValue("-1")
@Key("BQ_CLIENT_READ_TIMEOUT")
String getBqClientReadTimeout();

@DefaultValue("-1")
@Key("BQ_CLIENT_CONNECT_TIMEOUT")
String getBqClientConnectTimeout();
}
7 changes: 7 additions & 0 deletions src/main/java/com/gojek/beast/factory/BeastFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.gojek.beast.worker.WorkerState;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.TransportOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
Expand Down Expand Up @@ -94,6 +95,7 @@ public List<Worker> createBqWorkers() {
Integer bqWorkerPoolSize = appConfig.getBqWorkerPoolSize();
List<Worker> threads = new ArrayList<>(bqWorkerPoolSize);
Acknowledger acknowledger = createAcknowledger();
log.info("BQ Row InsertId is: {}", (bqConfig.isBQRowInsertIdEnabled()) ? "Enabled" : "Disabled");
for (int i = 0; i < bqWorkerPoolSize; i++) {
Worker bqQueueWorker = new BqQueueWorker("bq-worker-" + i, createBigQuerySink(), new QueueConfig(appConfig.getBqWorkerPollTimeoutMs()), acknowledger, readQueue, workerState);
threads.add(bqQueueWorker);
Expand Down Expand Up @@ -125,7 +127,12 @@ public BQErrorHandler createOOBErrorHandler() {
}

private BigQuery getBigQueryInstance() {
final TransportOptions transportOptions = BigQueryOptions.getDefaultHttpTransportOptions().toBuilder()
.setConnectTimeout(Integer.parseInt(bqConfig.getBqClientConnectTimeout()))
.setReadTimeout(Integer.parseInt(bqConfig.getBqClientReadTimeout()))
.build();
return BigQueryOptions.newBuilder()
.setTransportOptions(transportOptions)
.setCredentials(getGoogleCredentials())
.setProjectId(bqConfig.getGCPProject())
.build().getService();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/gojek/beast/models/FailureStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Optional<Exception> getException() {
@Override
public String toString() {
return "FailureStatus{"
+ "cause=" + cause.getMessage()
+ ", message='" + message + '\'' + '}';
+ "cause=" + cause.getCause()
+ ", message='" + ((message != null) ? message : cause.getMessage()) + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public Executor execute() {
try {
status = sink.push(records);
} catch (Exception e) {
statsClient.increment("retrysink.exec.failure.count," + statsClient.getBqTags());
status = new FailureStatus(e);
}
ifFailure();
Expand Down

0 comments on commit 304883e

Please sign in to comment.