Skip to content

Commit

Permalink
Reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberdelia committed Oct 9, 2023
1 parent ee6a4d6 commit 48b1931
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package com.lapanthere.flink.api.kotlin.api
import com.lapanthere.flink.api.kotlin.typeutils.createTypeInformation
import org.apache.flink.util.OutputTag

public inline fun <reified T : Any> OutputTag(id: String): OutputTag<T> =
OutputTag(id, createTypeInformation())
@Suppress("ktlint:standard:function-naming")
public inline fun <reified T : Any> OutputTag(id: String): OutputTag<T> = OutputTag(id, createTypeInformation())
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ internal class DataClassTypeComparator<T>(
}
}

override fun compare(first: T?, second: T?): Int {
override fun compare(
first: T?,
second: T?,
): Int {
keyPositions.zip(comparators).forEach { (position, comparator) ->
try {
val cmp = comparator.compare(first?.component(position), second?.component(position))
Expand All @@ -56,7 +59,12 @@ internal class DataClassTypeComparator<T>(
return 0
}

override fun putNormalizedKey(record: T?, target: MemorySegment, offset: Int, numBytes: Int) {
override fun putNormalizedKey(
record: T?,
target: MemorySegment,
offset: Int,
numBytes: Int,
) {
var localNumBytes = numBytes
var localOffset = offset
var i = 0
Expand All @@ -82,7 +90,11 @@ internal class DataClassTypeComparator<T>(
return comparator
}

override fun extractKeys(record: Any?, target: Array<out Any>, index: Int): Int {
override fun extractKeys(
record: Any?,
target: Array<out Any>,
index: Int,
): Int {
var localIndex = index
keyPositions.zip(comparators).forEach { (position, comparator) ->
localIndex += comparator.extractKeys(position, target, localIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ public class DataClassTypeInformation<T : Any>(
kotlinFieldTypes: Array<TypeInformation<*>>,
private val kotlinFieldNames: Array<String>,
) : TupleTypeInfoBase<T>(klass, *kotlinFieldTypes) {
override fun toString(): String = buildString {
append(klass.simpleName)
if (types.isNotEmpty()) {
append(types.joinToString(", ", "<", ">"))
override fun toString(): String =
buildString {
append(klass.simpleName)
if (types.isNotEmpty()) {
append(types.joinToString(", ", "<", ">"))
}
}
}

public override fun equals(other: Any?): Boolean = when (other) {
is DataClassTypeInformation<*> -> other.canEqual(this) && super.equals(other) &&
genericParameters == other.genericParameters && fieldNames.contentEquals(other.fieldNames)
else -> false
}
public override fun equals(other: Any?): Boolean =
when (other) {
is DataClassTypeInformation<*> ->
other.canEqual(this) && super.equals(other) &&
genericParameters == other.genericParameters && fieldNames.contentEquals(other.fieldNames)
else -> false
}

public override fun canEqual(obj: Any): Boolean = obj is DataClassTypeInformation<*>

Expand All @@ -58,8 +61,9 @@ public class DataClassTypeInformation<T : Any>(
offset: Int,
result: MutableList<FlatFieldDescriptor>,
) {
val match = PATTERN_NESTED_FIELDS_WILDCARD.matchEntire(fieldExpression)
?: throw InvalidFieldReferenceException("""Invalid tuple field reference "$fieldExpression".""")
val match =
PATTERN_NESTED_FIELDS_WILDCARD.matchEntire(fieldExpression)
?: throw InvalidFieldReferenceException("""Invalid tuple field reference "$fieldExpression".""")
var field = match.groups[0]?.value!!
if (field == Keys.ExpressionKeys.SELECT_ALL_CHAR) {
var keyPosition = 0
Expand All @@ -82,7 +86,10 @@ public class DataClassTypeInformation<T : Any>(

val tail = match.groups[3]?.value
if (tail == null) {
fun extractFlatFields(index: Int, pos: Int) {
fun extractFlatFields(
index: Int,
pos: Int,
) {
if (index >= fieldNames.size) {
throw InvalidFieldReferenceException("""Unable to find field "$field" in type "$this".""")
} else if (field == fieldNames[index]) {
Expand All @@ -96,13 +103,18 @@ public class DataClassTypeInformation<T : Any>(
}
extractFlatFields(0, offset)
} else {
fun extractFlatFields(index: Int, pos: Int) {
fun extractFlatFields(
index: Int,
pos: Int,
) {
if (index >= fieldNames.size) {
throw InvalidFieldReferenceException("""Unable to find field "$field" in type "$this".""")
} else if (field == fieldNames[index]) {
when (val fieldType = fieldTypes[index]) {
is CompositeType<*> -> fieldType.getFlatFields(tail, pos, result)
else -> throw InvalidFieldReferenceException("""Nested field expression "$tail" not possible on atomic type "$fieldType".""")
else -> throw InvalidFieldReferenceException(
"""Nested field expression "$tail" not possible on atomic type "$fieldType".""",
)
}
} else {
extractFlatFields(index + 1, pos + fieldTypes[index].totalFields)
Expand All @@ -114,12 +126,13 @@ public class DataClassTypeInformation<T : Any>(
}

override fun <X : Any?> getTypeAt(fieldExpression: String): TypeInformation<X> {
val match = PATTERN_NESTED_FIELDS.matchEntire(fieldExpression)
?: if (fieldExpression.startsWith(Keys.ExpressionKeys.SELECT_ALL_CHAR)) {
throw InvalidFieldReferenceException("Wildcard expressions are not allowed here.")
} else {
throw InvalidFieldReferenceException("""Invalid format of data class field expression "$fieldExpression".""")
}
val match =
PATTERN_NESTED_FIELDS.matchEntire(fieldExpression)
?: if (fieldExpression.startsWith(Keys.ExpressionKeys.SELECT_ALL_CHAR)) {
throw InvalidFieldReferenceException("Wildcard expressions are not allowed here.")
} else {
throw InvalidFieldReferenceException("""Invalid format of data class field expression "$fieldExpression".""")
}

var field = match.groups[1]?.value!!
val tail = match.groups[3]?.value
Expand All @@ -135,7 +148,9 @@ public class DataClassTypeInformation<T : Any>(
} else {
when (fieldType) {
is CompositeType<*> -> fieldType.getTypeAt(i)
else -> throw InvalidFieldReferenceException("""Nested field expression "$tail" not possible on atomic type "$fieldType".""")
else -> throw InvalidFieldReferenceException(
"""Nested field expression "$tail" not possible on atomic type "$fieldType".""",
)
}
}
}
Expand All @@ -151,15 +166,19 @@ public class DataClassTypeInformation<T : Any>(

override fun initializeTypeComparatorBuilder(size: Int) {}

override fun addComparatorField(fieldId: Int, comparator: TypeComparator<*>) {
override fun addComparatorField(
fieldId: Int,
comparator: TypeComparator<*>,
) {
fieldComparators += comparator
logicalKeyFields += fieldId
}

override fun createTypeComparator(config: ExecutionConfig): TypeComparator<T> = DataClassTypeComparator(
logicalKeyFields.toIntArray(),
fieldComparators.toTypedArray(),
types.take(logicalKeyFields.max() + 1).map { it.createSerializer(config) }.toTypedArray(),
)
override fun createTypeComparator(config: ExecutionConfig): TypeComparator<T> =
DataClassTypeComparator(
logicalKeyFields.toIntArray(),
fieldComparators.toTypedArray(),
types.take(logicalKeyFields.max() + 1).map { it.createSerializer(config) }.toTypedArray(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,59 @@ public class DataClassTypeSerializer<T : Any>(
type: Class<T>?,
fieldSerializers: Array<TypeSerializer<*>>,
) : TupleSerializerBase<T>(type, fieldSerializers) {
override fun duplicate(): TypeSerializer<T> = DataClassTypeSerializer(
tupleClass,
fieldSerializers.map { it.duplicate() }.toTypedArray(),
)

override fun createInstance(fields: Array<out Any?>): T? = try {
tupleClass.kotlin.primaryConstructor?.call(*fields)
} catch (e: Throwable) {
null
}
override fun duplicate(): TypeSerializer<T> =
DataClassTypeSerializer(
tupleClass,
fieldSerializers.map { it.duplicate() }.toTypedArray(),
)

override fun createInstance(fields: Array<out Any?>): T? =
try {
tupleClass.kotlin.primaryConstructor?.call(*fields)
} catch (e: Throwable) {
null
}

override fun createInstance(): T? =
createInstance(fieldSerializers.map { it.createInstance() }.toTypedArray())
override fun createInstance(): T? = createInstance(fieldSerializers.map { it.createInstance() }.toTypedArray())

override fun deserialize(source: DataInputView): T? =
createInstance(fieldSerializers.map { it.deserialize(source) }.toTypedArray())
override fun deserialize(source: DataInputView): T? = createInstance(fieldSerializers.map { it.deserialize(source) }.toTypedArray())

override fun snapshotConfiguration(): TypeSerializerSnapshot<T> = DataClassTypeSerializerSnapshot(this)

override fun createOrReuseInstance(fields: Array<out Any>, reuse: T): T? = createInstance(fields)
override fun createOrReuseInstance(
fields: Array<out Any>,
reuse: T,
): T? = createInstance(fields)

override fun deserialize(reuse: T?, source: DataInputView): T? = deserialize(source)
override fun deserialize(
reuse: T?,
source: DataInputView,
): T? = deserialize(source)

override fun serialize(record: T?, target: DataOutputView) {
override fun serialize(
record: T?,
target: DataOutputView,
) {
fieldSerializers.forEachIndexed { i, serializer ->
serializer.serialize(record?.component(i), target)
}
}

override fun copy(from: T?, reuse: T): T? = copy(from)
override fun copy(
from: T?,
reuse: T,
): T? = copy(from)

override fun copy(from: T?): T? = if (from == null) {
null
} else {
createInstance(
fieldSerializers.mapIndexed { i, serializer ->
serializer.copy(from.component(i))
}.toTypedArray(),
)
}
override fun copy(from: T?): T? =
if (from == null) {
null
} else {
createInstance(
fieldSerializers.mapIndexed { i, serializer ->
serializer.copy(from.component(i))
}.toTypedArray(),
)
}
}

internal fun <T : Any> T.component(i: Int): Any? =
this::class.functions.first { it.name == "component${i + 1}" }.call(this)
internal fun <T : Any> T.component(i: Int): Any? = this::class.functions.first { it.name == "component${i + 1}" }.call(this)
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ public fun createTypeInformation(type: KType): TypeInformation<*> {
}

@Suppress("UNCHECKED_CAST")
public inline fun <reified T : Any> createTypeInformation(): TypeInformation<T> =
createTypeInformation(typeOf<T>()) as TypeInformation<T>
public inline fun <reified T : Any> createTypeInformation(): TypeInformation<T> = createTypeInformation(typeOf<T>()) as TypeInformation<T>
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ internal class DataClassTypeComparatorTest : ComparatorTestBase<DataClass>() {

override fun createSerializer(): TypeSerializer<DataClass> = type.createSerializer(ExecutionConfig())

override fun getSortedTestData(): Array<DataClass> = arrayOf(
DataClass("abc", 1, Nested("abc")),
DataClass("def", 2, Nested("def")),
DataClass("xyz", 3, Nested("xyz")),
)
override fun getSortedTestData(): Array<DataClass> =
arrayOf(
DataClass("abc", 1, Nested("abc")),
DataClass("def", 2, Nested("def")),
DataClass("xyz", 3, Nested("xyz")),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package com.lapanthere.flink.api.kotlin.typeutils
import org.apache.flink.api.common.typeutils.TypeInformationTestBase

internal class DataClassTypeInformationTest : TypeInformationTestBase<DataClassTypeInformation<*>>() {
override fun getTestData(): Array<DataClassTypeInformation<*>> = arrayOf(
createTypeInformation<DataClass>() as DataClassTypeInformation<DataClass>,
createTypeInformation<Order>() as DataClassTypeInformation<Order>,
createTypeInformation<ParameterizedClass<Int>>() as DataClassTypeInformation<ParameterizedClass<Int>>,
createTypeInformation<Pair<String, Int>>() as DataClassTypeInformation<Pair<String, Int>>,
createTypeInformation<Triple<String, String, Int>>() as DataClassTypeInformation<Triple<String, String, Int>>,
)
override fun getTestData(): Array<DataClassTypeInformation<*>> =
arrayOf(
createTypeInformation<DataClass>() as DataClassTypeInformation<DataClass>,
createTypeInformation<Order>() as DataClassTypeInformation<Order>,
createTypeInformation<ParameterizedClass<Int>>() as DataClassTypeInformation<ParameterizedClass<Int>>,
createTypeInformation<Pair<String, Int>>() as DataClassTypeInformation<Pair<String, Int>>,
createTypeInformation<Triple<String, String, Int>>() as DataClassTypeInformation<Triple<String, String, Int>>,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
internal abstract class AbstractDataClassTypeSerializerTest<T : Any> : SerializerTestBase<T>() {
protected abstract val typeInformation: TypeInformation<T>

override fun createSerializer(): TypeSerializer<T> =
typeInformation.createSerializer(ExecutionConfig())
override fun createSerializer(): TypeSerializer<T> = typeInformation.createSerializer(ExecutionConfig())

override fun getLength(): Int = -1

Expand All @@ -19,44 +18,49 @@ internal abstract class AbstractDataClassTypeSerializerTest<T : Any> : Serialize
internal class DataClassTypeSerializerTest : AbstractDataClassTypeSerializerTest<DataClass>() {
override val typeInformation: TypeInformation<DataClass> = createTypeInformation()

override fun getTestData(): Array<DataClass> = arrayOf(
DataClass("string", 1, Nested("string")),
DataClass("string", 123, Nested("123")),
)
override fun getTestData(): Array<DataClass> =
arrayOf(
DataClass("string", 1, Nested("string")),
DataClass("string", 123, Nested("123")),
)
}

internal class ParameterizedTypeSerializerTest : AbstractDataClassTypeSerializerTest<ParameterizedClass<Int>>() {
override val typeInformation: TypeInformation<ParameterizedClass<Int>> = createTypeInformation()

override fun getTestData(): Array<ParameterizedClass<Int>> = arrayOf(
ParameterizedClass("string", 1),
ParameterizedClass("string", 4),
)
override fun getTestData(): Array<ParameterizedClass<Int>> =
arrayOf(
ParameterizedClass("string", 1),
ParameterizedClass("string", 4),
)
}

internal class OrderTypeSerializerTest : AbstractDataClassTypeSerializerTest<Order>() {
override val typeInformation: TypeInformation<Order> = createTypeInformation()

override fun getTestData(): Array<Order> = arrayOf(
Order(Purchase(2.0), Purchase(1.0)),
Order(Purchase(20.0), Purchase(15.0)),
)
override fun getTestData(): Array<Order> =
arrayOf(
Order(Purchase(2.0), Purchase(1.0)),
Order(Purchase(20.0), Purchase(15.0)),
)
}

internal class PairTypeSerializerTest : AbstractDataClassTypeSerializerTest<Pair<String, Int>>() {
override val typeInformation: TypeInformation<Pair<String, Int>> = createTypeInformation()

override fun getTestData(): Array<Pair<String, Int>> = arrayOf(
Pair("Hello", 1),
Pair("World", 2),
)
override fun getTestData(): Array<Pair<String, Int>> =
arrayOf(
Pair("Hello", 1),
Pair("World", 2),
)
}

internal class TripleTypeSerializerTest : AbstractDataClassTypeSerializerTest<Triple<String, String, Int>>() {
override val typeInformation: TypeInformation<Triple<String, String, Int>> = createTypeInformation()

override fun getTestData(): Array<Triple<String, String, Int>> = arrayOf(
Triple("Hello", "World", 1),
Triple("Super", "Mario", 2),
)
override fun getTestData(): Array<Triple<String, String, Int>> =
arrayOf(
Triple("Hello", "World", 1),
Triple("Super", "Mario", 2),
)
}
Loading

0 comments on commit 48b1931

Please sign in to comment.