Skip to content

Commit

Permalink
Merge pull request #3510 from lonvia/indexing-precompute-count
Browse files Browse the repository at this point in the history
Indexing: precompute counts of affected rows
  • Loading branch information
lonvia authored Aug 12, 2024
2 parents bd0316b + 3905dd6 commit 043d528
Showing 1 changed file with 54 additions and 9 deletions.
63 changes: 54 additions & 9 deletions src/nominatim_db/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
Main work horse for indexing (computing addresses) the database.
"""
from typing import cast, List, Any
from typing import cast, List, Any, Optional
import logging
import time

Expand Down Expand Up @@ -83,9 +83,30 @@ async def index_boundaries(self, minrank: int, maxrank: int) -> int:
LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)

minrank = max(minrank, 4)
maxrank = min(maxrank, 25)

# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)

with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_search, count(*)
FROM placex
WHERE rank_search between %s and %s
AND class = 'boundary' and type = 'administrative'
AND indexed_status > 0
GROUP BY rank_search""",
(minrank, maxrank))
total_tuples = {row.rank_search: row.count for row in cur}

with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(minrank, 4), min(maxrank, 26)):
total += await self._index(runners.BoundaryRunner(rank, analyzer))
for rank in range(minrank, maxrank + 1):
total += await self._index(runners.BoundaryRunner(rank, analyzer),
total_tuples=total_tuples.get(rank, 0))

return total

Expand All @@ -101,6 +122,23 @@ async def index_by_rank(self, minrank: int, maxrank: int) -> int:
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)

# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)

with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_address, count(*)
FROM placex
WHERE rank_address between %s and %s
AND indexed_status > 0
GROUP BY rank_address""",
(minrank, maxrank))
total_tuples = {row.rank_address: row.count for row in cur}


with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
Expand All @@ -109,11 +147,12 @@ async def index_by_rank(self, minrank: int, maxrank: int) -> int:
batch = 5
else:
batch = 1
total += await self._index(runners.RankRunner(rank, analyzer), batch)
total += await self._index(runners.RankRunner(rank, analyzer),
batch=batch, total_tuples=total_tuples.get(rank, 0))

if maxrank == 30:
total += await self._index(runners.RankRunner(0, analyzer))
total += await self._index(runners.InterpolationRunner(analyzer), 20)
total += await self._index(runners.InterpolationRunner(analyzer), batch=20)

return total

Expand All @@ -123,7 +162,7 @@ async def index_postcodes(self) -> int:
"""
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)

return await self._index(runners.PostcodeRunner(), 20)
return await self._index(runners.PostcodeRunner(), batch=20)


def update_status_table(self) -> None:
Expand All @@ -135,14 +174,20 @@ def update_status_table(self) -> None:

conn.commit()

async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
async def _index(self, runner: runners.Runner, batch: int = 1,
total_tuples: Optional[int] = None) -> int:
""" Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement
should be processed with a single SQL statement.
`total_tuples` may contain the total number of rows to process.
When not supplied, the value will be computed using the
approriate runner function.
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)

total_tuples = self._prepare_indexing(runner)
if total_tuples is None:
total_tuples = self._prepare_indexing(runner)

progress = ProgressLogger(runner.name(), total_tuples)

Expand Down

0 comments on commit 043d528

Please sign in to comment.