Skip to content

Commit

Permalink
Add metadata to segment tracking (airbytehq#8872)
Browse files Browse the repository at this point in the history
* Add metadata to segment tracking

* Add sync start time

* Fix test and format code
  • Loading branch information
ChristopheDuong authored Jan 14, 2022
1 parent 6fc9889 commit dbddd7b
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.StandardWorkspace;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class SegmentTrackingClient implements TrackingClient {
private static final String SEGMENT_WRITE_KEY = "7UDdp5K55CyiGgsauOr2pNNujGvmhaeu";
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String AIRBYTE_ROLE = "airbyte_role";
private static final String AIRBYTE_TRACKED_AT = "tracked_at";

// Analytics is threadsafe.
private final Analytics analytics;
Expand Down Expand Up @@ -116,6 +118,7 @@ public void track(final UUID workspaceId, final String action, final Map<String,
// Always add these traits.
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion().serialize());
mapCopy.put(CUSTOMER_ID_KEY, trackingIdentity.getCustomerId());
mapCopy.put(AIRBYTE_TRACKED_AT, Instant.now().toString());
if (!metadata.isEmpty()) {
trackingIdentity.getEmail().ifPresent(email -> mapCopy.put("email", email));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
package io.airbyte.analytics;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.segment.analytics.Analytics;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -109,7 +112,7 @@ void testTrack() {
final TrackMessage actual = mockBuilder.getValue().build();
assertEquals("jump", actual.event());
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(metadata, actual.properties());
assertEquals(metadata, filterTrackedAtProperty(Objects.requireNonNull(actual.properties())));
}

@Test
Expand All @@ -127,7 +130,18 @@ void testTrackWithMetadata() {
final TrackMessage actual = mockBuilder.getValue().build();
assertEquals("jump", actual.event());
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(metadata, actual.properties());
assertEquals(metadata, filterTrackedAtProperty(Objects.requireNonNull(actual.properties())));
}

private static ImmutableMap<String, Object> filterTrackedAtProperty(final Map<String, ?> properties) {
assertTrue(properties.containsKey("tracked_at"));
final Builder<String, Object> builder = ImmutableMap.builder();
properties.forEach((key, value) -> {
if (!key.equals("tracked_at")) {
builder.put(key, value);
}
});
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final
final JsonNode endpointFromConfig = config
.get("azure_blob_storage_endpoint_domain_name");
final JsonNode containerName = config.get("azure_blob_storage_container_name");
final int outputStreamBufferSizeFromConfig =
final int outputStreamBufferSizeFromConfig =
config.get("azure_blob_storage_output_buffer_size") != null
? config.get("azure_blob_storage_output_buffer_size").asInt(DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE)
: DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE;

final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId

final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.azure_blob_storage.csv;

import com.azure.storage.blob.specialized.AppendBlobClient;
import com.azure.storage.blob.specialized.BlobOutputStream;
import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig;
import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter;
import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.azure_blob_storage.jsonl;

import com.azure.storage.blob.specialized.AppendBlobClient;
import com.azure.storage.blob.specialized.BlobOutputStream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public static ImmutableMap<String, Object> generateDestinationDefinitionMetadata
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());
metadata.put("connector_destination_docker_repository", destinationDefinition.getDockerRepository());
final String imageTag = destinationDefinition.getDockerImageTag();
if (!Strings.isEmpty(imageTag)) {
metadata.put("connector_destination_version", imageTag);
Expand All @@ -77,6 +78,7 @@ public static ImmutableMap<String, Object> generateSourceDefinitionMetadata(fina
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("connector_source_docker_repository", sourceDefinition.getDockerRepository());
final String imageTag = sourceDefinition.getDockerImageTag();
if (!Strings.isEmpty(imageTag)) {
metadata.put("connector_source_version", imageTag);
Expand All @@ -94,6 +96,7 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
final JobOutput jobOutput = lastAttempt.getOutput().get();
if (jobOutput.getSync() != null) {
final StandardSyncSummary syncSummary = jobOutput.getSync().getStandardSyncSummary();
metadata.put("sync_start_time", syncSummary.getStartTime());
metadata.put("duration", Math.round((syncSummary.getEndTime() - syncSummary.getStartTime()) / 1000.0));
metadata.put("volume_mb", syncSummary.getBytesSynced());
metadata.put("volume_rows", syncSummary.getRecordsSynced());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ void testFailJob() throws IOException, InterruptedException, JsonValidationExcep
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("connector_source", "source-test");
metadata.put("connector_source_version", TEST_DOCKER_TAG);
metadata.put("connector_source_docker_repository", sourceDefinition.getDockerRepository());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());
metadata.put("connector_destination", "destination-test");
metadata.put("connector_destination_version", TEST_DOCKER_TAG);
metadata.put("connector_destination_docker_repository", destinationDefinition.getDockerRepository());
metadata.put("notification_type", NotificationType.SLACK);
verify(trackingClient).track(WORKSPACE_ID, JobNotifier.FAILURE_NOTIFICATION, metadata.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void testOAuthFullInjectionBecauseNoOAuthSpec() throws JsonValidationExce
.thenReturn(new StandardSourceDefinition()
.withSourceDefinitionId(sourceDefinitionId)
.withName("test")
.withDockerRepository("test/test")
.withDockerImageTag("dev")
.withSpec(null));
setupOAuthParamMocks(oauthParameters);
Expand Down Expand Up @@ -222,6 +223,7 @@ private void setupStandardDefinitionMock(final AdvancedAuth advancedAuth) throws
when(configRepository.getStandardSourceDefinition(any())).thenReturn(new StandardSourceDefinition()
.withSourceDefinitionId(sourceDefinitionId)
.withName("test")
.withDockerRepository("test/test")
.withDockerImageTag("dev")
.withSpec(new ConnectorSpecification().withAdvancedAuth(advancedAuth)));
}
Expand Down Expand Up @@ -277,6 +279,7 @@ private void assertTracking(final UUID workspaceId) {
verify(trackingClient, times(1)).track(workspaceId, "OAuth Injection - Backend", Map.of(
"connector_source", "test",
"connector_source_definition_id", sourceDefinitionId,
"connector_source_docker_repository", "test/test",
"connector_source_version", "dev"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class JobTrackerTest {
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final String SOURCE_DEF_NAME = "postgres";
private static final String DESTINATION_DEF_NAME = "bigquery";
public static final String CONNECTOR_VERSION = "test";
private static final String CONNECTOR_REPOSITORY = "test/test";
private static final String CONNECTOR_VERSION = "test";
private static final long SYNC_START_TIME = 1000L;
private static final long SYNC_END_TIME = 10000L;
private static final long SYNC_DURATION = 9L; // in sync between end and start time
Expand All @@ -84,6 +85,7 @@ class JobTrackerTest {
.put("attempt_completion_status", JobState.FAILED)
.build();
private static final ImmutableMap<String, Object> ATTEMPT_METADATA = ImmutableMap.<String, Object>builder()
.put("sync_start_time", SYNC_START_TIME)
.put("duration", SYNC_DURATION)
.put("volume_rows", SYNC_RECORDS_SYNC)
.put("volume_mb", SYNC_BYTES_SYNC)
Expand Down Expand Up @@ -122,13 +124,15 @@ void testTrackCheckConnectionSource() throws ConfigNotFoundException, IOExceptio
.put("attempt_id", 0)
.put("connector_source", SOURCE_DEF_NAME)
.put("connector_source_definition_id", UUID1)
.put("connector_source_docker_repository", CONNECTOR_REPOSITORY)
.put("connector_source_version", CONNECTOR_VERSION)
.build();

when(configRepository.getStandardSourceDefinition(UUID1))
.thenReturn(new StandardSourceDefinition()
.withSourceDefinitionId(UUID1)
.withName(SOURCE_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME));
Expand All @@ -150,13 +154,15 @@ void testTrackCheckConnectionDestination() throws ConfigNotFoundException, IOExc
.put("attempt_id", 0)
.put("connector_destination", DESTINATION_DEF_NAME)
.put("connector_destination_definition_id", UUID2)
.put("connector_destination_docker_repository", CONNECTOR_REPOSITORY)
.put("connector_destination_version", CONNECTOR_VERSION)
.build();

when(configRepository.getStandardDestinationDefinition(UUID2))
.thenReturn(new StandardDestinationDefinition()
.withDestinationDefinitionId(UUID2)
.withName(DESTINATION_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME));
Expand All @@ -178,13 +184,15 @@ void testTrackDiscover() throws ConfigNotFoundException, IOException, JsonValida
.put("attempt_id", 0)
.put("connector_source", SOURCE_DEF_NAME)
.put("connector_source_definition_id", UUID1)
.put("connector_source_docker_repository", CONNECTOR_REPOSITORY)
.put("connector_source_version", CONNECTOR_VERSION)
.build();

when(configRepository.getStandardSourceDefinition(UUID1))
.thenReturn(new StandardSourceDefinition()
.withSourceDefinitionId(UUID1)
.withName(SOURCE_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME));
Expand Down Expand Up @@ -296,22 +304,26 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con
.thenReturn(new StandardSourceDefinition()
.withSourceDefinitionId(UUID1)
.withName(SOURCE_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));
when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID))
.thenReturn(new StandardDestinationDefinition()
.withDestinationDefinitionId(UUID2)
.withName(DESTINATION_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));

when(configRepository.getStandardSourceDefinition(UUID1))
.thenReturn(new StandardSourceDefinition()
.withSourceDefinitionId(UUID1)
.withName(SOURCE_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));
when(configRepository.getStandardDestinationDefinition(UUID2))
.thenReturn(new StandardDestinationDefinition()
.withDestinationDefinitionId(UUID2)
.withName(DESTINATION_DEF_NAME)
.withDockerRepository(CONNECTOR_REPOSITORY)
.withDockerImageTag(CONNECTOR_VERSION));

final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
Expand Down Expand Up @@ -368,9 +380,11 @@ private ImmutableMap<String, Object> getJobMetadata(final ConfigType configType,
.put("connection_id", CONNECTION_ID)
.put("connector_source", SOURCE_DEF_NAME)
.put("connector_source_definition_id", UUID1)
.put("connector_source_docker_repository", CONNECTOR_REPOSITORY)
.put("connector_source_version", CONNECTOR_VERSION)
.put("connector_destination", DESTINATION_DEF_NAME)
.put("connector_destination_definition_id", UUID2)
.put("connector_destination_docker_repository", CONNECTOR_REPOSITORY)
.put("connector_destination_version", CONNECTOR_VERSION)
.put("namespace_definition", NamespaceDefinitionType.SOURCE)
.put("table_prefix", false)
Expand Down

0 comments on commit dbddd7b

Please sign in to comment.