Skip to content

Commit

Permalink
legacy columns - amendments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jul 8, 2024
1 parent 8465a17 commit 22b767d
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,75 @@ import com.snowplowanalytics.snowplow.badrows.{
Processor => BadRowProcessor
}

/**
* The methods needed to support the legacy column style of v1 loader
*
* These methods are largely copied from the `loaders-common` module of common-streams, but adapted
* for the bigquery types in schema-ddl
*
* If we ever end support for legacy columns, then everything in this file can be deleted.
*/
object LegacyColumns {

/**
* The analog of NonAtomicFields.Result from common-streams, but adapted for the legacy bigquery
* types in schema-ddl
*
* @param fields
* field type information about each self-describing entity to be loaded
* @param igluFailures
* details of schemas that were present in the batch but could not be looked up by the Iglu
* resolver.
*/
case class Result(fields: Vector[FieldForEntity], igluFailures: List[ColumnFailure])

/**
* Field type information for a self-describing entity to be loaded
*
* @param field
* The schema-ddl Field describing the entity to be loaded. Note this is a v2 Field from the
* parquet package of schema-ddl, so it must be converted from the legacy v1 Field.
* @param key
* The Iglu schema key for this entity
* @param entityType
* Whether this is for an unstruct_event or a context
*/
case class FieldForEntity(
field: V2Field,
key: SchemaKey,
entityType: TabledEntity.EntityType
)

/**
* Describes a failure to lookup an Iglu schema
*
* @param schemaKey
* The Iglu schema key of the failure
* @param entityType
* Whether the lookup was done for an unstruct_event or a context
* @param failure
* Why the lookup failed
*/
case class ColumnFailure(
schemaKey: SchemaKey,
entityType: TabledEntity.EntityType,
failure: FailureDetails.LoaderIgluError
)

/**
* The analog of `NonAtomicFields.resolveTypes` from common-streams, but adapted for the legacy
* bigquery types in schema-ddl
*
* @param resolver
* The Iglu resolver
* @param entities
* The self-describing entities in this batch for which we need to load data
* @return
* The typed fields for the entities to be loaded. These are v2 fields (parquet fields).
*
* This method works by deriving the legacy schema-ddl fields and then converting to the
* equivalent v2 (parquet) schema-ddl field.
*/
def resolveTypes[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]]
Expand Down Expand Up @@ -77,6 +130,21 @@ object LegacyColumns {
Result(good, failures.toList)
}

/**
* The analog of `Transform.transformEvent` from common-streams
*
* @param processor
* Details about this loader, only used for generating a bad row
* @param event
* The Snowplow event received from the stream
* @param batchInfo
* Pre-calculated Iglu types for a batch of events, used to guide the transformation
* @param loadTstamp
* The timestamp to use for the `load_tstamp` column
* @return
* If event entities are valid, then returns a Map that is compatible with the BigQuery Write
* client. If event entities are invalid then returns a bad row.
*/
def transformEvent(
processor: BadRowProcessor,
event: Event,
Expand All @@ -91,6 +159,12 @@ object LegacyColumns {
}.toEither
} yield atomic ++ nonAtomic + ("load_tstamp" -> loadTstamp)

/**
* Convert from the legacy Field of the old v1 loader into the new style Field.
*
* This is needed because this loader only deals with v2 Fields. But we want to use the legacy
* methods from schema-ddl which only return legacy Fields.
*/
private def legacyFieldToV2Field(legacyField: LegacyField): V2Field = {
val fieldType = legacyTypeToV2Type(legacyField.fieldType)
legacyField.mode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,16 @@ object MockEnvironment {
* Input events to send into the environment.
* @param mocks
* Responses we want the `Writer` to return when someone calls uses the mocked services
* @param legacyColumns
* Whether to use legacy column style of BigQuery Loader version 1
* @return
* An environment and a Ref that records the actions make by the environment
*/
def build(inputs: List[TokenedEvents], mocks: Mocks): Resource[IO, MockEnvironment] =
def build(
inputs: List[TokenedEvents],
mocks: Mocks,
legacyColumns: Boolean
): Resource[IO, MockEnvironment] =
for {
state <- Resource.eval(Ref[IO].of(Vector.empty[Action]))
writerResource <- Resource.eval(testWriter(state, mocks.writerResponses, mocks.descriptors))
Expand All @@ -86,7 +92,7 @@ object MockEnvironment {
),
badRowMaxSize = 1000000,
schemasToSkip = List.empty,
legacyColumns = false
legacyColumns = legacyColumns
)
MockEnvironment(state, env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ class ProcessingSpec extends Specification with CatsEffect {
Insert events to Bigquery and ack the events $e1
Emit BadRows when there are badly formatted events $e2
Write good batches and bad events when input contains both $e3
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - unstruct $e4_1
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - contexts $e4_2
Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - unstruct $e4_3
Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - contexts $e4_4
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - unstruct $alter1
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - unstruct with legacy columns $alter1_legacy
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - contexts $alter2
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - contexts $alter2_legacy
Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - unstruct $alter3
Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - contexts $alter4
Skip altering the table when the writer's protobuf Descriptor has relevant self-describing entitiy columns $e5
Emit BadRows when the WriterProvider reports a problem with the data $e6
Recover when the WriterProvider reports a server-side schema mismatch $e7
Expand Down Expand Up @@ -109,7 +111,7 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e4_1 = {
def alter1_base(legacyColumns: Boolean) = {
val unstructEvent: UnstructEvent = json"""
{
"schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
Expand All @@ -121,11 +123,14 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}
""".as[UnstructEvent].fold(throw _, identity)
val expectedColumnName =
if (legacyColumns) "unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1_0_0"
else "unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1"
val mocks = Mocks.default.copy(
addColumnsResponse = MockEnvironment.Response.Success(
FieldList.of(
BQField.of(
"unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1",
expectedColumnName,
StandardSQLTypeName.STRUCT,
FieldList.of(BQField.of("percent_progress", StandardSQLTypeName.STRING))
)
Expand All @@ -137,15 +142,16 @@ class ProcessingSpec extends Specification with CatsEffect {
AtomicDescriptor.withWebPageAndAdClick
)
)
runTest(inputEvents(count = 1, good(unstructEvent)), mocks) { case (inputs, control) =>

runTest(inputEvents(count = 1, good(unstructEvent)), mocks, legacyColumns) { case (inputs, control) =>
for {
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.CreatedTable,
Action.OpenedWriter,
Action.AlterTableAddedColumns(Vector("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1")),
Action.AlterTableAddedColumns(Vector(expectedColumnName)),
Action.ClosedWriter,
Action.OpenedWriter,
Action.WroteRowsToBigQuery(2),
Expand All @@ -157,7 +163,10 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e4_2 = {
def alter1 = alter1_base(legacyColumns = false)
def alter1_legacy = alter1_base(legacyColumns = true)

def alter2_base(legacyColumns: Boolean) = {
val data = json"""{ "percentProgress": 50 }"""
val contexts = Contexts(
List(
Expand All @@ -167,13 +176,16 @@ class ProcessingSpec extends Specification with CatsEffect {
)
)
)
val expectedColumnName =
if (legacyColumns) "contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1_0_0"
else "contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1"

val mocks = Mocks.default.copy(
addColumnsResponse = MockEnvironment.Response.Success(
FieldList.of(
BQField
.newBuilder(
"contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1",
expectedColumnName,
StandardSQLTypeName.STRUCT,
FieldList.of(BQField.of("percent_progress", StandardSQLTypeName.STRING))
)
Expand All @@ -187,15 +199,15 @@ class ProcessingSpec extends Specification with CatsEffect {
AtomicDescriptor.withAdClickContext
)
)
runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) =>
runTest(inputEvents(count = 1, good(contexts = contexts)), mocks, legacyColumns) { case (inputs, control) =>
for {
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.CreatedTable,
Action.OpenedWriter,
Action.AlterTableAddedColumns(Vector("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1")),
Action.AlterTableAddedColumns(Vector(expectedColumnName)),
Action.ClosedWriter,
Action.OpenedWriter,
Action.WroteRowsToBigQuery(2),
Expand All @@ -207,7 +219,10 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e4_3 = {
def alter2 = alter2_base(legacyColumns = false)
def alter2_legacy = alter2_base(legacyColumns = true)

def alter3 = {
val data = json"""{ "myInteger": 100 }"""
val unstruct = UnstructEvent(
Some(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data))
Expand Down Expand Up @@ -252,7 +267,7 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e4_4 = {
def alter4 = {
val data = json"""{ "myInteger": 100}"""
val contexts = Contexts(List(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data)))

Expand Down Expand Up @@ -477,12 +492,13 @@ object ProcessingSpec {

def runTest[A](
toInputs: IO[List[TokenedEvents]],
mocks: Mocks = Mocks.default
mocks: Mocks = Mocks.default,
legacyColumns: Boolean = false
)(
f: (List[TokenedEvents], MockEnvironment) => IO[A]
): IO[A] =
toInputs.flatMap { inputs =>
MockEnvironment.build(inputs, mocks).use { control =>
MockEnvironment.build(inputs, mocks, legacyColumns).use { control =>
f(inputs, control)
}
}
Expand Down

0 comments on commit 22b767d

Please sign in to comment.