diff --git a/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfig.java b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfig.java index 59943c1..625075c 100644 --- a/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfig.java +++ b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfig.java @@ -97,6 +97,12 @@ public abstract class SinkConnectorConfig implements Serializable { ) List partitionColumns = Collections.emptyList(); + @FieldContext( + category = CATEGORY_SINK, + doc = "Override the default fieldname for the primitive schema messages, default is 'message'" + ) + String overrideFieldName = ""; + static SinkConnectorConfig load(Map map) throws IOException, IncorrectParameterException { properties.putAll(map); String type = (String) map.get("type"); diff --git a/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PrimitiveFactory.java b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PrimitiveFactory.java new file mode 100644 index 0000000..abf47c8 --- /dev/null +++ b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PrimitiveFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.ecosystem.io.lakehouse.sink; + +import java.nio.ByteBuffer; +import org.apache.avro.Schema; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.schema.SchemaType; + +/** + * PrimitiveFactory provides a way to get different PulsarObject according to the given schema type. + */ +public class PrimitiveFactory { + + public static PulsarObject getPulsarPrimitiveObject(SchemaType schemaType, Object value, + String overrideFieldName) { + PulsarObject object; + switch (schemaType) { + case BYTES: + object = new PulsarObject<>(ByteBuffer.wrap((byte[]) value), Schema.create(Schema.Type.BYTES)); + break; + case STRING: + object = new PulsarObject<>((String) value, Schema.create(Schema.Type.STRING)); + break; + default: + throw new RuntimeException("Failed to build pulsar object, the given type '" + schemaType + "' " + + "is not supported yet."); + } + if (StringUtils.isNotEmpty(overrideFieldName)) { + object.overrideFieldName(overrideFieldName); + } + return object; + } +} diff --git a/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PulsarObject.java b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PulsarObject.java new file mode 100644 index 0000000..11732d6 --- /dev/null +++ b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PulsarObject.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.ecosystem.io.lakehouse.sink; + +import java.util.UUID; +import lombok.EqualsAndHashCode; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +/** + * The pulsar object represents the pulsar message with primitive schemas. + * This object will be serialized with Avro then writing to the lakehouse table. + * + * @param is the type of the given value. + */ +@EqualsAndHashCode +public class PulsarObject { + + private static String defaultFieldname = "message"; + private final Schema valueSchema; + T value; + String uuid; + + public PulsarObject(T value, Schema schema) { + this.value = value; + valueSchema = schema; + this.uuid = UUID.randomUUID().toString(); + } + + private PulsarObject(T value, Schema schema, String uuid) { + this.value = value; + valueSchema = schema; + this.uuid = uuid; + } + + public static void overrideFieldName(String fieldName) { + defaultFieldname = fieldName; + } + + public Schema getSchema() { + return SchemaBuilder.record("PulsarObject") + .fields() + .name(defaultFieldname).type(valueSchema).noDefault() + .name("uuid").type(Schema.create(Schema.Type.STRING)).noDefault() + .endRecord(); + } + + public GenericRecord getRecord() { + GenericRecord record = new GenericData.Record(getSchema()); + record.put(defaultFieldname, value); + record.put("uuid", uuid); + return record; + } + + public static PulsarObject fromGenericRecord(GenericRecord record) { + if (!record.hasField(defaultFieldname) && !record.hasField("uuid")) { + throw new RuntimeException("Unexpected record when parsing to the PulsarObject"); + } + return new PulsarObject(record.get(defaultFieldname), + record.getSchema().getField(defaultFieldname).schema() + , record.get("uuid").toString()); + } +} diff --git a/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/SinkWriter.java b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/SinkWriter.java index f472ea9..2b1bc39 100644 --- a/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/SinkWriter.java +++ b/src/main/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/SinkWriter.java @@ -175,8 +175,14 @@ public Optional convertToAvroGenericData(PulsarSinkRecord record, .jsonDecoder(schema, record.getNativeObject().toString()); return Optional.of(datumReader.read(null, decoder)); default: - log.error("not support this kind of schema: {}", record.getSchemaType()); - return Optional.empty(); + try { + GenericRecord gr = PrimitiveFactory.getPulsarPrimitiveObject(record.getSchemaType(), + record.getNativeObject(), sinkConnectorConfig.getOverrideFieldName()).getRecord(); + return Optional.of(gr); + } catch (Exception e) { + log.error("not support this kind of schema: {}", record.getSchemaType(), e); + return Optional.empty(); + } } } diff --git a/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfigTest.java b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfigTest.java new file mode 100644 index 0000000..57f29b7 --- /dev/null +++ b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/SinkConnectorConfigTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.ecosystem.io.lakehouse; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.fail; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.ecosystem.io.lakehouse.exception.IncorrectParameterException; +import org.testng.annotations.Test; + +public class SinkConnectorConfigTest { + + @Test + public void testDefaultValues() throws Exception { + Map properties = new HashMap<>(); + properties.put("type", "hudi"); + SinkConnectorConfig config = SinkConnectorConfig.load(properties); + assertEquals(SinkConnectorConfig.DEFAULT_MAX_COMMIT_INTERVAL, config.getMaxCommitInterval()); + assertEquals(SinkConnectorConfig.DEFAULT_MAX_RECORDS_PER_COMMIT, config.getMaxRecordsPerCommit()); + assertEquals(SinkConnectorConfig.DEFAULT_MAX_COMMIT_FAILED_TIMES, config.getMaxCommitFailedTimes()); + assertEquals(SinkConnectorConfig.DEFAULT_SINK_CONNECTOR_QUEUE_SIZE, config.getSinkConnectorQueueSize()); + assertEquals(Collections.emptyList(), config.getPartitionColumns()); + assertEquals("", config.getOverrideFieldName()); + } + + @Test + public void testOverrideDefaultValues() throws Exception { + Map properties = new HashMap<>(); + properties.put("type", "hudi"); + properties.put("partitionColumns", Collections.singletonList("partition")); + properties.put("maxCommitInterval", 10); + properties.put("maxRecordsPerCommit", 10); + properties.put("maxCommitFailedTimes", 10); + properties.put("sinkConnectorQueueSize", 10); + properties.put("overrideFieldName", "filedname"); + SinkConnectorConfig config = SinkConnectorConfig.load(properties); + assertEquals(10, config.getMaxCommitInterval()); + assertEquals(10, config.getMaxRecordsPerCommit()); + assertEquals(10, config.getMaxCommitFailedTimes()); + assertEquals(10, config.getSinkConnectorQueueSize()); + assertEquals(Collections.singletonList("partition"), config.getPartitionColumns()); + assertEquals("filedname", config.getOverrideFieldName()); + } + + @Test + public void testLoadInvalidType() { + Map properties = new HashMap<>(); + properties.put("type", "unknown"); + try { + SinkConnectorConfig.load(properties); + fail(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (IncorrectParameterException e) { + // expected exception + } + } +} diff --git a/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PrimitiveFactoryTest.java b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PrimitiveFactoryTest.java new file mode 100644 index 0000000..b453681 --- /dev/null +++ b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/PrimitiveFactoryTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.ecosystem.io.lakehouse.sink; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.apache.avro.Schema; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.annotations.Test; + +public class PrimitiveFactoryTest { + + @Test(expectedExceptions = RuntimeException.class) + public void testUnsupportedType() { + PrimitiveFactory.getPulsarPrimitiveObject(SchemaType.AVRO, null, ""); + } + + @Test + public void testPrimitiveBytes() { + byte[] message = "test".getBytes(StandardCharsets.UTF_8); + PulsarObject object = PrimitiveFactory.getPulsarPrimitiveObject(SchemaType.BYTES, message, ""); + Object value = object.getRecord().get("message"); + assertTrue(value instanceof ByteBuffer); + assertEquals(Schema.Type.BYTES, object.getSchema().getField("message").schema().getType()); + ByteBuffer byteBufferValue = (ByteBuffer) value; + assertEquals(byteBufferValue.array(), message); + } + + @Test + public void testPrimitiveString() { + String message = "test"; + PulsarObject object = PrimitiveFactory.getPulsarPrimitiveObject(SchemaType.STRING, message, ""); + Object value = object.getRecord().get("message"); + assertTrue(value instanceof String); + assertEquals(Schema.Type.STRING, object.getSchema().getField("message").schema().getType()); + assertEquals(value.toString(), message); + } +} diff --git a/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/delta/DeltaWriterTest.java b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/delta/DeltaWriterTest.java index d87500f..f205347 100644 --- a/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/delta/DeltaWriterTest.java +++ b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/delta/DeltaWriterTest.java @@ -26,12 +26,14 @@ import io.delta.standalone.actions.AddFile; import io.delta.standalone.types.StructType; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -43,6 +45,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.ecosystem.io.lakehouse.common.SchemaConverter; import org.apache.pulsar.ecosystem.io.lakehouse.parquet.DeltaParquetWriter; +import org.apache.pulsar.ecosystem.io.lakehouse.sink.PrimitiveFactory; +import org.apache.pulsar.ecosystem.io.lakehouse.sink.PulsarObject; import org.apache.pulsar.ecosystem.io.lakehouse.sink.SinkConnectorUtils; import org.apache.pulsar.functions.api.Record; import org.testng.annotations.AfterMethod; @@ -138,6 +142,47 @@ public void testCreateDeltaTable() { assertEquals(deltaLog.snapshot().getMetadata().getPartitionColumns().size(), 0); } + @Test + public void testWriteBytesToNonPartitionedDeltaTable() { + final int maxNumber = 10; + List> writeSet = new ArrayList<>(maxNumber); + for (int i = 0; i < maxNumber; i++) { + String message = "message-" + i; + byte[] value = message.getBytes(StandardCharsets.UTF_8); + PulsarObject obj = PrimitiveFactory.getPulsarPrimitiveObject(SchemaType.BYTES, value, ""); + writeSet.add(obj); + } + DeltaWriter writer = new DeltaWriter(config, writeSet.get(0).getSchema()); + try { + for (PulsarObject pulsarObject : writeSet) { + writer.writeAvroRecord(pulsarObject.getRecord()); + } + + List fileStats = writer.getWriter().closeAndFlush(); + writer.commitFiles(fileStats); + writer.close(); + + DeltaLog deltaLog = writer.getDeltaLog(); + + // validate current snapshot + Snapshot snapshot = deltaLog.snapshot(); + assertEquals(snapshot.getVersion(), 1); + assertEquals(snapshot.getAllFiles().size(), 1); + + AddFile addFile = snapshot.getAllFiles().get(0); + DeltaParquetWriter.FileStat fileStat = fileStats.get(0); + assertEquals(addFile.getPath(), fileStat.getFilePath()); + assertEquals(addFile.getPartitionValues(), fileStat.getPartitionValues()); + assertEquals(addFile.getSize(), fileStat.getFileSize().longValue()); + + String engineInfo = DeltaWriter.COMMIT_INFO + " Delta-Standalone/0.3.0"; + assertEquals(deltaLog.getCommitInfoAt(1).getEngineInfo().get(), engineInfo); + } catch (IOException e) { + log.error("write record into delta table failed. ", e); + fail(); + } + } + @Test public void testWriteNonPartitionedDeltaTable() { DeltaWriter writer = new DeltaWriter(config, schema); diff --git a/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/hudi/HoodieWriterTest.java b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/hudi/HoodieWriterTest.java index eb9d251..0b2e01c 100644 --- a/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/hudi/HoodieWriterTest.java +++ b/src/test/java/org/apache/pulsar/ecosystem/io/lakehouse/sink/hudi/HoodieWriterTest.java @@ -20,11 +20,13 @@ import java.io.IOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -49,8 +51,11 @@ import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.ecosystem.io.lakehouse.SinkConnectorConfig; import org.apache.pulsar.ecosystem.io.lakehouse.common.Utils; +import org.apache.pulsar.ecosystem.io.lakehouse.sink.PrimitiveFactory; +import org.apache.pulsar.ecosystem.io.lakehouse.sink.PulsarObject; import org.intellij.lang.annotations.Language; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -117,6 +122,54 @@ private Optional setupFileSystem(String storage, HoodieWriter hoodie return Optional.empty(); } + @Test + public void testHoodieWritePulsarPrimitiveTypeMessages() throws Exception { + final SinkConnectorConfig sinkConnectorConfig = sinkConfig; + sinkConnectorConfig.getProperties().remove("hoodie.datasource.write.recordkey.field"); + sinkConnectorConfig.setProperty("hoodie.datasource.write.partitionpath.field", "uuid"); + sinkConnectorConfig.setProperty("hoodie.datasource.write.recordkey.field", "uuid"); + final int maxNumber = 10; + List> writeSet = new ArrayList<>(maxNumber); + for (int i = 0; i < maxNumber; i++) { + String message = "message-" + i; + byte[] value = message.getBytes(StandardCharsets.UTF_8); + PulsarObject obj = PrimitiveFactory.getPulsarPrimitiveObject(SchemaType.BYTES, value, ""); + writeSet.add(obj); + } + + HoodieWriter hoodieWriter = new HoodieWriter(sinkConnectorConfig, writeSet.get(0).getSchema()); + Configuration hadoopConf = hoodieWriter.writer.getContext().getHadoopConf().get(); + + for (PulsarObject testDatum : writeSet) { + hoodieWriter.writeAvroRecord(testDatum.getRecord()); + } + + hoodieWriter.flush(); + List readSet = getCommittedFiles(testPath, STORAGE_LOCAL, Optional.empty()) + .map(p -> { + try { + return readRecordsFromFile(p, hadoopConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .flatMap(Collection::stream) + .map(PulsarObject::fromGenericRecord) + .collect(Collectors.toList()); + + Assert.assertEquals(readSet.size(), writeSet.size()); + for (PulsarObject byteBufferPulsarObject : writeSet) { + System.out.println(byteBufferPulsarObject.hashCode()); + } + + for (PulsarObject object : readSet) { + System.out.println(object.hashCode()); + } + Assert.assertTrue(writeSet.removeAll(readSet)); + Assert.assertEquals(writeSet.size(), 0); + hoodieWriter.close(); + } + @Test(dataProvider = "storage", timeOut = 10 * 60 * 1000) public void testHoodieWriteAndRead(String storage) throws Exception { setCloudProperties(storage);