Skip to content

Commit

Permalink
Bump common-streams to 0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 20, 2024
1 parent 8ae991c commit fd9d253
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package com.snowplowanalytics.snowplow.bigquery.processing

import cats.data.NonEmptyVector
import io.circe.Json

import java.time.{Instant, LocalDate}
Expand All @@ -32,11 +33,11 @@ private[processing] object BigQueryCaster extends Caster[AnyRef] {
new java.math.BigDecimal(unscaled.bigInteger, details.scale)
override def timestampValue(v: Instant): java.lang.Long = Long.box(v.toEpochMilli * 1000) // Microseconds
override def dateValue(v: LocalDate): java.lang.Long = Long.box(v.toEpochDay)
override def arrayValue(vs: List[AnyRef]): JSONArray =
override def arrayValue(vs: Vector[AnyRef]): JSONArray =
// BigQuery does not permit nulls in a repeated field
new JSONArray(vs.filterNot(_ == null).asJava)
override def structValue(vs: List[Caster.NamedValue[AnyRef]]): JSONObject = {
val map = vs
override def structValue(vs: NonEmptyVector[Caster.NamedValue[AnyRef]]): JSONObject = {
val map = vs.iterator
.map { case Caster.NamedValue(k, v) =>
(k, v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.jdk.CollectionConverters._

object BigQuerySchemaUtils {

def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] =
def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Vector[Field]): Vector[Field] =
ddlFields.filter { field =>
Option(tableDescriptor.findFieldByName(field.name)) match {
case Some(fieldDescriptor) =>
Expand All @@ -32,15 +32,15 @@ object BigQuerySchemaUtils {
case Descriptors.FieldDescriptor.Type.MESSAGE =>
ddlField.fieldType match {
case Type.Struct(nestedFields) =>
alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty
alterTableRequired(tableField.getMessageType, nestedFields.toVector).nonEmpty
case _ =>
false
}
case _ =>
false
}

def mergeInColumns(bqFields: FieldList, ddlFields: Seq[Field]): FieldList = {
def mergeInColumns(bqFields: FieldList, ddlFields: Vector[Field]): FieldList = {
val ddlFieldsByName = ddlFields.map(f => f.name -> f).toMap
val bqFieldNames = bqFields.asScala.map(f => f.getName).toSet
val alteredExisting = bqFields.asScala.map { bqField =>
Expand All @@ -65,7 +65,7 @@ object BigQuerySchemaUtils {
Option(bqField.getSubFields) match {
case Some(bqNestedFields) =>
bqField.toBuilder
.setType(StandardSQLTypeName.STRUCT, mergeInColumns(bqNestedFields, ddlNestedFields))
.setType(StandardSQLTypeName.STRUCT, mergeInColumns(bqNestedFields, ddlNestedFields.toVector))
.build
case None =>
bqField
Expand All @@ -86,7 +86,7 @@ object BigQuerySchemaUtils {
.setMode(BQField.Mode.REPEATED)
.build
case Type.Struct(nestedFields) =>
val nested = FieldList.of(nestedFields.map(bqFieldOf).asJava)
val nested = FieldList.of(nestedFields.map(bqFieldOf).toVector.asJava)
BQField
.newBuilder(ddlField.name, StandardSQLTypeName.STRUCT, nested)
.setMode(bqModeOf(ddlField.nullability))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ class ProcessingSpec extends Specification with CatsEffect {
{
"schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
"data": {
"schema": "iglu:com.snowplowanalytics.snowplow.media/ad_start_event/jsonschema/1-0-0",
"data": {}
"schema": "iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0",
"data": {
"percentProgress": 50
}
}
}
""".as[UnstructEvent].fold(throw _, identity)
Expand All @@ -122,7 +124,7 @@ class ProcessingSpec extends Specification with CatsEffect {
Vector(
Action.CreatedTable,
Action.OpenedWriter,
Action.AlterTableAddedColumns(Vector("unstruct_event_com_snowplowanalytics_snowplow_media_ad_start_event_1")),
Action.AlterTableAddedColumns(Vector("unstruct_event_com_snowplowanalytics_snowplow_media_ad_click_event_1")),
Action.ClosedWriter,
Action.OpenedWriter,
Action.WroteRowsToBigQuery(2),
Expand Down
2 changes: 1 addition & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object BuildSettings {
Test / igluUris := Seq(
// Iglu schemas used in unit tests
"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0",
"iglu:com.snowplowanalytics.snowplow.media/ad_start_event/jsonschema/1-0-0"
"iglu:com.snowplowanalytics.snowplow.media/ad_click_event/jsonschema/1-0-0"
)
) ++ commonSettings

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Dependencies {
val bigquery = "2.34.2"

// Snowplow
val streams = "0.5.2"
val streams = "0.6.0-M1"
val igluClient = "3.1.0"

// tests
Expand Down

0 comments on commit fd9d253

Please sign in to comment.