Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexing: precompute counts of affected rows #3510

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions src/nominatim_db/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
Main work horse for indexing (computing addresses) the database.
"""
from typing import cast, List, Any
from typing import cast, List, Any, Optional
import logging
import time

Expand Down Expand Up @@ -83,9 +83,30 @@ async def index_boundaries(self, minrank: int, maxrank: int) -> int:
LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)

minrank = max(minrank, 4)
maxrank = min(maxrank, 25)

# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)

with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_search, count(*)
FROM placex
WHERE rank_search between %s and %s
AND class = 'boundary' and type = 'administrative'
AND indexed_status > 0
GROUP BY rank_search""",
(minrank, maxrank))
total_tuples = {row.rank_search: row.count for row in cur}

with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(minrank, 4), min(maxrank, 26)):
total += await self._index(runners.BoundaryRunner(rank, analyzer))
for rank in range(minrank, maxrank + 1):
total += await self._index(runners.BoundaryRunner(rank, analyzer),
total_tuples=total_tuples.get(rank, 0))

return total

Expand All @@ -101,6 +122,23 @@ async def index_by_rank(self, minrank: int, maxrank: int) -> int:
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)

# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)

with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_address, count(*)
FROM placex
WHERE rank_address between %s and %s
AND indexed_status > 0
GROUP BY rank_address""",
(minrank, maxrank))
total_tuples = {row.rank_address: row.count for row in cur}


with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
Expand All @@ -109,11 +147,12 @@ async def index_by_rank(self, minrank: int, maxrank: int) -> int:
batch = 5
else:
batch = 1
total += await self._index(runners.RankRunner(rank, analyzer), batch)
total += await self._index(runners.RankRunner(rank, analyzer),
batch=batch, total_tuples=total_tuples.get(rank, 0))

if maxrank == 30:
total += await self._index(runners.RankRunner(0, analyzer))
total += await self._index(runners.InterpolationRunner(analyzer), 20)
total += await self._index(runners.InterpolationRunner(analyzer), batch=20)

return total

Expand All @@ -123,7 +162,7 @@ async def index_postcodes(self) -> int:
"""
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)

return await self._index(runners.PostcodeRunner(), 20)
return await self._index(runners.PostcodeRunner(), batch=20)


def update_status_table(self) -> None:
Expand All @@ -135,14 +174,20 @@ def update_status_table(self) -> None:

conn.commit()

async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
async def _index(self, runner: runners.Runner, batch: int = 1,
total_tuples: Optional[int] = None) -> int:
""" Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement
should be processed with a single SQL statement.

`total_tuples` may contain the total number of rows to process.
When not supplied, the value will be computed using the
approriate runner function.
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)

total_tuples = self._prepare_indexing(runner)
if total_tuples is None:
total_tuples = self._prepare_indexing(runner)

progress = ProgressLogger(runner.name(), total_tuples)

Expand Down