diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala index ea63d6a0..43180848 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala @@ -60,6 +60,8 @@ object BigQuerySchemaUtils { ddlField.fieldType match { case Type.Struct(nestedFields) => alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty + case Type.Array(Type.Struct(nestedFields), _) => + alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty case _ => false } diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 new file mode 100644 index 00000000..2a91f0b7 --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-0 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "test_vendor", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + "properties": { + "myString": {"type": "string"} + }, + "additionalProperties": false, + "type": "object" +} diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 new file mode 100644 index 00000000..f21ac304 --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/test_vendor/test_name/jsonschema/1-0-1 @@ -0,0 +1,15 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "test_vendor", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + "properties": { + "myString": {"type": "string"}, + "myInteger": {"type": "integer"} + }, + "additionalProperties": false, + "type": "object" +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala index 957ab5c6..f4514679 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/AtomicDescriptor.scala @@ -32,6 +32,46 @@ object AtomicDescriptor { fromDescriptorProtoBuilder(descriptorProto) } + def withTestUnstruct100: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testUnstruct.setNumber(2)) + .addNestedType(testNestedType100) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withTestUnstruct101: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testUnstruct.setNumber(2)) + .addNestedType(testNestedType101) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withTestContext100: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testContext.setNumber(2)) + .addNestedType(testNestedType100) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withTestContext101: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, testContext.setNumber(2)) + .addNestedType(testNestedType101) + fromDescriptorProtoBuilder(descriptorProto) + } + + def withAdClickContext: Descriptors.Descriptor = { + val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder + .addField(0, eventId.setNumber(1)) + .addField(1, adClickContext.setNumber(2)) + .addNestedType(adClickEventNestedType) + fromDescriptorProtoBuilder(descriptorProto) + } + /** A table which has been altered to add the ad_click_event unstruct event column */ def withWebPageAndAdClick: Descriptors.Descriptor = { val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder @@ -65,22 +105,60 @@ object AtomicDescriptor { .setTypeName("web_page_1") .setName("unstruct_event_com_snowplowanalytics_snowplow_web_page_1") + private def testContext: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REPEATED) + .setTypeName("test_1") + .setName("contexts_test_vendor_test_name_1") + + private def testUnstruct: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setTypeName("test_1") + .setName("unstruct_event_test_vendor_test_name_1") + private def webPageId: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) .setName("id") + private def myString: DescriptorProtos.FieldDescriptorProto.Builder = + DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setName("my_string") + + private def myInteger: DescriptorProtos.FieldDescriptorProto.Builder = + DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .setName("my_integer") + private def webPageNestedType: DescriptorProtos.DescriptorProto.Builder = DescriptorProtos.DescriptorProto.newBuilder .addField(0, webPageId.setNumber(1)) .setName("web_page_1") + private def testNestedType100: DescriptorProtos.DescriptorProto.Builder = + DescriptorProtos.DescriptorProto.newBuilder + .addField(0, myString.setNumber(1)) + .setName("test_1") + + private def testNestedType101: DescriptorProtos.DescriptorProto.Builder = + DescriptorProtos.DescriptorProto.newBuilder + .addField(0, myString.setNumber(1)) + .addField(1, myInteger.setNumber(2)) + .setName("test_1") + private def adClickEvent: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) .setTypeName("ad_click_event_1") .setName("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1") + private def adClickContext: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder + .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_REPEATED) + .setTypeName("ad_click_event_1") + .setName("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1") + private def adClickEventPercentProgress: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder .setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala index 16979859..c757d263 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/ProcessingSpec.scala @@ -15,6 +15,7 @@ import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl import io.circe.literal._ import com.google.cloud.bigquery.{Field => BQField, FieldList, StandardSQLTypeName} +import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData} import java.nio.charset.StandardCharsets import java.nio.ByteBuffer @@ -22,7 +23,7 @@ import java.time.Instant import scala.concurrent.duration.DurationLong import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{UnstructEvent, unstructEventDecoder} +import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent, unstructEventDecoder} import com.snowplowanalytics.snowplow.bigquery.{AtomicDescriptor, MockEnvironment} import com.snowplowanalytics.snowplow.bigquery.MockEnvironment.{Action, Mocks} import com.snowplowanalytics.snowplow.runtime.HealthProbe @@ -36,7 +37,10 @@ class ProcessingSpec extends Specification with CatsEffect { Insert events to Bigquery and ack the events $e1 Emit BadRows when there are badly formatted events $e2 Write good batches and bad events when input contains both $e3 - Alter the Bigquery table when the writer's protobuf Descriptor has missing columns $e4 + Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - unstruct $e4_1 + Alter the Bigquery table when the writer's protobuf Descriptor has missing columns - contexts $e4_2 + Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - unstruct $e4_3 + Alter the Bigquery table when the writer's protobuf Descriptor has missing nested fields - contexts $e4_4 Skip altering the table when the writer's protobuf Descriptor has relevant self-describing entitiy columns $e5 Emit BadRows when the WriterProvider reports a problem with the data $e6 Recover when the WriterProvider reports a server-side schema mismatch $e7 @@ -105,7 +109,7 @@ class ProcessingSpec extends Specification with CatsEffect { } } - def e4 = { + def e4_1 = { val unstructEvent: UnstructEvent = json""" { "schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", @@ -153,6 +157,147 @@ class ProcessingSpec extends Specification with CatsEffect { } } + def e4_2 = { + val data = json"""{ "percentProgress": 50 }""" + val contexts = Contexts( + List( + SelfDescribingData( + SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0").toOption.get, + data + ) + ) + ) + + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField + .newBuilder( + "contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1", + StandardSQLTypeName.STRUCT, + FieldList.of(BQField.of("percent_progress", StandardSQLTypeName.STRING)) + ) + .setMode(BQField.Mode.REPEATED) + .build() + ) + ), + descriptors = List( + AtomicDescriptor.initial, + AtomicDescriptor.initial, + AtomicDescriptor.withAdClickContext + ) + ) + runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) => + for { + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.CreatedTable, + Action.OpenedWriter, + Action.AlterTableAddedColumns(Vector("contexts_com_snowplowanalytics_snowplow_media_ad_click_event_1")), + Action.ClosedWriter, + Action.OpenedWriter, + Action.WroteRowsToBigQuery(2), + Action.AddedGoodCountMetric(2), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack)) + ) + ) + } + } + + def e4_3 = { + val data = json"""{ "myInteger": 100 }""" + val unstruct = UnstructEvent( + Some(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data)) + ) + + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField.of( + "unstruct_event_test_vendor_test_name_1", + StandardSQLTypeName.STRUCT, + FieldList.of( + BQField.of("my_string", StandardSQLTypeName.STRING), + BQField.of("my_integer", StandardSQLTypeName.INT64) + ) + ) + ) + ), + descriptors = List( + AtomicDescriptor.withTestUnstruct100, + AtomicDescriptor.withTestUnstruct100, + AtomicDescriptor.withTestUnstruct101 + ) + ) + runTest(inputEvents(count = 1, good(ue = unstruct)), mocks) { case (inputs, control) => + for { + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.CreatedTable, + Action.OpenedWriter, + Action.AlterTableAddedColumns(Vector("unstruct_event_test_vendor_test_name_1")), + Action.ClosedWriter, + Action.OpenedWriter, + Action.WroteRowsToBigQuery(2), + Action.AddedGoodCountMetric(2), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack)) + ) + ) + } + } + + def e4_4 = { + val data = json"""{ "myInteger": 100}""" + val contexts = Contexts(List(SelfDescribingData(SchemaKey.fromUri("iglu:test_vendor/test_name/jsonschema/1-0-1").toOption.get, data))) + + val mocks = Mocks.default.copy( + addColumnsResponse = MockEnvironment.Response.Success( + FieldList.of( + BQField + .newBuilder( + "contexts_test_vendor_test_name_1", + StandardSQLTypeName.STRUCT, + FieldList.of( + BQField.of("my_string", StandardSQLTypeName.STRING), + BQField.of("my_integer", StandardSQLTypeName.INT64) + ) + ) + .setMode(BQField.Mode.REPEATED) + .build() + ) + ), + descriptors = List( + AtomicDescriptor.withTestContext100, + AtomicDescriptor.withTestContext100, + AtomicDescriptor.withTestContext101 + ) + ) + runTest(inputEvents(count = 1, good(contexts = contexts)), mocks) { case (inputs, control) => + for { + _ <- Processing.stream(control.environment).compile.drain + state <- control.state.get + } yield state should beEqualTo( + Vector( + Action.CreatedTable, + Action.OpenedWriter, + Action.AlterTableAddedColumns(Vector("contexts_test_vendor_test_name_1")), + Action.ClosedWriter, + Action.OpenedWriter, + Action.WroteRowsToBigQuery(2), + Action.AddedGoodCountMetric(2), + Action.AddedBadCountMetric(0), + Action.Checkpointed(List(inputs(0).ack)) + ) + ) + } + } + def e5 = { val unstructEvent: UnstructEvent = json""" { @@ -350,14 +495,14 @@ object ProcessingSpec { .compile .toList - def good(ue: UnstructEvent = UnstructEvent(None)): IO[TokenedEvents] = + def good(ue: UnstructEvent = UnstructEvent(None), contexts: Contexts = Contexts(List.empty)): IO[TokenedEvents] = for { ack <- IO.unique eventId1 <- IO.randomUUID eventId2 <- IO.randomUUID collectorTstamp <- IO.realTimeInstant } yield { - val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0").copy(unstruct_event = ue) + val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0").copy(unstruct_event = ue).copy(contexts = contexts) val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") val serialized = Chunk(event1, event2).map { e => ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8))