diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 735aa8bafd2b..1b9f4d716444 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2353,9 +2353,9 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { // Assign a list of segments in batch mode public void assignTableSegments(String tableNameWithType, List segmentNames) { Map segmentZKMetadataPathMap = new HashMap<>(); - for (String segmentName: segmentNames) { - String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, - segmentName); + for (String segmentName : segmentNames) { + String segmentZKMetadataPath = + ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName); segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); } // Assign instances for the segment and add it into IdealState @@ -2370,11 +2370,12 @@ public void assignTableSegments(String tableNameWithType, List segmentNa if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { List sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); - for (String segmentName: segmentNames) { + for (String segmentName : segmentNames) { // Update segment tier to support direct assignment for multiple data directories updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); - InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( - tableNameWithType, segmentName, sortedTiers, _helixZkManager); + InstancePartitions tierInstancePartitions = + TierConfigUtils.getTieredInstancePartitionsForSegment(tableNameWithType, segmentName, sortedTiers, + _helixZkManager); if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { // Override instance partitions for offline table LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", @@ -2386,35 +2387,33 @@ public void assignTableSegments(String tableNameWithType, List segmentNa SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); - synchronized (getIdealStateUpdaterLock(tableNameWithType)) { - long segmentAssignmentStartTs = System.currentTimeMillis(); - Map finalInstancePartitionsMap = instancePartitionsMap; - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { - assert idealState != null; - for (String segmentName: segmentNames) { - Map> currentAssignment = idealState.getRecord().getMapFields(); - if (currentAssignment.containsKey(segmentName)) { - LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - tableNameWithType); - } else { - List assignedInstances = - segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); - LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - tableNameWithType); - currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, - SegmentStateModel.ONLINE)); - } + long segmentAssignmentStartMs = System.currentTimeMillis(); + Map finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + for (String segmentName : segmentNames) { + Map> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } - return idealState; - }); - LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} ms", segmentNames.size(), segmentNames, - tableNameWithType, System.currentTimeMillis() - segmentAssignmentStartTs); - } + } + return idealState; + }); + LOGGER.info("Added {} segments: {} to IdealState for table: {} in {} ms", segmentNames.size(), segmentNames, + tableNameWithType, System.currentTimeMillis() - segmentAssignmentStartMs); } catch (Exception e) { LOGGER.error( "Caught exception while adding segments: {} to IdealState for table: {}, deleting segments ZK metadata", segmentNames, tableNameWithType, e); - for (Map.Entry segmentZKMetadataPathEntry: segmentZKMetadataPathMap.entrySet()) { + for (Map.Entry segmentZKMetadataPathEntry : segmentZKMetadataPathMap.entrySet()) { String segmentName = segmentZKMetadataPathEntry.getKey(); String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue(); if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) {