Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help with libserdes example program #32

Open
aamirrashid opened this issue Jul 9, 2020 · 9 comments
Open

Help with libserdes example program #32

aamirrashid opened this issue Jul 9, 2020 · 9 comments

Comments

@aamirrashid
Copy link

I'm trying to use the ./serdes-kafka-avro-client example as shipped with latest version of libserdes, but I cannot get it to work. Could someone please let me know if it's working code or not? Or maybe I'm not using it as intended. Here's my output:

1). The schema is already registered in schemaregistry:

curl -X GET localhost:60002/subjects/car/versions/latest | jq .
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 258 100 258 0 0 15246 0 --:--:-- --:--:-- --:--:-- 16125
{
"subject": "car",
"version": 1,
"id": 22,
"schema": "{"type":"record","name":"Car","fields":[{"name":"Make","type":"string"},{"name":"Model","type":"string"},{"name":"Year","type":"string"},{"name":"MSRP","type":"int"}]}"
}

I start the consumer
./serdes-kafka-avro-client -C -b localhost:60000 -t aamir -p 0 -r localhost:60002
I start the producer
./serdes-kafka-avro-client -P -b localhost:60000 -t aamir -r localhost:60002 -s 22
% Using schema (null) with id 22
% Use "schema: " to specify a new schema
% Use "str: " to produce an Avro-encoded string
% Ctrl-D to exit

I enter just one record from producer
str: {"Make":"BMW","Model":"M-4","Year":"2014","MSRP":55000}

The consumer complains that it cannot deserialize
% serdes_deserialize_avro failed: Failed to read avro value: Cannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Cannot read string length: Cannot read 1 bytes from memory bufferCannot read string value: Can

Can someone please help?

Thanks!

@xmcqueen
Copy link

I was wondering about this example the other day.

The example encodes the input as a string https://github.com/confluentinc/libserdes/blob/master/examples/serdes-kafka-avro-client.c#L215, using the schema only to populate the magic byte. Maybe you should input just the values: https://avro.apache.org/docs/current/spec.html#binary_encoding

maybe instead of:
str: {"Make":"BMW","Model":"M-4","Year":"2014","MSRP":55000}

try:
str: "BMW","M-4","2014",???

I don't know how to handle that MSRP being an "int", but supplied as a string.

@aamirrashid
Copy link
Author

I truly don't understand the value in shipping example code which does not work out of the box. I believe there is a shortcoming in the libserdes and in the example code. It's missing the functionality to convert json to avro - both in the example code and in the lib itself. Surprisingly enough, libserdes does have the code to do the reverse i.e. avro to json! Go figure!

The right way to fix it would be to add functionality in libserdes to convert from json to avro, and then the example code should be modified to call that functionality.

For my purposes, I coded a function to convert from json to avro in example code, and then it all worked.

@aamirrashid
Copy link
Author

Here are the 2 functions that I added to convert from json to avro. I simply called json_to_avro() from run_producer() after } else if (!strncmp(buf, "str: ", 5)) { and it just works.

`int schema_traverse(const avro_schema_t schema, json_t *json,
avro_value_t *current_val, int quiet, int strjson, size_t max_str_sz)
{
assert(json != NULL);
assert(current_val != NULL);

if (!json) {
    fprintf(stderr, "ERROR: Avro schema does not match JSON\n");
    return 1;
}

switch (schema->type) {
case AVRO_RECORD:
{
    if (!json_is_object(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON object for Avro record, got something else\n");
        return 1;
    }

    int len = avro_schema_record_size(schema), i;
    for (i=0; i<len; i++) {

        const char *name = avro_schema_record_field_name(schema, i);
        avro_schema_t field_schema = avro_schema_record_field_get_by_index(schema, i);

        json_t *json_val = json_object_get(json, name);

        avro_value_t field;
        avro_value_get_by_index(current_val, i, &field, NULL);

        if (schema_traverse(field_schema, json_val, &field, quiet, strjson, max_str_sz))
            return 1;
    }
} break;

case AVRO_LINK:
    /* TODO */
    fprintf(stderr, "ERROR: AVRO_LINK is not implemented\n");
    return 1;
    break;

case AVRO_STRING:
    if (!json_is_string(json)) {
        if (json && strjson) {
            /* -j specified, just dump the remaining json as string */
            char * js = json_dumps(json, JSON_COMPACT|JSON_SORT_KEYS|JSON_ENCODE_ANY);
            if (max_str_sz && (strlen(js) > max_str_sz))
                js[max_str_sz] = 0; /* truncate the string - this will result in invalid JSON! */
            avro_value_set_string(current_val, js);
            free(js);
            break;
        }
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON string for Avro string, got something else\n");
        return 1;
    } else {
        const char *js = json_string_value(json);
        if (max_str_sz && (strlen(js) > max_str_sz)) {
            /* truncate the string */
            char *jst = malloc(strlen(js));
            strcpy(jst, js);
            jst[max_str_sz] = 0;
            avro_value_set_string(current_val, jst);
            free(jst);
        } else
            avro_value_set_string(current_val, js);
    }
    break;

case AVRO_BYTES:
    if (!json_is_string(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON string for Avro string, got something else\n");
        return 1;
    }
    /* NB: Jansson uses null-terminated strings, so embedded nulls are NOT
       supported, not even escaped ones */
    const char *s = json_string_value(json);
    avro_value_set_bytes(current_val, (void *)s, strlen(s));
    break;

case AVRO_INT32:
    if (!json_is_integer(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON integer for Avro int, got something else\n");
        return 1;
    }
    avro_value_set_int(current_val, json_integer_value(json));
    break;

case AVRO_INT64:
    if (!json_is_integer(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON integer for Avro long, got something else\n");
        return 1;
    }
    avro_value_set_long(current_val, json_integer_value(json));
    break;

case AVRO_FLOAT:
    if (!json_is_number(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON number for Avro float, got something else\n");
        return 1;
    }
    avro_value_set_float(current_val, json_number_value(json));
    break;

case AVRO_DOUBLE:
    if (!json_is_number(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON number for Avro double, got something else\n");
        return 1;
    }
    avro_value_set_double(current_val, json_number_value(json));
    break;

case AVRO_BOOLEAN:
    if (!json_is_boolean(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON boolean for Avro boolean, got something else\n");
        return 1;
    }
    avro_value_set_boolean(current_val, json_is_true(json));
    break;

case AVRO_NULL:
    if (!json_is_null(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON null for Avro null, got something else\n");
        return 1;
    }
    avro_value_set_null(current_val);
    break;

case AVRO_ENUM:
    // TODO ???
    break;

case AVRO_ARRAY:
    if (!json_is_array(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON array for Avro array, got something else\n");
        return 1;
    } else {
        int i, len = json_array_size(json);
        avro_schema_t items = avro_schema_array_items(schema);
        avro_value_t val;
        for (i=0; i<len; i++) {
            avro_value_append(current_val, &val, NULL);
            if (schema_traverse(items, json_array_get(json, i), &val, quiet, strjson, max_str_sz))
                return 1;
        }
    }
    break;

case AVRO_MAP:
    if (!json_is_object(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON object for Avro map, got something else\n");
        return 1;
    } else {
        avro_schema_t values = avro_schema_map_values(schema);
        void *iter = json_object_iter(json);
        avro_value_t val;
        while (iter) {
            avro_value_add(current_val, json_object_iter_key(iter), &val, 0, 0);
            if (schema_traverse(values, json_object_iter_value(iter), &val, quiet, strjson, max_str_sz))
                return 1;
            iter = json_object_iter_next(json, iter);
        }
    }
    break;

case AVRO_UNION:
{
    unsigned int i;
    avro_value_t branch;
    for (i=0; i<avro_schema_union_size(schema); i++) {
        avro_value_set_branch(current_val, i, &branch);
        avro_schema_t type = avro_schema_union_branch(schema, i);
        if (!schema_traverse(type, json, &branch, 1, strjson, max_str_sz))
            break;
    }
    if (i==avro_schema_union_size(schema)) {
        fprintf(stderr, "ERROR: No type in the Avro union matched the JSON type we got\n");
        return 1;
    }
    break;
}
case AVRO_FIXED:
    if (!json_is_string(json)) {
        if (!quiet)
            fprintf(stderr, "ERROR: Expecting JSON string for Avro fixed, got something else\n");
        return 1;
    }
    /* NB: Jansson uses null-terminated strings, so embedded nulls are NOT
       supported, not even escaped ones */
    const char *f = json_string_value(json);
    if (avro_value_set_fixed(current_val, (void *)f, strlen(f))) {
        fprintf(stderr, "ERROR: Setting Avro fixed value FAILED\n");
        return 1;
    }
    break;

default:
    fprintf(stderr, "ERROR: Unknown type: %d\n", schema->type);
    return 1;
}
return 0;

}

void json_to_avro(char *buffer, int buffer_len, avro_schema_t schema, avro_value_t *val)
{
json_error_t err;
json_t *json;
int n = 0;
int max_str_sz = 1024;
int strjson = 1;

assert(buffer != NULL);
assert(val != NULL);

json = json_loadb(buffer, buffer_len, 0, &err);
if (!json) {
        fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text);
}

if (schema_traverse(schema, json, val, 0, strjson, max_str_sz)) {
        fprintf(stderr, "Error processing record %s, skipping...\n", buffer);
}

json_decref(json);

}`

@aamirrashid aamirrashid mentioned this issue Jul 22, 2020
@xmcqueen
Copy link

This sounds great. I'll going to try out your code.

@xmcqueen
Copy link

I can't get it together properly. Your code uses a avro_schema_t but we are given a serdes_schema_t. Where did you get the avro_schema_t for the call to json_to_avro?

@aamirrashid
Copy link
Author

aamirrashid commented Jul 24, 2020

Here is how you do it:

`
} else if (!strncmp(buf, "str: ", 5)) {
/* Emit a single Avro string */
avro_value_t val;
void *ser_buf = NULL;
size_t ser_buf_size;
char *input;
int input_len;
avro_schema_t avro_schema = NULL;
avro_value_iface_t *iface = NULL;

#if 0
avro_generic_string_new(&val, buf+5);
#else
avro_schema = serdes_schema_avro(schema);
iface = avro_generic_class_from_schema(avro_schema);
avro_generic_value_new(iface, &val);
input = buf+5;
input_len = strlen(input);
json_to_avro(input, input_len, avro_schema, &val);
#endif
`

@xmcqueen
Copy link

Yes, that's good. You should get a PR going for this.

@bhuvanracham
Copy link

It works. Thanks much.

At the moment running into issue with supporting ENUM types in avro schema. I noticed it keeps returning the first item as it lacks the implementation in the following case statement (schema_traverse routine).

case AVRO_ENUM:
// TODO ???
break;

Could anyone get AVRO_ENUM to work at some point? If so, please post that part of the code.

I am trying out few things too to get AVRO_ENUM case block working and will post my response here.

Thanks.

@bhuvanracham
Copy link

Got it working. Here is the snippet.

            case AVRO_ENUM:
                    {
                    json_int_t symbol_value;

                    symbol_value = (json_int_t) avro_schema_enum_get_by_name(schema, json_string_value(json));
                    avro_value_set_enum(current_val, symbol_value);
                    }
                    break;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants