Skip to content

Commit

Permalink
Added Granfa suppoert for EF
Browse files Browse the repository at this point in the history
  • Loading branch information
randersenYB committed Jul 16, 2024
1 parent c47132a commit 21d639c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
4 changes: 3 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
"cwd": "${workspaceFolder}/aerospike",
"args": [
"--dataset", "glove-100-angular",
"--idxdrop",
"--concurrency", "10000"
],
"console": "integratedTerminal"
Expand Down Expand Up @@ -171,7 +172,8 @@
"--dataset", "glove-100-angular",
"--logfile", "./hdfquery.log",
"--check",
"-r", "10"
"-r", "10",
"--searchparams", "{\"ef\":10}"
],
"console": "integratedTerminal"
},
Expand Down
29 changes: 20 additions & 9 deletions aerospike/aerospikehdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,15 @@ async def put_vector(self, key, embedding, i: int, client: vectorASyncClient, re
self._exception_counter.add(1, {"exception_type":f"upsert: {e}", "handled_by_user":False,"ns":self._namespace,"set":self._setName})
self._pausePuts = False
raise
async def index_exist(self, adminClient: vectorASyncAdminClient) -> bool:

async def index_exist(self, adminClient: vectorASyncAdminClient) -> Union[dict, None]:
existingIndexes = await adminClient.index_list()
return any(index["id"]["namespace"] == self._idx_namespace
and index["id"]["name"] == self._idx_name
for index in existingIndexes)
if len(existingIndexes) == 0:
return None
indexInfo = [(index if index["id"]["namespace"] == self._idx_namespace
and index["id"]["name"] == self._idx_name else None)
for index in existingIndexes]
return next(i for i in indexInfo if i is not None)

async def populate(self) -> None:
'''
Expand All @@ -388,20 +391,25 @@ async def populate(self) -> None:
is_loadbalancer=self._useloadbalancer
) as adminClient:

#If exists, no sense to try creation...
if await self.index_exist(adminClient):
#If exists, no sense to try creation...
idxinfo = await self.index_exist(adminClient)
if idxinfo is not None:
self.print_log(f'Index {self._idx_namespace}.{self._idx_name} Already Exists')

#since this can be an external DB (not in a container), we need to clean up from prior runs
#if the index name is in this list, we know it was created in this run group and don't need to drop the index.
#If it is a fresh run, this list will not contain the index and we know it needs to be dropped.
if self._idx_name in aerospikeIdxNames:
self.print_log(f'Index {self._idx_name} being reused (updated)')
self._idx_hnswparams = BaseAerospike.set_hnsw_params_attrs(vectorTypes.HnswParams(),
idxinfo)
elif self._idx_drop:
await self.drop_index(adminClient)
await self.create_index(adminClient)
else:
self.print_log(f'Index {self._idx_namespace}.{self._idx_name} being updated')
self._idx_hnswparams = BaseAerospike.set_hnsw_params_attrs(vectorTypes.HnswParams(),
idxinfo)
else:
await self.create_index(adminClient)

Expand Down Expand Up @@ -496,11 +504,14 @@ async def query(self) -> None:
is_loadbalancer=self._useloadbalancer
) as adminClient:

if not await self.index_exist(adminClient):
idxinfo = await self.index_exist(adminClient)
if idxinfo is None:
self.print_log(f'Query: Vector Index: {self._idx_namespace}.{self._idx_name}, not found')
self._exception_counter.add(1, {"exception_type":"Index not found", "handled_by_user":False,"ns":self._idx_namespace,"set":self._idx_name})
raise FileNotFoundError(f"Vector Index {self._idx_namespace}.{self._idx_name} not found")

self._idx_hnswparams = BaseAerospike.set_hnsw_params_attrs(vectorTypes.HnswParams(),
idxinfo)

self.print_log(f'Starting Query Runs ({self._query_runs}) on {self._idx_namespace}.{self._idx_name}')
metricfunc = None
distancemetric : DistanceMetric= None
Expand Down
24 changes: 20 additions & 4 deletions aerospike/baseaerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ def prometheus_status(self, i:int, done:bool = False) -> None:
pausestate = "Running"
else:
pausestate = "Idle"


if self._query_hnswparams is None:
queryef = '' if self._idx_hnswparams is None else str(self._idx_hnswparams.ef)
else:
queryef = self._query_hnswparams.ef

self._prometheus_heartbeat_gauge.set(i, {"ns":self._namespace,
"set":self._setName,
"idxns":self._idx_namespace,
Expand All @@ -387,7 +392,9 @@ def prometheus_status(self, i:int, done:bool = False) -> None:
"remainingquerynbrs" : self._remainingquerynbrs,
"querymetric": None if self._query_metric is None else self._query_metric["type"],
"querymetricvalue": self._query_metric_value,
"querymetricaerospikevalue": self._aerospike_metric_value
"querymetricaerospikevalue": self._aerospike_metric_value,
"hnswparams": self.hnswstr(),
"queryef": queryef
})

def _prometheus_heartbeat(self) -> None:
Expand Down Expand Up @@ -454,9 +461,18 @@ def populate_index(self, train: np.array) -> None:
def query(self, query: np.array, limit: int) -> List[vectorTypes.Neighbor]:
pass

def hnswstr(self) -> str:
if self._idx_hnswparams is None:
return ''
if self._idx_hnswparams.batching_params is None:
batchingparams = ''
else:
batchingparams = f"maxrecs:{self._idx_hnswparams.batching_params.max_records}, interval:{self._idx_hnswparams.batching_params.interval}, disabled:{self._idx_hnswparams.batching_params.disabled}"
return f"m:{self._idx_hnswparams.m}, efconst:{self._idx_hnswparams.ef_construction}, ef:{self._idx_hnswparams.ef}, batching:{{{batchingparams}}}"

def basestring(self) -> str:
batchingparams = f"maxrecs:{self._idx_hnswparams.batching_params.max_records}, interval:{self._idx_hnswparams.batching_params.interval}, disabled:{self._idx_hnswparams.batching_params.disabled}"
hnswparams = f"m:{self._idx_hnswparams.m}, efconst:{self._idx_hnswparams.ef_construction}, ef:{self._idx_hnswparams.ef}, batching:{{{batchingparams}}}"
hnswparams = self.hnswstr()

if self._query_hnswparams is None:
searchhnswparams = ""
else:
Expand Down

0 comments on commit 21d639c

Please sign in to comment.