From 44c044bd573e35427451885512b805c0edb51757 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 24 Oct 2023 16:38:13 +0200 Subject: [PATCH] Map based converter for flat model (#847) * Map based converter for flat model When object is not a tree, use a map of qualified keys to values as converter source. Filtering keys when cursor cursor selects field removes need to wrap the return type into a Value * Keep bigtable values as columns * Add missing 2.12 import --- .../magnolify/bigtable/BigtableType.scala | 141 +++++------------- .../scala/magnolify/shared/Converter.scala | 43 ------ .../magnolify/tensorflow/ExampleType.scala | 84 +++++------ 3 files changed, 75 insertions(+), 193 deletions(-) diff --git a/bigtable/src/main/scala/magnolify/bigtable/BigtableType.scala b/bigtable/src/main/scala/magnolify/bigtable/BigtableType.scala index 9575304e6..b630611d7 100644 --- a/bigtable/src/main/scala/magnolify/bigtable/BigtableType.scala +++ b/bigtable/src/main/scala/magnolify/bigtable/BigtableType.scala @@ -16,26 +16,26 @@ package magnolify.bigtable -import java.nio.ByteBuffer -import java.util.UUID -import com.google.bigtable.v2.{Cell, Column, Family, Mutation, Row} import com.google.bigtable.v2.Mutation.SetCell +import com.google.bigtable.v2.* import com.google.protobuf.ByteString -import magnolia1._ -import magnolify.shared._ -import magnolify.shims._ +import magnolia1.* +import magnolify.shared.* +import magnolify.shims.* +import java.nio.ByteBuffer +import java.util.UUID import scala.annotation.implicitNotFound -import scala.jdk.CollectionConverters._ -import scala.collection.compat._ +import scala.jdk.CollectionConverters.* +import scala.collection.compat.* -sealed trait BigtableType[T] extends Converter[T, java.util.List[Column], Seq[SetCell.Builder]] { +sealed trait BigtableType[T] extends Converter[T, Map[String, Column], Seq[SetCell.Builder]] { def apply(v: Row, columnFamily: String): T = from( v.getFamiliesList.asScala .find(_.getName == columnFamily) - .map(_.getColumnsList) - .getOrElse(java.util.Collections.emptyList()) + .map(_.getColumnsList.asScala.map(c => c.getQualifier.toStringUtf8 -> c).toMap) + .getOrElse(Map.empty) ) def apply(v: T, columnFamily: String, timestampMicros: Long = 0L): Seq[Mutation] = to(v).map { b => @@ -53,7 +53,7 @@ object BigtableType { case r: BigtableField.Record[_] => new BigtableType[T] { private val caseMapper: CaseMapper = cm - override def from(xs: java.util.List[Column]): T = r.get(xs, null)(caseMapper).get + override def from(xs: Map[String, Column]): T = r.get(xs, null)(caseMapper) override def to(v: T): Seq[SetCell.Builder] = r.put(null, v)(caseMapper) } case _ => @@ -79,7 +79,7 @@ object BigtableType { ) .build() } - .toSeq + .toSeq // keep for java 2.12 Family.newBuilder().setName(familyName).addAllColumns(columns.asJava).build() } .toSeq @@ -105,11 +105,20 @@ object BigtableType { } sealed trait BigtableField[T] extends Serializable { - def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] + def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] } object BigtableField { + + private def key(prefix: String, label: String): String = + if (prefix == null) label else s"$prefix.$label" + + private def columnFilter(key: String): (String, Column) => Boolean = { + val recordKey = key + "." + (name: String, _: Column) => name == key || name.startsWith(recordKey) + } + sealed trait Record[T] extends BigtableField[T] sealed trait Primitive[T] extends BigtableField[T] { @@ -119,10 +128,8 @@ object BigtableField { private def columnQualifier(k: String): ByteString = ByteString.copyFromUtf8(k) - override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] = { - val v = Columns.find(xs, k) - if (v == null) Value.None else Value.Some(fromByteString(v.getCells(0).getValue)) - } + override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T = + fromByteString(xs(k).getCells(0).getValue) override def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] = Seq( @@ -142,34 +149,28 @@ object BigtableField { val p = caseClass.parameters.head val tc = p.typeclass new BigtableField[T] { - override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] = - tc.get(xs, k)(cm).map(x => caseClass.construct(_ => x)) + override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T = + caseClass.construct(_ => tc.get(xs, k)(cm)) override def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] = p.typeclass.put(k, p.dereference(v))(cm) } } else { new Record[T] { - private def key(prefix: String, label: String): String = - if (prefix == null) label else s"$prefix.$label" - - override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] = { - var fallback = true - val r = caseClass.construct { p => - val cq = key(k, cm.map(p.label)) - val v = p.typeclass.get(xs, cq)(cm) - if (v.isSome) { - fallback = false - } - v.getOrElse(p.default) + override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T = { + caseClass.construct { p => + val qualifier = key(k, cm.map(p.label)) + val columns = xs.filter(columnFilter(qualifier).tupled) + // consider default value only if all fields are missing + p.default + .filter(_ => columns.isEmpty) + .getOrElse(p.typeclass.get(columns, qualifier)(cm)) } - // result is default if all fields are default - if (fallback) Value.Default(r) else Value.Some(r) } override def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] = - caseClass.parameters.flatMap(p => + caseClass.parameters.flatMap { p => p.typeclass.put(key(k, cm.map(p.label)), p.dereference(v))(cm) - ) + } } } } @@ -255,8 +256,8 @@ object BigtableField { implicit def btfOption[T](implicit btf: BigtableField[T]): BigtableField[Option[T]] = new BigtableField[Option[T]] { - override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[Option[T]] = - Columns.findNullable(xs, k).map(btf.get(_, k)(cm).toOption).getOrElse(Value.Default(None)) + override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): Option[T] = + if (xs.isEmpty) None else Some(btf.get(xs, k)(cm)) override def put(k: String, v: Option[T])(cm: CaseMapper): Seq[SetCell.Builder] = v.toSeq.flatMap(btf.put(k, _)(cm)) @@ -314,67 +315,3 @@ object BigtableField { } } } - -private object Columns { - private def find( - xs: java.util.List[Column], - columnQualifier: String, - matchPrefix: Boolean - ): (Int, Int, Boolean) = { - val cq = ByteString.copyFromUtf8(columnQualifier) - val pre = if (matchPrefix) ByteString.copyFromUtf8(s"$columnQualifier.") else ByteString.EMPTY - var low = 0 - var high = xs.size() - var idx = -1 - var isNested = false - while (idx == -1 && low < high) { - val mid = (high + low) / 2 - val current = xs.get(mid).getQualifier - if (matchPrefix && current.startsWith(pre)) { - idx = mid - isNested = true - } else { - val c = ByteStringComparator.INSTANCE.compare(current, cq) - if (c < 0) { - low = mid + 1 - } else if (c == 0) { - idx = mid - low = mid + 1 - } else { - high = mid - } - } - } - - if (isNested) { - low = idx - 1 - while (low >= 0 && xs.get(low).getQualifier.startsWith(pre)) { - low -= 1 - } - high = idx + 1 - while (high < xs.size() && xs.get(high).getQualifier.startsWith(pre)) { - high += 1 - } - (low + 1, high, isNested) - } else { - (idx, idx, isNested) - } - } - - def find(xs: java.util.List[Column], columnQualifier: String): Column = { - val (idx, _, _) = find(xs, columnQualifier, false) - if (idx == -1) null else xs.get(idx) - } - - def findNullable( - xs: java.util.List[Column], - columnQualifier: String - ): Option[java.util.List[Column]] = { - val (low, high, isNested) = find(xs, columnQualifier, true) - if (isNested) { - Some(xs.subList(low, high)) - } else { - if (low == -1) None else Some(java.util.Collections.singletonList(xs.get(low))) - } - } -} diff --git a/shared/src/main/scala/magnolify/shared/Converter.scala b/shared/src/main/scala/magnolify/shared/Converter.scala index 6e1df1088..e0808f192 100644 --- a/shared/src/main/scala/magnolify/shared/Converter.scala +++ b/shared/src/main/scala/magnolify/shared/Converter.scala @@ -20,46 +20,3 @@ trait Converter[T, Reader, Writer] extends Serializable { def from(v: Reader): T def to(v: T): Writer } - -// Represent a value from an external source. -sealed trait Value[+T] { - def get: T = this match { - case Value.Some(v) => v - case Value.Default(v) => v - case Value.None => throw new NoSuchElementException - } - - def isSome: Boolean = this.isInstanceOf[Value.Some[_]] - def isEmpty: Boolean = this eq Value.None - - def map[U](f: T => U): Value[U] = this match { - case Value.Some(x) => Value.Some(f(x)) - case Value.Default(x) => Value.Default(f(x)) - case Value.None => Value.None - } - - def getOrElse[U](fallback: Option[U])(implicit ev: T <:< U): U = (this, fallback) match { - case (Value.Some(x), _) => x - case (Value.Default(_), Some(x)) => x - case (Value.Default(x), None) => x - case (Value.None, Some(x)) => x - case _ => throw new NoSuchElementException - } - - def toOption: Value[Option[T]] = this match { - case Value.Some(v) => Value.Some(Some(v)) - case Value.Default(v) => Value.Default(Some(v)) - case Value.None => Value.Default(None) - } -} - -object Value { - // Value from the external source, e.g. Avro, BigQuery - case class Some[T](value: T) extends Value[T] - - // Value from the case class default - case class Default[T](value: T) extends Value[T] - - // Value missing from both the external source and the case class default - case object None extends Value[Nothing] -} diff --git a/tensorflow/src/main/scala/magnolify/tensorflow/ExampleType.scala b/tensorflow/src/main/scala/magnolify/tensorflow/ExampleType.scala index 2a9559cb1..34e413c0d 100644 --- a/tensorflow/src/main/scala/magnolify/tensorflow/ExampleType.scala +++ b/tensorflow/src/main/scala/magnolify/tensorflow/ExampleType.scala @@ -16,26 +16,26 @@ package magnolify.tensorflow -import java.{lang => jl, util => ju} - import com.google.protobuf.ByteString -import magnolia1._ -import magnolify.shared._ +import magnolia1.* +import magnolify.shared.* import magnolify.shims.FactoryCompat -import org.tensorflow.metadata.v0.{Annotation, Feature => FeatureSchema, FeatureType, Schema} -import org.tensorflow.proto.example._ +import org.tensorflow.metadata.v0.{Annotation, Feature as FeatureSchema, FeatureType, Schema} +import org.tensorflow.proto.example.* + +import java.{lang as jl, util as ju} import scala.annotation.{implicitNotFound, StaticAnnotation} import scala.collection.concurrent -import scala.jdk.CollectionConverters._ -import scala.collection.compat._ +import scala.jdk.CollectionConverters.* +import scala.collection.compat.* class doc(msg: String) extends StaticAnnotation with Serializable { override def toString: String = msg } -sealed trait ExampleType[T] extends Converter[T, Example, Example.Builder] { +sealed trait ExampleType[T] extends Converter[T, Map[String, Feature], Example.Builder] { val schema: Schema - def apply(v: Example): T = from(v) + def apply(v: Example): T = from(v.getFeatures.getFeatureMap.asScala.toMap) def apply(v: T): Example = to(v).build() } @@ -46,7 +46,8 @@ object ExampleType { case r: ExampleField.Record[_] => new ExampleType[T] { @transient override lazy val schema: Schema = r.schema(cm) - override def from(v: Example): T = r.get(v.getFeatures, null)(cm).get + override def from(v: Map[String, Feature]): T = + r.get(v, null)(cm) override def to(v: T): Example.Builder = Example.newBuilder().setFeatures(r.put(Features.newBuilder(), null, v)(cm)) } @@ -63,26 +64,26 @@ sealed trait ExampleField[T] extends Serializable { def schema(cm: CaseMapper): Schema = schemaCache.getOrElseUpdate(cm.uuid, buildSchema(cm)) - def get(f: Features, k: String)(cm: CaseMapper): Value[T] + def get(fs: Map[String, Feature], k: String)(cm: CaseMapper): T def put(f: Features.Builder, k: String, v: T)(cm: CaseMapper): Features.Builder } object ExampleField { + private def key(prefix: String, label: String): String = + if (prefix == null) label else s"$prefix.$label" + + private def featureFilter(key: String): (String, Feature) => Boolean = { + val recordKey = key + "." + (name: String, _: Feature) => name == key || name.startsWith(recordKey) + } + sealed trait Primitive[T] extends ExampleField[T] { type ValueT def fromFeature(v: Feature): ju.List[T] def toFeature(v: Iterable[T]): Feature - override def get(f: Features, k: String)(cm: CaseMapper): Value[T] = { - val feature = f.getFeatureOrDefault(k, null) - if (feature == null) { - Value.None - } else { - val values = fromFeature(feature) - require(values.size() == 1) - Value.Some(values.get(0)) - } - } + override def get(fs: Map[String, Feature], k: String)(cm: CaseMapper): T = + fromFeature(fs(k)).ensuring(_.size() == 1).get(0) override def put(f: Features.Builder, k: String, v: T)(cm: CaseMapper): Features.Builder = f.putFeature(k, toFeature(Iterable(v))) @@ -106,28 +107,22 @@ object ExampleField { new ExampleField[T] { override protected def buildSchema(cm: CaseMapper): Schema = tc.buildSchema(cm) - override def get(f: Features, k: String)(cm: CaseMapper): Value[T] = - tc.get(f, k)(cm).map(x => caseClass.construct(_ => x)) + override def get(fs: Map[String, Feature], k: String)(cm: CaseMapper): T = + caseClass.construct(_ => tc.get(fs, k)(cm)) override def put(f: Features.Builder, k: String, v: T)(cm: CaseMapper): Features.Builder = tc.put(f, k, p.dereference(v))(cm) } } else { new Record[T] { - private def key(prefix: String, label: String): String = - if (prefix == null) label else s"$prefix.$label" - - override def get(f: Features, k: String)(cm: CaseMapper): Value[T] = { - var fallback = true - val r = caseClass.construct { p => + override def get(fs: Map[String, Feature], k: String)(cm: CaseMapper): T = { + caseClass.construct { p => val fieldKey = key(k, cm.map(p.label)) - val fieldValue = p.typeclass.get(f, fieldKey)(cm) - if (fieldValue.isSome) { - fallback = false - } - fieldValue.getOrElse(p.default) + val fieldsFeatures = fs.filter(featureFilter(fieldKey).tupled) + // consider default value only if all fields are missing + p.default + .filter(_ => fieldsFeatures.isEmpty) + .getOrElse(p.typeclass.get(fieldsFeatures, fieldKey)(cm)) } - // result is default if all fields are default - if (fallback) Value.Default(r) else Value.Some(r) } override def put(f: Features.Builder, k: String, v: T)(cm: CaseMapper): Features.Builder = @@ -259,12 +254,8 @@ object ExampleField { implicit def efOption[T](implicit ef: ExampleField[T]): ExampleField[Option[T]] = new ExampleField[Option[T]] { - override def get(f: Features, k: String)(cm: CaseMapper): Value[Option[T]] = - if (f.containsFeature(k) || f.getFeatureMap.keySet().asScala.exists(_.startsWith(s"$k."))) { - ef.get(f, k)(cm).toOption - } else { - Value.Default(None) - } + override def get(fs: Map[String, Feature], k: String)(cm: CaseMapper): Option[T] = + if (fs.isEmpty) None else Some(ef.get(fs, k)(cm)) override def put(f: Features.Builder, k: String, v: Option[T])( cm: CaseMapper @@ -282,11 +273,8 @@ object ExampleField { ti: C[T] => Iterable[T], fc: FactoryCompat[T, C[T]] ): ExampleField[C[T]] = new ExampleField[C[T]] { - override def get(f: Features, k: String)(cm: CaseMapper): Value[C[T]] = { - val v = f.getFeatureOrDefault(k, null) - if (v == null) Value.Default(fc.newBuilder.result()) - else Value.Some(fc.fromSpecific(ef.fromFeature(v).asScala)) - } + override def get(fs: Map[String, Feature], k: String)(cm: CaseMapper): C[T] = + if (fs.isEmpty) fc.newBuilder.result() else fc.fromSpecific(ef.fromFeature(fs(k)).asScala) override def put(f: Features.Builder, k: String, v: C[T])(cm: CaseMapper): Features.Builder = if (v.isEmpty) f else f.putFeature(k, ef.toFeature(v))