Skip to content

Commit

Permalink
Track per-stream record counts and records committed, and other sync …
Browse files Browse the repository at this point in the history
…summary metadata (airbytehq#9327)

* StateDeltaTracker class and tests

* working prototype implementation of per-stream record tracking

* misc stuff to get build working

* add new fields to replicationAttemptSummary

* update AirbyteMessageTracker to use StateDeltaTracker, and new interface methods

* finish implementation and tests for stateDeltaTracker and all new ReplicationAttemptSummary fields

* undo temporary changes to files that I accidentally committed

* simplify interactions with byte buffers (airbytehq#9331)

* define a map instead of generic object for counts by stream

* follow convention of keyToValue instead of valueByKey for maps

* use synchronized blocks instead of synchronized methods

* add totalBytesEmitted field to eventually replace bytesSynced

* misc PR feedback nits

* additionalProperties probably should still be false

* javadoc formatting

* define syncStats and use it for total and per-stream stats

* change per-stream stats map to a list, and set stats in standardSyncSummary

* wrap entire method bodies in synchronized block

* use a long instead of a Long for required fields

* remove extranneous 'this'

* set committed records to emitted records if sync has success status

* throw checked exception if commit state before add state, simplify exception handling throughout

* set delta tracker memory limit to 20MiB

* log error message that was thrown instead of assumed cause

* StreamSyncStats wrapper, add test case for populating stats on failure, misc formatting

Co-authored-by: Charles <[email protected]>
  • Loading branch information
pmossman and cgardens authored Jan 11, 2022
1 parent 9cff110 commit bf9e9ca
Show file tree
Hide file tree
Showing 13 changed files with 987 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@ required:
- bytesSynced
- startTime
- endTime
- totalStats
- streamStats
additionalProperties: false
properties:
status:
"$ref": ReplicationStatus.yaml
recordsSynced:
recordsSynced: # TODO (parker) remove in favor of totalRecordsEmitted
type: integer
minValue: 0
bytesSynced:
bytesSynced: # TODO (parker) remove in favor of totalBytesEmitted
type: integer
minValue: 0
startTime:
type: integer
endTime:
type: integer
totalStats:
"$ref": SyncStats.yaml
streamStats:
type: array
items:
"$ref": StreamSyncStats.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ required:
- bytesSynced
- startTime
- endTime
- totalStats
- streamStats
additionalProperties: false
properties:
status:
"$ref": ReplicationStatus.yaml
recordsSynced:
recordsSynced: # TODO (parker) remove in favor of totalRecordsEmitted
type: integer
minValue: 0
bytesSynced:
bytesSynced: # TODO (parker) remove in favor of totalBytesEmitted
type: integer
minValue: 0
startTime:
type: integer
endTime:
type: integer
totalStats:
"$ref": SyncStats.yaml
streamStats:
type: array
items:
"$ref": StreamSyncStats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StreamSyncStats.yaml
title: StreamSyncStats
description: Sync stats for a particular stream.
type: object
required:
- streamName
- stats
additionalProperties: false
properties:
streamName:
type: string
stats:
"$ref": SyncStats.yaml
19 changes: 19 additions & 0 deletions airbyte-config/models/src/main/resources/types/SyncStats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/SyncStats.yaml
title: SyncStats
description: sync stats.
type: object
required:
- recordsEmitted
- bytesEmitted
additionalProperties: false
properties:
recordsEmitted:
type: integer
bytesEmitted:
type: integer
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
type: integer
recordsCommitted:
type: integer # if unset, committed records could not be computed
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void runJob() throws Exception {
airbyteSource,
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new AirbyteMessageTracker());

log.info("Running replication worker...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.State;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -17,6 +19,7 @@
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.MessageTracker;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -55,8 +58,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final AirbyteSource source;
private final AirbyteMapper mapper;
private final AirbyteDestination destination;
private final MessageTracker sourceMessageTracker;
private final MessageTracker destinationMessageTracker;
private final MessageTracker messageTracker;

private final ExecutorService executors;
private final AtomicBoolean cancelled;
Expand All @@ -67,15 +69,13 @@ public DefaultReplicationWorker(final String jobId,
final AirbyteSource source,
final AirbyteMapper mapper,
final AirbyteDestination destination,
final MessageTracker sourceMessageTracker,
final MessageTracker destinationMessageTracker) {
final MessageTracker messageTracker) {
this.jobId = jobId;
this.attempt = attempt;
this.source = source;
this.mapper = mapper;
this.destination = destination;
this.sourceMessageTracker = sourceMessageTracker;
this.destinationMessageTracker = destinationMessageTracker;
this.messageTracker = messageTracker;
this.executors = Executors.newFixedThreadPool(2);

this.cancelled = new AtomicBoolean(false);
Expand Down Expand Up @@ -120,11 +120,11 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
source.start(sourceConfig, jobRoot);

final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, destinationMessageTracker, mdc),
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
executors);

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, sourceMessageTracker, mdc),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc),
executors);

LOGGER.info("Waiting for source and destination threads to complete.");
Expand Down Expand Up @@ -155,10 +155,45 @@ else if (hasFailed.get()) {
outputStatus = ReplicationStatus.COMPLETED;
}

final SyncStats totalSyncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
.withStateMessagesEmitted(messageTracker.getTotalStateMessagesEmitted());

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
} else if (messageTracker.getTotalRecordsCommitted().isPresent()) {
totalSyncStats.setRecordsCommitted(messageTracker.getTotalRecordsCommitted().get());
} else {
LOGGER.warn("Could not reliably determine committed record counts, committed record stats will be set to null");
totalSyncStats.setRecordsCommitted(null);
}

// assume every stream with stats is in streamToEmittedRecords map
final List<StreamSyncStats> streamSyncStats = messageTracker.getStreamToEmittedRecords().keySet().stream().map(stream -> {
final SyncStats syncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream))
.withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream))
.withStateMessagesEmitted(null); // TODO (parker) populate per-stream state messages emitted once supported in V2

if (outputStatus == ReplicationStatus.COMPLETED) {
syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream));
} else if (messageTracker.getStreamToCommittedRecords().isPresent()) {
syncStats.setRecordsCommitted(messageTracker.getStreamToCommittedRecords().get().get(stream));
} else {
syncStats.setRecordsCommitted(null);
}
return new StreamSyncStats()
.withStreamName(stream)
.withStats(syncStats);
}).collect(Collectors.toList());

final ReplicationAttemptSummary summary = new ReplicationAttemptSummary()
.withStatus(outputStatus)
.withRecordsSynced(sourceMessageTracker.getRecordCount())
.withBytesSynced(sourceMessageTracker.getBytesCount())
.withRecordsSynced(messageTracker.getTotalRecordsEmitted()) // TODO (parker) remove in favor of totalRecordsEmitted
.withBytesSynced(messageTracker.getTotalBytesEmitted()) // TODO (parker) remove in favor of totalBytesEmitted
.withTotalStats(totalSyncStats)
.withStreamStats(streamSyncStats)
.withStartTime(startTime)
.withEndTime(System.currentTimeMillis());

Expand All @@ -168,15 +203,15 @@ else if (hasFailed.get()) {
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());

if (sourceMessageTracker.getOutputState().isPresent()) {
if (messageTracker.getSourceOutputState().isPresent()) {
LOGGER.info("Source output at least one state message");
} else {
LOGGER.info("Source did not output any state messages");
}

if (destinationMessageTracker.getOutputState().isPresent()) {
LOGGER.info("State capture: Updated state to: {}", destinationMessageTracker.getOutputState());
final State state = destinationMessageTracker.getOutputState().get();
if (messageTracker.getDestinationOutputState().isPresent()) {
LOGGER.info("State capture: Updated state to: {}", messageTracker.getDestinationOutputState());
final State state = messageTracker.getDestinationOutputState().get();
output.withState(state);
} else if (syncInput.getState() != null) {
LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState());
Expand All @@ -196,7 +231,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteDestination destination,
final AtomicBoolean cancelled,
final AirbyteMapper mapper,
final MessageTracker sourceMessageTracker,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
return () -> {
MDC.setContextMap(mdc);
Expand All @@ -208,7 +243,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
if (messageOptional.isPresent()) {
final AirbyteMessage message = mapper.mapMessage(messageOptional.get());

sourceMessageTracker.accept(message);
messageTracker.acceptFromSource(message);
destination.accept(message);
recordsRead += 1;

Expand All @@ -235,7 +270,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,

private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker destinationMessageTracker,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
return () -> {
MDC.setContextMap(mdc);
Expand All @@ -245,7 +280,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
final Optional<AirbyteMessage> messageOptional = destination.attemptRead();
if (messageOptional.isPresent()) {
LOGGER.info("state in DefaultReplicationWorker from Destination: {}", messageOptional.get());
destinationMessageTracker.accept(messageOptional.get());
messageTracker.acceptFromDestination(messageOptional.get());
}
}
if (!cancelled.get() && destination.getExitValue() != 0) {
Expand Down
Loading

0 comments on commit bf9e9ca

Please sign in to comment.