Skip to content

Commit

Permalink
fix cursor traversal when indexing (#1180)
Browse files Browse the repository at this point in the history
  • Loading branch information
marwoodandrew authored Dec 14, 2023
1 parent 69fb324 commit 59fde73
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
15 changes: 10 additions & 5 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ def elastic_init():
@manager.option('-c', '--collection', dest='collection', default=None)
@manager.option('-t', '--timestamp', dest='timestamp', default=None)
@manager.option('-d', '--direction', dest='direction', choices=['older', 'newer'], default='older')
def index_from_mongo(hours, collection, timestamp, direction):
print('Checking if elastic index exists, a new one will be created if not')
app.data.init_elastic(app)
print('Elastic index check has been completed')
@manager.option('-s', '--start_id', dest='start_id', default=None)
@manager.option('-i', '--skip_init', dest='skip_init', default=False)
def index_from_mongo(hours, collection, timestamp, direction, start_id, skip_init):
if not skip_init:
print('Checking if elastic index exists, a new one will be created if not')
app.data.init_elastic(app)
print('Elastic index check has been completed')
else:
print('Skipping index initialisation')

if timestamp:
index_elastic_from_mongo_from_timestamp(collection, timestamp, direction)
else:
index_elastic_from_mongo(hours=hours, collection=collection)
index_elastic_from_mongo(hours=hours, collection=collection, start_id=start_id)


@manager.command
Expand Down
30 changes: 20 additions & 10 deletions newsroom/mongo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import pymongo
import superdesk
from bson import ObjectId
from datetime import timedelta, datetime

from flask import current_app as app
Expand All @@ -13,7 +14,7 @@
default_page_size = 500


def index_elastic_from_mongo(hours=None, collection=None):
def index_elastic_from_mongo(hours=None, collection=None, start_id=None):
print('Starting indexing from mongodb for "{}" collection hours={}'.format(collection, hours))

resources = app.data.get_elastic_resources()
Expand All @@ -25,7 +26,7 @@ def index_elastic_from_mongo(hours=None, collection=None):
for resource in resources:
print('Starting indexing collection {}'.format(resource))

for items in _get_mongo_items(resource, hours):
for items in _get_mongo_items(resource, hours, start_id):
print('{} Inserting {} items'.format(time.strftime('%X %x %Z'), len(items)))
s = time.time()

Expand Down Expand Up @@ -94,15 +95,16 @@ def index_elastic_from_mongo_from_timestamp(collection, timestamp_str, direction
print('Finished indexing collection {}'.format(collection))


def _get_mongo_items(mongo_collection_name, hours=None):
def _get_mongo_items(mongo_collection_name, hours=None, start_id=None):
"""Generate list of items from given mongo collection per default page size.
:param mongo_collection_name: Name of the collection to get the items
:return: list of items
"""
print('Indexing data from mongo/{} to elastic/{} for hours={}'.format(mongo_collection_name,
mongo_collection_name,
hours))
print('Indexing data from mongo/{} to elastic/{} for hours={} from id>{}'.format(mongo_collection_name,
mongo_collection_name,
hours,
start_id))

db = app.data.get_mongo_collection(mongo_collection_name)
args = {'limit': default_page_size, 'sort': [(config.ID_FIELD, pymongo.ASCENDING)]}
Expand All @@ -113,16 +115,24 @@ def _get_mongo_items(mongo_collection_name, hours=None):
now = utcnow()
args['filter'] = {}

last_id = None
if start_id:
last_id = ObjectId(start_id)
else:
last_id = None
while True:
if last_id:
args['filter'].update({config.ID_FIELD: {'$gt': last_id}})
cursor = db.find(**args)
if not cursor.count():
break
items = list(cursor)
if not len(items):
break

last_id = items[-1][config.ID_FIELD]
yield items

if start_id:
yield [item for item in items if isinstance(item.get('_id'), ObjectId)]
else:
yield items


def _get_mongo_items_from_timestamp(collection, timestamp, direction):
Expand Down

0 comments on commit 59fde73

Please sign in to comment.