Skip to content

Commit

Permalink
Add container network support (#353)
Browse files Browse the repository at this point in the history
Co-authored-by: CORPORATE\ewalsh <[email protected]>
  • Loading branch information
walshe and CORPORATE\ewalsh authored Jul 29, 2024
1 parent 801fe5b commit 90f0f13
Showing 1 changed file with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public final class DockerKafkaEnvironment
private final Map<String, String> srEnv;
private final Set<AclBinding> aclBindings;
private final Optional<Credentials> adminUser;
private final Optional<Network> explicitNetwork;

private Network network;
private KafkaContainer kafkaBroker;
Expand All @@ -108,7 +109,8 @@ private DockerKafkaEnvironment(
final Optional<DockerImageName> srDockerImage,
final Map<String, String> srEnv,
final Set<AclBinding> aclBindings,
final Optional<Credentials> adminUser) {
final Optional<Credentials> adminUser,
final Optional<Network> explicitNetwork) {
this.startUpTimeout = requireNonNull(startUpTimeout, "startUpTimeout");
this.startUpAttempts = startUpAttempts;
this.kafkaDockerImage = requireNonNull(kafkaDockerImage, "kafkaDockerImage");
Expand All @@ -117,6 +119,7 @@ private DockerKafkaEnvironment(
this.srEnv = Map.copyOf(requireNonNull(srEnv, "srEnv"));
this.aclBindings = Set.copyOf(requireNonNull(aclBindings, "aclBindings"));
this.adminUser = requireNonNull(adminUser, "credentials");
this.explicitNetwork = requireNonNull(explicitNetwork, "explicitNetwork");
tearDown();
}

Expand Down Expand Up @@ -187,7 +190,7 @@ private void setUp() {
return;
}

network = Network.newNetwork();
network = explicitNetwork.orElseGet(Network::newNetwork);

kafkaBroker =
new KafkaContainer(kafkaDockerImage)
Expand Down Expand Up @@ -232,7 +235,9 @@ private void tearDown() {
}

if (network != null) {
network.close();
if (explicitNetwork.isEmpty()) {
network.close();
}
network = null;
}

Expand All @@ -247,6 +252,17 @@ private void installAcls() {
}
}

public String testNetworkKafkaBootstrapServers() {
return "PLAINTEXT://" + kafkaBroker.getNetworkAliases().get(0) + ":9092";
}

public String testNetworkSchemeRegistryServer() {
return "http://"
+ schemaRegistry.getNetworkAliases().get(0)
+ ":"
+ SchemaRegistryContainer.SCHEMA_REGISTRY_PORT;
}

/** Builder of {@link DockerKafkaEnvironment}. */
public static final class Builder {

Expand All @@ -271,6 +287,12 @@ public static final class Builder {
private final Map<String, String> userPasswords = new LinkedHashMap<>();
private boolean enableAcls = false;
private final Set<AclBinding> aclBindings = new HashSet<>();
private Optional<Network> explicitNetwork = Optional.empty();

public Builder withNetwork(final Network network) {
this.explicitNetwork = Optional.of(network);
return this;
}

/**
* Customise the startup count.
Expand Down Expand Up @@ -436,7 +458,8 @@ public DockerKafkaEnvironment build() {
srImage,
srEnv,
aclBindings,
adminUser());
adminUser(),
explicitNetwork);
}

private Optional<Credentials> adminUser() {
Expand Down

0 comments on commit 90f0f13

Please sign in to comment.