Commands:
provision
from a spec.yml fileexport
existing Kafka resources to a spec when providing a domain-id to filter against- capture
production
,consumption
(chargeback metrics) for a given spec and build chargeback reporting/billing flatten
to prefix the 'id' to each channel allowing existing tools to be used - for example: https://microcks.io/ and Spring boot code generators etc. many more here
This page also contains a simple docker guide for local testing.
See further down the page for setting up a Docker environment
This command will provision Kafka resources using AsyncApi spec (aka. App, or data product) and publish to the configured cluster and schema registry environment. It can be run manually, and also as part of a GitOps workflow, and/or build promotion of the spec into different environments where cluster and SR endpoints are configured as environment variables.
provision
will look for a provision.properties
file in the docker /app/ folder (i.e. /app/provision.properties)
A default config file can optionally be used for managing/accessing common properties.
File provision.properties (automatically loaded from docker /app/provision.properties
)
spec=/app/simple_schema_demo-api.yaml
acl.enabled=true
sr.enabled=true
dry.run=true
bootstrap.server=broker1:9092
username=admin
secret=nothing
schema.registry=http://schema-registry:8081
schema.path=/app/1
sr.api.key=admin
sr.api.secret=nothing
% docker run --rm --network kafka_network -v "$(pwd)/resources:/app" ghcr.io/specmesh/specmesh-build-cli provision -bs kafka:9092 -sr http://schema-registry:8081 -spec /app/simple_schema_demo-api.yaml -schemaPath /app
Long form
Usage: provision [-aclDisabled] [-clean] [-dry] [-srDisabled] [-bs=<brokerUrl>]
[-s=<secret>] [-schemaPath=<schemaPath>] [-spec=<spec>]
[-sr=<schemaRegistryUrl>] [-srKey=<srApiKey>]
[-srSecret=<srApiSecret>] [-u=<username>]
[-D=<String=String>]...
Apply a specification.yaml to provision kafka resources on a cluster.
Use 'provision.properties' for common arguments
Explicit properties file location /app/provision.properties
-aclDisabled, --acl-disabled
Ignore ACL related operations
-bs, --bootstrap-server=<brokerUrl>
Kafka bootstrap server url
-clean, --clean-unspecified
Compares the cluster resources against the spec,
outputting proposed set of resources that are
unexpected (not specified). Use with '-dry-run'
for non-destructive checks. This operation will
not create resources, it will only remove
unspecified resources
-D, --property=<String=String>
Specify Java runtime properties for Apache Kafka.
-dry, --dry-run Compares the cluster resources against the spec,
outputting proposed changes if compatible. If
the spec incompatible with the cluster then will
fail with a descriptive error message. A return
value of '0' = indicates no changes needed; '1'
= changes needed; '-1' not compatible
-du, --domain-user=<domainUserAlias>
optional custom domain user, to be used when
creating ACLs. By default, specmesh expects the
principle used to authenticate with Kafka to
have the same name as the domain id. For
example, given a domain id of 'urn:acme.
products', specmesh expects the user to be
called 'acme.products', and creates ACLs
accordingly. In some situations, e.g. Confluent
Cloud Service Accounts, the username is system
generated or outside control of administrators.
In these situations, use this option to provide
the generated username and specmesh will
provision ACLs accordingly.
-s, --secret=<secret> secret credential for the cluster connection
-schemaPath, --schema-path=<schemaPath>
schemaPath where the set of referenced schemas
will be loaded
-spec, --spec=<spec> specmesh specification file
-sr, --schema-registry=<schemaRegistryUrl>
schemaRegistryUrl
-srDisabled, --sr-disabled
Ignore schema related operations
-srKey, --sr-api-key=<srApiKey>
srApiKey for schema registry
-srSecret, --sr-api-secret=<srApiSecret>
srApiSecret for schema secret
-u, --username=<username> username or api key for the cluster connection
This demonstrates provision
ing a spec into a docker environment, with a network kafka_network
, and kafka
is the container running a Kafka broker container, and schema-registry
the schema registry container.
{
"topics" : [ {
"name" : "simple.spec_demo._public.user_signed_up",
"state" : "CREATED",
"partitions" : 3,
"replication" : 1,
"config" : {
"cleanup.policy" : "delete"
},
"exception" : null,
"messages" : ""
}, {
"schemas" : null,
"acls" : [ {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._public, patternType=PREFIXED), entry=(principal=User:*, host=*, operation=READ, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo._public",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:*",
"host" : "*",
"operation" : "READ",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
}
<<SNIP>>
Long form
{
"topics" : [ {
"name" : "simple.spec_demo._public.user_signed_up",
"state" : "CREATED",
"partitions" : 3,
"replication" : 1,
"config" : {
"cleanup.policy" : "delete"
},
"exception" : null,
"messages" : ""
}, {
"name" : "simple.spec_demo._private.user_checkout",
"state" : "CREATED",
"partitions" : 3,
"replication" : 1,
"config" : {
"cleanup.policy" : "delete"
},
"exception" : null,
"messages" : ""
}, {
"name" : "simple.spec_demo._protected.purchased",
"state" : "CREATED",
"partitions" : 3,
"replication" : 1,
"config" : { },
"exception" : null,
"messages" : ""
} ],
"schemas" : null,
"acls" : [ {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._protected.purchased, patternType=LITERAL), entry=(principal=User:some.other.domain.root, host=*, operation=READ, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo._protected.purchased",
"patternType" : "LITERAL",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:some.other.domain.root",
"host" : "*",
"operation" : "READ",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._private, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=CREATE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo._private",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "CREATE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=GROUP, name=simple.spec_demo, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=READ, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "GROUP",
"name" : "simple.spec_demo",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "READ",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._public, patternType=PREFIXED), entry=(principal=User:*, host=*, operation=READ, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo._public",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:*",
"host" : "*",
"operation" : "READ",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID, name=simple.spec_demo, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=DESCRIBE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TRANSACTIONAL_ID",
"name" : "simple.spec_demo",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "DESCRIBE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=WRITE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "WRITE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._protected.purchased, patternType=LITERAL), entry=(principal=User:some.other.domain.root, host=*, operation=DESCRIBE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo._protected.purchased",
"patternType" : "LITERAL",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:some.other.domain.root",
"host" : "*",
"operation" : "DESCRIBE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=READ, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "READ",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID, name=simple.spec_demo, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=WRITE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TRANSACTIONAL_ID",
"name" : "simple.spec_demo",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "WRITE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=IDEMPOTENT_WRITE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "CLUSTER",
"name" : "kafka-cluster",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "IDEMPOTENT_WRITE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo, patternType=PREFIXED), entry=(principal=User:simple.spec_demo, host=*, operation=DESCRIBE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:simple.spec_demo",
"host" : "*",
"operation" : "DESCRIBE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
}, {
"name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._public, patternType=PREFIXED), entry=(principal=User:*, host=*, operation=DESCRIBE, permissionType=ALLOW))",
"state" : "CREATED",
"aclBinding" : {
"pattern" : {
"resourceType" : "TOPIC",
"name" : "simple.spec_demo._public",
"patternType" : "PREFIXED",
"unknown" : false
},
"entry" : {
"data" : {
"principal" : "User:*",
"host" : "*",
"operation" : "DESCRIBE",
"permissionType" : "ALLOW",
"clusterLinkIds" : [ ]
},
"unknown" : false
},
"unknown" : false
},
"exception" : null,
"messages" : ""
} ]
}
Reports app-id => topic-partition level metrics for storage bytes and offsets
docker run --rm --network kafka_network -v "$(pwd)/resources:/app" ghcr.io/specmesh/specmesh-build-cli storage -bs kafka:9092 -spec /app/simple_spec_demo-api.yaml
Long form
Usage: storage [-bs=<brokerUrl>] [-s=<secret>] [-spec=<spec>]
[-u=<username>]
Given a spec, break down the storage volume (including replication) against
each of its topic in bytes
-bs, --bootstrap-server=<brokerUrl>
Kafka bootstrap server url
-s, --secret=<secret> secret credential for the cluster connection
-spec, --spec=<spec> specmesh specification file
-u, --username=<username> username or api key for the cluster connection
Topic data that matches the app-id (prefixed with simple.spec_demo
within the api.yaml)
{"simple.spec_demo._private.user_checkout":{"storage-bytes":1590,"offset-total":6},"simple.spec_demo._protected.purchased":{"storage-bytes":0,"offset-total":0},"simple.spec_demo._public.user_signed_up":{"storage-bytes":9185,"offset-total":57}}
Report active consumer groups (with offset) that is consuming data given an app-ids set of topics.
% docker run --rm --network kafka_network -v "$(pwd)/resources:/app" ghcr.io/specmesh/specmesh-build-cli consumption -bs kafka:9092 -spec /app/simple_spec_demo-api.yaml
Long form
Usage: consumption [-bs=<brokerUrl>] [-s=<secret>] [-spec=<spec>]
[-u=<username>]
Given a spec, break down the consumption volume against each of its topic
-bs, --bootstrap-server=<brokerUrl>
Kafka bootstrap server url
-s, --secret=<secret> secret credential for the cluster connection
-spec, --spec=<spec> specmesh specification file
-u, --username=<username> username or api key for the cluster connection
A consumer group some.other.app
with id console-consumer...
is actively consuming data
{"simple.spec_demo._public.user_signed_up":{"id":"some.other.app","members":[{"id":"console-consumer-7f9d23c7-a627-41cd-ade9-3919164bc363","clientId":"console-consumer","host":"/172.30.0.3","partitions":[{"id":0,"topic":"simple.spec_demo._public.user_signed_up","offset":57,"timestamp":-1}]}],"offsetTotal":57}}
docker run --rm --network kafka_network -v "$(pwd)/resources:/app" ghcr.io/specmesh/specmesh-build-cli export -bs kafka:9092 -aggid simple:spec_demo
Long form
Usage: export [-domainid=<domain>] [-bs=<brokerUrl>] [-s=<secret>] [-u=<username>]
Build an incomplete spec from a running Cluster
-domainid, --domain-id=<domain>
specmesh - domain-id/prefix - domain/context identified
(app-id) to export against
-bs, --bootstrap-server=<brokerUrl>
Kafka bootstrap server url
-s, --secret=<secret> secret credential for the cluster connection
-u, --username=<username>
username or api key for the cluster connection
{
"id": "urn:simple.spec_demo",
"version": "2023-05-06",
"asyncapi": "2.5.0",
"channels": {
"_public.user_signed_up": {
"bindings": {
"kafka": {
"partitions": 1,
"replicas": 1,
"configs": {
"compression.type": "producer",
"leader.replication.throttled.replicas": "",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"cleanup.policy": "delete",
"flush.ms": "9223372036854775807",
"follower.replication.throttled.replicas": "",
"segment.bytes": "1073741824",
"retention.ms": "604800000",
<<SNIP>>
Full JSON
{"id":"urn:simple:spec_demo","version":"2023-05-06","asyncapi":"2.5.0","channels":{"_private.user_checkout":{"description":null,"bindings":{"kafka":{"envs":null,"partitions":1,"replicas":1,"configs":{"compression.type":"producer","leader.replication.throttled.replicas":"","message.downconversion.enable":"true","min.insync.replicas":"1","segment.jitter.ms":"0","cleanup.policy":"delete","flush.ms":"9223372036854775807","follower.replication.throttled.replicas":"","segment.bytes":"1073741824","retention.ms":"604800000","flush.messages":"9223372036854775807","message.format.version":"3.0-IV1","max.compaction.lag.ms":"9223372036854775807","file.delete.delay.ms":"60000","max.message.bytes":"1048588","min.compaction.lag.ms":"0","message.timestamp.type":"CreateTime","preallocate":"false","min.cleanable.dirty.ratio":"0.5","index.interval.bytes":"4096","unclean.leader.election.enable":"false","retention.bytes":"-1","delete.retention.ms":"86400000","segment.ms":"604800000","message.timestamp.difference.max.ms":"9223372036854775807","segment.index.bytes":"10485760"},"groupId":null,"schemaIdLocation":null,"schemaLookupStrategy":null,"bindingVersion":"unknown"}},"publish":null,"subscribe":null},"_public.user_signed_up":{"description":null,"bindings":{"kafka":{"envs":null,"partitions":1,"replicas":1,"configs":{"compression.type":"producer","leader.replication.throttled.replicas":"","message.downconversion.enable":"true","min.insync.replicas":"1","segment.jitter.ms":"0","cleanup.policy":"delete","flush.ms":"9223372036854775807","follower.replication.throttled.replicas":"","segment.bytes":"1073741824","retention.ms":"604800000","flush.messages":"9223372036854775807","message.format.version":"3.0-IV1","max.compaction.lag.ms":"9223372036854775807","file.delete.delay.ms":"60000","max.message.bytes":"1048588","min.compaction.lag.ms":"0","message.timestamp.type":"CreateTime","preallocate":"false","min.cleanable.dirty.ratio":"0.5","index.interval.bytes":"4096","unclean.leader.election.enable":"false","retention.bytes":"-1","delete.retention.ms":"86400000","segment.ms":"604800000","message.timestamp.difference.max.ms":"9223372036854775807","segment.index.bytes":"10485760"},"groupId":null,"schemaIdLocation":null,"schemaLookupStrategy":null,"bindingVersion":"unknown"}},"publish":null,"subscribe":null}}}
Run a local kafka environment to manually test against. Note, security is not configured.
Docker network
docker network create kafka_network
ZooKeeper
docker run --name zookeeper -p 2181:2181 --network kafka_network -d zookeeper:latest
Kafka
docker run --name kafka -p 9092:9092 --network kafka_network -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -d confluentinc/cp-kafka:latest
Schema Registry
docker run --name schema-registry -p 8081:8081 --network kafka_network -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 -e SCHEMA_REGISTRY_HOST_NAME=localhost -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 -d confluentinc/cp-schema-registry:latest
docker network inspect kafka_network
Create a topic
docker exec -it kafka /bin/bash -c "/usr/bin/kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test"
List topics
docker exec -it kafka /bin/bash -c "/usr/bin/kafka-topics --list --bootstrap-server kafka:9092"
Notice that the --bootstrap-server parameter now points to kafka:9092 instead of localhost:9092, as the Kafka container is now referred to by its container name within the kafka_network network.
Produce messages
docker exec -it kafka /bin/bash -c "/usr/bin/kafka-console-producer --broker-list kafka:9092 --topic test"
OR (own container rather than the 'kafka' container)
% docker run --name test-listing --network kafka_network -it confluentinc/cp-kafka:latest /bin/bash -c "/usr/bin/kafka-topics --list --bootstrap-server kafka:9092"
Consume messages
docker exec -it kafka /bin/bash -c "/usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic test --from-beginning"
Stop the containers
docker stop schema-registry
docker stop kafka
docker stop zookeeper
Remove the network
docker network rm kafka_network
Remove containers
docker rm schema-registry
docker rm kafka
docker rm zookeeper