Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-11680: Add source sstable/memtable id to vector traces #1411

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public static class VectorOnHeapSegmentBuilder extends SegmentBuilder
public VectorOnHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, long keyCount, NamedMemoryLimiter limiter)
{
super(components, rowIdOffset, limiter);
graphIndex = new CassandraOnHeapGraph<>(components.context(), false);
graphIndex = new CassandraOnHeapGraph<>(components.context(), false, null);
totalBytesAllocated = graphIndex.ramBytesUsed();
totalBytesAllocatedConcurrent.add(totalBytesAllocated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public IndexSearcher newIndexSearcher(SSTableContext sstableContext,
SegmentMetadata segmentMetadata) throws IOException
{
if (indexContext.isVector())
return new V3VectorIndexSearcher(sstableContext.primaryKeyMapFactory(), indexFiles, segmentMetadata, indexContext);
return new V3VectorIndexSearcher(sstableContext, indexFiles, segmentMetadata, indexContext);
if (indexContext.isLiteral())
return new V3InvertedIndexSearcher(sstableContext, indexFiles, segmentMetadata, indexContext);
return super.newIndexSearcher(sstableContext, indexContext, indexFiles, segmentMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;

import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
Expand All @@ -33,15 +34,15 @@
*/
public class V3VectorIndexSearcher extends V2VectorIndexSearcher
{
public V3VectorIndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory,
public V3VectorIndexSearcher(SSTableContext sstableContext,
PerIndexFiles perIndexFiles,
SegmentMetadata segmentMetadata,
IndexContext indexContext) throws IOException
{
super(primaryKeyMapFactory,
super(sstableContext.primaryKeyMapFactory(),
perIndexFiles,
segmentMetadata,
indexContext,
new CassandraDiskAnn(segmentMetadata.componentMetadatas, perIndexFiles, indexContext, V2OnDiskOrdinalsMap::new));
new CassandraDiskAnn(sstableContext, segmentMetadata.componentMetadatas, perIndexFiles, indexContext, V2OnDiskOrdinalsMap::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public IndexSearcher newIndexSearcher(SSTableContext sstableContext,
SegmentMetadata segmentMetadata) throws IOException
{
if (indexContext.isVector())
return new V5VectorIndexSearcher(sstableContext.primaryKeyMapFactory(), indexFiles, segmentMetadata, indexContext);
return new V5VectorIndexSearcher(sstableContext, indexFiles, segmentMetadata, indexContext);
return super.newIndexSearcher(sstableContext, indexContext, indexFiles, segmentMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
Expand All @@ -31,16 +32,16 @@
*/
public class V5VectorIndexSearcher extends V2VectorIndexSearcher
{
public V5VectorIndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory,
public V5VectorIndexSearcher(SSTableContext sstableContext,
PerIndexFiles perIndexFiles,
SegmentMetadata segmentMetadata,
IndexContext indexContext) throws IOException
{
// inherits from V2 instead of V3 because the difference between V5 and V3 is the OnDiskOrdinalsMap that they use
super(primaryKeyMapFactory,
super(sstableContext.primaryKeyMapFactory(),
perIndexFiles,
segmentMetadata,
indexContext,
new CassandraDiskAnn(segmentMetadata.componentMetadatas, perIndexFiles, indexContext, V5OnDiskOrdinalsMap::new));
new CassandraDiskAnn(sstableContext, segmentMetadata.componentMetadatas, perIndexFiles, indexContext, V5OnDiskOrdinalsMap::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AutoResumingNodeScoreIterator extends AbstractIterator<SearchResult
private final int limit;
private final int rerankK;
private final boolean inMemory;
private final String source;
private final IntConsumer nodesVisitedConsumer;
private Iterator<SearchResult.NodeScore> nodeScores;
private int cumulativeNodesVisited;
Expand All @@ -54,14 +55,16 @@ public class AutoResumingNodeScoreIterator extends AbstractIterator<SearchResult
* @param limit the limit to pass to the {@link GraphSearcher} when resuming search
* @param rerankK the rerankK to pass to the {@link GraphSearcher} when resuming search
* @param inMemory whether the graph is in memory or on disk (used for trace logging)
* @param source the source of the search (used for trace logging)
*/
public AutoResumingNodeScoreIterator(GraphSearcher searcher,
GraphSearcherAccessManager accessManager,
SearchResult result,
IntConsumer nodesVisitedConsumer,
int limit,
int rerankK,
boolean inMemory)
boolean inMemory,
String source)
{
this.searcher = searcher;
this.accessManager = accessManager;
Expand All @@ -71,6 +74,7 @@ public AutoResumingNodeScoreIterator(GraphSearcher searcher,
this.limit = max(1, limit / 2); // we shouldn't need as many results on resume
this.rerankK = rerankK;
this.inMemory = inMemory;
this.source = source;
}

@Override
Expand All @@ -89,11 +93,9 @@ protected SearchResult.NodeScore computeNext()

private void maybeLogTrace(SearchResult result)
{
if (!Tracing.isTracing())
return;
String msg = inMemory ? "ANN resume for {}/{} visited {} nodes, reranked {} to return {} results"
: "DiskANN resume for {}/{} visited {} nodes, reranked {} to return {} results";
Tracing.trace(msg, limit, rerankK, result.getVisitedCount(), result.getRerankedCount(), result.getNodes().length);
String msg = inMemory ? "ANN resume for {}/{} visited {} nodes, reranked {} to return {} results from {}"
: "DiskANN resume for {}/{} visited {} nodes, reranked {} to return {} results from {}";
Tracing.trace(msg, limit, rerankK, result.getVisitedCount(), result.getRerankedCount(), result.getNodes().length, source);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import io.github.jbellis.jvector.vector.types.VectorFloat;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat;
import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter.Structure;
import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph.PQVersion;
import org.apache.cassandra.index.sai.utils.RowIdWithScore;
import org.apache.cassandra.io.sstable.SSTableId;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.tracing.Tracing;
Expand All @@ -65,6 +67,7 @@ public class CassandraDiskAnn
protected final PerIndexFiles indexFiles;
protected final SegmentMetadata.ComponentMetadataMap componentMetadatas;

private final SSTableId<?> source;
private final FileHandle graphHandle;
private final OnDiskOrdinalsMap ordinalsMap;
private final Set<FeatureId> features;
Expand All @@ -79,8 +82,9 @@ public class CassandraDiskAnn

private final ExplicitThreadLocal<GraphSearcherAccessManager> searchers;

public CassandraDiskAnn(SegmentMetadata.ComponentMetadataMap componentMetadatas, PerIndexFiles indexFiles, IndexContext context, OrdinalsMapFactory omFactory) throws IOException
public CassandraDiskAnn(SSTableContext sstableContext, SegmentMetadata.ComponentMetadataMap componentMetadatas, PerIndexFiles indexFiles, IndexContext context, OrdinalsMapFactory omFactory) throws IOException
{
this.source = sstableContext.sstable().getId();
this.componentMetadatas = componentMetadatas;
this.indexFiles = indexFiles;

Expand Down Expand Up @@ -253,8 +257,8 @@ else if (compressedVectors == null)
var result = searcher.search(ssp, limit, rerankK, threshold, context.getAnnRerankFloor(), ordinalsMap.ignoringDeleted(acceptBits));
if (V3OnDiskFormat.ENABLE_RERANK_FLOOR)
context.updateAnnRerankFloor(result.getWorstApproximateScoreInTopK());
Tracing.trace("DiskANN search for {}/{} visited {} nodes, reranked {} to return {} results",
limit, rerankK, result.getVisitedCount(), result.getRerankedCount(), result.getNodes().length);
Tracing.trace("DiskANN search for {}/{} visited {} nodes, reranked {} to return {} results from {}",
limit, rerankK, result.getVisitedCount(), result.getRerankedCount(), result.getNodes().length, source);
if (threshold > 0)
{
// Threshold based searches are comprehensive and do not need to resume the search.
Expand All @@ -265,7 +269,7 @@ else if (compressedVectors == null)
}
else
{
var nodeScores = new AutoResumingNodeScoreIterator(searcher, graphAccessManager, result, nodesVisitedConsumer, limit, rerankK, false);
var nodeScores = new AutoResumingNodeScoreIterator(searcher, graphAccessManager, result, nodesVisitedConsumer, limit, rerankK, false, source.toString());
return new NodeScoreToRowIdWithScoreIterator(nodeScores, ordinalsMap.getRowIdsView());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.agrona.collections.IntHashSet;
import org.apache.cassandra.db.compaction.CompactionSSTable;
import org.apache.cassandra.db.marshal.VectorType;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.QueryContext;
Expand Down Expand Up @@ -109,6 +110,8 @@ public enum PQVersion {
private static final Logger logger = LoggerFactory.getLogger(CassandraOnHeapGraph.class);
private static final VectorTypeSupport vts = VectorizationProvider.getInstance().getVectorTypeSupport();

// We use the metable reference for easier tracing.
private final String source;
private final ConcurrentVectorValues vectorValues;
private final GraphIndexBuilder builder;
private final VectorType.VectorSerializer serializer;
Expand All @@ -128,8 +131,11 @@ public enum PQVersion {
/**
* @param forSearching if true, vectorsByKey will be initialized and populated with vectors as they are added
*/
public CassandraOnHeapGraph(IndexContext context, boolean forSearching)
public CassandraOnHeapGraph(IndexContext context, boolean forSearching, Memtable memtable)
{
this.source = memtable == null
? "null"
: memtable.getClass().getSimpleName() + '@' + Integer.toHexString(memtable.hashCode());
var indexConfig = context.getIndexWriterConfig();
var termComparator = context.getValidator();
serializer = (VectorType.VectorSerializer) termComparator.getSerializer();
Expand Down Expand Up @@ -321,16 +327,16 @@ public CloseableIterator<SearchResult.NodeScore> search(QueryContext context, Ve
var ssf = SearchScoreProvider.exact(queryVector, similarityFunction, vectorValues);
var rerankK = sourceModel.rerankKFor(limit, VectorCompression.NO_COMPRESSION);
var result = searcher.search(ssf, limit, rerankK, threshold, 0.0f, bits);
Tracing.trace("ANN search for {}/{} visited {} nodes, reranked {} to return {} results",
limit, rerankK, result.getVisitedCount(), result.getRerankedCount(), result.getNodes().length);
Tracing.trace("ANN search for {}/{} visited {} nodes, reranked {} to return {} results from {}",
limit, rerankK, result.getVisitedCount(), result.getRerankedCount(), result.getNodes().length, source);
context.addAnnNodesVisited(result.getVisitedCount());
if (threshold > 0)
{
// Threshold based searches do not support resuming the search.
graphAccessManager.release();
return CloseableIterator.wrap(Arrays.stream(result.getNodes()).iterator());
}
return new AutoResumingNodeScoreIterator(searcher, graphAccessManager, result, context::addAnnNodesVisited, limit, rerankK, true);
return new AutoResumingNodeScoreIterator(searcher, graphAccessManager, result, context::addAnnNodesVisited, limit, rerankK, true, source);
}
catch (Throwable t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class VectorMemtableIndex implements MemtableIndex
public VectorMemtableIndex(IndexContext indexContext, Memtable mt)
{
this.indexContext = indexContext;
this.graph = new CassandraOnHeapGraph<>(indexContext, true);
this.graph = new CassandraOnHeapGraph<>(indexContext, true, mt);
this.mt = mt;
}

Expand Down