Skip to content

Commit

Permalink
KAFKA-16610 Replace "Map#entrySet#forEach" by "Map#forEach" (#15795)
Browse files Browse the repository at this point in the history
Reviewers: Apoorv Mittal <[email protected]>, Igor Soarez <[email protected]>
  • Loading branch information
frankvicky authored Apr 25, 2024
1 parent a7ceacd commit 864744f
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 53 deletions.
20 changes: 10 additions & 10 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,12 @@ class ControllerApis(
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
controllerResults.entrySet().forEach(entry => response.responses().add(
controllerResults.forEach((key, value) => response.responses().add(
new OldAlterConfigsResourceResponse().
setErrorCode(entry.getValue.error().code()).
setErrorMessage(entry.getValue.message()).
setResourceName(entry.getKey.name()).
setResourceType(entry.getKey.`type`().id())))
setErrorCode(value.error().code()).
setErrorMessage(value.message()).
setResourceName(key.name()).
setResourceType(key.`type`().id())))
requestHelper.sendResponseMaybeThrottle(request, throttleMs =>
new AlterConfigsResponse(response.setThrottleTimeMs(throttleMs)))
}
Expand Down Expand Up @@ -771,12 +771,12 @@ class ControllerApis(
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
controllerResults.entrySet().forEach(entry => response.responses().add(
controllerResults.forEach((key, value) => response.responses().add(
new AlterConfigsResourceResponse().
setErrorCode(entry.getValue.error().code()).
setErrorMessage(entry.getValue.message()).
setResourceName(entry.getKey.name()).
setResourceType(entry.getKey.`type`().id())))
setErrorCode(value.error().code()).
setErrorMessage(value.message()).
setResourceName(key.name()).
setResourceType(key.`type`().id())))
brokerLoggerResponses.forEach(r => response.responses().add(r))
requestHelper.sendResponseMaybeThrottle(request, throttleMs =>
new IncrementalAlterConfigsResponse(response.setThrottleTimeMs(throttleMs)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
validateTopicName(resource.name())
val properties = new Properties()
val nullTopicConfigs = new mutable.ArrayBuffer[String]()
config.entrySet().forEach(e => {
if (e.getValue == null) {
nullTopicConfigs += e.getKey
config.forEach((key, value) => {
if (value == null) {
nullTopicConfigs += key
} else {
properties.setProperty(e.getKey, e.getValue)
properties.setProperty(key, value)
}
})
if (nullTopicConfigs.nonEmpty) {
Expand All @@ -111,7 +111,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
config.entrySet().forEach(e => properties.setProperty(e.getKey, e.getValue))
config.forEach((key, value) => properties.setProperty(key, value))
ClientMetricsConfigs.validate(resource.name(), properties)
case _ => throwExceptionForUnknownResourceType(resource)
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/metadata/AclPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class AclPublisher(
try {
// Because the changes map is a LinkedHashMap, the deltas will be returned in
// the order they were performed.
aclsDelta.changes().entrySet().forEach(e =>
if (e.getValue.isPresent) {
authorizer.addAcl(e.getKey, e.getValue.get())
aclsDelta.changes().forEach((key, value) =>
if (value.isPresent) {
authorizer.addAcl(key, value.get())
} else {
authorizer.removeAcl(e.getKey)
authorizer.removeAcl(key)
})
} catch {
case t: Throwable => faultHandler.handleFault("Error loading " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
private[metadata] val connectionQuotas: ConnectionQuotas) extends Logging {

def update(quotasDelta: ClientQuotasDelta): Unit = {
quotasDelta.changes().entrySet().forEach { e =>
update(e.getKey, e.getValue)
quotasDelta.changes().forEach { (key, value) =>
update(key, value)
}
}

Expand Down Expand Up @@ -96,8 +96,8 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
ClientIdEntity(clientIdVal)
}
}
quotaDelta.changes().entrySet().forEach { e =>
handleUserClientQuotaChange(userClientEntity, e.getKey, e.getValue.asScala)
quotaDelta.changes().forEach { (key, value) =>
handleUserClientQuotaChange(userClientEntity, key, value.asScala)
}
} else {
warn(s"Ignoring unsupported quota entity $entity.")
Expand All @@ -116,10 +116,10 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
case _ => throw new IllegalStateException("Should only handle IP quota entities here")
}

quotaDelta.changes().entrySet().forEach { e =>
quotaDelta.changes().forEach { (key, value) =>
// The connection quota only understands the connection rate limit
val quotaName = e.getKey
val quotaValue = e.getValue
val quotaName = key
val quotaValue = value
if (!quotaName.equals(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)) {
warn(s"Ignoring unexpected quota key $quotaName for entity $ipEntity")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,9 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
val internalTopics = new util.HashSet[String]

image.topics().topicsByName().values().forEach { topic =>
topic.partitions().entrySet().forEach { entry =>
val partitionId = entry.getKey
val partition = entry.getValue
topic.partitions().forEach { (key, value) =>
val partitionId = key
val partition = value
partitionInfos.add(new PartitionInfo(topic.name(),
partitionId,
node(partition.leader),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
def getTopicNames(): Map[Uuid, String] = {
if (isKRaftTest()) {
val result = new util.HashMap[Uuid, String]()
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach {
e => result.put(e.getValue(), e.getKey())
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
(key, value) => result.put(value, key)
}
result.asScala.toMap
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public boolean equals(Object o) {
@Override
public String toString() {
List<String> features = new ArrayList<>();
localSupportedFeatures.entrySet().forEach(f -> features.add(f.getKey() + ": " + f.getValue()));
localSupportedFeatures.forEach((key, value) -> features.add(key + ": " + value));
features.sort(String::compareTo);
List<String> nodeIds = new ArrayList<>();
quorumNodeIds.forEach(id -> nodeIds.add("" + id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,7 @@ private ApiError createTopic(ControllerRequestContext context,
}
ApiError error = maybeCheckCreateTopicPolicy(() -> {
Map<Integer, List<Integer>> assignments = new HashMap<>();
newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
Replicas.toList(e.getValue().replicas)));
newParts.forEach((key, value) -> assignments.put(key, Replicas.toList(value.replicas)));
return new CreateTopicPolicy.RequestMetadata(
topic.name(), null, null, assignments, creationConfigs);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,7 @@ public String toString() {
TreeMap<String, String> outputMap = new TreeMap<>();
emptyLogDirs.forEach(e -> outputMap.put(e, "EMPTY"));
errorLogDirs.forEach(e -> outputMap.put(e, "ERROR"));
logDirProps.entrySet().forEach(
e -> outputMap.put(e.getKey(), e.getValue().toString()));
logDirProps.forEach((key, value) -> outputMap.put(key, value.toString()));
return "MetaPropertiesEnsemble" +
"(metadataLogDir=" + metadataLogDir +
", dirs={" + outputMap.entrySet().stream().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,12 @@ CreatableTopicResult createTestTopic(String name, int[][] replicas,
topic.assignments().add(new CreatableReplicaAssignment().
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
}
configs.entrySet().forEach(e -> topic.configs().add(
new CreateTopicsRequestData.CreateableTopicConfig().setName(e.getKey()).
setValue(e.getValue())));
configs.forEach((key, value) -> topic.configs().add(
new CreateTopicsRequestData.CreateableTopicConfig()
.setName(key)
.setValue(value)
)
);
request.topics().add(topic);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,9 @@ synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartit
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
this.partitionsByTopic = new HashMap<>();
topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic
.computeIfAbsent(entry.getKey().topic(), topic -> new ArrayList<>())
.add(entry.getValue())
);
topicPartitionInfo.forEach((key, value) -> this.partitionsByTopic
.computeIfAbsent(key.topic(), topic -> new ArrayList<>())
.add(value));

rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static void run(Duration timeoutMs, String... args) throws Exception {
ElectionType electionType = commandOptions.getElectionType();
Optional<Set<TopicPartition>> jsonFileTopicPartitions =
Optional.ofNullable(commandOptions.getPathToJsonFile())
.map(path -> parseReplicaElectionData(path));
.map(LeaderElectionCommand::parseReplicaElectionData);

Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
Expand Down Expand Up @@ -144,16 +144,15 @@ private static void electLeaders(Admin client, ElectionType electionType, Option
Set<TopicPartition> noop = new HashSet<>();
Map<TopicPartition, Throwable> failed = new HashMap<>();

electionResults.entrySet().stream().forEach(entry -> {
Optional<Throwable> error = entry.getValue();
electionResults.forEach((key, error) -> {
if (error.isPresent()) {
if (error.get() instanceof ElectionNotNeededException) {
noop.add(entry.getKey());
noop.add(key);
} else {
failed.put(entry.getKey(), error.get());
failed.put(key, error.get());
}
} else {
succeeded.add(entry.getKey());
succeeded.add(key);
}
});

Expand All @@ -175,10 +174,16 @@ private static void electLeaders(Admin client, ElectionType electionType, Option
if (!failed.isEmpty()) {
AdminCommandFailedException rootException =
new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
failed.entrySet().forEach(entry -> {
System.out.println(String.format("Error completing leader election (%s) for partition: %s: %s",
electionType, entry.getKey(), entry.getValue()));
rootException.addSuppressed(entry.getValue());
failed.forEach((key, value) -> {
System.out.println(
String.format(
"Error completing leader election (%s) for partition: %s: %s",
electionType,
key,
value
)
);
rootException.addSuppressed(value);
});
throw rootException;
}
Expand Down

0 comments on commit 864744f

Please sign in to comment.