-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC for segment replication. #2075
Changes from 3 commits
507bdab
12bf217
fc7ca0b
a48d1f3
b1a0510
9728020
dac3a4a
db0baac
7d0c871
f619be7
f646a9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,11 +61,11 @@ public class RefreshResponse extends BroadcastResponse { | |
declareBroadcastFields(PARSER); | ||
} | ||
|
||
RefreshResponse(StreamInput in) throws IOException { | ||
public RefreshResponse(StreamInput in) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like you only use the other constructor. Does this one need to be public? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is used to inside of
With that said - this was me being quick and dirty again for the poc. We shouldn't be reusing RefreshResponse here and instead create a new response type. Will make a separate task. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
super(in); | ||
} | ||
|
||
RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) { | ||
public RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) { | ||
super(totalShards, successfulShards, failedShards, shardFailures); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,6 +135,7 @@ | |
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.cluster.metadata.IndexMetadata.APIBlock; | ||
import org.opensearch.common.Nullable; | ||
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; | ||
|
||
/** | ||
* Administrative actions/operations against indices. | ||
|
@@ -405,6 +406,13 @@ public interface IndicesAdminClient extends OpenSearchClient { | |
*/ | ||
void refresh(RefreshRequest request, ActionListener<RefreshResponse> listener); | ||
|
||
/** | ||
* Publish the latest primary checkpoint to replica shards. | ||
* @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest | ||
* @param listener A listener to be notified with a result | ||
*/ | ||
void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason why you're re-using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 100%, I was cutting corners. Same as comment above making a separate issue to clean this up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
/** | ||
* Explicitly refresh one or more indices (making the content indexed since the last refresh searchable). | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,7 @@ | |
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; | ||
import org.opensearch.indices.mapper.MapperRegistry; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; | ||
import org.opensearch.plugins.IndexStorePlugin; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.search.aggregations.support.ValuesSourceRegistry; | ||
|
@@ -466,7 +467,8 @@ public IndexService newIndexService( | |
IndicesFieldDataCache indicesFieldDataCache, | ||
NamedWriteableRegistry namedWriteableRegistry, | ||
BooleanSupplier idFieldDataEnabled, | ||
ValuesSourceRegistry valuesSourceRegistry | ||
ValuesSourceRegistry valuesSourceRegistry, | ||
TransportCheckpointPublisher checkpointPublisher | ||
) throws IOException { | ||
final IndexEventListener eventListener = freeze(); | ||
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper | ||
|
@@ -520,8 +522,8 @@ public IndexService newIndexService( | |
allowExpensiveQueries, | ||
expressionResolver, | ||
valuesSourceRegistry, | ||
recoveryStateFactory | ||
); | ||
recoveryStateFactory, | ||
checkpointPublisher); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooof, we should really think about refactoring this constructor, and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree - I think we should revisit this after we have separate implementations of IndexShard and address how to load things as a module. |
||
success = true; | ||
return indexService; | ||
} finally { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,6 +94,7 @@ | |
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; | ||
import org.opensearch.indices.mapper.MapperRegistry; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; | ||
import org.opensearch.plugins.IndexStorePlugin; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.search.aggregations.support.ValuesSourceRegistry; | ||
|
@@ -165,6 +166,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust | |
private final IndexNameExpressionResolver expressionResolver; | ||
private final Supplier<Sort> indexSortSupplier; | ||
private final ValuesSourceRegistry valuesSourceRegistry; | ||
private final TransportCheckpointPublisher checkpointPublisher; | ||
|
||
public IndexService( | ||
IndexSettings indexSettings, | ||
|
@@ -195,8 +197,8 @@ public IndexService( | |
BooleanSupplier allowExpensiveQueries, | ||
IndexNameExpressionResolver expressionResolver, | ||
ValuesSourceRegistry valuesSourceRegistry, | ||
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory | ||
) { | ||
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, | ||
TransportCheckpointPublisher checkpointPublisher) { | ||
super(indexSettings); | ||
this.allowExpensiveQueries = allowExpensiveQueries; | ||
this.indexSettings = indexSettings; | ||
|
@@ -206,6 +208,7 @@ public IndexService( | |
this.circuitBreakerService = circuitBreakerService; | ||
this.expressionResolver = expressionResolver; | ||
this.valuesSourceRegistry = valuesSourceRegistry; | ||
this.checkpointPublisher = checkpointPublisher; | ||
if (needsMapperService(indexSettings, indexCreationContext)) { | ||
assert indexAnalyzers != null; | ||
this.mapperService = new MapperService( | ||
|
@@ -520,8 +523,8 @@ public synchronized IndexShard createShard( | |
indexingOperationListeners, | ||
() -> globalCheckpointSyncer.accept(shardId), | ||
retentionLeaseSyncer, | ||
circuitBreakerService | ||
); | ||
circuitBreakerService, | ||
checkpointPublisher); | ||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); | ||
eventListener.afterIndexShardCreated(indexShard); | ||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); | ||
|
@@ -911,7 +914,9 @@ private void maybeRefreshEngine(boolean force) { | |
if (indexSettings.getRefreshInterval().millis() > 0 || force) { | ||
for (IndexShard shard : this.shards.values()) { | ||
try { | ||
shard.scheduledRefresh(); | ||
if (shard.routingEntry().primary()) { | ||
shard.scheduledRefresh(); | ||
} | ||
Comment on lines
+917
to
+919
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retain code as-is and move the gating into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack - #2197 |
||
} catch (IndexShardClosedException | AlreadyClosedException ex) { | ||
// fine - continue; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -508,7 +508,7 @@ public final class IndexSettings { | |
*/ | ||
public static final Setting<Boolean> INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting( | ||
"index.replication.segment_replication", | ||
false, | ||
true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to commit this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will remove given our first goal is getting this working alongside docrep and we want to get gradle check to green. |
||
Property.IndexScope, | ||
Property.Final | ||
); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -249,6 +249,9 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { | |
} | ||
} | ||
|
||
public void updateCurrentInfos(byte[] infosBytes, long gen) throws IOException {}; | ||
|
||
|
||
/** | ||
* A throttling class that can be activated, causing the | ||
* {@code acquireThrottle} method to block on a lock when throttling | ||
|
@@ -1196,6 +1199,10 @@ public abstract void forceMerge( | |
*/ | ||
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException; | ||
|
||
public SegmentInfosRef getLatestSegmentInfosSafe() { return null; }; | ||
|
||
public SegmentInfos getLatestSegmentInfos() { return null; }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit - could you add documentation to all of these new APIs to clarify what they're expected to do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
/** | ||
* Snapshots the most recent safe index commit from the engine. | ||
*/ | ||
|
@@ -1999,6 +2006,28 @@ public IndexCommit getIndexCommit() { | |
} | ||
} | ||
|
||
public static class SegmentInfosRef implements Closeable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move to top-level class. As a broader note, we should see if there's an opportunity to generalize this Decorator design pattern instead of having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack - #2202 |
||
private final AtomicBoolean closed = new AtomicBoolean(); | ||
private final CheckedRunnable<IOException> onClose; | ||
private final SegmentInfos segmentInfos; | ||
|
||
public SegmentInfosRef(SegmentInfos segmentInfos, CheckedRunnable<IOException> onClose) { | ||
this.segmentInfos = segmentInfos; | ||
this.onClose = onClose; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
if (closed.compareAndSet(false, true)) { | ||
onClose.run(); | ||
} | ||
} | ||
|
||
public SegmentInfos getSegmentInfos() { | ||
return segmentInfos; | ||
} | ||
} | ||
|
||
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,7 @@ public final class EngineConfig { | |
private final CircuitBreakerService circuitBreakerService; | ||
private final LongSupplier globalCheckpointSupplier; | ||
private final Supplier<RetentionLeases> retentionLeasesSupplier; | ||
private boolean isPrimary; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: move this attribute to IndexShard There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IndexShard actually populates this when wiring up the engine so we can conditionally make InternalEngine readOnly for replicas. I think a better route here is wiring up a different engine class for replicas. |
||
|
||
/** | ||
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been | ||
|
@@ -169,8 +170,8 @@ public EngineConfig( | |
LongSupplier globalCheckpointSupplier, | ||
Supplier<RetentionLeases> retentionLeasesSupplier, | ||
LongSupplier primaryTermSupplier, | ||
TombstoneDocSupplier tombstoneDocSupplier | ||
) { | ||
TombstoneDocSupplier tombstoneDocSupplier, | ||
boolean isPrimary) { | ||
this( | ||
shardId, | ||
threadPool, | ||
|
@@ -193,7 +194,7 @@ public EngineConfig( | |
circuitBreakerService, | ||
globalCheckpointSupplier, | ||
retentionLeasesSupplier, | ||
primaryTermSupplier, | ||
isPrimary, primaryTermSupplier, | ||
tombstoneDocSupplier | ||
); | ||
} | ||
|
@@ -223,7 +224,7 @@ public EngineConfig( | |
CircuitBreakerService circuitBreakerService, | ||
LongSupplier globalCheckpointSupplier, | ||
Supplier<RetentionLeases> retentionLeasesSupplier, | ||
LongSupplier primaryTermSupplier, | ||
boolean isPrimary, LongSupplier primaryTermSupplier, | ||
TombstoneDocSupplier tombstoneDocSupplier | ||
) { | ||
this.shardId = shardId; | ||
|
@@ -237,6 +238,7 @@ public EngineConfig( | |
this.codecService = codecService; | ||
this.eventListener = eventListener; | ||
codecName = indexSettings.getValue(INDEX_CODEC_SETTING); | ||
this.isPrimary = isPrimary; | ||
// We need to make the indexing buffer for this shard at least as large | ||
// as the amount of memory that is available for all engines on the | ||
// local node so that decisions to flush segments to disk are made by | ||
|
@@ -458,6 +460,10 @@ public LongSupplier getPrimaryTermSupplier() { | |
return primaryTermSupplier; | ||
} | ||
|
||
public boolean isPrimary() { | ||
return isPrimary; | ||
} | ||
Comment on lines
+463
to
+465
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (See my other comment) The notion should of being a "primary" should be encapsulated in the |
||
|
||
/** | ||
* A supplier supplies tombstone documents which will be used in soft-update methods. | ||
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to not break document replication,
isSegmentReplicationEnabled
method from my PRisSegmentReplicationEnabled
within IndexShard's flush method and no-op itEven though the
flush
method returns a CommitId, I checked all invocations in the codebase and nowhere is the return value accessed/used. So it's safe to returnnull
or even change the method to have avoid
return type.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#2197