Skip to content

Commit

Permalink
Require alter table when schema is evolved for contexts
Browse files Browse the repository at this point in the history
When schema evolves, e.g. when new nested field is added to entity, loader should explicity try to alter underlying BigQuery schema. It works correctly for self-describing events (unstruct columns), but as it turns in case of contexts it doesn't modify schema, alter is skipped, what results in bad data. This commit should fix this problem.
  • Loading branch information
pondzix committed May 9, 2024
1 parent 771f821 commit c1951fe
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ object BigQuerySchemaUtils {
ddlField.fieldType match {
case Type.Struct(nestedFields) =>
alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty
case Type.Array(Type.Struct(nestedFields), _) =>
alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "test_vendor",
"name": "test_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"properties": {
"myString": {"type": "string"}
},
"additionalProperties": false,
"type": "object"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "test_vendor",
"name": "test_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"properties": {
"myString": {"type": "string"},
"myInteger": {"type": "integer"}
},
"additionalProperties": false,
"type": "object"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,46 @@ object AtomicDescriptor {
fromDescriptorProtoBuilder(descriptorProto)
}

def withTestUnstruct100: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.addField(0, eventId.setNumber(1))
.addField(1, testUnstruct.setNumber(2))
.addNestedType(testNestedType100)
fromDescriptorProtoBuilder(descriptorProto)
}

def withTestUnstruct101: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.addField(0, eventId.setNumber(1))
.addField(1, testUnstruct.setNumber(2))
.addNestedType(testNestedType101)
fromDescriptorProtoBuilder(descriptorProto)
}

def withTestContext100: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.addField(0, eventId.setNumber(1))
.addField(1, testContext.setNumber(2))
.addNestedType(testNestedType100)
fromDescriptorProtoBuilder(descriptorProto)
}

def withTestContext101: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.addField(0, eventId.setNumber(1))
.addField(1, testContext.setNumber(2))
.addNestedType(testNestedType101)
fromDescriptorProtoBuilder(descriptorProto)
}

def withAdClickContext: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.addField(0, eventId.setNumber(1))
.addField(1, adClickContext.setNumber(2))
.addNestedType(adClickEventNestedType)
fromDescriptorProtoBuilder(descriptorProto)
}

/** A table which has been altered to add the ad_click_event unstruct event column */
def withWebPageAndAdClick: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
Expand Down Expand Up @@ -65,22 +105,60 @@ object AtomicDescriptor {
.setTypeName("web_page_1")
.setName("unstruct_event_com_snowplowanalytics_snowplow_web_page_1")

private def testContext: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REPEATED)
.setTypeName("test_1")
.setName("contexts_test_vendor_test_name_1")

private def testUnstruct: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setTypeName("test_1")
.setName("unstruct_event_test_vendor_test_name_1")

private def webPageId: DescriptorProtos.FieldDescriptorProto.Builder =
DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setName("id")

private def myString: DescriptorProtos.FieldDescriptorProto.Builder =
DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setName("my_string")

private def myInteger: DescriptorProtos.FieldDescriptorProto.Builder =
DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
.setName("my_integer")

private def webPageNestedType: DescriptorProtos.DescriptorProto.Builder =
DescriptorProtos.DescriptorProto.newBuilder
.addField(0, webPageId.setNumber(1))
.setName("web_page_1")

private def testNestedType100: DescriptorProtos.DescriptorProto.Builder =
DescriptorProtos.DescriptorProto.newBuilder
.addField(0, myString.setNumber(1))
.setName("test_1")

private def testNestedType101: DescriptorProtos.DescriptorProto.Builder =
DescriptorProtos.DescriptorProto.newBuilder
.addField(0, myString.setNumber(1))
.addField(1, myInteger.setNumber(2))
.setName("test_1")

private def adClickEvent: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setTypeName("ad_click_event_1")
.setName("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1")

private def adClickContext: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REPEATED)
.setTypeName("ad_click_event_1")
.setName("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1")

private def adClickEventPercentProgress: DescriptorProtos.FieldDescriptorProto.Builder =
DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import cats.effect.testing.specs2.CatsEffect
import cats.effect.testkit.TestControl
import io.circe.literal._
import com.google.cloud.bigquery.{Field => BQField, FieldList, StandardSQLTypeName}
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData}

import java.nio.charset.StandardCharsets
import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{UnstructEvent, unstructEventDecoder}
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent, unstructEventDecoder}
import com.snowplowanalytics.snowplow.bigquery.{AtomicDescriptor, MockEnvironment}
import com.snowplowanalytics.snowplow.bigquery.MockEnvironment.{Action, Mocks}
import com.snowplowanalytics.snowplow.runtime.HealthProbe
Expand All @@ -36,7 +37,10 @@ class ProcessingSpec extends Specification with CatsEffect {
Insert events to Bigquery and ack the events $e1
Emit BadRows when there are badly formatted events $e2
Write good batches and bad events when input contains both $e3
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns $e4
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - unstruct $e4_1
Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - contexts $e4_2
Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - unstruct $e4_3
Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - contexts $e4_4
Skip altering the table when the writer's protobuf Descriptor has relevant self-describing entitiy columns $e5
Emit BadRows when the WriterProvider reports a problem with the data $e6
Recover when the WriterProvider reports a server-side schema mismatch $e7
Expand Down Expand Up @@ -105,7 +109,7 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e4 = {
def e4_1 = {
val unstructEvent: UnstructEvent = json"""
{
"schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
Expand Down Expand Up @@ -153,6 +157,147 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e4_2 = {
val data = json"""{ "percentProgress": 50 }"""
val contexts = Contexts(
List(
SelfDescribingData(
SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0").toOption.get,
data
)
)
)

val mocks = Mocks.default.copy(
addColumnsResponse = MockEnvironment.Response.Success(
FieldList.of(
BQField
.newBuilder(
"contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1",
StandardSQLTypeName.STRUCT,
FieldList.of(BQField.of("percent_progress", StandardSQLTypeName.STRING))
)
.setMode(BQField.Mode.REPEATED)
.build()
)
),
descriptors = List(
AtomicDescriptor.initial,
AtomicDescriptor.initial,
AtomicDescriptor.withAdClickContext
)
)
runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) =>
for {
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.CreatedTable,
Action.OpenedWriter,
Action.AlterTableAddedColumns(Vector("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1")),
Action.ClosedWriter,
Action.OpenedWriter,
Action.WroteRowsToBigQuery(2),
Action.AddedGoodCountMetric(2),
Action.AddedBadCountMetric(0),
Action.Checkpointed(List(inputs(0).ack))
)
)
}
}

def e4_3 = {
val data = json"""{ "myInteger": 100 }"""
val unstruct = UnstructEvent(
Some(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data))
)

val mocks = Mocks.default.copy(
addColumnsResponse = MockEnvironment.Response.Success(
FieldList.of(
BQField.of(
"unstruct_event_test_vendor_test_name_1",
StandardSQLTypeName.STRUCT,
FieldList.of(
BQField.of("my_string", StandardSQLTypeName.STRING),
BQField.of("my_integer", StandardSQLTypeName.INT64)
)
)
)
),
descriptors = List(
AtomicDescriptor.withTestUnstruct100,
AtomicDescriptor.withTestUnstruct100,
AtomicDescriptor.withTestUnstruct101
)
)
runTest(inputEvents(count = 1, good(ue = unstruct)), mocks) { case (inputs, control) =>
for {
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.CreatedTable,
Action.OpenedWriter,
Action.AlterTableAddedColumns(Vector("unstruct_event_test_vendor_test_name_1")),
Action.ClosedWriter,
Action.OpenedWriter,
Action.WroteRowsToBigQuery(2),
Action.AddedGoodCountMetric(2),
Action.AddedBadCountMetric(0),
Action.Checkpointed(List(inputs(0).ack))
)
)
}
}

def e4_4 = {
val data = json"""{ "myInteger": 100}"""
val contexts = Contexts(List(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data)))

val mocks = Mocks.default.copy(
addColumnsResponse = MockEnvironment.Response.Success(
FieldList.of(
BQField
.newBuilder(
"contexts_test_vendor_test_name_1",
StandardSQLTypeName.STRUCT,
FieldList.of(
BQField.of("my_string", StandardSQLTypeName.STRING),
BQField.of("my_integer", StandardSQLTypeName.INT64)
)
)
.setMode(BQField.Mode.REPEATED)
.build()
)
),
descriptors = List(
AtomicDescriptor.withTestContext100,
AtomicDescriptor.withTestContext100,
AtomicDescriptor.withTestContext101
)
)
runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) =>
for {
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.CreatedTable,
Action.OpenedWriter,
Action.AlterTableAddedColumns(Vector("contexts_test_vendor_test_name_1")),
Action.ClosedWriter,
Action.OpenedWriter,
Action.WroteRowsToBigQuery(2),
Action.AddedGoodCountMetric(2),
Action.AddedBadCountMetric(0),
Action.Checkpointed(List(inputs(0).ack))
)
)
}
}

def e5 = {
val unstructEvent: UnstructEvent = json"""
{
Expand Down Expand Up @@ -350,14 +495,14 @@ object ProcessingSpec {
.compile
.toList

def good(ue: UnstructEvent = UnstructEvent(None)): IO[TokenedEvents] =
def good(ue: UnstructEvent = UnstructEvent(None), contexts: Contexts = Contexts(List.empty)): IO[TokenedEvents] =
for {
ack <- IO.unique
eventId1 <- IO.randomUUID
eventId2 <- IO.randomUUID
collectorTstamp <- IO.realTimeInstant
} yield {
val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0").copy(unstruct_event = ue)
val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0").copy(unstruct_event = ue).copy(contexts = contexts)
val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0")
val serialized = Chunk(event1, event2).map { e =>
ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8))
Expand Down

0 comments on commit c1951fe

Please sign in to comment.