diff --git a/api-docs/docs/classes/SchemaRegistry.md b/api-docs/docs/classes/SchemaRegistry.md index a13f255..00060a8 100644 --- a/api-docs/docs/classes/SchemaRegistry.md +++ b/api-docs/docs/classes/SchemaRegistry.md @@ -137,8 +137,10 @@ Deserializes the given data and schema into its original form. ▸ **getSchema**(`schema`): [`Schema`](../interfaces/Schema.md) -**`method`** -Get a schema from Schema Registry by version and subject. +**`method`** Get a schema from Schema Registry +* if only `schema.subject` is set: returns the latest schema for the given subject +* if `schema.subject` and `schema.schema` is set: returns the schema for the given schema string +* if `schema.subject` and `schema.version` is set: returns the schema for the given version #### Parameters diff --git a/api-docs/index.d.ts b/api-docs/index.d.ts index 5b69d44..7d79727 100644 --- a/api-docs/index.d.ts +++ b/api-docs/index.d.ts @@ -488,7 +488,8 @@ export class SchemaRegistry { constructor(schemaRegistryConfig: SchemaRegistryConfig); /** * @method - * Get a schema from Schema Registry by version and subject. + * Get latest schema from Schema Registry by subject. + * Alternatively a specific schema version can be fetched by either specifing schema.version of schema.schema * @param {Schema} schema - Schema configuration. * @returns {Schema} - Schema. */ diff --git a/schema_registry.go b/schema_registry.go index 379470d..165ad0f 100644 --- a/schema_registry.go +++ b/schema_registry.go @@ -274,12 +274,16 @@ func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.Sch return srClient } -// getSchema returns the schema for the given subject and schema ID and version. +// getSchema returns either the latest schema for the given subject or a specific version (if given) or the schema for the given schema string (if given) func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema { // If EnableCache is set, check if the schema is in the cache. if schema.EnableCaching { - if schema, ok := k.schemaCache[schema.Subject]; ok { - return schema + if cachedSchema, ok := k.schemaCache[schema.Subject]; ok { + // the cache should contain the latest version of a schema for the given subject + // we must not return the cached schema if it does not match the requested version or schema string + if (schema.Version == 0 && schema.Schema != "") || schema.Version == cachedSchema.Version || schema.Schema == cachedSchema.Schema { + return cachedSchema; + } } } @@ -287,12 +291,20 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) // The client always caches the schema. var schemaInfo *srclient.Schema var err error - // Default version of the schema is the latest version. - if schema.Version == 0 { + var isLatestSchema = false; + if schema.Schema != "" { // fetch schema for given schema string + var schemaType srclient.SchemaType + if schema.SchemaType != nil { + schemaType = *schema.SchemaType + } else { + schemaType = srclient.Avro + } + schemaInfo, err = client.LookupSchema(schema.Subject, schema.Schema, schemaType, schema.References...) + } else if schema.Version == 0 { // fetch schema by version schemaInfo, err = client.GetLatestSchema(schema.Subject) - } else { - schemaInfo, err = client.GetSchemaByVersion( - schema.Subject, schema.Version) + } else { // fetch latest schema for given subject + schemaInfo, err = client.GetSchemaByVersion(schema.Subject, schema.Version) + isLatestSchema = true; } if err == nil { @@ -306,7 +318,7 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) Subject: schema.Subject, } // If the Cache is set, cache the schema. - if wrappedSchema.EnableCaching { + if wrappedSchema.EnableCaching && isLatestSchema { k.schemaCache[wrappedSchema.Subject] = wrappedSchema } return wrappedSchema diff --git a/schema_registry_test.go b/schema_registry_test.go index c2ddea3..aff15cf 100644 --- a/schema_registry_test.go +++ b/schema_registry_test.go @@ -271,7 +271,8 @@ func TestGetSubjectNameCanUseRecordNameStrategyWithNamespace(t *testing.T) { // TestSchemaRegistryClientClass tests the schema registry client class. func TestSchemaRegistryClientClass(t *testing.T) { test := getTestModuleInstance(t) - avroSchema := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}` + avroSchema1 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}` + avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}, {"name":"field2","type":"int"}]}` require.NoError(t, test.moveToVUCode()) assert.NotPanics(t, func() { @@ -287,21 +288,36 @@ func TestSchemaRegistryClientClass(t *testing.T) { }) assert.NotNil(t, client) - // Create a schema and send it to the registry. + // Create first schema and send it to the registry. createSchema := client.Get("createSchema").Export().(func(sobek.FunctionCall) sobek.Value) - newSchema := createSchema(sobek.FunctionCall{ + newSchema1 := createSchema(sobek.FunctionCall{ Arguments: []sobek.Value{ test.module.vu.Runtime().ToValue( map[string]interface{}{ "subject": "test-subject", - "schema": avroSchema, + "schema": avroSchema1, "schemaType": srclient.Avro, }, ), }, }).Export().(*Schema) - assert.Equal(t, "test-subject", newSchema.Subject) - assert.Equal(t, 0, newSchema.Version) + assert.Equal(t, "test-subject", newSchema1.Subject) + assert.Equal(t, 1, newSchema1.Version) + + // Create second schema and send it to the registry. + newSchema2 := createSchema(sobek.FunctionCall{ + Arguments: []sobek.Value{ + test.module.vu.Runtime().ToValue( + map[string]interface{}{ + "subject": "test-subject", + "schema": avroSchema2, + "schemaType": srclient.Avro, + }, + ), + }, + }).Export().(*Schema) + assert.Equal(t, "test-subject", newSchema2.Subject) + assert.Equal(t, 2, newSchema2.Version) // Get the latest version of the schema from the registry. getSchema := client.Get("getSchema").Export().(func(sobek.FunctionCall) sobek.Value) @@ -316,8 +332,38 @@ func TestSchemaRegistryClientClass(t *testing.T) { }, }).Export().(*Schema) assert.Equal(t, "test-subject", currentSchema.Subject) - assert.Equal(t, 1, currentSchema.Version) - assert.Equal(t, avroSchema, currentSchema.Schema) + assert.Equal(t, 2, currentSchema.Version) + assert.Equal(t, avroSchema2, currentSchema.Schema) + + // get schema by schema string + schemaByString := getSchema(sobek.FunctionCall{ + Arguments: []sobek.Value{ + test.module.vu.Runtime().ToValue( + map[string]interface{}{ + "subject": "test-subject", + "schema": avroSchema1, + }, + ), + }, + }).Export().(*Schema) + assert.Equal(t, "test-subject", schemaByString.Subject) + assert.Equal(t, 1, schemaByString.Version) + assert.Equal(t, avroSchema1, schemaByString.Schema) + + // get schema by version + schemaByVersion := getSchema(sobek.FunctionCall{ + Arguments: []sobek.Value{ + test.module.vu.Runtime().ToValue( + map[string]interface{}{ + "subject": "test-subject", + "version": 1, + }, + ), + }, + }).Export().(*Schema) + assert.Equal(t, "test-subject", schemaByVersion.Subject) + assert.Equal(t, 1, schemaByVersion.Version) + assert.Equal(t, avroSchema1, schemaByVersion.Schema) // Get the subject name based on the given subject name config. getSubjectName := client.Get("getSubjectName").Export().(func(sobek.FunctionCall) sobek.Value) @@ -325,7 +371,7 @@ func TestSchemaRegistryClientClass(t *testing.T) { Arguments: []sobek.Value{ test.module.vu.Runtime().ToValue( map[string]interface{}{ - "schema": avroSchema, + "schema": avroSchema1, "topic": "test-topic", "subjectNameStrategy": TopicRecordNameStrategy, "element": Value, diff --git a/scripts/test_avro_with_schema_registry.js b/scripts/test_avro_with_schema_registry.js index a91e042..2c7e5d3 100644 --- a/scripts/test_avro_with_schema_registry.js +++ b/scripts/test_avro_with_schema_registry.js @@ -93,6 +93,12 @@ const valueSchemaObject = schemaRegistry.createSchema({ schemaType: SCHEMA_TYPE_AVRO, }); +// if you want use a schema which has already been created you can fetch it by the schema string +const valueSchemaObjectExisting = schemaRegistry.get({ + subject: valueSubjectName, + schema: valueSchema, +}); + export default function () { for (let index = 0; index < 100; index++) { let messages = [