Skip to content

Commit

Permalink
[Enhancement] Ignore union type tag when converting avro to json
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Nov 19, 2024
1 parent f28f98a commit ecf1eb6
Show file tree
Hide file tree
Showing 4 changed files with 669 additions and 40 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1512,4 +1512,8 @@ CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500");
CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000");
CONF_mInt32(batch_write_poll_load_status_interval_ms, "200");
CONF_mBool(batch_write_trace_log_enable, "false");

// ignore union type tag in avro kafka routine load
CONF_mBool(avro_ignore_union_type_tag, "false");

} // namespace starrocks::config
275 changes: 259 additions & 16 deletions be/src/formats/avro/binary_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

#include "binary_column.h"

#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

#include "column/binary_column.h"
#include "column/json_column.h"
#include "common/status.h"
Expand Down Expand Up @@ -199,15 +203,238 @@ static Status add_column_with_boolean_value(BinaryColumn* column, const TypeDesc
return Status::OK();
}

static Status avro_value_to_rapidjson(const avro_value_t& value, rapidjson::Document::AllocatorType& allocator,
rapidjson::Value& out) {
switch (avro_value_get_type(&value)) {
case AVRO_STRING: {
const char* in;
size_t size;
if (avro_value_get_string(&value, &in, &size) != 0) {
return Status::InvalidArgument(strings::Substitute("Get string value error $0", avro_strerror()));
}
out.SetString(in, allocator);
return Status::OK();
}
case AVRO_BYTES: {
const char* in;
size_t size;
if (avro_value_get_fixed(&value, (const void**)&in, &size) != 0) {
return Status::InvalidArgument(strings::Substitute("Get string value error $0", avro_strerror()));
}
out.SetString(in, allocator);
return Status::OK();
}
case AVRO_INT32: {
int32_t in;
if (avro_value_get_int(&value, &in) != 0) {
return Status::InvalidArgument(strings::Substitute("Get int32 value error $0", avro_strerror()));
}
out.SetInt(in);
return Status::OK();
}
case AVRO_INT64: {
int64_t in;
if (avro_value_get_long(&value, &in) != 0) {
return Status::InvalidArgument(strings::Substitute("Get int64 value error $0", avro_strerror()));
}
out.SetInt64(in);
return Status::OK();
}
case AVRO_FLOAT: {
float in;
if (avro_value_get_float(&value, &in) != 0) {
return Status::InvalidArgument(strings::Substitute("Get float value error $0", avro_strerror()));
}
out.SetFloat(in);
return Status::OK();
}
case AVRO_DOUBLE: {
double in;
if (avro_value_get_double(&value, &in) != 0) {
return Status::InvalidArgument(strings::Substitute("Get double value error $0", avro_strerror()));
}
out.SetDouble(in);
return Status::OK();
}
case AVRO_BOOLEAN: {
int in;
if (avro_value_get_boolean(&value, &in) != 0) {
return Status::InvalidArgument(strings::Substitute("Get boolean value error $0", avro_strerror()));
}
out.SetBool(in);
return Status::OK();
}
case AVRO_NULL: {
out.SetNull();
return Status::OK();
}
case AVRO_RECORD: {
size_t field_count = 0;
if (avro_value_get_size(&value, &field_count) != 0) {
return Status::InvalidArgument(strings::Substitute("Get record field count error $0", avro_strerror()));
}

out.SetObject();
for (size_t i = 0; i < field_count; ++i) {
avro_value_t field_value;
const char* field_name;
if (avro_value_get_by_index(&value, i, &field_value, &field_name) != 0) {
return Status::InvalidArgument(strings::Substitute("Get record field error $0", avro_strerror()));
}

rapidjson::Value field_name_val;
field_name_val.SetString(field_name, allocator);
rapidjson::Value field_value_val;
RETURN_IF_ERROR(avro_value_to_rapidjson(field_value, allocator, field_value_val));
out.AddMember(field_name_val, field_value_val, allocator);
}
return Status::OK();
}
case AVRO_ENUM: {
avro_schema_t enum_schema;
int symbol_value;
if (avro_value_get_enum(&value, &symbol_value) != 0) {
return Status::InvalidArgument(strings::Substitute("Get enum value error $0", avro_strerror()));
}

enum_schema = avro_value_get_schema(&value);
const char* symbol_name;
symbol_name = avro_schema_enum_get(enum_schema, symbol_value);
out.SetString(symbol_name, allocator);
return Status::OK();
}
case AVRO_FIXED: {
const char* in;
size_t size;
if (avro_value_get_fixed(&value, (const void**)&in, &size) != 0) {
return Status::InvalidArgument(strings::Substitute("Get fixed value error $0", avro_strerror()));
}
out.SetString(in, allocator);
return Status::OK();
}
case AVRO_MAP: {
size_t map_size = 0;
if (avro_value_get_size(&value, &map_size) != 0) {
return Status::InvalidArgument(strings::Substitute("Get map size error $0", avro_strerror()));
}

out.SetObject();
for (int i = 0; i < map_size; ++i) {
const char* key;
avro_value_t map_value;
if (avro_value_get_by_index(&value, i, &map_value, &key) != 0) {
return Status::InvalidArgument(strings::Substitute("Get map key value error $0", avro_strerror()));
}

rapidjson::Value key_val;
key_val.SetString(key, allocator);
rapidjson::Value value_val;
RETURN_IF_ERROR(avro_value_to_rapidjson(map_value, allocator, value_val));
out.AddMember(key_val, value_val, allocator);
}
return Status::OK();
}
case AVRO_ARRAY: {
size_t array_size = 0;
if (avro_value_get_size(&value, &array_size) != 0) {
return Status::InvalidArgument(strings::Substitute("Get array size error $0", avro_strerror()));
}

out.SetArray();
for (int i = 0; i < array_size; ++i) {
avro_value_t element;
if (avro_value_get_by_index(&value, i, &element, nullptr) != 0) {
return Status::InvalidArgument(strings::Substitute("Get array element error $0", avro_strerror()));
}

rapidjson::Value element_value;
RETURN_IF_ERROR(avro_value_to_rapidjson(element, allocator, element_value));
out.PushBack(element_value, allocator);
}
return Status::OK();
}
case AVRO_UNION: {
avro_value_t union_value;
if (avro_value_get_current_branch(&value, &union_value) != 0) {
return Status::InvalidArgument(strings::Substitute("Get union value error $0", avro_strerror()));
}
RETURN_IF_ERROR(avro_value_to_rapidjson(union_value, allocator, out));
return Status::OK();
}
default:
return Status::InvalidArgument("Unsupported avro type");
}
}

// Convert an avro value to a json object using rapidjson.
// Different from avro `avro_value_to_json`, this function will ignore the union type tags.
//
// schema:
// {
// "type": "record",
// "name": "User",
// "fields": [
// {"name": "id", "type": "int"},
// {"name": "name", "type": "string"},
// {"name": "email", "type": ["null",
// {
// "type": "record",
// "name": "email2",
// "fields": [
// {
// "name": "x",
// "type" : ["null", "int"]
// },
// {
// "name": "y",
// "type": ["null", "string"]
// }
// ]
// }
// ]
// }
// ]
// }
//
// avro `avro_value_to_json` result:
// {"id": 1, "name": "Alice", "email": {"email2": {"x": {"int": 1}, "y": {"string": "[email protected]"}}}}
//
// this function result:
// {"id":1,"name":"Alice","email":{"x":1,"y":"[email protected]"}}
static Status avro_value_to_json_str(const avro_value_t& value, std::string* json_str) {
rapidjson::Document doc;
auto& allocator = doc.GetAllocator();
rapidjson::Value root;
RETURN_IF_ERROR(avro_value_to_rapidjson(value, allocator, root));

rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
root.Accept(writer);
json_str->append(buffer.GetString(), buffer.GetSize());
return Status::OK();
}

static Status add_column_with_array_object_value(BinaryColumn* column, const TypeDescriptor& type_desc,
const std::string& name, const avro_value_t& value) {
char* as_json;
if (avro_value_to_json(&value, 1, &as_json)) {
LOG(ERROR) << "avro to json failed: %s" << avro_strerror();
return Status::InternalError("avro to json failed");
if (config::avro_ignore_union_type_tag) {
std::string json_str;
auto st = avro_value_to_json_str(value, &json_str);
if (!st.ok()) {
return Status::InternalError(
strings::Substitute("avro to json failed. column=$0, err=$1", name, st.message()));
}

column->append(Slice(json_str));
} else {
char* as_json;
if (avro_value_to_json(&value, 1, &as_json)) {
LOG(WARNING) << "avro to json failed: %s" << avro_strerror();
return Status::InternalError(
strings::Substitute("avro to json failed. column=$0, err=$1", name, avro_strerror()));
}
DeferOp json_deleter([&] { free(as_json); });
column->append(Slice(as_json));
}
column->append(Slice(as_json));
free(as_json);
return Status::OK();
}

Expand Down Expand Up @@ -251,18 +478,34 @@ Status add_binary_column(Column* column, const TypeDescriptor& type_desc, const

Status add_native_json_column(Column* column, const TypeDescriptor& type_desc, const std::string& name,
const avro_value_t& value) {
auto json_column = down_cast<JsonColumn*>(column);
char* as_json;
if (avro_value_to_json(&value, 1, &as_json)) {
LOG(ERROR) << "avro to json failed: %s" << avro_strerror();
return Status::InternalError("avro to json failed");
}
DeferOp json_deleter([&] { free(as_json); });
JsonValue json_value;
Status s = JsonValue::parse(as_json, &json_value);
if (!s.ok()) {
return Status::InternalError("parse json failed");
Status st;
if (config::avro_ignore_union_type_tag) {
std::string json_str;
st = avro_value_to_json_str(value, &json_str);
if (!st.ok()) {
return Status::InternalError(
strings::Substitute("avro to json failed. column=$0, err=$1", name, st.message()));
}

st = JsonValue::parse(Slice(json_str), &json_value);
} else {
char* as_json;
if (avro_value_to_json(&value, 1, &as_json)) {
LOG(WARNING) << "avro to json failed: %s" << avro_strerror();
return Status::InternalError(
strings::Substitute("avro to json failed. column=$0, err=$1", name, avro_strerror()));
}

DeferOp json_deleter([&] { free(as_json); });
st = JsonValue::parse(as_json, &json_value);
}

if (!st.ok()) {
return Status::InternalError(strings::Substitute("parse json failed. column=$0, err=$1", name, st.message()));
}

auto json_column = down_cast<JsonColumn*>(column);
json_column->append(std::move(json_value));
return Status::OK();
}
Expand Down
Loading

0 comments on commit ecf1eb6

Please sign in to comment.