Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture all fields in Other class to enable more powerful roles #584

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion docs/futures/define-custom-roles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,96 @@ Your topology file could look like this:
group: "foo"
bar:
- principal: "User:bandana"
group: "bar"
group: "bar"


More generic usage of roles
-----------

Lets assume you need to deploy multiple instances of an application that is not willing to use julie topic hierarchy,
requires multiple topics and custom acls. You could define a role for each instance, or for each group/topic,
but that would clutter the roles file quickly. You would however like to have these topics in julie, for acl management
and for documentation. For example kafka mirror maker needs many topics which would be nice to group together in julie
config.

For brevity example below gives too much permissions (ALL), but works as example how feature works.

.. code-block:: YAML

roles:
- name: "mirrorMaker"
acls:
- resourceType: "Topic"
resourceName: "{{statusTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{offsetTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{configTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{targetPrefix}}"
patternType: "PREFIXED"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{offsetSyncTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{checkpointsTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Cluster"
resourceName: "kafka-cluster"
patternType: "LITERAL"
host: "*"
operation: "DESCRIBE"
permissionType: "ALLOW"
- resourceType: "Cluster"
resourceName: "kafka-cluster"
patternType: "LITERAL"
host: "*"
operation: "DESCRIBE_CONFIGS"
permissionType: "ALLOW"
- resourceType: "Group"
resourceName: "{{group}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"

With previous roles file mirror maker can be defined in a clutter free manner in a project.

.. code-block:: YAML

context: "contextOrg"
source: "source"
projects:
- name: "foo"
mirrorMaker:
- principal: "User:banana"
group: "foo"
statusTopic: "test-cluster-status"
offsetTopic: "test-cluster-offsets"
configTopic: "test-cluster-configs"
targetPrefix: "target-prefix."
offsetSyncTopic: "mm2-offset-syncs.test-mm.internal"
checkpointsTopic: "test-mm.checkpoints.internal"

Somewhat viable alternative to this would be to use special_topics, but they limit to topic and producer/consumer acl:s.
16 changes: 16 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/users/Other.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.purbon.kafka.topology.model.users;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.purbon.kafka.topology.model.User;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -8,6 +10,8 @@

public class Other extends User {

private Map<String, Object> unmappedFields = new HashMap<>();

private Optional<String> transactionId;
private Optional<Boolean> idempotence;
private Optional<String> group;
Expand All @@ -27,6 +31,7 @@ public Other() {

public Map<String, Object> asMap() {
Map<String, Object> map = new HashMap<>();
map.putAll(unmappedFields);
map.put("topic", topicString());
if (subject.isPresent()) {
map.put("subject", subjectString());
Expand All @@ -41,6 +46,17 @@ public Map<String, Object> asMap() {
return map;
}

// Capture all other fields that Jackson do not match other members
@JsonAnyGetter
public Map<String, Object> otherFields() {
return unmappedFields;
}

@JsonAnySetter
public void setOtherField(String name, Object value) {
unmappedFields.put(name, value);
}

public String groupString() {
return group.orElse("*");
}
Expand Down
28 changes: 28 additions & 0 deletions src/test/java/com/purbon/kafka/topology/JulieRolesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -105,4 +106,31 @@ public void testTopologyValidationCorrect() throws IOException {
Topology topology = topologySerdes.deserialise(TestUtils.getResourceFile("/descriptor.yaml"));
roles.validateTopology(topology);
}

@Test
public void testMirrorMakerRole() throws IOException {
JulieRoles roles = parser.deserialise(TestUtils.getResourceFile("/roles-mirrormaker.yaml"));
TopologySerdes topologySerdes = new TopologySerdes();

Topology topology =
topologySerdes.deserialise(TestUtils.getResourceFile("/descriptor-mirrormaker.yaml"));
roles.validateTopology(topology);

var expected =
new String[] {
"test-cluster-status",
"test-cluster-offsets",
"test-cluster-configs",
"target-prefix.",
"mm2-offset-syncs.test-mm.internal",
"test-mm.checkpoints.internal"
};

var mirrorMaker = topology.getProjects().get(0).getOthers().get("mirrorMaker").get(0);
var topics = mirrorMaker.asMap().values();

for (String t : expected) {
Assert.assertTrue(topics.contains(t));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,46 @@ public void consumerAclsCreation() throws ExecutionException, InterruptedExcepti
verifyConsumerAcls(consumers);
}

@Test
public void mirrorMakerAclsCreation()
throws ExecutionException, InterruptedException, IOException {
Project project = new ProjectImpl("project");

Topology topology = new TopologyImpl();
topology.setContext("integration-test");
topology.addOther("source", "testMirrorMakerAclsCreation");

var other = new Other();
other.setPrincipal("User:mm2");
other.setGroup(Optional.of("testgroup"));
other.setOtherField("statusTopic", "test-mirrormaker2-cluster-status");
other.setOtherField("offsetTopic", "test-mirrormaker2-cluster-offsets");
other.setOtherField("configTopic", "test-mirrormaker2-cluster-configs");
other.setOtherField("targetPrefix", "test-mm.");
other.setOtherField("offsetSyncTopic", "mm2-offset-syncs.test-mm.internal");
other.setOtherField("checkpointsTopic", "test-mm.checkpoints.internal");

project.setOthers(Collections.singletonMap("mirrorMaker", Collections.singletonList(other)));

topology.addProject(project);

Properties props = new Properties();
props.put(JULIE_ROLES, TestUtils.getResourceFilename("/roles-mirrormaker.yaml"));

Map<String, String> cliOps = new HashMap<>();

Configuration config = new Configuration(cliOps, props);

accessControlManager =
new AccessControlManager(
aclsProvider, new AclsBindingsBuilder(config), config.getJulieRoles(), config);

accessControlManager.updatePlan(topology, plan);
plan.run(false);

verifyMirrorMakerAcls(other);
}

@Test(expected = IOException.class)
public void shouldDetectChangesInTheRemoteClusterBetweenRuns() throws IOException {

Expand Down Expand Up @@ -746,4 +786,41 @@ private void verifyConsumerAcls(List<Consumer> consumers)
Assert.assertTrue(ops.contains(AclOperation.READ));
}
}

private void verifyMirrorMakerAcls(Other other) throws InterruptedException, ExecutionException {
ResourcePatternFilter resourceFilter = ResourcePatternFilter.ANY;

AccessControlEntryFilter entryFilter =
new AccessControlEntryFilter(
other.getPrincipal(), null, AclOperation.ALL, AclPermissionType.ALLOW);

AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);

Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

// 6 topics + 1 group with ALL operation
assertEquals(7, acls.size());

entryFilter =
new AccessControlEntryFilter(
other.getPrincipal(), null, AclOperation.DESCRIBE, AclPermissionType.ALLOW);

filter = new AclBindingFilter(resourceFilter, entryFilter);

acls = kafkaAdminClient.describeAcls(filter).values().get();

// 1 DESCRIBE permission for cluster
assertEquals(1, acls.size());

entryFilter =
new AccessControlEntryFilter(
other.getPrincipal(), null, AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW);

filter = new AclBindingFilter(resourceFilter, entryFilter);

acls = kafkaAdminClient.describeAcls(filter).values().get();

// 1 DESCRIBE_CONFIGS permission for cluster
assertEquals(1, acls.size());
}
}
14 changes: 14 additions & 0 deletions src/test/resources/descriptor-mirrormaker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
context: "contextOrg"
source: "source"
projects:
- name: "foo"
mirrorMaker:
- principal: "User:banana"
group: "test-group"
statusTopic: "test-cluster-status"
offsetTopic: "test-cluster-offsets"
configTopic: "test-cluster-configs"
targetPrefix: "target-prefix."
offsetSyncTopic: "mm2-offset-syncs.test-mm.internal"
checkpointsTopic: "test-mm.checkpoints.internal"
58 changes: 58 additions & 0 deletions src/test/resources/roles-mirrormaker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
roles:
- name: "mirrorMaker"
acls:
- resourceType: "Topic"
resourceName: "{{statusTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{offsetTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{configTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{targetPrefix}}"
patternType: "PREFIXED"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{offsetSyncTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Topic"
resourceName: "{{checkpointsTopic}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"
- resourceType: "Cluster"
resourceName: "kafka-cluster"
patternType: "LITERAL"
host: "*"
operation: "DESCRIBE"
permissionType: "ALLOW"
- resourceType: "Cluster"
resourceName: "kafka-cluster"
patternType: "LITERAL"
host: "*"
operation: "DESCRIBE_CONFIGS"
permissionType: "ALLOW"
- resourceType: "Group"
resourceName: "{{group}}"
patternType: "LITERAL"
host: "*"
operation: "ALL"
permissionType: "ALLOW"