diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 70ef9130e..3154f0478 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -287,14 +287,7 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) String columnName = columnNames.next(); JsonNode columnNode = node.get(columnName); Object columnValue; - if (columnNode.isArray()) { - List itemList = new ArrayList<>(); - ArrayNode arrayNode = (ArrayNode) columnNode; - for (JsonNode e : arrayNode) { - itemList.add(e.isTextual() ? e.textValue() : MAPPER.writeValueAsString(e)); - } - columnValue = itemList; - } else if (columnNode.isTextual()) { + if (columnNode.isTextual()) { columnValue = columnNode.textValue(); } else if (columnNode.isNull()) { columnValue = null; diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java index 822463b67..22344d26a 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java @@ -258,6 +258,24 @@ public void testSchematizationStringField() throws JsonProcessingException { assert got.get("\"ANSWER\"").equals("42"); } + @Test + public void testSchematizationArrayOfObject() throws JsonProcessingException { + RecordService service = new RecordService(); + SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter(); + + service.setEnableSchematization(true); + String value = "{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}"; + byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); + SchemaAndValue sv = jsonConverter.toConnectData(topic, valueContents); + + SinkRecord record = + new SinkRecord( + topic, partition, Schema.STRING_SCHEMA, "string", sv.schema(), sv.value(), partition); + + Map got = service.getProcessedRecordForStreamingIngest(record); + assert got.get("\"PLAYERS\"").equals("[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]"); + } + @Test public void testColumnNameFormatting() throws JsonProcessingException { RecordService service = new RecordService();