Skip to content

Commit

Permalink
Add DocValuesProducers for releasing memory when close index (#1946)
Browse files Browse the repository at this point in the history
Add DocValuesProducers for releasing memory when close index #1946

(cherry picked from commit 004fcc0)
  • Loading branch information
luyuncheng authored and github-actions[bot] committed Sep 14, 2024
1 parent cddbded commit c165756
Show file tree
Hide file tree
Showing 14 changed files with 569 additions and 29 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
### Bug Fixes
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
### Infrastructure
### Documentation
### Maintenance
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN80Codec;

import lombok.Getter;
import org.apache.lucene.codecs.CompoundDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.knn.index.engine.KNNEngine;

import java.io.IOException;
import java.util.Set;

public class KNN80CompoundDirectory extends CompoundDirectory {

@Getter
private CompoundDirectory delegate;
@Getter
private Directory dir;

public KNN80CompoundDirectory(CompoundDirectory delegate, Directory dir) {
this.delegate = delegate;
this.dir = dir;
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
}

@Override
public String[] listAll() throws IOException {
return delegate.listAll();
}

@Override
public long fileLength(String name) throws IOException {
return delegate.fileLength(name);
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().stream().anyMatch(engine -> name.endsWith(engine.getCompoundExtension()))) {
return dir.openInput(name, context);
}
return delegate.openInput(name, context);
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public Set<String> getPendingDeletions() throws IOException {
return delegate.getPendingDeletions();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public KNN80CompoundFormat(CompoundFormat delegate) {

@Override
public CompoundDirectory getCompoundReader(Directory dir, SegmentInfo si, IOContext context) throws IOException {
return delegate.getCompoundReader(dir, si, context);
return new KNN80CompoundDirectory(delegate.getCompoundReader(dir, si, context), dir);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept

@Override
public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException {
return delegate.fieldsProducer(state);
return new KNN80DocValuesProducer(delegate.fieldsProducer(state), state);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.index.codec.KNN80Codec;

import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.io.PathUtils;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

@Log4j2
public class KNN80DocValuesProducer extends DocValuesProducer {

private final SegmentReadState state;
private final DocValuesProducer delegate;
private final NativeMemoryCacheManager nativeMemoryCacheManager;
private final Map<String, String> indexPathMap = new HashMap();

public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state) {
this.delegate = delegate;
this.state = state;
this.nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();

Directory directory = state.directory;
// directory would be CompoundDirectory, we need get directory firstly and then unwrap
if (state.directory instanceof KNN80CompoundDirectory) {
directory = ((KNN80CompoundDirectory) state.directory).getDir();
}

Directory dir = FilterDirectory.unwrap(directory);
if (!(dir instanceof FSDirectory)) {
log.warn("{} can not casting to FSDirectory", directory);
return;
}
String directoryPath = ((FSDirectory) dir).getDirectory().toString();
for (FieldInfo field : state.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
}
// Only Native Engine put into indexPathMap
KNNEngine knnEngine = getNativeKNNEngine(field);
if (knnEngine == null) {
continue;
}
List<String> engineFiles = KNNCodecUtil.getEngineFiles(knnEngine.getExtension(), field.name, state.segmentInfo);
Path indexPath = PathUtils.get(directoryPath, engineFiles.get(0));
indexPathMap.putIfAbsent(field.getName(), indexPath.toString());
}
}

@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
return delegate.getBinary(field);
}

@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
return delegate.getNumeric(field);
}

@Override
public SortedDocValues getSorted(FieldInfo field) throws IOException {
return delegate.getSorted(field);
}

@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
return delegate.getSortedNumeric(field);
}

@Override
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
return delegate.getSortedSet(field);
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
}

@Override
public void close() throws IOException {
for (String path : indexPathMap.values()) {
nativeMemoryCacheManager.invalidate(path);
}
delegate.close();
}

public final List<String> getOpenedIndexPath() {
return new ArrayList<>(indexPathMap.values());
}

/**
* Get KNNEngine From FieldInfo
*
* @param field which field we need produce from engine
* @return if and only if Native Engine we return specific engine, else return null
*/
private KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) {

final String modelId = field.attributes().get(MODEL_ID);
if (modelId != null) {
return null;
}
KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field);
if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) {
return engine;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@
package org.opensearch.knn.index.codec.util;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.SegmentInfo;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.KNN80Codec.KNN80BinaryDocValues;
import org.opensearch.knn.index.engine.KNNEngine;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

public class KNNCodecUtil {
// Floats are 4 bytes in size
Expand Down Expand Up @@ -53,4 +60,28 @@ public static long getTotalLiveDocsCount(final BinaryDocValues binaryDocValues)
}
return totalLiveDocs;
}

/**
* Get Engine Files from segment with specific fieldName and engine extension
*
* @param extension Engine extension comes from {@link KNNEngine#getExtension()}}
* @param fieldName Filed for knn field
* @param segmentInfo {@link SegmentInfo} One Segment info to use for compute.
* @return List of engine files
*/
public static List<String> getEngineFiles(String extension, String fieldName, SegmentInfo segmentInfo) {
/*
* In case of compound file, extension would be <engine-extension> + c otherwise <engine-extension>
*/
String engineExtension = segmentInfo.getUseCompoundFile() ? extension + KNNConstants.COMPOUND_EXTENSION : extension;
String engineSuffix = fieldName + engineExtension;
String underLineEngineSuffix = "_" + engineSuffix;

List<String> engineFiles = segmentInfo.files()
.stream()
.filter(fileName -> fileName.endsWith(underLineEngineSuffix))
.sorted(Comparator.comparingInt(String::length))
.collect(Collectors.toList());
return engineFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.Getter;
import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.common.concurrent.RefCountedReleasable;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.query.KNNWeight;
import org.opensearch.knn.jni.JNIService;
Expand Down Expand Up @@ -79,6 +80,26 @@ public interface NativeMemoryAllocation {
*/
int getSizeInKB();

/**
* Increments the refCount of this instance.
*
* @see #decRef
* @throws IllegalStateException iff the reference counter can not be incremented.
*/
default void incRef() {}

/**
* Decreases the refCount of this instance. If the refCount drops to 0, then this
* instance is considered as closed and should not be used anymore.
*
* @see #incRef
*
* @return returns {@code true} if the ref count dropped to 0 as a result of calling this method
*/
default boolean decRef() {
return true;
}

/**
* Represents native indices loaded into memory. Because these indices are backed by files, they should be
* freed when file is deleted.
Expand All @@ -100,6 +121,7 @@ class IndexAllocation implements NativeMemoryAllocation {
private final SharedIndexState sharedIndexState;
@Getter
private final boolean isBinaryIndex;
private final RefCountedReleasable<IndexAllocation> refCounted;

/**
* Constructor
Expand Down Expand Up @@ -158,10 +180,10 @@ class IndexAllocation implements NativeMemoryAllocation {
this.watcherHandle = watcherHandle;
this.sharedIndexState = sharedIndexState;
this.isBinaryIndex = isBinaryIndex;
this.refCounted = new RefCountedReleasable<>("IndexAllocation-Reference", this, this::closeInternal);
}

@Override
public void close() {
protected void closeInternal() {
Runnable onClose = () -> {
writeLock();
cleanup();
Expand All @@ -177,6 +199,13 @@ public void close() {
}
}

@Override
public void close() {
if (!closed && refCounted.refCount() > 0) {
refCounted.close();
}
}

private void cleanup() {
if (this.closed) {
return;
Expand Down Expand Up @@ -240,6 +269,16 @@ public void writeUnlock() {
public int getSizeInKB() {
return size;
}

@Override
public void incRef() {
refCounted.incRef();
}

@Override
public boolean decRef() {
return refCounted.decRef();
}
}

/**
Expand Down
Loading

0 comments on commit c165756

Please sign in to comment.