Skip to content

Commit

Permalink
MINOR: Various cleanups in metadata (#16610)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Jul 22, 2024
1 parent 7a8f89e commit 90b779b
Show file tree
Hide file tree
Showing 42 changed files with 216 additions and 258 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object KafkaRaftServer {
// Load and verify the original ensemble.
val loader = new MetaPropertiesEnsemble.Loader()
loader.addMetadataLogDir(config.metadataLogDir)
config.logDirs.foreach(loader.addLogDir)
.addLogDirs(config.logDirs.asJava)
val initialMetaPropsEnsemble = loader.load()
val verificationFlags = util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class KafkaServer(
/* load metadata */
val initialMetaPropsEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
config.logDirs.foreach(loader.addLogDir)
loader.addLogDirs(config.logDirs.asJava)
if (config.migrationEnabled) {
loader.addMetadataLogDir(config.metadataLogDir)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ object StorageTool extends Logging {
throw new TerseFailure("No log directories found in the configuration.")
}
val loader = new MetaPropertiesEnsemble.Loader()
directories.foreach(loader.addLogDir)
.addLogDirs(directories.asJava)
val metaPropertiesEnsemble = loader.load()
metaPropertiesEnsemble.verify(metaProperties.clusterId(), metaProperties.nodeId(),
util.EnumSet.noneOf(classOf[VerificationFlag]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

trait QuorumImplementation {
def createBroker(
Expand Down Expand Up @@ -103,7 +104,7 @@ class KRaftQuorumImplementation(
): KafkaBroker = {
val metaPropertiesEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
config.logDirs.foreach(loader.addLogDir)
.addLogDirs(config.logDirs.asJava)
loader.addMetadataLogDir(config.metadataLogDir)
val ensemble = loader.load()
val copier = new MetaPropertiesEnsemble.Copier(ensemble)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,10 @@ private void remove(int brokerId, Uuid topicId, int removedPartition) {
}
} else {
int[] newPartitions = new int[partitions.length - 1];
int j = 0;
for (int i = 0; i < partitions.length; i++) {
int partition = partitions[i];
int i = 0;
for (int partition : partitions) {
if (partition != removedPartition) {
newPartitions[j++] = partition;
newPartitions[i++] = partition;
}
}
topicMap.put(topicId, newPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,10 @@ private void remove(int brokerId, Uuid topicId, int removedPartition, boolean le
}
} else {
int[] newPartitions = new int[partitions.length - 1];
int j = 0;
for (int i = 0; i < partitions.length; i++) {
int partition = partitions[i];
int i = 0;
for (int partition : partitions) {
if (partition != removedPartition) {
newPartitions[j++] = partition;
newPartitions[i++] = partition;
}
}
topicMap.put(topicId, newPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ static boolean isValidIpEntity(String ip) {
}

private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> validatedEntityMap) {
// Given a quota entity (which is a mapping of entity type to entity name), validate it's types
// Given a quota entity (which is a mapping of entity type to entity name), validate its types
if (entity.entries().isEmpty()) {
return new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,12 +464,12 @@ ControllerResult<Void> registerController(ControllerRegistrationRequestData requ
}
ListenerInfo listenerInfo = ListenerInfo.fromControllerRegistrationRequest(request.listeners());
ControllerFeatureCollection features = new ControllerFeatureCollection();
request.features().forEach(feature -> {
request.features().forEach(feature ->
features.add(new RegisterControllerRecord.ControllerFeature().
setName(feature.name()).
setMaxSupportedVersion(feature.maxSupportedVersion()).
setMinSupportedVersion(feature.minSupportedVersion()));
});
setMinSupportedVersion(feature.minSupportedVersion()))
);
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(new RegisterControllerRecord().
setControllerId(request.controllerId()).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -472,10 +471,7 @@ public Map<ConfigResource, ResultOrError<Map<String, String>>> describeConfigs(
if (configs != null) {
Collection<String> targetConfigs = resourceEntry.getValue();
if (targetConfigs.isEmpty()) {
Iterator<Entry<String, String>> iter =
configs.entrySet(lastCommittedOffset).iterator();
while (iter.hasNext()) {
Entry<String, String> entry = iter.next();
for (Entry<String, String> entry : configs.entrySet(lastCommittedOffset)) {
foundConfigs.put(entry.getKey(), entry.getValue());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public PartitionChangeBuilder setTargetIsrWithBrokerStates(List<BrokerState> tar
return setTargetIsr(
targetIsrWithEpoch
.stream()
.map(brokerState -> brokerState.brokerId())
.map(BrokerState::brokerId)
.collect(Collectors.toList())
);
}
Expand Down Expand Up @@ -285,7 +285,7 @@ private ElectionResult electAnyLeader() {
if (election == Election.UNCLEAN) {
// Attempt unclean leader election
Optional<Integer> uncleanLeader = targetReplicas.stream()
.filter(replica -> isAcceptableLeader.test(replica))
.filter(isAcceptableLeader::test)
.findFirst();
if (uncleanLeader.isPresent()) {
return new ElectionResult(uncleanLeader.get(), true);
Expand Down Expand Up @@ -422,9 +422,6 @@ void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord record) {
}
}

/**
* @return true if the reassignment was completed; false otherwise.
*/
private void completeReassignmentIfNeeded() {
PartitionReassignmentReplicas reassignmentReplicas =
new PartitionReassignmentReplicas(
Expand Down Expand Up @@ -572,15 +569,15 @@ private void maybePopulateTargetElr() {
// To do that, we first union the current ISR and current elr, then filter out the target ISR and unclean shutdown
// Replicas.
Set<Integer> candidateSet = new HashSet<>(targetElr);
Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
Arrays.stream(partition.isr).forEach(candidateSet::add);
targetElr = candidateSet.stream()
.filter(replica -> !targetIsrSet.contains(replica))
.filter(replica -> uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica))
.collect(Collectors.toList());

// Calculate the new last known ELR. Includes any ISR members since the ISR size drops below min ISR.
// In order to reduce the metadata usage, the last known ELR excludes the members in ELR and current ISR.
targetLastKnownElr.forEach(ii -> candidateSet.add(ii));
candidateSet.addAll(targetLastKnownElr);
targetLastKnownElr = candidateSet.stream()
.filter(replica -> !targetIsrSet.contains(replica))
.filter(replica -> !targetElr.contains(replica))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* All of these except MetadataErrorCount are managed by ControllerMetadataMetricsPublisher.
*
* IMPORTANT: Metrics which are managed by the QuorumController class itself should go in
* @link{org.apache.kafka.controller.metrics.QuorumControllerMetrics}, not here.
* {@link org.apache.kafka.controller.metrics.QuorumControllerMetrics}, not here.
*/
public final class ControllerMetadataMetrics implements AutoCloseable {
private static final MetricName FENCED_BROKER_COUNT = getMetricName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
Histogram histogram = registry.get().newHistogram(name, biased);
return e -> histogram.update(e);
return histogram::update;
} else {
return __ -> { };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public AclsImage apply() {
public String toString() {
return "AclsDelta(" +
", changes=" + changes.entrySet().stream().
map(e -> "" + e.getKey() + "=" + e.getValue()).
map(e -> e.getKey() + "=" + e.getValue()).
collect(Collectors.joining(", ")) + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
StringBuilder scramImageString = new StringBuilder("ScramImage({");
for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : mechanisms.entrySet()) {
if (!mechanismEntry.getValue().isEmpty()) {
scramImageString.append(mechanismEntry.getKey() + ":");
scramImageString.append(mechanismEntry.getKey()).append(":");
List<String> users = new ArrayList<>(mechanismEntry.getValue().keySet());
scramImageString.append(String.join(", ", users));
scramImageString.append("},{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ public LocalReplicaChanges localChanges(int brokerId) {
}
topicIds.putIfAbsent(name(), id());
}
} else if (
entry.getValue().leader != brokerId &&
Replicas.contains(entry.getValue().replicas, brokerId)
) {
} else {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
followers.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public int hashCode() {
* Like TopicsImage itself, this map is immutable.
*/
public Map<String, Uuid> topicNameToIdView() {
return new TranslatedValueMapView<>(topicsByName, image -> image.id());
return new TranslatedValueMapView<>(topicsByName, TopicImage::id);
}

/**
Expand All @@ -113,7 +113,7 @@ public Map<String, Uuid> topicNameToIdView() {
* Like TopicsImage itself, this map is immutable.
*/
public Map<Uuid, String> topicIdToNameView() {
return new TranslatedValueMapView<>(topicsById, image -> image.name());
return new TranslatedValueMapView<>(topicsById, TopicImage::name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static class Builder {
private String threadNamePrefix = "";
private Time time = Time.SYSTEM;
private LogContext logContext = null;
private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
private FaultHandler faultHandler = FaultHandlerException::new;
private MetadataLoaderMetrics metrics = null;
private Supplier<OptionalLong> highWaterMarkAccessor = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public MetadataLoaderMetrics(
registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new Gauge<Integer>() {
@Override
public Integer value() {
return Integer.valueOf(currentMetadataVersion().featureLevel());
return (int) currentMetadataVersion().featureLevel();
}
}));
registry.ifPresent(r -> r.newGauge(CURRENT_CONTROLLER_ID, new Gauge<Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public Builder(RegisterControllerRecord record) {
});
this.listeners = Collections.unmodifiableMap(newListeners);
Map<String, VersionRange> newSupportedFeatures = new HashMap<>();
record.features().forEach(feature -> {
record.features().forEach(feature ->
newSupportedFeatures.put(feature.name(), VersionRange.of(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
});
feature.minSupportedVersion(), feature.maxSupportedVersion()))
);
this.supportedFeatures = Collections.unmodifiableMap(newSupportedFeatures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ public BootstrapDirectory(
}

public BootstrapMetadata read() throws Exception {
if (!Files.isDirectory(Paths.get(directoryPath))) {
if (Files.exists(Paths.get(directoryPath))) {
Path path = Paths.get(directoryPath);
if (!Files.isDirectory(path)) {
if (Files.exists(path)) {
throw new RuntimeException("Path " + directoryPath + " exists, but is not " +
"a directory.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,13 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
);
});

deletedTopics.forEach((topicId, topicName) -> {
deletedTopics.forEach((topicId, topicName) ->
operationConsumer.accept(
DELETE_TOPIC,
"Deleted Topic " + topicName + ", ID " + topicId,
migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState)
);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
});
)
);

newPartitions.forEach((topicId, partitionMap) -> {
TopicImage topic = topicsImage.getTopic(topicId);
Expand All @@ -272,14 +271,14 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
migrationState));
});

extraneousPartitionsInZk.forEach((topicName, partitions) -> {
extraneousPartitionsInZk.forEach((topicName, partitions) ->
operationConsumer.accept(
DELETE_PARTITION,
"Deleted extraneous Partitions " + partitions + " for Topic " + topicName,
migrationState -> migrationClient.topicClient().deleteTopicPartitions(
Collections.singletonMap(topicName, partitions),
migrationState));
});
migrationState))
);
}

void handleTopicsDelta(
Expand Down Expand Up @@ -516,9 +515,9 @@ void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta metadata

// Populate list with users with scram changes
if (metadataDelta.scramDelta() != null) {
metadataDelta.scramDelta().changes().forEach((scramMechanism, changes) -> {
changes.forEach((userName, changeOpt) -> users.add(userName));
});
metadataDelta.scramDelta().changes().forEach((scramMechanism, changes) ->
changes.forEach((userName, changeOpt) -> users.add(userName))
);
}

// Populate list with users with quota changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,7 @@ public static class Loader {
private Optional<String> metadataLogDir = Optional.empty();

public Loader addLogDirs(Collection<String> logDirs) {
for (String logDir : logDirs) {
this.logDirs.add(logDir);
}
return this;
}

public Loader addLogDir(String logDir) {
this.logDirs.add(logDir);
this.logDirs.addAll(logDirs);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,15 @@ public void testActivationMessageForNonEmptyLogWithMigrations() {

assertEquals(
"Should not have ZK migrations enabled on a cluster that was created in KRaft mode.",
assertThrows(RuntimeException.class, () -> {
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.empty()),
MetadataVersion.IBP_3_4_IV0
);
}).getMessage()
)
).getMessage()
);

result = ActivationRecordsGenerator.recordsForNonEmptyLog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ public void testActivateAndDeactivate() {
public void testDeactivateFailsIfNotActive() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
assertEquals("Can't deactivate inactive OffsetControlManager.",
assertThrows(RuntimeException.class,
() -> offsetControl.deactivate()).
getMessage());
assertThrows(RuntimeException.class, offsetControl::deactivate).getMessage());
}

private static Batch<ApiMessageAndVersion> newFakeBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,20 @@ public void testMonotonic() {
assertEquals(42, range.firstProducerId());

// Can't go backwards in Producer IDs
assertThrows(RuntimeException.class, () -> {
assertThrows(RuntimeException.class, () ->
producerIdControlManager.replay(
new ProducerIdsRecord()
.setBrokerId(1)
.setBrokerEpoch(100)
.setNextProducerId(40));
}, "Producer ID range must only increase");
assertThrows(RuntimeException.class, () -> {
.setNextProducerId(40)),
"Producer ID range must only increase");
assertThrows(RuntimeException.class, () ->
producerIdControlManager.replay(
new ProducerIdsRecord()
.setBrokerId(2)
.setBrokerEpoch(100)
.setNextProducerId(42));
}, "Producer ID range must only increase");
.setNextProducerId(42)),
"Producer ID range must only increase");
range = producerIdControlManager.generateNextProducerId(3, 100).response();
assertEquals(42, range.firstProducerId());

Expand Down
Loading

0 comments on commit 90b779b

Please sign in to comment.