Skip to content

Commit

Permalink
Fix the bug of using push time to identify new created segment (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Sep 15, 2023
1 parent 14f3246 commit cf460aa
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -60,19 +61,19 @@
* selection. When it is selected, we don't serve the new segment.
* <p>
* Definition of new segment:
* 1) Segment pushed more than 5 minutes ago.
* - If we first see a segment via initialization, we look up segment push time from zookeeper.
* 1) Segment created more than 5 minutes ago.
* - If we first see a segment via initialization, we look up segment creation time from zookeeper.
* - If we first see a segment via onAssignmentChange initialization, we use the calling time of onAssignmentChange
* as approximation.
* 2) We retire new segment as old when:
* - The push time is more than 5 minutes ago
* - The creation time is more than 5 minutes ago
* - Any instance for new segment is in ERROR state
* - External view for segment converges with ideal state
*
* Note that this implementation means:
* 1) Inconsistent selection of new segments across queries (some queries will serve new segments and others won't).
* 2) When there is no state update from helix, new segments won't be retired because of the time passing (those with
* push time more than 5 minutes ago).
* creation time more than 5 minutes ago).
* TODO: refresh new/old segment state where there is no update from helix for long time.
*/
abstract class BaseInstanceSelector implements InstanceSelector {
Expand Down Expand Up @@ -109,8 +110,9 @@ abstract class BaseInstanceSelector implements InstanceSelector {
public void init(Set<String> enabledInstances, IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
_enabledInstances = enabledInstances;
Map<String, Long> newSegmentPushTimeMap = getNewSegmentPushTimeMapFromZK(idealState, externalView, onlineSegments);
updateSegmentMaps(idealState, externalView, onlineSegments, newSegmentPushTimeMap);
Map<String, Long> newSegmentCreationTimeMap =
getNewSegmentCreationTimeMapFromZK(idealState, externalView, onlineSegments);
updateSegmentMaps(idealState, externalView, onlineSegments, newSegmentCreationTimeMap);
refreshSegmentStates();
}

Expand All @@ -122,9 +124,9 @@ static boolean isOnlineForRouting(@Nullable String state) {
}

/**
* Returns a map from new segment to their push time based on the ZK metadata.
* Returns a map from new segment to their creation time based on the ZK metadata.
*/
Map<String, Long> getNewSegmentPushTimeMapFromZK(IdealState idealState, ExternalView externalView,
Map<String, Long> getNewSegmentCreationTimeMapFromZK(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
List<String> potentialNewSegments = new ArrayList<>();
Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
Expand All @@ -136,9 +138,8 @@ Map<String, Long> getNewSegmentPushTimeMapFromZK(IdealState idealState, External
}
}

// Use push time in ZK metadata to determine whether the potential new segment is newly pushed
Map<String, Long> newSegmentPushTimeMap = new HashMap<>();
long nowMillis = _clock.millis();
Map<String, Long> newSegmentCreationTimeMap = new HashMap<>();
long currentTimeMs = _clock.millis();
String segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + "/";
List<String> segmentZKMetadataPaths = new ArrayList<>(potentialNewSegments.size());
Expand All @@ -151,14 +152,14 @@ Map<String, Long> getNewSegmentPushTimeMapFromZK(IdealState idealState, External
continue;
}
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(record);
long pushTimeMillis = segmentZKMetadata.getPushTime();
if (InstanceSelector.isNewSegment(pushTimeMillis, nowMillis)) {
newSegmentPushTimeMap.put(segmentZKMetadata.getSegmentName(), pushTimeMillis);
long creationTimeMs = SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata);
if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs)) {
newSegmentCreationTimeMap.put(segmentZKMetadata.getSegmentName(), creationTimeMs);
}
}
LOGGER.info("Got {} new segments: {} for table: {} by reading ZK metadata, current time: {}",
newSegmentPushTimeMap.size(), newSegmentPushTimeMap, _tableNameWithType, nowMillis);
return newSegmentPushTimeMap;
newSegmentCreationTimeMap.size(), newSegmentCreationTimeMap, _tableNameWithType, currentTimeMs);
return newSegmentCreationTimeMap;
}

/**
Expand Down Expand Up @@ -220,36 +221,36 @@ static SortedMap<String, String> convertToSortedMap(Map<String, String> map) {
* ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}) and new segments.
* After this update:
* - Old segments' online instances should be tracked in _oldSegmentCandidatesMap
* - New segments' state (push time and candidate instances) should be tracked in _newSegmentStateMap
* - New segments' state (creation time and candidate instances) should be tracked in _newSegmentStateMap
*/
void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
Map<String, Long> newSegmentPushTimeMap) {
Map<String, Long> newSegmentCreationTimeMap) {
_oldSegmentCandidatesMap.clear();
_newSegmentStateMap = new HashMap<>(HashUtil.getHashMapCapacity(newSegmentPushTimeMap.size()));
_newSegmentStateMap = new HashMap<>(HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size()));

Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
for (String segment : onlineSegments) {
Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
Long newSegmentPushTimeMillis = newSegmentPushTimeMap.get(segment);
Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment);
if (externalViewInstanceStateMap == null) {
if (newSegmentPushTimeMillis != null) {
if (newSegmentCreationTimeMs != null) {
// New segment
List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size());
for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
if (isOnlineForRouting(entry.getValue())) {
candidates.add(new SegmentInstanceCandidate(entry.getKey(), false));
}
}
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentPushTimeMillis, candidates));
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMs, candidates));
} else {
// Old segment
_oldSegmentCandidatesMap.put(segment, Collections.emptyList());
}
} else {
TreeSet<String> onlineInstances = getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap);
if (newSegmentPushTimeMillis != null) {
if (newSegmentCreationTimeMs != null) {
// New segment
List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size());
for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
Expand All @@ -258,7 +259,7 @@ void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<Str
candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance)));
}
}
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentPushTimeMillis, candidates));
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMs, candidates));
} else {
// Old segment
List<SegmentInstanceCandidate> candidates = new ArrayList<>(onlineInstances.size());
Expand Down Expand Up @@ -358,44 +359,44 @@ public void onInstancesChange(Set<String> enabledInstances, List<String> changed
*/
@Override
public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
Map<String, Long> newSegmentPushTimeMap =
getNewSegmentPushTimeMapFromExistingStates(idealState, externalView, onlineSegments);
updateSegmentMaps(idealState, externalView, onlineSegments, newSegmentPushTimeMap);
Map<String, Long> newSegmentCreationTimeMap =
getNewSegmentCreationTimeMapFromExistingStates(idealState, externalView, onlineSegments);
updateSegmentMaps(idealState, externalView, onlineSegments, newSegmentCreationTimeMap);
refreshSegmentStates();
}

/**
* Returns a map from new segment to their push time based on the existing in-memory states.
* Returns a map from new segment to their creation time based on the existing in-memory states.
*/
Map<String, Long> getNewSegmentPushTimeMapFromExistingStates(IdealState idealState, ExternalView externalView,
Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
Map<String, Long> newSegmentPushTimeMap = new HashMap<>();
long nowMillis = _clock.millis();
Map<String, Long> newSegmentCreationTimeMap = new HashMap<>();
long currentTimeMs = _clock.millis();
Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields();
for (String segment : onlineSegments) {
NewSegmentState newSegmentState = _newSegmentStateMap.get(segment);
long pushTimeMillis = 0;
long creationTimeMs = 0;
if (newSegmentState != null) {
// It was a new segment before, check the push time and segment state to see if it is still a new segment
if (InstanceSelector.isNewSegment(newSegmentState.getPushTimeMillis(), nowMillis)) {
pushTimeMillis = newSegmentState.getPushTimeMillis();
// It was a new segment before, check the creation time and segment state to see if it is still a new segment
if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), currentTimeMs)) {
creationTimeMs = newSegmentState.getCreationTimeMs();
}
} else if (!_oldSegmentCandidatesMap.containsKey(segment)) {
// This is the first time we see this segment, use the current time as the push time
pushTimeMillis = nowMillis;
// This is the first time we see this segment, use the current time as the creation time
creationTimeMs = currentTimeMs;
}
// For recently pushed segment, check if it is qualified as new segment
if (pushTimeMillis > 0) {
// For recently created segment, check if it is qualified as new segment
if (creationTimeMs > 0) {
assert idealStateAssignment.containsKey(segment);
if (isPotentialNewSegment(idealStateAssignment.get(segment), externalViewAssignment.get(segment))) {
newSegmentPushTimeMap.put(segment, pushTimeMillis);
newSegmentCreationTimeMap.put(segment, creationTimeMs);
}
}
}
LOGGER.info("Got {} new segments: {} for table: {} by processing existing states, current time: {}",
newSegmentPushTimeMap.size(), newSegmentPushTimeMap, _tableNameWithType, nowMillis);
return newSegmentPushTimeMap;
newSegmentCreationTimeMap.size(), newSegmentCreationTimeMap, _tableNameWithType, currentTimeMs);
return newSegmentCreationTimeMap;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
public interface InstanceSelector {
long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);

static boolean isNewSegment(long pushMillis, long nowMillis) {
return nowMillis - pushMillis <= NEW_SEGMENT_EXPIRATION_MILLIS;
static boolean isNewSegment(long creationTimeMs, long currentTimeMs) {
return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= NEW_SEGMENT_EXPIRATION_MILLIS;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@
*/
@Immutable
public class NewSegmentState {
// Segment push time. This could be
// Segment creation time. This could be
// 1) From ZK if we first see this segment via init call.
// 2) Use wall time, if first see this segment from onAssignmentChange call.
private final long _pushTimeMillis;
// 2) Use wall time if we first see this segment from onAssignmentChange call.
private final long _creationTimeMs;

// List of SegmentInstanceCandidate: which contains instance name and online flags.
// The candidates have to be in instance sorted order.
private final List<SegmentInstanceCandidate> _candidates;

public NewSegmentState(long pushTimeMillis, List<SegmentInstanceCandidate> candidates) {
_pushTimeMillis = pushTimeMillis;
public NewSegmentState(long creationTimeMs, List<SegmentInstanceCandidate> candidates) {
_creationTimeMs = creationTimeMs;
_candidates = candidates;
}

public long getPushTimeMillis() {
return _pushTimeMillis;
public long getCreationTimeMs() {
return _creationTimeMs;
}

public List<SegmentInstanceCandidate> getCandidates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public StrictReplicaGroupInstanceSelector(String tableNameWithType, ZkHelixPrope
*/
@Override
void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<String> onlineSegments,
Map<String, Long> newSegmentPushTimeMap) {
Map<String, Long> newSegmentCreationTimeMap) {
_oldSegmentCandidatesMap.clear();
int newSegmentMapCapacity = HashUtil.getHashMapCapacity(newSegmentPushTimeMap.size());
int newSegmentMapCapacity = HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
_newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);

Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields();
Expand All @@ -113,7 +113,7 @@ void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<Str
} else {
onlineInstances = getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap);
}
if (newSegmentPushTimeMap.containsKey(segment)) {
if (newSegmentCreationTimeMap.containsKey(segment)) {
newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
} else {
oldSegmentToOnlineInstancesMap.put(segment, onlineInstances);
Expand Down Expand Up @@ -170,7 +170,7 @@ void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<Str
candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance)));
}
}
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentPushTimeMap.get(segment), candidates));
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
}
}
}
Loading

0 comments on commit cf460aa

Please sign in to comment.