Skip to content

Commit

Permalink
Map based converter for flat model (#847)
Browse files Browse the repository at this point in the history
* Map based converter for flat model

When object is not a tree, use a map of qualified keys to values
as converter source.
Filtering keys when cursor cursor selects field removes need to wrap the
return type into a Value

* Keep bigtable values as columns

* Add missing 2.12 import
  • Loading branch information
RustedBones authored Oct 24, 2023
1 parent 9ff1381 commit 44c044b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 193 deletions.
141 changes: 39 additions & 102 deletions bigtable/src/main/scala/magnolify/bigtable/BigtableType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@

package magnolify.bigtable

import java.nio.ByteBuffer
import java.util.UUID
import com.google.bigtable.v2.{Cell, Column, Family, Mutation, Row}
import com.google.bigtable.v2.Mutation.SetCell
import com.google.bigtable.v2.*
import com.google.protobuf.ByteString
import magnolia1._
import magnolify.shared._
import magnolify.shims._
import magnolia1.*
import magnolify.shared.*
import magnolify.shims.*

import java.nio.ByteBuffer
import java.util.UUID
import scala.annotation.implicitNotFound
import scala.jdk.CollectionConverters._
import scala.collection.compat._
import scala.jdk.CollectionConverters.*
import scala.collection.compat.*

sealed trait BigtableType[T] extends Converter[T, java.util.List[Column], Seq[SetCell.Builder]] {
sealed trait BigtableType[T] extends Converter[T, Map[String, Column], Seq[SetCell.Builder]] {
def apply(v: Row, columnFamily: String): T =
from(
v.getFamiliesList.asScala
.find(_.getName == columnFamily)
.map(_.getColumnsList)
.getOrElse(java.util.Collections.emptyList())
.map(_.getColumnsList.asScala.map(c => c.getQualifier.toStringUtf8 -> c).toMap)
.getOrElse(Map.empty)
)
def apply(v: T, columnFamily: String, timestampMicros: Long = 0L): Seq[Mutation] =
to(v).map { b =>
Expand All @@ -53,7 +53,7 @@ object BigtableType {
case r: BigtableField.Record[_] =>
new BigtableType[T] {
private val caseMapper: CaseMapper = cm
override def from(xs: java.util.List[Column]): T = r.get(xs, null)(caseMapper).get
override def from(xs: Map[String, Column]): T = r.get(xs, null)(caseMapper)
override def to(v: T): Seq[SetCell.Builder] = r.put(null, v)(caseMapper)
}
case _ =>
Expand All @@ -79,7 +79,7 @@ object BigtableType {
)
.build()
}
.toSeq
.toSeq // keep for java 2.12
Family.newBuilder().setName(familyName).addAllColumns(columns.asJava).build()
}
.toSeq
Expand All @@ -105,11 +105,20 @@ object BigtableType {
}

sealed trait BigtableField[T] extends Serializable {
def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T]
def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T
def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder]
}

object BigtableField {

private def key(prefix: String, label: String): String =
if (prefix == null) label else s"$prefix.$label"

private def columnFilter(key: String): (String, Column) => Boolean = {
val recordKey = key + "."
(name: String, _: Column) => name == key || name.startsWith(recordKey)
}

sealed trait Record[T] extends BigtableField[T]

sealed trait Primitive[T] extends BigtableField[T] {
Expand All @@ -119,10 +128,8 @@ object BigtableField {

private def columnQualifier(k: String): ByteString = ByteString.copyFromUtf8(k)

override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] = {
val v = Columns.find(xs, k)
if (v == null) Value.None else Value.Some(fromByteString(v.getCells(0).getValue))
}
override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T =
fromByteString(xs(k).getCells(0).getValue)

override def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] =
Seq(
Expand All @@ -142,34 +149,28 @@ object BigtableField {
val p = caseClass.parameters.head
val tc = p.typeclass
new BigtableField[T] {
override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] =
tc.get(xs, k)(cm).map(x => caseClass.construct(_ => x))
override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T =
caseClass.construct(_ => tc.get(xs, k)(cm))
override def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] =
p.typeclass.put(k, p.dereference(v))(cm)
}
} else {
new Record[T] {
private def key(prefix: String, label: String): String =
if (prefix == null) label else s"$prefix.$label"

override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[T] = {
var fallback = true
val r = caseClass.construct { p =>
val cq = key(k, cm.map(p.label))
val v = p.typeclass.get(xs, cq)(cm)
if (v.isSome) {
fallback = false
}
v.getOrElse(p.default)
override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): T = {
caseClass.construct { p =>
val qualifier = key(k, cm.map(p.label))
val columns = xs.filter(columnFilter(qualifier).tupled)
// consider default value only if all fields are missing
p.default
.filter(_ => columns.isEmpty)
.getOrElse(p.typeclass.get(columns, qualifier)(cm))
}
// result is default if all fields are default
if (fallback) Value.Default(r) else Value.Some(r)
}

override def put(k: String, v: T)(cm: CaseMapper): Seq[SetCell.Builder] =
caseClass.parameters.flatMap(p =>
caseClass.parameters.flatMap { p =>
p.typeclass.put(key(k, cm.map(p.label)), p.dereference(v))(cm)
)
}
}
}
}
Expand Down Expand Up @@ -255,8 +256,8 @@ object BigtableField {

implicit def btfOption[T](implicit btf: BigtableField[T]): BigtableField[Option[T]] =
new BigtableField[Option[T]] {
override def get(xs: java.util.List[Column], k: String)(cm: CaseMapper): Value[Option[T]] =
Columns.findNullable(xs, k).map(btf.get(_, k)(cm).toOption).getOrElse(Value.Default(None))
override def get(xs: Map[String, Column], k: String)(cm: CaseMapper): Option[T] =
if (xs.isEmpty) None else Some(btf.get(xs, k)(cm))

override def put(k: String, v: Option[T])(cm: CaseMapper): Seq[SetCell.Builder] =
v.toSeq.flatMap(btf.put(k, _)(cm))
Expand Down Expand Up @@ -314,67 +315,3 @@ object BigtableField {
}
}
}

private object Columns {
private def find(
xs: java.util.List[Column],
columnQualifier: String,
matchPrefix: Boolean
): (Int, Int, Boolean) = {
val cq = ByteString.copyFromUtf8(columnQualifier)
val pre = if (matchPrefix) ByteString.copyFromUtf8(s"$columnQualifier.") else ByteString.EMPTY
var low = 0
var high = xs.size()
var idx = -1
var isNested = false
while (idx == -1 && low < high) {
val mid = (high + low) / 2
val current = xs.get(mid).getQualifier
if (matchPrefix && current.startsWith(pre)) {
idx = mid
isNested = true
} else {
val c = ByteStringComparator.INSTANCE.compare(current, cq)
if (c < 0) {
low = mid + 1
} else if (c == 0) {
idx = mid
low = mid + 1
} else {
high = mid
}
}
}

if (isNested) {
low = idx - 1
while (low >= 0 && xs.get(low).getQualifier.startsWith(pre)) {
low -= 1
}
high = idx + 1
while (high < xs.size() && xs.get(high).getQualifier.startsWith(pre)) {
high += 1
}
(low + 1, high, isNested)
} else {
(idx, idx, isNested)
}
}

def find(xs: java.util.List[Column], columnQualifier: String): Column = {
val (idx, _, _) = find(xs, columnQualifier, false)
if (idx == -1) null else xs.get(idx)
}

def findNullable(
xs: java.util.List[Column],
columnQualifier: String
): Option[java.util.List[Column]] = {
val (low, high, isNested) = find(xs, columnQualifier, true)
if (isNested) {
Some(xs.subList(low, high))
} else {
if (low == -1) None else Some(java.util.Collections.singletonList(xs.get(low)))
}
}
}
43 changes: 0 additions & 43 deletions shared/src/main/scala/magnolify/shared/Converter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,3 @@ trait Converter[T, Reader, Writer] extends Serializable {
def from(v: Reader): T
def to(v: T): Writer
}

// Represent a value from an external source.
sealed trait Value[+T] {
def get: T = this match {
case Value.Some(v) => v
case Value.Default(v) => v
case Value.None => throw new NoSuchElementException
}

def isSome: Boolean = this.isInstanceOf[Value.Some[_]]
def isEmpty: Boolean = this eq Value.None

def map[U](f: T => U): Value[U] = this match {
case Value.Some(x) => Value.Some(f(x))
case Value.Default(x) => Value.Default(f(x))
case Value.None => Value.None
}

def getOrElse[U](fallback: Option[U])(implicit ev: T <:< U): U = (this, fallback) match {
case (Value.Some(x), _) => x
case (Value.Default(_), Some(x)) => x
case (Value.Default(x), None) => x
case (Value.None, Some(x)) => x
case _ => throw new NoSuchElementException
}

def toOption: Value[Option[T]] = this match {
case Value.Some(v) => Value.Some(Some(v))
case Value.Default(v) => Value.Default(Some(v))
case Value.None => Value.Default(None)
}
}

object Value {
// Value from the external source, e.g. Avro, BigQuery
case class Some[T](value: T) extends Value[T]

// Value from the case class default
case class Default[T](value: T) extends Value[T]

// Value missing from both the external source and the case class default
case object None extends Value[Nothing]
}
Loading

0 comments on commit 44c044b

Please sign in to comment.