From a3708ba0b27f0d354b257338ce875a15814572b6 Mon Sep 17 00:00:00 2001 From: kellen Date: Tue, 17 Sep 2024 15:33:48 -0400 Subject: [PATCH] Add support for Beam schemas (#1027) --- .github/workflows/ci.yml | 4 +- .../magnolify/avro/logical/package.scala | 98 ++----- .../main/scala/magnolify/beam/RowType.scala | 277 ++++++++++++++++++ .../magnolify/beam/logical/package.scala | 149 ++++++++++ .../scala/magnolify/beam/unsafe/package.scala | 24 ++ .../scala/magnolify/beam/RowTypeSuite.scala | 201 +++++++++++++ build.sbt | 47 ++- .../test/scala/magnolify/cats/TestEq.scala | 12 +- docs/beam.md | 62 ++++ docs/index.md | 2 + docs/mapping.md | 71 ++--- .../magnolify/parquet/logical/TimeTypes.scala | 28 ++ .../magnolify/parquet/logical/package.scala | 70 ++--- .../magnolify/parquet/ParquetTypeSuite.scala | 42 ++- .../magnolify/scalacheck/TestArbitrary.scala | 48 +-- .../main/scala/magnolify/shared/Time.scala | 155 ++++++++++ .../magnolify/shared/TimeArbitrary.scala | 63 ++++ .../scala/magnolify/shared/TimeSpec.scala | 141 +++++++++ test/src/test/scala/magnolify/test/ADT.scala | 3 + 19 files changed, 1283 insertions(+), 214 deletions(-) create mode 100644 beam/src/main/scala/magnolify/beam/RowType.scala create mode 100644 beam/src/main/scala/magnolify/beam/logical/package.scala create mode 100644 beam/src/main/scala/magnolify/beam/unsafe/package.scala create mode 100644 beam/src/test/scala/magnolify/beam/RowTypeSuite.scala create mode 100644 docs/beam.md create mode 100644 parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala create mode 100644 shared/src/main/scala/magnolify/shared/Time.scala create mode 100644 shared/src/test/scala/magnolify/shared/TimeArbitrary.scala create mode 100644 shared/src/test/scala/magnolify/shared/TimeSpec.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc701ceb5..d667a91b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,11 +99,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target + run: mkdir -p bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target beam/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target + run: tar cf targets.tar bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target beam/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/avro/src/main/scala/magnolify/avro/logical/package.scala b/avro/src/main/scala/magnolify/avro/logical/package.scala index a6d0d7473..bd0d070c1 100644 --- a/avro/src/main/scala/magnolify/avro/logical/package.scala +++ b/avro/src/main/scala/magnolify/avro/logical/package.scala @@ -22,109 +22,63 @@ import org.joda.{time => joda} import java.time._ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder} -import java.util.concurrent.TimeUnit package object logical { + import magnolify.shared.Time._ // Duplicate implementation from org.apache.avro.data.TimeConversions // to support both 1.8 (joda-time based) and 1.9+ (java-time based) object micros { - private def toTimestampMicros(microsFromEpoch: Long): Instant = { - val epochSeconds = microsFromEpoch / 1000000L - val nanoAdjustment = (microsFromEpoch % 1000000L) * 1000L; - Instant.ofEpochSecond(epochSeconds, nanoAdjustment) - } - - private def fromTimestampMicros(instant: Instant): Long = { - val seconds = instant.getEpochSecond - val nanos = instant.getNano - if (seconds < 0 && nanos > 0) { - val micros = Math.multiplyExact(seconds + 1, 1000000L) - val adjustment = (nanos / 1000L) - 1000000 - Math.addExact(micros, adjustment) - } else { - val micros = Math.multiplyExact(seconds, 1000000L) - Math.addExact(micros, nanos / 1000L) - } - } - implicit val afTimestampMicros: AvroField[Instant] = - AvroField.logicalType[Long](LogicalTypes.timestampMicros())(toTimestampMicros)( - fromTimestampMicros + AvroField.logicalType[Long](LogicalTypes.timestampMicros())(microsToInstant)( + microsFromInstant ) implicit val afTimeMicros: AvroField[LocalTime] = - AvroField.logicalType[Long](LogicalTypes.timeMicros()) { us => - LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(us)) - } { time => - TimeUnit.NANOSECONDS.toMicros(time.toNanoOfDay) - } + AvroField.logicalType[Long](LogicalTypes.timeMicros())(microsToLocalTime)(microsFromLocalTime) // `LogicalTypes.localTimestampMicros()` is Avro 1.10 implicit val afLocalTimestampMicros: AvroField[LocalDateTime] = - AvroField.logicalType[Long](new LogicalType("local-timestamp-micros")) { microsFromEpoch => - val instant = toTimestampMicros(microsFromEpoch) - LocalDateTime.ofInstant(instant, ZoneOffset.UTC) - } { timestamp => - val instant = timestamp.toInstant(ZoneOffset.UTC) - fromTimestampMicros(instant) - } + AvroField.logicalType[Long](new LogicalType("local-timestamp-micros"))(microsToLocalDateTime)( + microsFromLocalDateTime + ) // avro 1.8 uses joda-time implicit val afJodaTimestampMicros: AvroField[joda.DateTime] = - AvroField.logicalType[Long](LogicalTypes.timestampMicros()) { microsFromEpoch => - new joda.DateTime(microsFromEpoch / 1000, joda.DateTimeZone.UTC) - } { timestamp => - 1000 * timestamp.getMillis - } + AvroField.logicalType[Long](LogicalTypes.timestampMicros())(microsToJodaDateTime)( + microsFromJodaDateTime + ) implicit val afJodaTimeMicros: AvroField[joda.LocalTime] = - AvroField.logicalType[Long](LogicalTypes.timeMicros()) { microsFromMidnight => - joda.LocalTime.fromMillisOfDay(microsFromMidnight / 1000) - } { time => - // from LossyTimeMicrosConversion - 1000L * time.millisOfDay().get() - } + AvroField.logicalType[Long](LogicalTypes.timeMicros())(microsToJodaLocalTime)( + microsFromJodaLocalTime + ) } object millis { implicit val afTimestampMillis: AvroField[Instant] = - AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch => - Instant.ofEpochMilli(millisFromEpoch) - } { timestamp => - timestamp.toEpochMilli - } + AvroField.logicalType[Long](LogicalTypes.timestampMillis())(millisToInstant)( + millisFromInstant + ) implicit val afTimeMillis: AvroField[LocalTime] = - AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight => - LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight.toLong)) - } { time => - TimeUnit.NANOSECONDS.toMillis(time.toNanoOfDay).toInt - } + AvroField.logicalType[Int](LogicalTypes.timeMillis())(millisToLocalTime)(millisFromLocalTime) // `LogicalTypes.localTimestampMillis` is Avro 1.10.0+ implicit val afLocalTimestampMillis: AvroField[LocalDateTime] = - AvroField.logicalType[Long](new LogicalType("local-timestamp-millis")) { millisFromEpoch => - val instant = Instant.ofEpochMilli(millisFromEpoch) - LocalDateTime.ofInstant(instant, ZoneOffset.UTC) - } { timestamp => - val instant = timestamp.toInstant(ZoneOffset.UTC) - instant.toEpochMilli - } + AvroField.logicalType[Long](new LogicalType("local-timestamp-millis"))(millisToLocalDateTime)( + millisFromLocalDateTime + ) // avro 1.8 uses joda-time implicit val afJodaTimestampMillis: AvroField[joda.DateTime] = - AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch => - new joda.DateTime(millisFromEpoch, joda.DateTimeZone.UTC) - } { timestamp => - timestamp.getMillis - } + AvroField.logicalType[Long](LogicalTypes.timestampMillis())(millisToJodaDateTime)( + millisFromJodaDateTime + ) implicit val afJodaTimeMillis: AvroField[joda.LocalTime] = - AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight => - joda.LocalTime.fromMillisOfDay(millisFromMidnight.toLong) - } { time => - time.millisOfDay().get() - } + AvroField.logicalType[Int](LogicalTypes.timeMillis())(millisToJodaLocalTime)( + millisFromJodaLocalTime + ) } object bigquery { diff --git a/beam/src/main/scala/magnolify/beam/RowType.scala b/beam/src/main/scala/magnolify/beam/RowType.scala new file mode 100644 index 000000000..94479eedc --- /dev/null +++ b/beam/src/main/scala/magnolify/beam/RowType.scala @@ -0,0 +1,277 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.beam + +import magnolia1.* +import magnolify.shared.* +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.schemas.Schema.{Field, FieldType} +import org.apache.beam.sdk.values.Row +import com.google.protobuf.ByteString +import magnolify.shims.FactoryCompat +import org.apache.beam.sdk.schemas.logicaltypes + +import java.nio.ByteBuffer +import java.util as ju +import scala.collection.compat.* +import scala.collection.concurrent +import scala.jdk.CollectionConverters.* + +// https://beam.apache.org/documentation/programming-guide/#schema-definition +sealed trait RowType[T] extends Converter[T, Row, Row] { + def schema: Schema + def apply(r: Row): T = from(r) + def apply(t: T): Row = to(t) +} + +object RowType { + implicit def apply[T: RowField]: RowType[T] = RowType[T](CaseMapper.identity) + + def apply[T](cm: CaseMapper)(implicit f: RowField[T]): RowType[T] = { + f match { + case r: RowField.Record[_] => + val mappedSchema = r.schema(cm) // fail fast on bad annotations + new RowType[T] { + private val caseMapper: CaseMapper = cm + override lazy val schema: Schema = mappedSchema + + override def from(v: Row): T = r.from(v)(caseMapper) + override def to(v: T): Row = r.to(v)(caseMapper) + } + case _ => + throw new IllegalArgumentException( + s"RowType can only be created from Record. Got $f" + ) + } + } +} + +sealed trait RowField[T] extends Serializable { + type FromT + type ToT + def fieldType(cm: CaseMapper): FieldType + def from(v: FromT)(cm: CaseMapper): T + def to(v: T)(cm: CaseMapper): ToT + def fromAny(v: Any)(cm: CaseMapper): T = from(v.asInstanceOf[FromT])(cm) +} + +object RowField { + sealed trait Aux[T, From, To] extends RowField[T] { + override type FromT = From + override type ToT = To + } + + private[magnolify] def aux[T, From, To]( + ft: CaseMapper => FieldType + )(fromFn: From => T)(toFn: T => To): RowField[T] = + new Aux[T, From, To] { + override def fieldType(cm: CaseMapper): FieldType = ft(cm) + override def from(v: FromT)(cm: CaseMapper): T = fromFn(v) + override def to(v: T)(cm: CaseMapper): ToT = toFn(v) + } + + private[magnolify] def id[T](ft: CaseMapper => FieldType): RowField[T] = + aux[T, T, T](ft)(identity)(identity) + + def from[T]: FromWord[T] = new FromWord[T] + + class FromWord[T] { + def apply[U](f: T => U)(g: U => T)(implicit rf: RowField[T]): RowField[U] = + new Aux[U, rf.FromT, rf.ToT] { + override def fieldType(cm: CaseMapper): FieldType = rf.fieldType(cm) + override def from(v: FromT)(cm: CaseMapper): U = f(rf.from(v)(cm)) + override def to(v: U)(cm: CaseMapper): ToT = rf.to(g(v))(cm) + } + } + + sealed trait Record[T] extends Aux[T, Row, Row] { + @transient private lazy val schemaCache: concurrent.Map[ju.UUID, Schema] = + concurrent.TrieMap.empty + protected def buildSchema(cm: CaseMapper): Schema + def schema(cm: CaseMapper): Schema = schemaCache.getOrElseUpdate(cm.uuid, buildSchema(cm)) + } + + // //////////////////////////////////////////////// + + type Typeclass[T] = RowField[T] + implicit def gen[T]: RowField[T] = macro Magnolia.gen[T] + + def split[T]( + sealedTrait: SealedTrait[Typeclass, T] + )(implicit r: shapeless.Refute[EnumType[T]]): RowField[T] = + new RowField[T] { + override type FromT = logicaltypes.OneOfType.Value + override type ToT = logicaltypes.OneOfType.Value + + private def enumName(sub: Subtype[Typeclass, T]): String = + s"${sub.typeName.owner}.${sub.typeName.short}" + + @transient private lazy val beamOneOfTypeCache + : concurrent.Map[ju.UUID, logicaltypes.OneOfType] = concurrent.TrieMap.empty + private def beamOneOfType(cm: CaseMapper): logicaltypes.OneOfType = + beamOneOfTypeCache.getOrElseUpdate( + cm.uuid, + logicaltypes.OneOfType.create( + sealedTrait.subtypes.map { sub => + Field.of(enumName(sub), sub.typeclass.fieldType(cm)) + }.asJava + ) + ) + + override def fieldType(cm: CaseMapper): FieldType = + FieldType.logicalType(beamOneOfType(cm)) + def from(v: logicaltypes.OneOfType.Value)(cm: CaseMapper): T = { + val idx = v.getCaseType.getValue + sealedTrait.subtypes.find(_.index == idx) match { + case None => throw new IllegalArgumentException(s"OneOf index not found: [$idx]") + case Some(sub) => sub.typeclass.fromAny(v.getValue)(cm) + } + } + + def to(v: T)(cm: CaseMapper): logicaltypes.OneOfType.Value = + sealedTrait.split(v)(sub => + beamOneOfType(cm).createValue(enumName(sub), sub.typeclass.to(sub.cast(v))(cm)) + ) + } + + def join[T](caseClass: CaseClass[Typeclass, T]): RowField[T] = { + if (caseClass.isValueClass) { + val p = caseClass.parameters.head + val tc = p.typeclass + new RowField[T] { + override type FromT = tc.FromT + override type ToT = tc.ToT + override def fieldType(cm: CaseMapper): FieldType = tc.fieldType(cm) + override def from(v: FromT)(cm: CaseMapper): T = caseClass.construct(_ => tc.fromAny(v)(cm)) + override def to(v: T)(cm: CaseMapper): ToT = tc.to(p.dereference(v))(cm) + } + } else { + new Record[T] { + override def fieldType(cm: CaseMapper): FieldType = FieldType.row(schema(cm)) + + override protected def buildSchema(cm: CaseMapper): Schema = + caseClass.parameters + .foldLeft(Schema.builder()) { case (s, p) => + s.addField(cm.map(p.label), p.typeclass.fieldType(cm)) + } + .build() + + override def from(v: Row)(cm: CaseMapper): T = + caseClass.construct(p => p.typeclass.fromAny(v.getValue[Any](p.index))(cm)) + + override def to(v: T)(cm: CaseMapper): Row = { + val values = caseClass.parameters.map { p => + p.typeclass.to(p.dereference(v))(cm).asInstanceOf[Object] + } + Row.withSchema(schema(cm)).addValues(values.asJava).build() + } + } + } + } + + // BYTE An 8-bit signed value + implicit val rfByte: RowField[Byte] = id[Byte](_ => FieldType.BYTE) + // INT16 A 16-bit signed value + implicit val rfShort: RowField[Short] = id[Short](_ => FieldType.INT16) + implicit val rfChar: RowField[Char] = from[Short](_.toChar)(_.toShort) + // INT32 A 32-bit signed value + implicit val rfInt: RowField[Int] = id[Int](_ => FieldType.INT32) + // INT64 A 64-bit signed value + implicit val rfLong: RowField[Long] = id[Long](_ => FieldType.INT64) + // FLOAT A 32-bit IEEE 754 floating point number + implicit val rfFloat: RowField[Float] = id[Float](_ => FieldType.FLOAT) + // DOUBLE A 64-bit IEEE 754 floating point number + implicit val rfDouble: RowField[Double] = id[Double](_ => FieldType.DOUBLE) + // STRING A string + implicit val rfString: RowField[String] = id[String](_ => FieldType.STRING) + // BOOLEAN A boolean value + implicit val rfBoolean: RowField[Boolean] = id[Boolean](_ => FieldType.BOOLEAN) + // BYTES A raw byte array + implicit val rfByteArray: RowField[Array[Byte]] = id[Array[Byte]](_ => FieldType.BYTES) + implicit val rfByteBuffer: RowField[ByteBuffer] = + from[Array[Byte]](x => ByteBuffer.wrap(x))(_.array()) + implicit val rfByteString: RowField[ByteString] = + from[Array[Byte]](x => ByteString.copyFrom(x))(_.toByteArray) + // DECIMAL An arbitrary-precision decimal type + implicit val rfDecimal: RowField[BigDecimal] = + aux[BigDecimal, java.math.BigDecimal, java.math.BigDecimal](_ => FieldType.DECIMAL)( + BigDecimal.apply + )(_.bigDecimal) + + implicit val rfUUID: RowField[ju.UUID] = + id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType)) + + implicit def rfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): RowField[T] = + new RowField[T] { + type FromT = logicaltypes.EnumerationType.Value + type ToT = logicaltypes.EnumerationType.Value + + @transient private lazy val enumTypeCache: concurrent.Map[ju.UUID, EnumType[T]] = + concurrent.TrieMap.empty + @transient private lazy val beamEnumTypeCache + : concurrent.Map[ju.UUID, logicaltypes.EnumerationType] = + concurrent.TrieMap.empty + + private def enumType(cm: CaseMapper): EnumType[T] = + enumTypeCache.getOrElseUpdate(cm.uuid, et.map(cm)) + private def beamEnumType(cm: CaseMapper): logicaltypes.EnumerationType = + beamEnumTypeCache.getOrElseUpdate( + cm.uuid, + logicaltypes.EnumerationType.create(enumType(cm).values.asJava) + ) + override def fieldType(cm: CaseMapper): FieldType = FieldType.logicalType(beamEnumType(cm)) + override def to(v: T)(cm: CaseMapper): ToT = beamEnumType(cm).valueOf(enumType(cm).to(v)) + override def from(v: FromT)(cm: CaseMapper): T = + enumType(cm).from(beamEnumType(cm).toString(v)) + } + + implicit def rfMap[K, V](implicit rfK: RowField[K], rfV: RowField[V]): RowField[Map[K, V]] = + new Aux[Map[K, V], ju.Map[rfK.FromT, rfV.FromT], ju.Map[rfK.ToT, rfV.ToT]] { + override def fieldType(cm: CaseMapper): FieldType = + FieldType.map(rfK.fieldType(cm), rfV.fieldType(cm)) + override def from(v: ju.Map[rfK.FromT, rfV.FromT])(cm: CaseMapper): Map[K, V] = + v.asScala.map { case (k, v) => rfK.from(k)(cm) -> rfV.from(v)(cm) }.toMap + override def to(v: Map[K, V])(cm: CaseMapper): ju.Map[rfK.ToT, rfV.ToT] = + v.map { case (k, v) => rfK.to(k)(cm) -> rfV.to(v)(cm) }.asJava + } + + implicit def rfIterable[T, C[_]](implicit + f: RowField[T], + ti: C[T] => Iterable[T], + fc: FactoryCompat[T, C[T]] + ): RowField[C[T]] = { + new Aux[C[T], ju.List[f.FromT], ju.List[f.ToT]] { + override def from(v: ju.List[f.FromT])(cm: CaseMapper): C[T] = + fc.fromSpecific(v.asScala.iterator.map(p => f.from(p)(cm))) + override def to(v: C[T])(cm: CaseMapper): ju.List[f.ToT] = + v.iterator.map(f.to(_)(cm)).toList.asJava + override def fieldType(cm: CaseMapper): FieldType = FieldType.iterable(f.fieldType(cm)) + } + } + + implicit def rfOption[T](implicit f: RowField[T]): RowField[Option[T]] = { + new Aux[Option[T], f.FromT, f.ToT] { + override def from(v: f.FromT)(cm: CaseMapper): Option[T] = + if (v == null) None else Some(f.from(v)(cm)) + override def to(v: Option[T])(cm: CaseMapper): f.ToT = v match { + case None => null.asInstanceOf[f.ToT] + case Some(x) => f.to(x)(cm) + } + override def fieldType(cm: CaseMapper): FieldType = f.fieldType(cm).withNullable(true) + } + } +} diff --git a/beam/src/main/scala/magnolify/beam/logical/package.scala b/beam/src/main/scala/magnolify/beam/logical/package.scala new file mode 100644 index 000000000..6e97c3356 --- /dev/null +++ b/beam/src/main/scala/magnolify/beam/logical/package.scala @@ -0,0 +1,149 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.beam + +import org.apache.beam.sdk.schemas.logicaltypes +import org.apache.beam.sdk.schemas.Schema.FieldType +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes +import org.joda.time as joda +import org.joda.time.chrono.ISOChronology + +import java.time as jt +import java.time.temporal.ChronoField + +package object logical { + import magnolify.shared.Time._ + + object date { + implicit val rfLocalDate: RowField[jt.LocalDate] = + RowField.id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date)) + private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1) + implicit val rfJodaLocalDate: RowField[joda.LocalDate] = + RowField.from[jt.LocalDate](jtld => + EpochJodaDate.plusDays(jtld.getLong(ChronoField.EPOCH_DAY).toInt) + )(d => jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong)) + } + + object millis { + implicit lazy val rfInstantMillis: RowField[jt.Instant] = + RowField.from[joda.Instant](i => millisToInstant(millisFromJodaInstant(i)))(i => + millisToJodaInstant(millisFromInstant(i)) + ) + implicit val rfJodaInstantMillis: RowField[joda.Instant] = + RowField.id[joda.Instant](_ => FieldType.DATETIME) + implicit val rfJodaDateTimeMillis: RowField[joda.DateTime] = + RowField.from[joda.Instant](_.toDateTime(ISOChronology.getInstanceUTC))(_.toInstant) + + implicit val rfLocalTimeMillis: RowField[jt.LocalTime] = + RowField.from[Int](millisToLocalTime)(millisFromLocalTime) + implicit val rfJodaLocalTimeMillis: RowField[joda.LocalTime] = + RowField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime) + + implicit val rfLocalDateTimeMillis: RowField[jt.LocalDateTime] = + RowField.id[jt.LocalDateTime](_ => FieldType.logicalType(new logicaltypes.DateTime())) + implicit val rfJodaLocalDateTimeMillis: RowField[joda.LocalDateTime] = + RowField.from[jt.LocalDateTime](ldt => + millisToJodaLocalDateTime(millisFromLocalDateTime(ldt)) + )(ldt => millisToLocalDateTime(millisFromJodaLocalDateTime(ldt))) + + implicit val rfDurationMillis: RowField[jt.Duration] = + RowField.from[Long](millisToDuration)(millisFromDuration) + implicit val rfJodaDurationMillis: RowField[joda.Duration] = + RowField.from[Long](millisToJodaDuration)(millisFromJodaDuration) + } + + object micros { + // NOTE: logicaltypes.MicrosInstant() cannot be used as it throws assertion + // errors when greater-than-microsecond precision data is used + implicit val rfInstantMicros: RowField[jt.Instant] = + RowField.from[Long](microsToInstant)(microsFromInstant) + // joda.Instant has millisecond precision, excess precision discarded + implicit val rfJodaInstantMicros: RowField[joda.Instant] = + RowField.from[Long](microsToJodaInstant)(microsFromJodaInstant) + // joda.DateTime only has millisecond resolution, so excess precision is discarded + implicit val rfJodaDateTimeMicros: RowField[joda.DateTime] = + RowField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime) + + implicit val rfLocalTimeMicros: RowField[jt.LocalTime] = + RowField.from[Long](microsToLocalTime)(microsFromLocalTime) + // joda.LocalTime only has millisecond resolution, so excess precision is discarded + implicit val rfJodaLocalTimeMicros: RowField[joda.LocalTime] = + RowField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime) + + implicit val rfLocalDateTimeMicros: RowField[jt.LocalDateTime] = + RowField.from[Long](microsToLocalDateTime)(microsFromLocalDateTime) + // joda.LocalDateTime has millisecond precision, excess precision discarded + implicit val rfJodaLocalDateTimeMicros: RowField[joda.LocalDateTime] = + RowField.from[Long](microsToJodaLocalDateTime)(microsFromJodaLocalDateTime) + + implicit val rfDurationMicros: RowField[jt.Duration] = + RowField.from[Long](microsToDuration)(microsFromDuration) + // joda.Duration has millisecond precision, excess precision discarded + implicit val rfJodaDurationMicros: RowField[joda.Duration] = + RowField.from[Long](microsToJodaDuration)(microsFromJodaDuration) + } + + object nanos { + implicit val rfInstantNanos: RowField[jt.Instant] = + RowField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant())) + // joda.Instant has millisecond precision, excess precision discarded + implicit val rfJodaInstantNanos: RowField[joda.Instant] = + RowField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i => + nanosToInstant(nanosFromJodaInstant(i)) + ) + // joda.DateTime only has millisecond resolution + implicit val rfJodaDateTimeNanos: RowField[joda.DateTime] = + RowField.from[jt.Instant](i => nanosToJodaDateTime(nanosFromInstant(i)))(i => + nanosToInstant(nanosFromJodaDateTime(i)) + ) + + implicit val rfLocalTimeNanos: RowField[jt.LocalTime] = + RowField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time())) + // joda.LocalTime only has millisecond resolution, so excess precision is discarded + implicit val rfJodaLocalTimeNanos: RowField[joda.LocalTime] = + RowField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt => + nanosToLocalTime(nanosFromJodaLocalTime(lt)) + ) + + implicit val rfLocalDateTimeNanos: RowField[jt.LocalDateTime] = + RowField.from[Long](nanosToLocalDateTime)(nanosFromLocalDateTime) + // joda.LocalDateTime has millisecond precision, excess precision discarded + implicit val rfJodaLocalDateTimeMicros: RowField[joda.LocalDateTime] = + RowField.from[jt.LocalDateTime](ldt => nanosToJodaLocalDateTime(nanosFromLocalDateTime(ldt)))( + ldt => nanosToLocalDateTime(nanosFromJodaLocalDateTime(ldt)) + ) + + implicit val rfDurationNanos: RowField[jt.Duration] = + RowField.id[jt.Duration](_ => FieldType.logicalType(new logicaltypes.NanosDuration())) + // joda.Duration has millisecond precision, excess precision discarded + implicit val rfJodaDurationNanos: RowField[joda.Duration] = + RowField.from[jt.Duration](d => nanosToJodaDuration(nanosFromDuration(d)))(d => + nanosToDuration(nanosFromJodaDuration(d)) + ) + } + + object sql { + implicit val rfSqlLocalTime: RowField[jt.LocalTime] = + RowField.id(_ => FieldType.logicalType(SqlTypes.TIME)) + implicit val rfSqlInstant: RowField[jt.Instant] = + RowField.id(_ => FieldType.logicalType(SqlTypes.TIMESTAMP)) + implicit val rfSqlLocalDateTime: RowField[jt.LocalDateTime] = + RowField.id(_ => FieldType.logicalType(SqlTypes.DATETIME)) + implicit val rfSqlLocalDate: RowField[jt.LocalDate] = + RowField.id(_ => FieldType.logicalType(SqlTypes.DATE)) + } +} diff --git a/beam/src/main/scala/magnolify/beam/unsafe/package.scala b/beam/src/main/scala/magnolify/beam/unsafe/package.scala new file mode 100644 index 000000000..72ea47908 --- /dev/null +++ b/beam/src/main/scala/magnolify/beam/unsafe/package.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.beam + +import magnolify.shared.* + +package object unsafe { + implicit def rfUnsafeEnum[T: EnumType]: RowField[UnsafeEnum[T]] = + RowField.from[String](UnsafeEnum.from[T])(UnsafeEnum.to[T]) +} diff --git a/beam/src/test/scala/magnolify/beam/RowTypeSuite.scala b/beam/src/test/scala/magnolify/beam/RowTypeSuite.scala new file mode 100644 index 000000000..4f7a54836 --- /dev/null +++ b/beam/src/test/scala/magnolify/beam/RowTypeSuite.scala @@ -0,0 +1,201 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.beam + +import cats.* +import com.google.protobuf.ByteString +import magnolify.cats.auto.* +import magnolify.cats.TestEq.* +import magnolify.scalacheck.auto.* +import magnolify.scalacheck.TestArbitrary.* +import magnolify.shared.CaseMapper +import magnolify.test.ADT +import magnolify.test.MagnolifySuite +import magnolify.test.Simple.* +import org.apache.beam.sdk.schemas.Schema +import org.joda.time as joda +import org.scalacheck.{Arbitrary, Gen, Prop} + +import java.nio.ByteBuffer +import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime} +import java.util.UUID +import scala.reflect.ClassTag +import scala.jdk.CollectionConverters.* + +class RowTypeSuite extends MagnolifySuite { + private def test[T: Arbitrary: ClassTag](implicit + bst: RowType[T], + eq: Eq[T] + ): Unit = testNamed[T](className[T]) + + private def testNamed[T: Arbitrary](name: String)(implicit + bst: RowType[T], + eq: Eq[T] + ): Unit = { + // Ensure serializable even after evaluation of `schema` + bst.schema: Unit + ensureSerializable(bst) + + property(name) { + Prop.forAll { (t: T) => + val converted = bst.apply(t) + val roundtripped = bst.apply(converted) + Prop.all(eq.eqv(t, roundtripped)) + } + } + } + + implicit val arbByteString: Arbitrary[ByteString] = + Arbitrary(Gen.alphaNumStr.map(ByteString.copyFromUtf8)) + implicit val arbBigDecimal: Arbitrary[BigDecimal] = + Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_))) + implicit val eqByteString: Eq[ByteString] = Eq.instance(_ == _) + + test[Integers] + test[Floats] + test[Required] + test[Nullable] + test[Repeated] + test[Nested] + test[Collections] + test[MoreCollections] + + test[Empty] + test[Others] + test[Maps] + test[Logical] + test[Decimal] + + { + import magnolify.shared.TestEnumType._ + test[SealedTest] + } + + { + import magnolify.beam.unsafe._ + import magnolify.shared.TestEnumType._ + test[Enums] + test[UnsafeEnums] + } + + { + import magnolify.beam.logical.date.* + test[JavaDate] + test[JodaDate] + } + + { + import magnolify.beam.logical.millis.* + testNamed[JavaTime]("JavaMillis") + testNamed[JodaTime]("JodaMillis") + } + + { + import magnolify.beam.logical.micros.* + testNamed[JavaTime]("JavaMicros") + testNamed[JodaTime]("JodaMicros") + } + + { + import magnolify.beam.logical.nanos.* + testNamed[JavaTime]("JavaNanos") + testNamed[JodaTime]("JodaNanos") + } + + { + implicit val bst: RowType[LowerCamel] = + RowType[LowerCamel](CaseMapper(_.toUpperCase)) + test[LowerCamel] + + { + val schema = bst.schema + val fields = LowerCamel.fields.map(_.toUpperCase) + assertEquals(schema.getFields.asScala.map(_.getName()).toSeq, fields) + assertEquals( + schema.getField("INNERFIELD").getType.getRowSchema.getFields.asScala.map(_.getName()).toSeq, + Seq("INNERFIRST") + ) + } + } + + { + // value classes should act only as fields + intercept[IllegalArgumentException] { + RowType[ValueClass] + } + + implicit val bst: RowType[HasValueClass] = RowType[HasValueClass] + test[HasValueClass] + + assert(bst.schema.getField("vc").getType == Schema.FieldType.STRING) + val record = bst(HasValueClass(ValueClass("String"))) + assert(record.getValue[String]("vc").equals("String")) + } + + { + import magnolify.beam.logical.sql.* + test[Sql] + } +} + +case class Empty() +case class Others(bs: ByteString, bb: ByteBuffer, c: Char) +case class Decimal(bd: BigDecimal, bdo: Option[BigDecimal]) +case class Logical( + u: UUID, + uo: Option[UUID], + ul: List[UUID], + ulo: List[Option[UUID]] +) + +case class Sql( + i: Instant, + dt: LocalDateTime, + t: LocalTime, + d: LocalDate +) +case class JavaDate(d: LocalDate) +case class JodaDate(jd: joda.LocalDate) +case class JavaTime( + i: Instant, + dt: LocalDateTime, + t: LocalTime, + d: Duration +) +case class JodaTime( + i: joda.Instant, + dt: joda.DateTime, + lt: joda.LocalTime, + d: joda.Duration, + ldt: joda.LocalDateTime +) +case class Maps( + ms: Map[String, String], + mi: Map[Int, Int], + ml: Map[Long, Long], + md: Map[Double, Double], + mf: Map[Float, Float], + mb: Map[Byte, Byte], + msh: Map[Short, Short], + mba: Map[Byte, Array[Byte]], + mbs: Map[ByteString, Array[Byte]], + mso: Map[Option[String], Option[String]], + mu: Map[UUID, UUID], + mlo: Map[Option[UUID], Option[UUID]] +) + +case class SealedTest(shape: ADT.Shape, point: ADT.Rect, enumColor: ADT.Color) diff --git a/build.sbt b/build.sbt index cece6ea58..c76915fb1 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,7 @@ val magnoliaScala3Version = "1.3.7" val algebirdVersion = "0.13.10" val avroVersion = Option(sys.props("avro.version")).getOrElse("1.11.3") +val beamVersion = "2.57.0" val bigqueryVersion = "v2-rev20240229-2.0.0" val bigtableVersion = "2.43.0" val catsVersion = "2.12.0" @@ -225,7 +226,23 @@ ThisBuild / githubWorkflowAddedJobs ++= Seq( // mima ThisBuild / mimaBinaryIssueFilters ++= Seq( // genFunnelMacro should not be available to users - ProblemFilters.exclude[DirectMissingMethodProblem]("magnolify.guava.auto.package.genFunnelMacro") + ProblemFilters.exclude[DirectMissingMethodProblem]("magnolify.guava.auto.package.genFunnelMacro"), + // incorrectly named implicit + ProblemFilters.exclude[DirectMissingMethodProblem]( + "magnolify.parquet.logical.package#micros.pfTimestampMillis" + ), + // incorrectly named implicit + ProblemFilters.exclude[DirectMissingMethodProblem]( + "magnolify.parquet.logical.package#micros.pfLocalDateTimeMillis" + ), + // incorrectly named implicit + ProblemFilters.exclude[DirectMissingMethodProblem]( + "magnolify.parquet.logical.package#nanos.pfTimestampMillis" + ), + // incorrectly named implicit + ProblemFilters.exclude[DirectMissingMethodProblem]( + "magnolify.parquet.logical.package#nanos.pfLocalDateTimeMillis" + ) ) ThisBuild / tlVersionIntroduced := Map("3" -> "0.8.0") @@ -326,6 +343,7 @@ lazy val root = tlCrossRootProject ) .aggregate( avro, + beam, bigquery, bigtable, bom, @@ -379,7 +397,8 @@ lazy val shared = project commonSettings, crossScalaVersions := Seq(scala3, scala213, scala212), moduleName := "magnolify-shared", - description := "Shared code for Magnolify" + description := "Shared code for Magnolify", + libraryDependencies += "org.scalacheck" %% "scalacheck" % scalacheckVersion % Test ) // shared code for unit tests @@ -400,7 +419,7 @@ lazy val test = project lazy val scalacheck = project .in(file("scalacheck")) .dependsOn( - shared, + shared % "compile,test->test", test % "test->test" ) .settings( @@ -490,6 +509,27 @@ lazy val avro = project ) ) +lazy val beam = project + .in(file("beam")) + .dependsOn( + shared, + cats % "test->test", + scalacheck % "test->test", + test % "test->test" + ) + .settings( + commonSettings, + protobufSettings, + moduleName := "magnolify-beam", + description := "Magnolia add-on for Apache Beam", + libraryDependencies ++= Seq( + "org.apache.beam" % "beam-sdks-java-core" % beamVersion % Provided, + "com.google.protobuf" % "protobuf-java" % protobufVersion % Provided + ), + // TODO remove this line after release + tlMimaPreviousVersions := Set.empty + ) + lazy val bigquery = project .in(file("bigquery")) .dependsOn( @@ -745,6 +785,7 @@ lazy val site = project ) .dependsOn( avro % "compile->compile,provided", + beam % "compile->compile,provided", bigquery % "compile->compile,provided", bigtable % "compile->compile,provided", cats % "compile->compile,provided", diff --git a/cats/src/test/scala/magnolify/cats/TestEq.scala b/cats/src/test/scala/magnolify/cats/TestEq.scala index 152cccbd8..db0c76ce5 100644 --- a/cats/src/test/scala/magnolify/cats/TestEq.scala +++ b/cats/src/test/scala/magnolify/cats/TestEq.scala @@ -45,7 +45,7 @@ object TestEq { // Can only be used as a key value list m.map { case (k, v) => k.toString -> v } } - implicit val eqByteBuffer: Eq[ByteBuffer] = Eq.by(_.array()) + implicit lazy val eqByteBuffer: Eq[ByteBuffer] = Eq.by(_.array()) // java-time implicit lazy val eqInstant: Eq[Instant] = Eq.fromUniversalEquals @@ -56,10 +56,12 @@ object TestEq { implicit lazy val eqDuration: Eq[Duration] = Eq.fromUniversalEquals // joda-time - implicit val eqJodaDate: Eq[joda.LocalDate] = Eq.fromUniversalEquals - implicit val eqJodaDateTime: Eq[joda.DateTime] = Eq.fromUniversalEquals - implicit val eqJodaLocalTime: Eq[joda.LocalTime] = Eq.fromUniversalEquals - implicit val eqJodaLocalDateTime: Eq[joda.LocalDateTime] = Eq.fromUniversalEquals + implicit lazy val eqJodaDate: Eq[joda.LocalDate] = Eq.fromUniversalEquals + implicit lazy val eqJodaDateTime: Eq[joda.DateTime] = Eq.fromUniversalEquals + implicit lazy val eqJodaLocalTime: Eq[joda.LocalTime] = Eq.fromUniversalEquals + implicit lazy val eqJodaLocalDateTime: Eq[joda.LocalDateTime] = Eq.fromUniversalEquals + implicit lazy val eqJodaDuration: Eq[joda.Duration] = Eq.fromUniversalEquals + implicit lazy val eqJodaInstant: Eq[joda.Instant] = Eq.fromUniversalEquals // enum implicit lazy val eqJavaEnum: Eq[JavaEnums.Color] = Eq.fromUniversalEquals diff --git a/docs/beam.md b/docs/beam.md new file mode 100644 index 000000000..5c47a227c --- /dev/null +++ b/docs/beam.md @@ -0,0 +1,62 @@ +# Beam + +`RowType[T]` provides conversion between Scala type `T` and a Beam Row, backed by a [Beam Schema](https://beam.apache.org/documentation/programming-guide/#schema-definition). Custom support for type `T` can be added with an implicit instance of `RowField[T]`. + +```scala mdoc:compile-only +import java.net.URI + +case class Inner(long: Long, str: String, uri: URI) +case class Outer(inner: Inner) +val record = Outer(Inner(1L, "hello", URI.create("https://www.spotify.com"))) + +import magnolify.beam.* +// Encode custom type URI as String +implicit val uriField: RowField[URI] = RowField.from[String](URI.create)(_.toString) + +val rowType = RowType[Outer] +val row = rowType.to(record) +val copy: Outer = rowType.from(row) + +// Beam Schema +val schema = rowType.schema +``` + +## Enums +Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe.*`. + +## Time and dates + +Java and joda `LocalDate` types are available via `import magnolify.beam.logical.date.*` + +For date-time, instants, and durations, use `import magnolify.beam.logical.millis.*`, `import magnolify.beam.logical.micros.*` or `import magnolify.beam.logical.nanos.*` as appropriate for your use-case. +Note that joda types have only millisecond resolution, so excess precision will be discarded when used with `micros` or `nanos`. + +Where possible, Beam logical types are used and joda types defer to these implementations: + +* Beam's `DATETIME` primitive type maps to the millisecond-precision java and joda `Instant`s and the joda `DateTime`. +* The `DateTime` logical type is used for millisecond-precision java and joda `LocalDateTime` +* The `NanosInstant` logical type is used for nanosecond-precision java and joda `Instant` +* The `Time` logical type is used for nanosecond-precision java and joda `LocalTime` +* The `NanosDuration` logical type is used for java and joda `Duration` + +Beam's `MicrosInstant` should not be used as it throws exceptions when presented with greater-than-microsecond precision data. + +## SQL types + +SQL-compatible logical types are supported via `import magnolify.beam.logical.sql.*` + +## Case mapping + +To use a different field case format in target records, add an optional `CaseMapper` argument to `RowType`: + +```scala mdoc:compile-only +import magnolify.beam.* +import magnolify.shared.CaseMapper +import com.google.common.base.CaseFormat + +case class LowerCamel(firstName: String, lastName: String) + +val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _ +val rowType = RowType[LowerCamel](CaseMapper(toSnakeCase)) +rowType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe) +``` \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 537a022fe..c89bfb3fa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -11,6 +11,7 @@ A collection of [Magnolia](https://github.com/propensive/magnolia) add-ons for c This library includes the following modules. - @ref:[`magnolify-avro`](avro.md) - conversion between Scala types and [Apache Avro](https://github.com/apache/avro) `GenericRecord` +- @ref:[`magnolify-beam`](beam.md) - conversion between Scala types and [Apache Beam](https://beam.apache.org/) [schema types](https://beam.apache.org/documentation/programming-guide/#schemas) - @ref:[`magnolify-bigquery`](bigquery.md) - conversion between Scala types and [Google Cloud BigQuery](https://cloud.google.com/bigquery/) `TableRow` - @ref:[`magnolify-bigtable`](bigtable.md) - conversion between Scala types and [Google Cloud Bigtable](https://cloud.google.com/bigtable) to `Mutation`, from `Row` - @ref:[`magnolify-cats`](cats.md) - type class derivation for [Cats](https://github.com/typelevel/cats), specifically @@ -35,6 +36,7 @@ Complete type mapping @ref:[here](mapping.md). @@@ index - @ref:[Avro](avro.md) +- @ref:[Beam](beam.md) - @ref:[BigQuery](bigquery.md) - @ref:[Bigtable](bigtable.md) - @ref:[Cats](cats.md) diff --git a/docs/mapping.md b/docs/mapping.md index bafde4b65..7f5450ed1 100644 --- a/docs/mapping.md +++ b/docs/mapping.md @@ -1,38 +1,38 @@ # Type Mapping -| Scala | Avro | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow | -|-----------------------------------|------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------| -| `Unit` | `null` | x | x | `Null` | x | x | x | -| `Boolean` | `boolean` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 | -| `Char` | `int`3 | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 | -| `Byte` | `int`3 | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | -| `Short` | `int`3 | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | -| `Int` | `int` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 | -| `Long` | `long` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` | -| `Float` | `float` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` | -| `Double` | `double` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 | -| `CharSequence` | `string` | x | x | x | x | x | x | -| `String` | `string` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 | -| `Array[Byte]` | `bytes` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` | -| `ByteString` | x | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` | -| `ByteBuffer` | `bytes` | x | x | | x | x | x | -| Enum1 | `enum` | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 | -| `BigInt` | x | x | `BigInt` | x | x | x | x | -| `BigDecimal` | `bytes`4 | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x | -| `Option[T]` | `union[null, T]`5 | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 | -| `Iterable[T]`2 | `array[T]` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 | -| Nested | `record` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 | -| `Map[K, V]` | `map[V]`15 | x | x | x | x | `map` | x | -| `java.time.Instant` | `long`11 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x | -| `java.time.LocalDateTime` | `long`11 | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x | -| `java.time.OffsetTime` | x | x | x | x | `LOGICAL[TIME]`9 | x | x | -| `java.time.LocalTime` | `long`11 | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x | -| `java.time.LocalDate` | `int`11 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x | -| `org.joda.time.LocalDate` | `int`11 | x | x | x | x | x | x | -| `org.joda.time.DateTime` | `int`11 | x | x | x | x | x | x | -| `org.joda.time.LocalTime` | `int`11 | x | x | x | x | x | x | -| `java.util.UUID` | `string`4 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x | -| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | +| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow | +|-----------------------------------|------------------------------|-----------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------| +| `Unit` | `null` | x | x | x | `Null` | x | x | x | +| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 | +| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 | +| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | +| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | +| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 | +| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` | +| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` | +| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 | +| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x | +| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 | +| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` | +| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` | +| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x | +| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 | +| `BigInt` | x | x | x | `BigInt` | x | x | x | x | +| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x | +| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 | +| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 | +| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 | +| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x | +| `java.time.Instant` | `long`11 | `DATETIME`, `INT64`, `ROW`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x | +| `java.time.LocalDateTime` | `long`11 | `ROW`, `INT64`17 | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x | +| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x | +| `java.time.LocalTime` | `long`11 | `INT32`, `INT64`17 | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x | +| `java.time.LocalDate` | `int`11 | `INT64`17 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x | +| `org.joda.time.LocalDate` | `int`11 | `INT64`17 | x | x | x | x | x | x | +| `org.joda.time.DateTime` | `int`11 | `DATETIME`, `INT64`, `ROW`17 | x | x | x | x | x | x | +| `org.joda.time.LocalTime` | `int`11 | `INT32`, `INT64`17 | x | x | x | x | x | x | +| `java.util.UUID` | `string`4 | `ROW`18 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x | +| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x | 1. Those wrapped in`UnsafeEnum` are encoded as strings, see [enums.md](https://github.com/spotify/magnolify/blob/master/docs/enums.md) for more @@ -58,4 +58,7 @@ format: `required group $FIELDNAME (LIST) { repeated $FIELDTYPE array ($FIELDSCHEMA); }`. 14. Parquet's Decimal logical format supports multiple representations, and are not implicitly scoped by default. Import one of: `magnolify.parquet.ParquetField.{decimal32, decimal64, decimalFixed, decimalBinary}`. -15. Map key type in avro is fixed to string. Scala Map key type must be either `String` or `CharSequence`. \ No newline at end of file +15. Map key type in avro is fixed to string. Scala Map key type must be either `String` or `CharSequence`. +16. Beam logical [Enumeration type](https://beam.apache.org/documentation/programming-guide/#enumerationtype) +17. See [beam.md](https://github.com/spotify/magnolify/blob/master/docs/beam.md) for details +18. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html) diff --git a/parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala b/parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala new file mode 100644 index 000000000..1e9b76a5c --- /dev/null +++ b/parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet.logical + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit + +trait TimeTypes { + protected def unit: TimeUnit + protected def ts(adjusted: Boolean): LogicalTypeAnnotation = + LogicalTypeAnnotation.timestampType(adjusted, unit) + protected def time(adjusted: Boolean): LogicalTypeAnnotation = + LogicalTypeAnnotation.timeType(adjusted, unit) +} diff --git a/parquet/src/main/scala/magnolify/parquet/logical/package.scala b/parquet/src/main/scala/magnolify/parquet/logical/package.scala index 9223c4342..d2fcc5bae 100644 --- a/parquet/src/main/scala/magnolify/parquet/logical/package.scala +++ b/parquet/src/main/scala/magnolify/parquet/logical/package.scala @@ -19,88 +19,64 @@ package magnolify.parquet import java.time._ import magnolify.parquet.ParquetField.Primitive -import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit package object logical { + import magnolify.shared.Time._ // TIME (millis i32, micros i64, nanos, i64), UTC true/false // TIMESTAMP (millis, micros, nanos), UTC true/false - object millis { - private val unit = TimeUnit.MILLIS + object millis extends TimeTypes { + protected val unit = TimeUnit.MILLIS // TIMESTAMP implicit val pfTimestampMillis: Primitive[Instant] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timestampType(true, unit))( - Instant.ofEpochMilli - )(_.toEpochMilli) + ParquetField.logicalType[Long](ts(true))(millisToInstant)(millisFromInstant) implicit val pfLocalDateTimeMillis: Primitive[LocalDateTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timestampType(false, unit))(ms => - LocalDateTime.ofInstant(Instant.ofEpochMilli(ms), ZoneOffset.UTC) - )( - _.toInstant(ZoneOffset.UTC).toEpochMilli - ) + ParquetField.logicalType[Long](ts(false))(millisToLocalDateTime)(millisFromLocalDateTime) // TIME implicit val pfOffsetTimeMillis: Primitive[OffsetTime] = - ParquetField.logicalType[Int](LogicalTypeAnnotation.timeType(true, unit))(ms => + ParquetField.logicalType[Int](time(true))(ms => LocalTime.ofNanoOfDay(ms * 1000000L).atOffset(ZoneOffset.UTC) )(t => (t.toLocalTime.toNanoOfDay / 1000000).toInt) implicit val pfLocalTimeMillis: Primitive[LocalTime] = - ParquetField.logicalType[Int](LogicalTypeAnnotation.timeType(false, unit))(ms => - LocalTime.ofNanoOfDay(ms * 1000000L) - )(t => (t.toNanoOfDay / 1000000).toInt) + ParquetField.logicalType[Int](time(false))(millisToLocalTime)(millisFromLocalTime) } - object micros { - private val unit = TimeUnit.MICROS + object micros extends TimeTypes { + override protected val unit = TimeUnit.MICROS // TIMESTAMP - implicit val pfTimestampMillis: Primitive[Instant] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timestampType(true, unit))(us => - Instant.ofEpochMilli(us / 1000) - )(_.toEpochMilli * 1000) - implicit val pfLocalDateTimeMillis: Primitive[LocalDateTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timestampType(false, unit))(us => - LocalDateTime.ofInstant(Instant.ofEpochMilli(us / 1000), ZoneOffset.UTC) - )( - _.toInstant(ZoneOffset.UTC).toEpochMilli * 1000 - ) + implicit val pfTimestampMicros: Primitive[Instant] = + ParquetField.logicalType[Long](ts(true))(microsToInstant)(microsFromInstant) + implicit val pfLocalDateTimeMicros: Primitive[LocalDateTime] = + ParquetField.logicalType[Long](ts(false))(microsToLocalDateTime)(microsFromLocalDateTime) // TIME implicit val pfOffsetTimeMicros: Primitive[OffsetTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timeType(true, unit))(us => + ParquetField.logicalType[Long](time(true))(us => LocalTime.ofNanoOfDay(us * 1000).atOffset(ZoneOffset.UTC) )(_.toLocalTime.toNanoOfDay / 1000) implicit val pfLocalTimeMicros: Primitive[LocalTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timeType(false, unit))(us => - LocalTime.ofNanoOfDay(us * 1000) - )(_.toNanoOfDay / 1000) + ParquetField.logicalType[Long](time(false))(microsToLocalTime)(microsFromLocalTime) } - object nanos { - private val unit = TimeUnit.NANOS + object nanos extends TimeTypes { + override protected val unit = TimeUnit.NANOS // TIMESTAMP - implicit val pfTimestampMillis: Primitive[Instant] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timestampType(true, unit))(ns => - Instant.ofEpochMilli(ns / 1000000) - )(_.toEpochMilli * 1000000) - implicit val pfLocalDateTimeMillis: Primitive[LocalDateTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timestampType(false, unit))(ns => - LocalDateTime.ofInstant(Instant.ofEpochMilli(ns / 1000000), ZoneOffset.UTC) - )( - _.toInstant(ZoneOffset.UTC).toEpochMilli * 1000000 - ) + implicit val pfTimestampNanos: Primitive[Instant] = + ParquetField.logicalType[Long](ts(true))(nanosToInstant)(nanosFromInstant) + implicit val pfLocalDateTimeNanos: Primitive[LocalDateTime] = + ParquetField.logicalType[Long](ts(false))(nanosToLocalDateTime)(nanosFromLocalDateTime) // TIME implicit val pfOffsetTimeNanos: Primitive[OffsetTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timeType(true, unit))(ns => + ParquetField.logicalType[Long](time(true))(ns => LocalTime.ofNanoOfDay(ns).atOffset(ZoneOffset.UTC) )(_.toLocalTime.toNanoOfDay) implicit val pfLocalTimeNanos: Primitive[LocalTime] = - ParquetField.logicalType[Long](LogicalTypeAnnotation.timeType(false, unit))( - LocalTime.ofNanoOfDay - )(_.toNanoOfDay) + ParquetField.logicalType[Long](time(false))(nanosToLocalTime)(nanosFromLocalTime) } } diff --git a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala index dbbb7bb8f..53357fb8a 100644 --- a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala @@ -43,12 +43,17 @@ class ParquetTypeSuite extends MagnolifySuite { private def test[T: Arbitrary: ClassTag](implicit t: ParquetType[T], eq: Eq[T] + ): Unit = testNamed[T](className[T]) + + private def testNamed[T: Arbitrary](name: String)(implicit + t: ParquetType[T], + eq: Eq[T] ): Unit = { // Ensure serializable even after evaluation of `schema` t.schema: Unit val tpe = ensureSerializable(t) - property(className[T]) { + property(name) { Prop.forAll { (t: T) => val out = new TestOutputFile val writer = tpe.writeBuilder(out).build() @@ -138,46 +143,67 @@ class ParquetTypeSuite extends MagnolifySuite { Arbitrary(Gen.choose(-max, max).map(BigDecimal.apply)) } + test("Decimal range") { + intercept[IllegalArgumentException] { + ParquetField.decimal32(0, 0) + } + intercept[IllegalArgumentException] { + ParquetField.decimal32(1, 10) + } + intercept[IllegalArgumentException] { + ParquetField.decimal64(0, 0) + } + intercept[IllegalArgumentException] { + ParquetField.decimal64(1, 19) + } + intercept[IllegalArgumentException] { + ParquetField.decimalFixed(0, 1) + } + intercept[IllegalArgumentException] { + ParquetField.decimalFixed(2, 5) // capacity = 4 + } + } + { implicit val arbBigDecimal: Arbitrary[BigDecimal] = decimal(9) implicit val pfBigDecimal: ParquetField[BigDecimal] = ParquetField.decimal32(9, 0) - test[Decimal] + testNamed[Decimal]("Decimal32") } { implicit val arbBigDecimal: Arbitrary[BigDecimal] = decimal(18) implicit val pfBigDecimal: ParquetField[BigDecimal] = ParquetField.decimal64(18, 0) - test[Decimal] + testNamed[Decimal]("Decimal64") } { implicit val arbBigDecimal: Arbitrary[BigDecimal] = decimal(18) // math.floor(math.log10(math.pow(2, 8*8-1) - 1)) = 18 digits implicit val pfBigDecimal: ParquetField[BigDecimal] = ParquetField.decimalFixed(8, 18, 0) - test[Decimal] + testNamed[Decimal]("DecimalFixed") } { implicit val arbBigDecimal: Arbitrary[BigDecimal] = decimal(20) implicit val pfBigDecimal: ParquetField[BigDecimal] = ParquetField.decimalBinary(20, 0) - test[Decimal] + testNamed[Decimal]("DecimalBinary") } test[Logical] { import magnolify.parquet.logical.millis._ - test[Time] + testNamed[Time]("TimeMillis") } { import magnolify.parquet.logical.micros._ - test[Time] + testNamed[Time]("TimeMicros") } { import magnolify.parquet.logical.nanos._ - test[Time] + testNamed[Time]("TimeNanos") } { diff --git a/scalacheck/src/test/scala/magnolify/scalacheck/TestArbitrary.scala b/scalacheck/src/test/scala/magnolify/scalacheck/TestArbitrary.scala index c0d9de75e..c6452152a 100644 --- a/scalacheck/src/test/scala/magnolify/scalacheck/TestArbitrary.scala +++ b/scalacheck/src/test/scala/magnolify/scalacheck/TestArbitrary.scala @@ -17,18 +17,16 @@ package magnolify.scalacheck import magnolify.scalacheck.semiauto.ArbitraryDerivation -import magnolify.shared.UnsafeEnum -import magnolify.test.ADT._ +import magnolify.shared.{TimeArbitrary, UnsafeEnum} +import magnolify.test.ADT.* import magnolify.test.JavaEnums -import magnolify.test.Simple._ -import org.joda.{time => joda} -import org.scalacheck._ +import magnolify.test.Simple.* +import org.scalacheck.* import java.net.URI import java.nio.ByteBuffer -import java.time._ -object TestArbitrary { +object TestArbitrary extends TimeArbitrary { // null implicit lazy val arbNull: Arbitrary[Null] = Arbitrary(Gen.const(null)) @@ -44,42 +42,6 @@ object TestArbitrary { Arbitrary.arbitrary[Array[Byte]].map(ByteBuffer.wrap) } - // java-time - implicit lazy val arbInstant: Arbitrary[Instant] = - Arbitrary(Gen.posNum[Long].map(Instant.ofEpochMilli)) - implicit lazy val arbLocalDate: Arbitrary[LocalDate] = - Arbitrary(Gen.chooseNum(0L, 365L * 100).map(LocalDate.ofEpochDay)) - implicit lazy val arbLocalTime: Arbitrary[LocalTime] = - Arbitrary(arbInstant.arbitrary.map(_.atZone(ZoneOffset.UTC).toLocalTime)) - implicit lazy val arbLocalDateTime: Arbitrary[LocalDateTime] = - Arbitrary(arbInstant.arbitrary.map(_.atZone(ZoneOffset.UTC).toLocalDateTime)) - implicit lazy val arbOffsetTime: Arbitrary[OffsetTime] = - Arbitrary(arbInstant.arbitrary.map(_.atOffset(ZoneOffset.UTC).toOffsetTime)) - implicit lazy val arbDuration: Arbitrary[Duration] = - Arbitrary(Gen.posNum[Long].map(Duration.ofMillis)) - - // joda-time - implicit val arbJodaDate: Arbitrary[joda.LocalDate] = Arbitrary { - Arbitrary.arbitrary[LocalDate].map { ld => - new joda.LocalDate(ld.getYear, ld.getMonthValue, ld.getDayOfMonth) - } - } - implicit val arbJodaDateTime: Arbitrary[joda.DateTime] = Arbitrary { - Arbitrary.arbitrary[Instant].map { i => - new joda.DateTime(i.toEpochMilli, joda.DateTimeZone.UTC) - } - } - implicit val arbJodaLocalTime: Arbitrary[joda.LocalTime] = Arbitrary { - Arbitrary.arbitrary[LocalTime].map { lt => - joda.LocalTime.fromMillisOfDay(lt.toNanoOfDay / 1000) - } - } - implicit val arbJodaLocalDateTime: Arbitrary[joda.LocalDateTime] = Arbitrary { - Arbitrary.arbitrary[LocalDateTime].map { ldt => - joda.LocalDateTime.parse(ldt.toString) - } - } - // enum implicit lazy val arbJavaEnum: Arbitrary[JavaEnums.Color] = Arbitrary(Gen.oneOf(JavaEnums.Color.values.toSeq)) diff --git a/shared/src/main/scala/magnolify/shared/Time.scala b/shared/src/main/scala/magnolify/shared/Time.scala new file mode 100644 index 000000000..7cad70ee5 --- /dev/null +++ b/shared/src/main/scala/magnolify/shared/Time.scala @@ -0,0 +1,155 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.shared + +import org.joda.time as joda +import java.time.{Duration, Instant, LocalDateTime, LocalTime, ZoneOffset} +import java.util.concurrent.TimeUnit + +object Time { + // millis ///////////////////////////////////////////////////// + @inline def millisToInstant(millisFromEpoch: Long): Instant = + Instant.ofEpochMilli(millisFromEpoch) + @inline def millisFromInstant(instant: Instant): Long = instant.toEpochMilli + @inline def millisToJodaInstant(millisFromEpoch: Long): joda.Instant = + joda.Instant.ofEpochMilli(millisFromEpoch) + @inline def millisFromJodaInstant(instant: joda.Instant): Long = instant.getMillis + + @inline def millisToLocalTime(millisFromMidnight: Int): LocalTime = + LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight.toLong)) + @inline def millisFromLocalTime(lt: LocalTime): Int = + TimeUnit.NANOSECONDS.toMillis(lt.toNanoOfDay).toInt + @inline def millisToJodaLocalTime(millisFromMidnight: Int): joda.LocalTime = + joda.LocalTime.fromMillisOfDay(millisFromMidnight.toLong) + @inline def millisFromJodaLocalTime(lt: joda.LocalTime): Int = lt.millisOfDay().get() + + @inline def millisToJodaDateTime(millisFromEpoch: Long): joda.DateTime = + new joda.DateTime(millisFromEpoch, joda.DateTimeZone.UTC) + @inline def millisFromJodaDateTime(dt: joda.DateTime): Long = dt.getMillis + + @inline def millisToLocalDateTime(millisFromEpoch: Long): LocalDateTime = + LocalDateTime.ofInstant(millisToInstant(millisFromEpoch), ZoneOffset.UTC) + @inline def millisFromLocalDateTime(ldt: LocalDateTime): Long = + millisFromInstant(ldt.toInstant(ZoneOffset.UTC)) + @inline def millisToJodaLocalDateTime(millisFromEpoch: Long): joda.LocalDateTime = + new joda.LocalDateTime(millisFromEpoch, joda.DateTimeZone.UTC) + @inline def millisFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long = + ldt.toDateTime(joda.DateTimeZone.UTC).getMillis + + @inline def millisToDuration(millis: Long): Duration = Duration.ofMillis(millis) + @inline def millisFromDuration(d: Duration): Long = + TimeUnit.SECONDS.toMillis(d.getSeconds) + TimeUnit.NANOSECONDS.toMillis(d.getNano.toLong) + @inline def millisToJodaDuration(millis: Long): joda.Duration = joda.Duration.millis(millis) + @inline def millisFromJodaDuration(d: joda.Duration): Long = d.getMillis + + // micros ///////////////////////////////////////////////////// + @inline def microsToInstant(microsFromEpoch: Long): Instant = { + val epochSeconds = TimeUnit.MICROSECONDS.toSeconds(microsFromEpoch) + val nanoAdjustment = TimeUnit.MICROSECONDS.toNanos(microsFromEpoch % 1000000L) + Instant.ofEpochSecond(epochSeconds, nanoAdjustment) + } + @inline def microsFromInstant(instant: Instant): Long = { + val seconds = instant.getEpochSecond + val nanos = instant.getNano + if (seconds < 0 && nanos > 0) { + val micros = Math.multiplyExact(seconds + 1, 1000000L) + val adjustment = (nanos / 1000L) - 1000000 + Math.addExact(micros, adjustment) + } else { + val micros = Math.multiplyExact(seconds, 1000000L) + Math.addExact(micros, nanos / 1000L) + } + } + @inline def microsToJodaInstant(microsFromEpoch: Long): joda.Instant = + joda.Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch)) + @inline def microsFromJodaInstant(instant: joda.Instant): Long = + TimeUnit.MILLISECONDS.toMicros(instant.getMillis) + + @inline def microsToJodaDateTime(microsFromEpoch: Long): joda.DateTime = + new joda.DateTime(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch), joda.DateTimeZone.UTC) + @inline def microsFromJodaDateTime(dt: joda.DateTime): Long = + TimeUnit.MILLISECONDS.toMicros(dt.getMillis) + + @inline def microsToLocalTime(microsFromMidnight: Long): LocalTime = + LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(microsFromMidnight)) + @inline def microsFromLocalTime(lt: LocalTime): Long = + TimeUnit.NANOSECONDS.toMicros(lt.toNanoOfDay) + @inline def microsToJodaLocalTime(microsFromMidnight: Long): joda.LocalTime = + joda.LocalTime.fromMillisOfDay(TimeUnit.MICROSECONDS.toMillis(microsFromMidnight)) + @inline def microsFromJodaLocalTime(lt: joda.LocalTime): Long = + TimeUnit.MILLISECONDS.toMicros(lt.millisOfDay().get().toLong) + + @inline def microsToLocalDateTime(microsFromEpoch: Long): LocalDateTime = + LocalDateTime.ofInstant(microsToInstant(microsFromEpoch), ZoneOffset.UTC) + @inline def microsFromLocalDateTime(ldt: LocalDateTime): Long = + microsFromInstant(ldt.toInstant(ZoneOffset.UTC)) + @inline def microsToJodaLocalDateTime(microsFromEpoch: Long): joda.LocalDateTime = + new joda.LocalDateTime(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch), joda.DateTimeZone.UTC) + @inline def microsFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long = + TimeUnit.MILLISECONDS.toMicros(ldt.toDateTime(joda.DateTimeZone.UTC).getMillis) + + @inline def microsToDuration(micros: Long): Duration = + Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(micros)) + @inline def microsFromDuration(d: Duration): Long = + TimeUnit.SECONDS.toMicros(d.getSeconds) + TimeUnit.NANOSECONDS.toMicros(d.getNano.toLong) + @inline def microsToJodaDuration(micros: Long): joda.Duration = + joda.Duration.millis(TimeUnit.MICROSECONDS.toMillis(micros)) + @inline def microsFromJodaDuration(d: joda.Duration): Long = + TimeUnit.MILLISECONDS.toMicros(d.getMillis) + + // nanos ///////////////////////////////////////////////////// + // Long does not technically have enough range for Instant + @inline def nanosToInstant(epochNanos: Long): Instant = + Instant.ofEpochSecond(TimeUnit.NANOSECONDS.toSeconds(epochNanos), epochNanos % 1000000000L) + @inline def nanosFromInstant(instant: Instant): Long = + TimeUnit.SECONDS.toNanos(instant.getEpochSecond) + instant.getNano + @inline def nanosToJodaInstant(nanosFromEpoch: Long): joda.Instant = + joda.Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch)) + @inline def nanosFromJodaInstant(instant: joda.Instant): Long = + TimeUnit.MILLISECONDS.toNanos(instant.getMillis) + + @inline def nanosToJodaDateTime(nanosFromEpoch: Long): joda.DateTime = + new joda.DateTime(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch), joda.DateTimeZone.UTC) + @inline def nanosFromJodaDateTime(dt: joda.DateTime): Long = + TimeUnit.MILLISECONDS.toNanos(dt.getMillis) + + @inline def nanosToLocalTime(nanosFromMidnight: Long): LocalTime = + LocalTime.ofNanoOfDay(nanosFromMidnight) + @inline def nanosFromLocalTime(lt: LocalTime): Long = lt.toNanoOfDay + @inline def nanosToJodaLocalTime(nanosFromMidnight: Long): joda.LocalTime = + joda.LocalTime.fromMillisOfDay(TimeUnit.NANOSECONDS.toMillis(nanosFromMidnight)) + @inline def nanosFromJodaLocalTime(lt: joda.LocalTime): Long = + TimeUnit.MILLISECONDS.toNanos(lt.millisOfDay().get().toLong) + + @inline def nanosToLocalDateTime(nanosFromEpoch: Long): LocalDateTime = + LocalDateTime.ofInstant(nanosToInstant(nanosFromEpoch), ZoneOffset.UTC) + @inline def nanosFromLocalDateTime(ldt: LocalDateTime): Long = + nanosFromInstant(ldt.toInstant(ZoneOffset.UTC)) + @inline def nanosToJodaLocalDateTime(nanosFromEpoch: Long): joda.LocalDateTime = + new joda.LocalDateTime(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch), joda.DateTimeZone.UTC) + @inline def nanosFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long = + TimeUnit.MILLISECONDS.toNanos(ldt.toDateTime(joda.DateTimeZone.UTC).getMillis) + + @inline def nanosToDuration(nanos: Long): Duration = + Duration.ofNanos(nanos) + @inline def nanosFromDuration(d: Duration): Long = + TimeUnit.SECONDS.toNanos(d.getSeconds) + d.getNano + @inline def nanosToJodaDuration(nanos: Long): joda.Duration = + joda.Duration.millis(TimeUnit.NANOSECONDS.toMillis(nanos)) + @inline def nanosFromJodaDuration(d: joda.Duration): Long = + TimeUnit.MILLISECONDS.toNanos(d.getMillis) +} diff --git a/shared/src/test/scala/magnolify/shared/TimeArbitrary.scala b/shared/src/test/scala/magnolify/shared/TimeArbitrary.scala new file mode 100644 index 000000000..892f7e651 --- /dev/null +++ b/shared/src/test/scala/magnolify/shared/TimeArbitrary.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.shared + +import org.joda.time as joda +import org.scalacheck.{Arbitrary, Gen} + +import java.time.{Duration, Instant, LocalDate, LocalDateTime, LocalTime, OffsetTime, ZoneOffset} + +trait TimeArbitrary { + implicit lazy val arbInstant: Arbitrary[Instant] = + Arbitrary(Gen.posNum[Long].map(Instant.ofEpochMilli)) + implicit lazy val arbLocalDate: Arbitrary[LocalDate] = + Arbitrary(Gen.chooseNum(0L, 365L * 100).map(LocalDate.ofEpochDay)) + implicit lazy val arbLocalTime: Arbitrary[LocalTime] = + Arbitrary(arbInstant.arbitrary.map(_.atZone(ZoneOffset.UTC).toLocalTime)) + implicit lazy val arbLocalDateTime: Arbitrary[LocalDateTime] = + Arbitrary(arbInstant.arbitrary.map(_.atZone(ZoneOffset.UTC).toLocalDateTime)) + implicit lazy val arbOffsetTime: Arbitrary[OffsetTime] = + Arbitrary(arbInstant.arbitrary.map(_.atOffset(ZoneOffset.UTC).toOffsetTime)) + implicit lazy val arbDuration: Arbitrary[Duration] = + Arbitrary(Gen.posNum[Long].map(Duration.ofMillis)) + + implicit val arbJodaDate: Arbitrary[joda.LocalDate] = Arbitrary { + Arbitrary.arbitrary[LocalDate].map { ld => + new joda.LocalDate(ld.getYear, ld.getMonthValue, ld.getDayOfMonth) + } + } + implicit val arbJodaDateTime: Arbitrary[joda.DateTime] = Arbitrary { + Arbitrary.arbitrary[Instant].map { i => + new joda.DateTime(i.toEpochMilli, joda.DateTimeZone.UTC) + } + } + implicit val arbJodaLocalTime: Arbitrary[joda.LocalTime] = Arbitrary { + Arbitrary.arbitrary[LocalTime].map { lt => + joda.LocalTime.fromMillisOfDay(lt.toNanoOfDay / 1000) + } + } + implicit val arbJodaLocalDateTime: Arbitrary[joda.LocalDateTime] = Arbitrary { + Arbitrary.arbitrary[LocalDateTime].map { ldt => + joda.LocalDateTime.parse(ldt.toString) + } + } + implicit val arbJodaDuration: Arbitrary[joda.Duration] = + Arbitrary(Gen.posNum[Long].map(joda.Duration.millis)) + implicit val arbJodaInstant: Arbitrary[joda.Instant] = + Arbitrary(Gen.posNum[Long].map(l => new joda.Instant(l))) + +} diff --git a/shared/src/test/scala/magnolify/shared/TimeSpec.scala b/shared/src/test/scala/magnolify/shared/TimeSpec.scala new file mode 100644 index 000000000..36c5a2fef --- /dev/null +++ b/shared/src/test/scala/magnolify/shared/TimeSpec.scala @@ -0,0 +1,141 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.shared + +import org.joda.time as joda +import org.scalacheck.* +import org.scalacheck.Prop.forAll + +class TimeSpec extends Properties("Time") with TimeArbitrary { + import Time._ + + case class Convert[T, U: Arbitrary, V: Arbitrary]( + name: String, + javaTo: T => U, + javaFrom: U => T, + jodaTo: T => V, + jodaFrom: V => T + ) { + def java = + property(name) = forAll((u: U) => (javaFrom andThen javaTo)(u) == u) + def joda = + property(s"$name-joda") = forAll((v: V) => (jodaFrom andThen jodaTo)(v) == v) + def roundtrip = + property(s"$name-roundtrip") = + forAll((u: U) => (javaFrom andThen jodaTo andThen jodaFrom andThen javaTo)(u) == u) + } + + val conversions: List[Convert[?, ?, ?]] = List( + Convert( + "millis-instant", + millisToInstant, + millisFromInstant, + millisToJodaInstant, + millisFromJodaInstant + ), + Convert( + "millis-localtime", + millisToLocalTime, + millisFromLocalTime, + millisToJodaLocalTime, + millisFromJodaLocalTime + ), + Convert( + "millis-localdatetime", + millisToLocalDateTime, + millisFromLocalDateTime, + millisToJodaLocalDateTime, + millisFromJodaLocalDateTime + ), + Convert( + "millis-duration", + millisToDuration, + millisFromDuration, + millisToJodaDuration, + millisFromJodaDuration + ), + Convert( + "micros-instant", + microsToInstant, + microsFromInstant, + microsToJodaInstant, + microsFromJodaInstant + ), + Convert( + "micros-localtime", + microsToLocalTime, + microsFromLocalTime, + microsToJodaLocalTime, + microsFromJodaLocalTime + ), + Convert( + "micros-localdatetime", + microsToLocalDateTime, + microsFromLocalDateTime, + microsToJodaLocalDateTime, + microsFromJodaLocalDateTime + ), + Convert( + "micros-duration", + microsToDuration, + microsFromDuration, + microsToJodaDuration, + microsFromJodaDuration + ), + Convert( + "nanos-instant", + nanosToInstant, + nanosFromInstant, + nanosToJodaInstant, + nanosFromJodaInstant + ), + Convert( + "nanos-localtime", + nanosToLocalTime, + nanosFromLocalTime, + nanosToJodaLocalTime, + nanosFromJodaLocalTime + ), + Convert( + "nanos-localdatetime", + nanosToLocalDateTime, + nanosFromLocalDateTime, + nanosToJodaLocalDateTime, + nanosFromJodaLocalDateTime + ), + Convert( + "nanos-duration", + nanosToDuration, + nanosFromDuration, + nanosToJodaDuration, + nanosFromJodaDuration + ) + ) + + conversions.foreach { c => + c.java + c.joda + c.roundtrip + } + + property(s"millis-datetime-joda") = + forAll((v: joda.DateTime) => (millisFromJodaDateTime _ andThen millisToJodaDateTime)(v) == v) + property(s"micros-datetime-joda") = + forAll((v: joda.DateTime) => (microsFromJodaDateTime _ andThen microsToJodaDateTime)(v) == v) + property(s"nanos-datetime-joda") = + forAll((v: joda.DateTime) => (nanosFromJodaDateTime _ andThen nanosToJodaDateTime)(v) == v) +} diff --git a/test/src/test/scala/magnolify/test/ADT.scala b/test/src/test/scala/magnolify/test/ADT.scala index 084b54db7..bea9295a0 100644 --- a/test/src/test/scala/magnolify/test/ADT.scala +++ b/test/src/test/scala/magnolify/test/ADT.scala @@ -31,6 +31,9 @@ object ADT { case object Space extends Shape case class Point(x: Int, y: Int) extends Shape case class Circle(r: Int) extends Shape + sealed trait Quadrilateral extends Shape + case class Square(origin: Point, sideLength: Int) extends Quadrilateral + case class Rect(origin: Point, width: Int, length: Int) extends Quadrilateral @ScalaAnnotation("Color") sealed trait Color