Skip to content

Commit

Permalink
Add support for Beam schemas (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen authored Sep 17, 2024
1 parent b27bf21 commit a3708ba
Show file tree
Hide file tree
Showing 19 changed files with 1,283 additions and 214 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target
run: mkdir -p bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target beam/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target
run: tar cf targets.tar bom/target refined/target shared/target tensorflow/target parquet/target tools/target protobuf/target jmh/target bigquery/target avro/target scalacheck/target beam/target datastore/target neo4j/target cats/target bigtable/target guava/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
98 changes: 26 additions & 72 deletions avro/src/main/scala/magnolify/avro/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,109 +22,63 @@ import org.joda.{time => joda}

import java.time._
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.util.concurrent.TimeUnit

package object logical {
import magnolify.shared.Time._
// Duplicate implementation from org.apache.avro.data.TimeConversions
// to support both 1.8 (joda-time based) and 1.9+ (java-time based)
object micros {
private def toTimestampMicros(microsFromEpoch: Long): Instant = {
val epochSeconds = microsFromEpoch / 1000000L
val nanoAdjustment = (microsFromEpoch % 1000000L) * 1000L;
Instant.ofEpochSecond(epochSeconds, nanoAdjustment)
}

private def fromTimestampMicros(instant: Instant): Long = {
val seconds = instant.getEpochSecond
val nanos = instant.getNano
if (seconds < 0 && nanos > 0) {
val micros = Math.multiplyExact(seconds + 1, 1000000L)
val adjustment = (nanos / 1000L) - 1000000
Math.addExact(micros, adjustment)
} else {
val micros = Math.multiplyExact(seconds, 1000000L)
Math.addExact(micros, nanos / 1000L)
}
}

implicit val afTimestampMicros: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(toTimestampMicros)(
fromTimestampMicros
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(microsToInstant)(
microsFromInstant
)

implicit val afTimeMicros: AvroField[LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { us =>
LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(us))
} { time =>
TimeUnit.NANOSECONDS.toMicros(time.toNanoOfDay)
}
AvroField.logicalType[Long](LogicalTypes.timeMicros())(microsToLocalTime)(microsFromLocalTime)

// `LogicalTypes.localTimestampMicros()` is Avro 1.10
implicit val afLocalTimestampMicros: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros")) { microsFromEpoch =>
val instant = toTimestampMicros(microsFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
fromTimestampMicros(instant)
}
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros"))(microsToLocalDateTime)(
microsFromLocalDateTime
)

// avro 1.8 uses joda-time
implicit val afJodaTimestampMicros: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros()) { microsFromEpoch =>
new joda.DateTime(microsFromEpoch / 1000, joda.DateTimeZone.UTC)
} { timestamp =>
1000 * timestamp.getMillis
}
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(microsToJodaDateTime)(
microsFromJodaDateTime
)

implicit val afJodaTimeMicros: AvroField[joda.LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { microsFromMidnight =>
joda.LocalTime.fromMillisOfDay(microsFromMidnight / 1000)
} { time =>
// from LossyTimeMicrosConversion
1000L * time.millisOfDay().get()
}
AvroField.logicalType[Long](LogicalTypes.timeMicros())(microsToJodaLocalTime)(
microsFromJodaLocalTime
)
}

object millis {
implicit val afTimestampMillis: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
Instant.ofEpochMilli(millisFromEpoch)
} { timestamp =>
timestamp.toEpochMilli
}
AvroField.logicalType[Long](LogicalTypes.timestampMillis())(millisToInstant)(
millisFromInstant
)

implicit val afTimeMillis: AvroField[LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight.toLong))
} { time =>
TimeUnit.NANOSECONDS.toMillis(time.toNanoOfDay).toInt
}
AvroField.logicalType[Int](LogicalTypes.timeMillis())(millisToLocalTime)(millisFromLocalTime)

// `LogicalTypes.localTimestampMillis` is Avro 1.10.0+
implicit val afLocalTimestampMillis: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis")) { millisFromEpoch =>
val instant = Instant.ofEpochMilli(millisFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
instant.toEpochMilli
}
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis"))(millisToLocalDateTime)(
millisFromLocalDateTime
)

// avro 1.8 uses joda-time
implicit val afJodaTimestampMillis: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
new joda.DateTime(millisFromEpoch, joda.DateTimeZone.UTC)
} { timestamp =>
timestamp.getMillis
}
AvroField.logicalType[Long](LogicalTypes.timestampMillis())(millisToJodaDateTime)(
millisFromJodaDateTime
)

implicit val afJodaTimeMillis: AvroField[joda.LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
joda.LocalTime.fromMillisOfDay(millisFromMidnight.toLong)
} { time =>
time.millisOfDay().get()
}
AvroField.logicalType[Int](LogicalTypes.timeMillis())(millisToJodaLocalTime)(
millisFromJodaLocalTime
)
}

object bigquery {
Expand Down
Loading

0 comments on commit a3708ba

Please sign in to comment.