Skip to content

Commit

Permalink
Support transactional producers (#66)
Browse files Browse the repository at this point in the history
...and also fix `IDEMPOTENT_WRITE` ACL for < v3.0 clusters that still need this.

fixes: #62
  • Loading branch information
big-andy-coates authored Feb 4, 2023
1 parent d658132 commit e61f28f
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
import static org.apache.kafka.common.resource.ResourceType.GROUP;
import static org.apache.kafka.streams.kstream.Produced.with;
Expand Down Expand Up @@ -353,7 +354,7 @@ private static Set<AclBinding> aclsForOtherDomain() {
final String principal = "User:" + DIFFERENT_USER;
return Set.of(
new AclBinding(
new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL),
new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL),
new AccessControlEntry(principal, "*", ALL, ALLOW)),
new AclBinding(
new ResourcePattern(GROUP, DIFFERENT_USER, LITERAL),
Expand Down
9 changes: 9 additions & 0 deletions kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ making it trivial to trace back the topic name to its owning domain.

## Authorisation

## Group authorisation

The provisioner will set ACLs to allow the domain to use any consumer group prefixed with the domain id.
It is recommended that, in most cases, domain services use a consumer group name of `<domain-id>-<service-name>`.

## Transaction id authorisation

The provisioner will set ACLs to allow the domain to use any transaction id prefixed with the domain id.

## Topic authorisation

The provisioner will set ACLs to enforce the following topic authorisation:
Expand Down
11 changes: 10 additions & 1 deletion kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
import static org.apache.kafka.common.resource.ResourceType.GROUP;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;

import io.specmesh.apiparser.model.ApiSpec;
import io.specmesh.apiparser.model.SchemaInfo;
Expand Down Expand Up @@ -101,10 +104,11 @@ public List<AclBinding> listACLsForDomainOwnedTopics() {

final List<AclBinding> topicAcls = new ArrayList<>();
topicAcls.addAll(ownTopicAcls());
topicAcls.addAll(ownTransactionIdsAcls());
topicAcls.addAll(publicTopicAcls());
topicAcls.addAll(protectedTopicAcls());
topicAcls.addAll(privateTopicAcls());
topicAcls.addAll(prefixedAcls(TOPIC, id(), principal(), IDEMPOTENT_WRITE));
topicAcls.addAll(prefixedAcls(CLUSTER, CLUSTER_NAME, principal(), IDEMPOTENT_WRITE));
return topicAcls;
}

Expand All @@ -119,6 +123,7 @@ public List<AclBinding> listACLsForDomainOwnedTopics() {
* <li>The spec's domain to be able to create ad-hoc private topics
* <li>The spec's domain to produce and consume its topics
* <li>The spec's domain to use its own consumer groups
* <li>The spec's domain to use its own transaction ids
* </ul>
*
* @return returns the set of required acls.
Expand Down Expand Up @@ -198,6 +203,10 @@ private Set<AclBinding> ownTopicAcls() {
return prefixedAcls(TOPIC, id(), principal(), DESCRIBE, READ, WRITE);
}

private Set<AclBinding> ownTransactionIdsAcls() {
return prefixedAcls(TRANSACTIONAL_ID, id(), principal(), DESCRIBE, WRITE);
}

private Set<AclBinding> publicTopicAcls() {
return prefixedAcls(TOPIC, id() + "._public", "User:*", DESCRIBE, READ);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.PatternType.PREFIXED;
import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
import static org.apache.kafka.common.resource.ResourceType.GROUP;
import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -42,12 +45,12 @@
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
Expand Down Expand Up @@ -149,6 +152,8 @@ void shouldHaveCorrectProduceAndConsumeAcls(
try (KafkaConsumer<Long, String> domainConsumer = domainConsumer(consumerDomain);
KafkaProducer<Long, String> domainProducer = domainProducer(producerDomain)) {

domainProducer.initTransactions();

domainConsumer.subscribe(List.of(topic.topicName));

final Executable poll = () -> domainConsumer.poll(Duration.ofMillis(500));
Expand All @@ -159,6 +164,8 @@ void shouldHaveCorrectProduceAndConsumeAcls(
assertThat((Throwable) e, instanceOf(TopicAuthorizationException.class));
}

domainProducer.beginTransaction();

final Executable send =
() ->
domainProducer
Expand All @@ -167,9 +174,11 @@ void shouldHaveCorrectProduceAndConsumeAcls(

if (canProduce) {
send.execute();
domainProducer.commitTransaction();
} else {
final Throwable e = assertThrows(ExecutionException.class, send).getCause();
assertThat(e, instanceOf(TopicAuthorizationException.class));
domainProducer.abortTransaction();
}

if (canConsume && canProduce) {
Expand Down Expand Up @@ -219,7 +228,8 @@ private Admin adminClient(final Domain domain) {
private KafkaProducer<Long, String> domainProducer(final Domain domain) {
final Map<String, Object> props = clientProperties();
props.putAll(saslAuthProperties(domain.domainId));
props.put(AdminClientConfig.CLIENT_ID_CONFIG, domain.domainId + ".producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, domain.domainId + ".txId");

return new KafkaProducer<>(props, Serdes.Long().serializer(), Serdes.String().serializer());
}
Expand All @@ -229,8 +239,6 @@ private KafkaConsumer<Long, String> domainConsumer(final Domain domain) {
props.putAll(saslAuthProperties(domain.domainId));
props.putAll(
Map.of(
ConsumerConfig.CLIENT_ID_CONFIG,
domain.domainId + ".consumer",
ConsumerConfig.GROUP_ID_CONFIG,
domain.domainId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
Expand Down Expand Up @@ -272,10 +280,13 @@ private static Set<AclBinding> aclsForOtherDomain(final Domain domain) {
final String principal = "User:" + domain.domainId;
return Set.of(
new AclBinding(
new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL),
new ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL),
new AccessControlEntry(principal, "*", IDEMPOTENT_WRITE, ALLOW)),
new AclBinding(
new ResourcePattern(GROUP, domain.domainId, LITERAL),
new AccessControlEntry(principal, "*", ALL, ALLOW)),
new AclBinding(
new ResourcePattern(TRANSACTIONAL_ID, domain.domainId, PREFIXED),
new AccessControlEntry(principal, "*", ALL, ALLOW)));
}
}
160 changes: 114 additions & 46 deletions kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package io.specmesh.kafka;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.specmesh.apiparser.model.SchemaInfo;
import io.specmesh.test.TestSpecLoader;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -35,6 +37,80 @@ public class KafkaAPISpecTest {
private static final KafkaApiSpec API_SPEC =
TestSpecLoader.loadFromClassPath("bigdatalondon-api.yaml");

private enum ExpectedAcl {
READ_PUBLIC_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._public,"
+ " patternType=PREFIXED), entry=(principal=User:*, host=*,"
+ " operation=READ, permissionType=ALLOW))"),
DESCRIBE_PUBLIC_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._public,"
+ " patternType=PREFIXED), entry=(principal=User:*, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"),
READ_PROTECTED_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase,"
+ " patternType=LITERAL), entry=(principal=User:.some.other.domain.root,"
+ " host=*, operation=READ, permissionType=ALLOW))"),
DESCRIBE_PROTECTED_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase,"
+ " patternType=LITERAL), entry=(principal=User:.some.other.domain.root,"
+ " host=*, operation=DESCRIBE, permissionType=ALLOW))"),
DESCRIBE_OWN_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=DESCRIBE, permissionType=ALLOW))"),
READ_OWN_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=READ, permissionType=ALLOW))"),
WRITE_OWN_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=WRITE, permissionType=ALLOW))"),
CREATE_OWN_PRIVATE_TOPICS(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._private,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=CREATE, permissionType=ALLOW))"),
READ_OWN_GROUPS(
"(pattern=ResourcePattern(resourceType=GROUP,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=READ, permissionType=ALLOW))"),
WRITE_OWN_TX_IDS(
"(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID,"
+ " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*,"
+ " operation=WRITE, permissionType=ALLOW))"),
DESCRIBE_OWN_TX_IDS(
"(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID,"
+ " name=london.hammersmith.olympia.bigdatalondon, patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"),
OWN_IDEMPOTENT_WRITE(
"(pattern=ResourcePattern(resourceType=CLUSTER, name=kafka-cluster,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon, host=*,"
+ " operation=IDEMPOTENT_WRITE, permissionType=ALLOW))");

final String text;

ExpectedAcl(final String text) {
this.text = text;
}
}

@Test
public void shouldListAppOwnedTopics() {
final List<NewTopic> newTopics = API_SPEC.listDomainOwnedTopics();
Expand All @@ -48,14 +124,8 @@ public void shouldGenerateAclToAllowAnyOneToConsumePublicTopics() {
assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._public,"
+ " patternType=PREFIXED), entry=(principal=User:*, host=*,"
+ " operation=READ, permissionType=ALLOW))",
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._public,"
+ " patternType=PREFIXED), entry=(principal=User:*, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"));
ExpectedAcl.READ_PUBLIC_TOPICS.text,
ExpectedAcl.DESCRIBE_PUBLIC_TOPICS.text));
}

@Test
Expand All @@ -65,59 +135,57 @@ public void shouldGenerateAclToAllowSpecificUsersToConsumeProtectedTopics() {
assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase,"
+ " patternType=LITERAL),"
+ " entry=(principal=User:.some.other.domain.root, host=*,"
+ " operation=READ, permissionType=ALLOW))",
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._protected.retail.subway.food.purchase,"
+ " patternType=LITERAL),"
+ " entry=(principal=User:.some.other.domain.root, host=*,"
+ " operation=DESCRIBE, permissionType=ALLOW))"));
ExpectedAcl.READ_PROTECTED_TOPICS.text,
ExpectedAcl.DESCRIBE_PROTECTED_TOPICS.text));
}

@Test
public void shouldGenerateAclToAllowSelfControlOfPrivateTopics() {
public void shouldGenerateAclToAllowControlOfOwnPrivateTopics() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=DESCRIBE, permissionType=ALLOW))",
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=READ, permissionType=ALLOW))",
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=WRITE, permissionType=ALLOW))",
"(pattern=ResourcePattern(resourceType=TOPIC,"
+ " name=london.hammersmith.olympia.bigdatalondon._private,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=CREATE, permissionType=ALLOW))"));
ExpectedAcl.DESCRIBE_OWN_TOPICS.text,
ExpectedAcl.READ_OWN_TOPICS.text,
ExpectedAcl.WRITE_OWN_TOPICS.text,
ExpectedAcl.CREATE_OWN_PRIVATE_TOPICS.text));
}

@Test
public void shouldGenerateAclsToAllowSelfToUseConsumerGroups() {
public void shouldGenerateAclsToAllowToUseOwnConsumerGroups() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(
"(pattern=ResourcePattern(resourceType=GROUP,"
+ " name=london.hammersmith.olympia.bigdatalondon,"
+ " patternType=PREFIXED),"
+ " entry=(principal=User:london.hammersmith.olympia.bigdatalondon,"
+ " host=*, operation=READ, permissionType=ALLOW))"));
hasItems(ExpectedAcl.READ_OWN_GROUPS.text));
}

@Test
public void shouldGenerateAclsToAllowToUseOwnTransactionId() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(ExpectedAcl.WRITE_OWN_TX_IDS.text, ExpectedAcl.DESCRIBE_OWN_TX_IDS.text));
}

@Test
public void shouldGenerateAclsToAllowIdempotentWriteOnOlderClusters() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
hasItems(ExpectedAcl.OWN_IDEMPOTENT_WRITE.text));
}

@Test
void shouldNotHaveAnyAdditionalAcls() {
final Set<AclBinding> acls = API_SPEC.requiredAcls();

assertThat(
acls.stream().map(Object::toString).collect(Collectors.toSet()),
containsInAnyOrder(Arrays.stream(ExpectedAcl.values()).map(e -> e.text).toArray()));
}

@Test
Expand Down

0 comments on commit e61f28f

Please sign in to comment.