Skip to content

Commit

Permalink
Split CBService into separate classes
Browse files Browse the repository at this point in the history
Splits the CBService into 2 separate classes. One contains the logic for
querying information about the circuit breaker (i.e. whether or not it
is tripped). The other contains the logic for running the periodic
monitoring workflow.

Signed-off-by: John Mazanec <[email protected]>
  • Loading branch information
jmazanec15 committed Mar 28, 2023
1 parent ec68514 commit 791b489
Show file tree
Hide file tree
Showing 22 changed files with 555 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.plugin.stats.KNNStats;

import java.util.Collections;
Expand All @@ -24,7 +24,7 @@ public class StatsIT extends AbstractRollingUpgradeTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService = mock(NativeMemoryCircuitBreakerService.class);
NativeMemoryCircuitBreaker nativeMemoryCircuitBreakerService = mock(NativeMemoryCircuitBreaker.class);
this.knnStats = new KNNStats(nativeMemoryCircuitBreakerService);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Str
*/
public synchronized void updateBooleanSetting(String settingName, boolean value) {
ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest();
Settings circuitBreakerSettings = Settings.builder().put("unregistered-setting-lets-see-what-happens", value).build();
Settings circuitBreakerSettings = Settings.builder().put(settingName, value).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.index.mapper.ValueFetcher;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.QueryShardException;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.index.KNNMethodContext;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.KNNVectorIndexFieldData;
Expand Down Expand Up @@ -145,12 +145,12 @@ public static class Builder extends ParametrizedFieldMapper.Builder {
protected String efConstruction;

protected ModelDao modelDao;
protected NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService;
protected NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker;

public Builder(String name, ModelDao modelDao, NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService) {
public Builder(String name, ModelDao modelDao, NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker) {
super(name);
this.modelDao = modelDao;
this.nativeMemoryCircuitBreakerService = nativeMemoryCircuitBreakerService;
this.nativeMemoryCircuitBreaker = nativeMemoryCircuitBreaker;
}

/**
Expand All @@ -167,13 +167,13 @@ public Builder(
String spaceType,
String m,
String efConstruction,
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService
NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker
) {
super(name);
this.spaceType = spaceType;
this.m = m;
this.efConstruction = efConstruction;
this.nativeMemoryCircuitBreakerService = nativeMemoryCircuitBreakerService;
this.nativeMemoryCircuitBreaker = nativeMemoryCircuitBreaker;
}

@Override
Expand Down Expand Up @@ -227,7 +227,7 @@ public KNNVectorFieldMapper build(BuilderContext context) {
.stored(stored.get())
.hasDocValues(hasDocValues.get())
.knnMethodContext(knnMethodContext)
.nativeMemoryCircuitBreakerService(nativeMemoryCircuitBreakerService)
.nativeMemoryCircuitBreakerService(nativeMemoryCircuitBreaker)
.build();
return new LuceneFieldMapper(createLuceneFieldMapperInput);
}
Expand All @@ -239,7 +239,7 @@ public KNNVectorFieldMapper build(BuilderContext context) {
ignoreMalformed,
stored.get(),
hasDocValues.get(),
nativeMemoryCircuitBreakerService,
nativeMemoryCircuitBreaker,
knnMethodContext
);
}
Expand All @@ -259,7 +259,7 @@ public KNNVectorFieldMapper build(BuilderContext context) {
ignoreMalformed,
stored.get(),
hasDocValues.get(),
nativeMemoryCircuitBreakerService,
nativeMemoryCircuitBreaker,
modelDao,
modelIdAsString
);
Expand All @@ -286,7 +286,7 @@ public KNNVectorFieldMapper build(BuilderContext context) {
ignoreMalformed,
stored.get(),
hasDocValues.get(),
nativeMemoryCircuitBreakerService,
nativeMemoryCircuitBreaker,
spaceType,
m,
efConstruction
Expand Down Expand Up @@ -318,23 +318,16 @@ public static class TypeParser implements Mapper.TypeParser {
// Use a supplier here because in {@link org.opensearch.knn.KNNPlugin#getMappers()} the ModelDao has not yet
// been initialized
private final Supplier<ModelDao> modelDaoSupplier;
private final Supplier<NativeMemoryCircuitBreakerService> nativeMemoryCircuitBreakerServiceSupplier;
private final Supplier<NativeMemoryCircuitBreaker> nativeMemoryCircuitBreakerSupplier;

public TypeParser(
Supplier<ModelDao> modelDaoSupplier,
Supplier<NativeMemoryCircuitBreakerService> knnCircuitBreakerServiceSupplier
) {
public TypeParser(Supplier<ModelDao> modelDaoSupplier, Supplier<NativeMemoryCircuitBreaker> knnCircuitBreakerServiceSupplier) {
this.modelDaoSupplier = modelDaoSupplier;
this.nativeMemoryCircuitBreakerServiceSupplier = knnCircuitBreakerServiceSupplier;
this.nativeMemoryCircuitBreakerSupplier = knnCircuitBreakerServiceSupplier;
}

@Override
public Mapper.Builder<?> parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
Builder builder = new KNNVectorFieldMapper.Builder(
name,
modelDaoSupplier.get(),
nativeMemoryCircuitBreakerServiceSupplier.get()
);
Builder builder = new KNNVectorFieldMapper.Builder(name, modelDaoSupplier.get(), nativeMemoryCircuitBreakerSupplier.get());
builder.parse(name, parserContext, node);

// All <a
Expand Down Expand Up @@ -416,7 +409,7 @@ public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName, S
// subclass (if it is unique).
protected KNNMethodContext knnMethod;
protected String modelId;
protected NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService;
protected NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker;

public KNNVectorFieldMapper(
String simpleName,
Expand All @@ -426,14 +419,14 @@ public KNNVectorFieldMapper(
Explicit<Boolean> ignoreMalformed,
boolean stored,
boolean hasDocValues,
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService
NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker
) {
super(simpleName, mappedFieldType, multiFields, copyTo);
this.ignoreMalformed = ignoreMalformed;
this.stored = stored;
this.hasDocValues = hasDocValues;
this.dimension = mappedFieldType.getDimension();
this.nativeMemoryCircuitBreakerService = nativeMemoryCircuitBreakerService;
this.nativeMemoryCircuitBreaker = nativeMemoryCircuitBreaker;
updateEngineStats();
}

Expand Down Expand Up @@ -472,7 +465,7 @@ protected void parseCreateField(ParseContext context, int dimension) throws IOEx
}

void validateIfCircuitBreakerIsNotTriggered() {
if (nativeMemoryCircuitBreakerService.isCircuitBreakerTriggered()) {
if (nativeMemoryCircuitBreaker.isTripped()) {
throw new IllegalStateException(
"Indexing knn vector fields is rejected as circuit breaker triggered. Check _opendistro/_knn/stats for detailed state"
);
Expand Down Expand Up @@ -545,7 +538,7 @@ protected boolean docValuesByDefault() {

@Override
public ParametrizedFieldMapper.Builder getMergeBuilder() {
return new KNNVectorFieldMapper.Builder(simpleName(), modelDao, nativeMemoryCircuitBreakerService).init(this);
return new KNNVectorFieldMapper.Builder(simpleName(), modelDao, nativeMemoryCircuitBreaker).init(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.opensearch.common.Explicit;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.mapper.ParametrizedFieldMapper;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.util.KNNEngine;

Expand Down Expand Up @@ -46,12 +46,12 @@ public class LegacyFieldMapper extends KNNVectorFieldMapper {
Explicit<Boolean> ignoreMalformed,
boolean stored,
boolean hasDocValues,
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService,
NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker,
String spaceType,
String m,
String efConstruction
) {
super(simpleName, mappedFieldType, multiFields, copyTo, ignoreMalformed, stored, hasDocValues, nativeMemoryCircuitBreakerService);
super(simpleName, mappedFieldType, multiFields, copyTo, ignoreMalformed, stored, hasDocValues, nativeMemoryCircuitBreaker);

this.spaceType = spaceType;
this.m = m;
Expand All @@ -72,13 +72,8 @@ public class LegacyFieldMapper extends KNNVectorFieldMapper {

@Override
public ParametrizedFieldMapper.Builder getMergeBuilder() {
return new KNNVectorFieldMapper.Builder(
simpleName(),
this.spaceType,
this.m,
this.efConstruction,
this.nativeMemoryCircuitBreakerService
).init(this);
return new KNNVectorFieldMapper.Builder(simpleName(), this.spaceType, this.m, this.efConstruction, this.nativeMemoryCircuitBreaker)
.init(this);
}

static String getSpaceType(Settings indexSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.apache.lucene.index.VectorSimilarityFunction;
import org.opensearch.common.Explicit;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.index.KNNMethodContext;
import org.opensearch.knn.index.VectorField;
import org.opensearch.knn.index.util.KNNEngine;
Expand Down Expand Up @@ -45,7 +45,7 @@ public class LuceneFieldMapper extends KNNVectorFieldMapper {
input.getIgnoreMalformed(),
input.isStored(),
input.isHasDocValues(),
input.getNativeMemoryCircuitBreakerService()
input.getNativeMemoryCircuitBreaker()
);

this.knnMethod = input.getKnnMethodContext();
Expand Down Expand Up @@ -131,6 +131,6 @@ static class CreateLuceneFieldMapperInput {
@NonNull
KNNMethodContext knnMethodContext;
@NonNull
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService;
NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.opensearch.common.Explicit;
import org.opensearch.common.Strings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.index.KNNMethodContext;
import org.opensearch.knn.index.util.KNNEngine;

Expand All @@ -33,11 +33,11 @@ public class MethodFieldMapper extends KNNVectorFieldMapper {
Explicit<Boolean> ignoreMalformed,
boolean stored,
boolean hasDocValues,
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService,
NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker,
KNNMethodContext knnMethodContext
) {

super(simpleName, mappedFieldType, multiFields, copyTo, ignoreMalformed, stored, hasDocValues, nativeMemoryCircuitBreakerService);
super(simpleName, mappedFieldType, multiFields, copyTo, ignoreMalformed, stored, hasDocValues, nativeMemoryCircuitBreaker);

this.knnMethod = knnMethodContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.lucene.document.FieldType;
import org.opensearch.common.Explicit;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.indices.ModelMetadata;

Expand All @@ -29,11 +29,11 @@ public class ModelFieldMapper extends KNNVectorFieldMapper {
Explicit<Boolean> ignoreMalformed,
boolean stored,
boolean hasDocValues,
NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService,
NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker,
ModelDao modelDao,
String modelId
) {
super(simpleName, mappedFieldType, multiFields, copyTo, ignoreMalformed, stored, hasDocValues, nativeMemoryCircuitBreakerService);
super(simpleName, mappedFieldType, multiFields, copyTo, ignoreMalformed, stored, hasDocValues, nativeMemoryCircuitBreaker);

this.modelId = modelId;
this.modelDao = modelDao;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreakerService;
import org.opensearch.knn.index.memory.breaker.NativeMemoryCircuitBreaker;
import org.opensearch.knn.plugin.stats.StatNames;

import java.io.Closeable;
Expand All @@ -43,7 +43,7 @@ public class NativeMemoryCacheManager implements Closeable {

private static final Logger logger = LogManager.getLogger(NativeMemoryCacheManager.class);
private static NativeMemoryCacheManager INSTANCE;
private static NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService;
private static NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker;
private Cache<String, NativeMemoryAllocation> cache;
private final ExecutorService executor;
private AtomicBoolean cacheCapacityReached;
Expand Down Expand Up @@ -71,8 +71,8 @@ public static synchronized NativeMemoryCacheManager getInstance() {
private void initialize() {
initialize(
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(nativeMemoryCircuitBreakerService.isCircuitBreakerEnabled())
.maxWeight(nativeMemoryCircuitBreakerService.getCircuitBreakerLimit().getKb())
.isWeightLimited(nativeMemoryCircuitBreaker.isEnabled())
.maxWeight(nativeMemoryCircuitBreaker.getLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -101,8 +101,8 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
cache = cacheBuilder.build();
}

public static void initialize(NativeMemoryCircuitBreakerService nativeMemoryCircuitBreakerService) {
NativeMemoryCacheManager.nativeMemoryCircuitBreakerService = nativeMemoryCircuitBreakerService;
public static void initialize(NativeMemoryCircuitBreaker nativeMemoryCircuitBreaker) {
NativeMemoryCacheManager.nativeMemoryCircuitBreaker = nativeMemoryCircuitBreaker;
}

/**
Expand All @@ -111,8 +111,8 @@ public static void initialize(NativeMemoryCircuitBreakerService nativeMemoryCirc
public synchronized void rebuildCache() {
rebuildCache(
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(nativeMemoryCircuitBreakerService.isCircuitBreakerEnabled())
.maxWeight(nativeMemoryCircuitBreakerService.getCircuitBreakerLimit().getKb())
.isWeightLimited(nativeMemoryCircuitBreaker.isEnabled())
.maxWeight(nativeMemoryCircuitBreaker.getLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -372,15 +372,15 @@ private void onRemoval(RemovalNotification<String, NativeMemoryAllocation> remov
nativeMemoryAllocation.close();

if (RemovalCause.SIZE == removalNotification.getCause()) {
nativeMemoryCircuitBreakerService.setCircuitBreaker(true);
nativeMemoryCircuitBreaker.set(true);
setCacheCapacityReached(true);
}

logger.debug("[KNN] Cache evicted. Key {}, Reason: {}", removalNotification.getKey(), removalNotification.getCause());
}

private Float getSizeAsPercentage(long size) {
long cbLimit = nativeMemoryCircuitBreakerService.getCircuitBreakerLimit().getKb();
long cbLimit = nativeMemoryCircuitBreaker.getLimit().getKb();
if (cbLimit == 0) {
return 0.0F;
}
Expand Down
Loading

0 comments on commit 791b489

Please sign in to comment.