Skip to content

Commit

Permalink
Standalone support for gc and shard splitting (#1222)
Browse files Browse the repository at this point in the history
* support gc and shard splitting with standalone

* fix

* fix

* also on delete

* refactor kb orm

* test fixes

* test fixes

* only 1 exception with that name

* exc for datamanagers/

* type hint
  • Loading branch information
vangheem authored Aug 18, 2023
1 parent 0158614 commit a4fc620
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 118 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 67 additions & 1 deletion nucliadb/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@
ShardsNotFound,
)
from nucliadb.common.datamanagers.cluster import ClusterDataManager
from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager
from nucliadb.common.maindb.driver import Transaction
from nucliadb.common.maindb.utils import get_driver
from nucliadb_protos import noderesources_pb2, nodewriter_pb2, writer_pb2
from nucliadb_protos import (
nodereader_pb2,
noderesources_pb2,
nodewriter_pb2,
writer_pb2,
)
from nucliadb_telemetry import errors
from nucliadb_utils.keys import KB_SHARDS
from nucliadb_utils.utilities import get_indexing, get_storage
Expand Down Expand Up @@ -310,8 +316,53 @@ async def add_resource(
indexpb.shard = replica_id
await indexing.index(indexpb, node_id)

def should_create_new_shard(self, shard_info: noderesources_pb2.Shard) -> bool:
return (
shard_info.paragraphs > settings.max_shard_paragraphs
or shard_info.fields > settings.max_shard_fields
)

async def maybe_create_new_shard(
self, kbid: str, shard_info: noderesources_pb2.Shard
):
if self.should_create_new_shard(shard_info):
logger.warning({"message": "Adding shard", "kbid": kbid})
kbdm = KnowledgeBoxDataManager(get_driver())
model = await kbdm.get_model_metadata(kbid)
driver = get_driver()
async with driver.transaction() as txn:
await self.create_shard_by_kbid(txn, kbid, semantic_model=model)
await txn.commit()


class StandaloneKBShardManager(KBShardManager):
max_ops_before_checks = 25

def __init__(self):
super().__init__()
self._lock = asyncio.Lock()
self._change_count: dict[tuple[str, str], int] = {} # type: ignore

async def _resource_change_event(
self, kbid: str, node_id: str, shard_id: str
) -> None:
if (node_id, shard_id) not in self._change_count:
self._change_count[(node_id, shard_id)] = 0
self._change_count[(node_id, shard_id)] += 1
if self._change_count[(node_id, shard_id)] < self.max_ops_before_checks:
return

self._change_count[(node_id, shard_id)] = 0
async with self._lock:
index_node: Optional[ProxyStandaloneIndexNode] = get_index_node(node_id) # type: ignore
if index_node is None:
return
shard_info: noderesources_pb2.Shard = await index_node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=shard_id) # type: ignore
)
await self.maybe_create_new_shard(kbid, shard_info)
await index_node.writer.GC(noderesources_pb2.ShardId(id=shard_id)) # type: ignore

async def delete_resource(
self,
shard: writer_pb2.ShardObject,
Expand All @@ -328,6 +379,13 @@ async def delete_resource(
index_node = get_index_node(shardreplica.node)
await index_node.writer.RemoveResource(req) # type: ignore

if index_node is not None:
asyncio.create_task(
self._resource_change_event(
kb, shardreplica.node, shardreplica.shard.id
)
)

async def add_resource(
self,
shard: writer_pb2.ShardObject,
Expand All @@ -337,6 +395,7 @@ async def add_resource(
kb: str,
reindex_id: Optional[str] = None,
) -> None:
index_node = None
for shardreplica in shard.replicas:
resource.shard_id = resource.resource.shard_id = shardreplica.shard.id
index_node = get_index_node(shardreplica.node)
Expand All @@ -346,6 +405,13 @@ async def add_resource(
)
await index_node.writer.SetResource(resource) # type: ignore

if index_node is not None:
asyncio.create_task(
self._resource_change_event(
kb, shardreplica.node, shardreplica.shard.id
)
)


def choose_node(
shard: writer_pb2.ShardObject, target_replicas: Optional[list[str]] = None
Expand Down
18 changes: 17 additions & 1 deletion nucliadb/nucliadb/common/cluster/standalone/grpc_node_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
SuggestResponse,
TypeList,
)
from nucliadb_protos.noderesources_pb2 import EmptyQuery, Resource, ResourceID
from nucliadb_protos.noderesources_pb2 import (
EmptyQuery,
EmptyResponse,
Resource,
ResourceID,
)
from nucliadb_protos.noderesources_pb2 import Shard as NodeResourcesShard
from nucliadb_protos.noderesources_pb2 import (
ShardCleaned,
Expand Down Expand Up @@ -396,6 +401,16 @@ async def JoinGraph(self, request: SetGraph) -> OpStatus:
op_status.ParseFromString(pb_bytes)
return op_status

async def GC(self, request: ShardId) -> EmptyResponse:
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(
self.executor, self.writer.gc, request.SerializeToString()
)
pb_bytes = bytes(resp)
op_status = EmptyResponse()
op_status.ParseFromString(pb_bytes)
return op_status


# supported marshalled reader methods for standalone node support
READER_METHODS = {
Expand All @@ -416,4 +431,5 @@ async def JoinGraph(self, request: SetGraph) -> OpStatus:
"SetResource": (Resource, OpStatus),
"RemoveResource": (ResourceID, OpStatus),
"JoinGraph": (SetGraph, OpStatus),
"GC": (ShardId, EmptyResponse),
}
21 changes: 21 additions & 0 deletions nucliadb/nucliadb/common/datamanagers/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
class KnowledgeBoxNotFound(Exception):
pass
55 changes: 55 additions & 0 deletions nucliadb/nucliadb/common/datamanagers/kb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from nucliadb.common.cluster.exceptions import ShardsNotFound
from nucliadb.common.datamanagers.exceptions import KnowledgeBoxNotFound
from nucliadb.common.maindb.driver import Driver
from nucliadb_protos import knowledgebox_pb2, writer_pb2
from nucliadb_utils.keys import KB_SHARDS


class KnowledgeBoxDataManager:
def __init__(self, driver: Driver):
self.driver = driver

async def get_shards_object(self, kbid: str) -> writer_pb2.Shards:
key = KB_SHARDS.format(kbid=kbid)
async with self.driver.transaction() as txn:
payload = await txn.get(key)
if not payload:
raise ShardsNotFound(kbid)
pb = writer_pb2.Shards()
pb.ParseFromString(payload)
return pb

async def get_model_metadata(
self, kbid: str
) -> knowledgebox_pb2.SemanticModelMetadata:
try:
shards_obj = await self.get_shards_object(kbid)
except ShardsNotFound:
raise KnowledgeBoxNotFound(kbid)
if shards_obj.HasField("model"):
return shards_obj.model
else:
# B/c code for old KBs that do not have the `model` attribute set in the Shards object.
# Cleanup this code after a migration is done unifying all fields under `model` (on-prem and cloud).
return knowledgebox_pb2.SemanticModelMetadata(
similarity_function=shards_obj.similarity
)
21 changes: 4 additions & 17 deletions nucliadb/nucliadb/ingest/consumer/shard_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
from functools import partial

from nucliadb.common.cluster.manager import choose_node
from nucliadb.common.cluster.settings import settings
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.maindb.driver import Driver
from nucliadb.ingest.orm.knowledgebox import KnowledgeBox
from nucliadb_protos import noderesources_pb2, nodesidecar_pb2, writer_pb2
from nucliadb_utils import const
from nucliadb_utils.cache.pubsub import PubSubDriver
Expand Down Expand Up @@ -88,28 +86,17 @@ async def handle_message(self, raw_data) -> None:
)
metrics.total_messages.inc({"type": "shard_creator", "action": "scheduled"})

def should_create_new_shard(self, counter: nodesidecar_pb2.Counter) -> bool:
return (
counter.paragraphs > settings.max_shard_paragraphs
or counter.fields > settings.max_shard_fields
)

@metrics.handler_histo.wrap({"type": "shard_creator"})
async def process_kb(self, kbid: str) -> None:
logger.info({"message": "Processing notification for kbid", "kbid": kbid})
kb_shards = await self.shard_manager.get_shards_by_kbid_inner(kbid)
current_shard: writer_pb2.ShardObject = kb_shards.shards[kb_shards.actual]

node, shard_id, _ = choose_node(current_shard)
# we could also call `reader.GetShard`; however, the `GetCount`
# is cached at the sidecar so we're cheating for now until
# the `reader.GetShard` is faster
shard_counter: nodesidecar_pb2.Counter = await node.sidecar.GetCount(
noderesources_pb2.ShardId(id=shard_id) # type: ignore
)
if self.should_create_new_shard(shard_counter):
logger.warning({"message": "Adding shard", "kbid": kbid})
async with self.driver.transaction() as txn:
kb = KnowledgeBox(txn, self.storage, kbid)
model = await kb.get_model_metadata()
await self.shard_manager.create_shard_by_kbid(
txn, kbid, semantic_model=model
)
await txn.commit()
await self.shard_manager.maybe_create_new_shard(kbid, shard_counter) # type: ignore
4 changes: 0 additions & 4 deletions nucliadb/nucliadb/ingest/orm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ class KnowledgeBoxConflict(Exception):
pass


class KnowledgeBoxNotFound(NotFound):
pass


class DeadletteredError(Exception):
pass

Expand Down
35 changes: 9 additions & 26 deletions nucliadb/nucliadb/ingest/orm/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@
from nucliadb_protos.resources_pb2 import Basic

from nucliadb.common.cluster.base import AbstractIndexNode
from nucliadb.common.cluster.exceptions import ShardNotFound, ShardsNotFound
from nucliadb.common.cluster.exceptions import ShardNotFound
from nucliadb.common.cluster.manager import get_index_node
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.datamanagers.exceptions import KnowledgeBoxNotFound
from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager
from nucliadb.common.maindb.driver import Driver, Transaction
from nucliadb.common.maindb.utils import get_driver
from nucliadb.ingest import SERVICE_NAME, logger
from nucliadb.ingest.orm.exceptions import KnowledgeBoxConflict, KnowledgeBoxNotFound
from nucliadb.ingest.orm.exceptions import KnowledgeBoxConflict
from nucliadb.ingest.orm.resource import (
KB_RESOURCE_SLUG,
KB_RESOURCE_SLUG_BASE,
Expand Down Expand Up @@ -78,6 +81,8 @@ def __init__(self, txn: Transaction, storage: Storage, kbid: str):
self.kbid = kbid
self._config: Optional[KnowledgeBoxConfig] = None
self.synonyms = Synonyms(self.txn, self.kbid)
# b/w compatible, long term this class would change dramatically
self.data_manager = KnowledgeBoxDataManager(get_driver())

async def get_config(self) -> Optional[KnowledgeBoxConfig]:
if self._config is None:
Expand Down Expand Up @@ -264,7 +269,7 @@ async def update(
return uuid

async def iterate_kb_nodes(self) -> AsyncIterator[Tuple[AbstractIndexNode, str]]:
shards_obj = await self.get_shards_object()
shards_obj = await self.data_manager.get_shards_object(self.kbid)

for shard in shards_obj.shards:
for replica in shard.replicas:
Expand Down Expand Up @@ -424,34 +429,12 @@ async def delete_all_kb_keys(
async def get_resource_shard(
self, shard_id: str
) -> Optional[writer_pb2.ShardObject]:
pb = await self.get_shards_object()
pb = await self.data_manager.get_shards_object(self.kbid)
for shard in pb.shards:
if shard.shard == shard_id:
return shard
return None

async def get_shards_object(self) -> writer_pb2.Shards:
key = KB_SHARDS.format(kbid=self.kbid)
payload = await self.txn.get(key)
if not payload:
await self.txn.abort()
raise ShardsNotFound(self.kbid)
pb = writer_pb2.Shards()
pb.ParseFromString(payload)
return pb

async def get_model_metadata(self) -> SemanticModelMetadata:
try:
shards_obj = await self.get_shards_object()
except ShardsNotFound:
raise KnowledgeBoxNotFound(self.kbid)
if shards_obj.HasField("model"):
return shards_obj.model
else:
# B/c code for old KBs that do not have the `model` attribute set in the Shards object.
# Cleanup this code after a migration is done unifying all fields under `model` (on-prem and cloud).
return SemanticModelMetadata(similarity_function=shards_obj.similarity)

async def get(self, uuid: str) -> Optional[Resource]:
raw_basic = await get_basic(self.txn, self.kbid, uuid)
if raw_basic:
Expand Down
6 changes: 4 additions & 2 deletions nucliadb/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import aiohttp.client_exceptions

from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.datamanagers.exceptions import KnowledgeBoxNotFound
from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager
from nucliadb.common.maindb.driver import Driver, Transaction
from nucliadb.common.maindb.exceptions import ConflictError
from nucliadb.ingest.orm.exceptions import (
DeadletteredError,
KnowledgeBoxConflict,
KnowledgeBoxNotFound,
SequenceOrderViolation,
)
from nucliadb.ingest.orm.knowledgebox import KnowledgeBox
Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(
self.partition = partition
self.pubsub = pubsub
self.shard_manager = get_shard_manager()
self.kb_data_manager = KnowledgeBoxDataManager(driver)

async def process(
self,
Expand Down Expand Up @@ -318,7 +320,7 @@ async def index_resource(
shard = await self.shard_manager.get_current_active_shard(txn, kbid)
if shard is None:
# no shard available, create a new one
model = await kb.get_model_metadata()
model = await self.kb_data_manager.get_model_metadata(kbid)
shard = await self.shard_manager.create_shard_by_kbid(
txn, kbid, semantic_model=model
)
Expand Down
Loading

2 comments on commit a4fc620

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: a4fc620 Previous: b760df9 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 4465.288993125828 iter/sec (stddev: 0.000003904942276966958) 5804.479338298567 iter/sec (stddev: 1.4505330313876097e-7) 1.30

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: a4fc620 Previous: a40981d Ratio
nucliadb/tests/benchmarks/test_search.py::test_search_returns_labels 79.31886786101556 iter/sec (stddev: 0.00020922160019282615) 60.779932309336715 iter/sec (stddev: 0.0019119907918232523) 0.77
nucliadb/tests/benchmarks/test_search.py::test_search_relations 183.0915662584169 iter/sec (stddev: 0.00019032119488819046) 182.57436721258293 iter/sec (stddev: 0.0002220745559283828) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.