Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NHUB-583] fix(celery): Await when sending task to celery #1157

Merged
merged 3 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion newsroom/agenda/agenda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def enhance_coverages(self, coverages: list[dict[str, Any]]):
wire_items = await WireSearchServiceAsync().get_items_by_id(text_delivery_ids)
if await wire_items.count():
async for item in wire_items:
coverage = [c for c in completed_coverages if c.get("delivery_id") == item.get("_id")][0]
coverage = [c for c in completed_coverages if c.get("delivery_id") == item.id][0]
coverage["publish_time"] = item.publish_schedule or item.firstpublished

async def set_delivery(self, wire_item: dict[str, Any]) -> list[dict[str, Any]]:
Expand Down
116 changes: 56 additions & 60 deletions newsroom/agenda/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,72 +138,68 @@ async def notify_agenda_update(
!= (agenda.get("dates") or {}).get("end").replace(tzinfo=None)
)

if agenda.get("state") and agenda.get("state") != original_agenda.get("state"):
state_changed = agenda.get("state") in notify_states

if state_changed:
_fill_all_coverages(
agenda,
original_agenda,
coverage_updates,
cancelled=False if agenda.get("state") == WORKFLOW_STATE.SCHEDULED else True,
use_original_agenda=True,
)
if agenda.get("state") and agenda.get("state") != original_agenda.get("state"):
state_changed = agenda.get("state") in notify_states

if state_changed:
_fill_all_coverages(
agenda,
original_agenda,
coverage_updates,
cancelled=False if agenda.get("state") == WORKFLOW_STATE.SCHEDULED else True,
use_original_agenda=True,
)
else:
if time_updated:
_fill_all_coverages(agenda, original_agenda, coverage_updates)
else:
if time_updated:
_fill_all_coverages(agenda, original_agenda, coverage_updates)
else:
for coverage in agenda.get("coverages") or []:
existing_coverage = next(
for coverage in agenda.get("coverages") or []:
existing_coverage = next(
(
c
for c in original_agenda.get("coverages") or []
if c["coverage_id"] == coverage["coverage_id"]
),
None,
)
detailed_coverage = _get_detailed_coverage(agenda, original_agenda, coverage)
if detailed_coverage:
if not existing_coverage:
if coverage["workflow_status"] != WORKFLOW_STATE.CANCELLED:
coverage_updates["modified_coverages"].append(detailed_coverage)
elif coverage.get("workflow_status") == WORKFLOW_STATE.CANCELLED and existing_coverage.get(
"workflow_status"
) != coverage.get("workflow_status"):
coverage_updates["cancelled_coverages"].append(detailed_coverage)
elif (
(
coverage.get("delivery_state") != existing_coverage.get("delivery_state")
and coverage.get("delivery_state") == "published"
)
or (
coverage.get("workflow_status") != existing_coverage.get("workflow_status")
and coverage.get("workflow_status") == "completed"
)
or (existing_coverage.get("scheduled") != coverage.get("scheduled"))
):
coverage_updates["modified_coverages"].append(detailed_coverage)
only_new_coverages = False
elif detailed_coverage["coverage_id"] != (coverage_updated or {}).get("coverage_id"):
coverage_updates["unaltered_coverages"].append(detailed_coverage)

# Check for removed coverages - show it as cancelled
if item and item.get("type") == "planning":
for original_cov in original_agenda.get("coverages") or []:
updated_cov = next(
(
c
for c in original_agenda.get("coverages") or []
if c["coverage_id"] == coverage["coverage_id"]
for c in (agenda.get("coverages") or [])
if c.get("coverage_id") == original_cov.get("coverage_id")
),
None,
)
detailed_coverage = _get_detailed_coverage(agenda, original_agenda, coverage)
if detailed_coverage:
if not existing_coverage:
if coverage["workflow_status"] != WORKFLOW_STATE.CANCELLED:
coverage_updates["modified_coverages"].append(detailed_coverage)
elif coverage.get(
"workflow_status"
) == WORKFLOW_STATE.CANCELLED and existing_coverage.get(
"workflow_status"
) != coverage.get(
"workflow_status"
):
coverage_updates["cancelled_coverages"].append(detailed_coverage)
elif (
(
coverage.get("delivery_state") != existing_coverage.get("delivery_state")
and coverage.get("delivery_state") == "published"
)
or (
coverage.get("workflow_status") != existing_coverage.get("workflow_status")
and coverage.get("workflow_status") == "completed"
)
or (existing_coverage.get("scheduled") != coverage.get("scheduled"))
):
coverage_updates["modified_coverages"].append(detailed_coverage)
only_new_coverages = False
elif detailed_coverage["coverage_id"] != (coverage_updated or {}).get("coverage_id"):
coverage_updates["unaltered_coverages"].append(detailed_coverage)

# Check for removed coverages - show it as cancelled
if item and item.get("type") == "planning":
for original_cov in original_agenda.get("coverages") or []:
updated_cov = next(
(
c
for c in (agenda.get("coverages") or [])
if c.get("coverage_id") == original_cov.get("coverage_id")
),
None,
)
if not updated_cov:
coverage_updates["cancelled_coverages"].append(original_cov)
if not updated_cov:
coverage_updates["cancelled_coverages"].append(original_cov)

if len(coverage_updates["modified_coverages"]) > 0 or len(coverage_updates["cancelled_coverages"]) > 0:
coverage_modified = True
Expand Down
12 changes: 7 additions & 5 deletions newsroom/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def _send_email(to, subject, text_body, html_body=None, sender=None, sender_name
return app.mail.send(msg)


def send_email(to, subject, text_body, html_body=None, sender=None, sender_name=None, attachments_info=None, cc=None):
async def send_email(
to, subject, text_body, html_body=None, sender=None, sender_name=None, attachments_info=None, cc=None
):
"""
Sends the email
:param to: List of recipients
Expand All @@ -132,7 +134,7 @@ def send_email(to, subject, text_body, html_body=None, sender=None, sender_name=
"sender_name": sender_name or get_app_config("EMAIL_DEFAULT_SENDER_NAME"),
"attachments_info": attachments_info,
}
_send_email.apply_async(kwargs=kwargs)
await _send_email.apply_async(kwargs=kwargs)


async def send_new_signup_email(company: Company, user: User, is_new_company: bool):
Expand Down Expand Up @@ -286,7 +288,7 @@ async def _send_localized_email(
text_body = await render_template(text_template, **template_kwargs)
html_body = await render_template(html_template, **template_kwargs)

send_email(
await send_email(
to=to,
cc=cc,
subject=subject,
Expand Down Expand Up @@ -518,7 +520,7 @@ async def _send_wire_killed_notification_email(user: UserResourceModel, item: di
subject = gettext("Kill/Takedown notice")
text_body = to_text(await formatter.format_item(item))

send_email(to=recipients, subject=subject, text_body=text_body)
await send_email(to=recipients, subject=subject, text_body=text_body)


async def _send_agenda_killed_notification_email(user: UserResourceModel, item: dict[str, Any]) -> None:
Expand All @@ -527,7 +529,7 @@ async def _send_agenda_killed_notification_email(user: UserResourceModel, item:
subject = gettext("%(section)s cancelled notice", section=get_app_config("AGENDA_SECTION"))
text_body = to_text(await formatter.format_item(item, item_type="agenda"))

send_email(to=recipients, subject=subject, text_body=text_body)
await send_email(to=recipients, subject=subject, text_body=text_body)


def to_text(output: Union[str, bytes]) -> str:
Expand Down
2 changes: 1 addition & 1 deletion newsroom/push/agenda_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ async def set_item_reference(self, coverage):
"coverage_id": coverage.get("coverage_id"),
},
)
notify_new_wire_item.delay(item.id, check_topics=False)
await notify_new_wire_item.delay(item.id, check_topics=False)
4 changes: 2 additions & 2 deletions newsroom/push/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def _send_notification(section: str, users_ids: set[ObjectId]):
dict(
user=user_id,
item=item["_id"],
resource=item.get("type"),
resource=item.get("type") or item.get("_type"),
action="history_match",
data=None,
)
Expand Down Expand Up @@ -291,7 +291,7 @@ async def notify_agenda_topic_matches(
users_dict: dict[str, User] = {str(user.id): user.to_dict() for user in users.values()}
topic_matches |= set(
[
topic
topic["_id"]
for topic in await get_agenda_notification_topics_for_query_by_id(item, users_dict)
if topic.get("_id") not in topic_matches
]
Expand Down
2 changes: 1 addition & 1 deletion newsroom/push/publishing.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def publish_item(self, doc: Item, original: Item):
if doc.get("coverage_id"):
agenda_items = await AgendaItemService().set_delivery(doc)
if agenda_items:
[notify_new_agenda_item.delay(item["_id"], check_topics=False) for item in agenda_items]
[await notify_new_agenda_item.delay(item["_id"], check_topics=False) for item in agenda_items]
except Exception as ex:
logger.info("Failed to notify new wire item for Agenda watches")
logger.exception(ex)
Expand Down
6 changes: 3 additions & 3 deletions newsroom/push/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
async def handle_publish_event(item):
orig = await AgendaItemService().find_by_id(item["guid"])
event_id = await publisher.publish_event(item, orig.to_dict() if orig else None)
notify_new_agenda_item.delay(event_id, check_topics=True, is_new=orig is None)
await notify_new_agenda_item.delay(event_id, check_topics=True, is_new=orig is None)


async def handle_publish_planning(item):
Expand All @@ -49,15 +49,15 @@ async def handle_publish_planning(item):

# Prefer parent Event when sending notificaitons
_id = event_id or plan_id
notify_new_agenda_item.delay(_id, check_topics=True, is_new=orig is None)
await notify_new_agenda_item.delay(_id, check_topics=True, is_new=orig is None)


async def handle_publish_text_item(item):
orig = await WireSearchServiceAsync().service.find_by_id(item["guid"])
item["_id"] = await publisher.publish_item(item, orig.to_dict() if orig else None)

if not item.get("nextversion"):
notify_new_wire_item.delay(
await notify_new_wire_item.delay(
item["_id"], check_topics=orig is None or get_app_config("WIRE_NOTIFICATIONS_ON_CORRECTIONS")
)

Expand Down
15 changes: 8 additions & 7 deletions newsroom/reports/content_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from newsroom.wire import WireItemService
from newsroom.agenda.filters import get_date_filters, apply_item_type_filter as apply_agenda_type_filter
from newsroom.agenda import AgendaItemService
from newsroom.history_async import HistoryService

from newsroom.utils import query_resource, MAX_TERMS_SIZE

Expand All @@ -23,7 +24,7 @@


async def get_query_source(args: dict[str, Any], source: dict[str, Any]) -> dict[str, Any]:
search_request = NewshubSearchRequest[BaseSearchRequestArgs](section=args["section"])
search_request = NewshubSearchRequest[BaseSearchRequestArgs](section=SectionEnum(args["section"]))
query = search_request.search.query

if args.get("genre"):
Expand Down Expand Up @@ -84,16 +85,16 @@ async def get_items(args):

while True:
cursor = await service.search(source)
items = await cursor.to_list_raw()
items = await cursor.to_list()

if not len(items):
break

source["from"] += CHUNK_SIZE
yield items
yield [item.to_dict() for item in items]


def get_aggregations(args, ids):
async def get_aggregations(args, ids):
MarkLark86 marked this conversation as resolved.
Show resolved Hide resolved
"""Get action and company aggregations for the items provided"""

if not args.get("section"):
Expand Down Expand Up @@ -122,8 +123,8 @@ def get_aggregations(args, ids):
},
}

results = get_resource_service("history").fetch_history(source)
aggs = (results.get("hits") or {}).get("aggregations") or {}
results = cast(ElasticsearchResourceCursorAsync, await HistoryService().search(source))
aggs = (results.hits or {}).get("aggregations") or {}
buckets = (aggs.get("items") or {}).get("buckets") or []

return {
Expand Down Expand Up @@ -302,7 +303,7 @@ async def get_content_activity_report():

async for items in get_items(args):
item_ids = [item.get("_id") for item in items]
aggs = get_aggregations(args, item_ids)
aggs = await get_aggregations(args, item_ids)

for item in items:
item_id = item["_id"]
Expand Down
8 changes: 2 additions & 6 deletions newsroom/tests/markers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
# Sets ``app.config["FORCE_ENABLE_GOOGLE_OAUTH"]`` to True
enable_google_login = mark.enable_google_login


# Adds ``newsroom.auth.saml`` to ``app.config["INSTALLED_APPS"]``
# This registers SAML views to the auth blueprint
enable_saml = mark.enable_saml


# Skips the test, due to known issue with async changes
# Change this to `requires_async_celery = mark.requires_async_celery` to run these tests
requires_async_celery = mark.skip(reason="Requires celery to support async tasks")

# Mark tests that require celery
requires_async_celery = mark.requires_async_celery

skip_auto_wire_items = mark.skip_auto_wire_items
skip_auto_agenda_items = mark.skip_auto_agenda_items
6 changes: 3 additions & 3 deletions newsroom/types/history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Annotated, Any
from datetime import datetime

from superdesk.core.resources.fields import ObjectId as ObjectIdField, Keyword
from superdesk.core.resources.fields import ObjectId as ObjectIdField, Keyword, keyword_mapping
from superdesk.core.resources.validators import validate_data_relation_async

from newsroom.core.resources.model import NewshubResourceModel
Expand All @@ -10,8 +10,8 @@
class HistoryResourceModel(NewshubResourceModel):
action: Keyword
versioncreated: datetime
user: Annotated[ObjectIdField | None, validate_data_relation_async("users")] = None
company: Annotated[ObjectIdField | None, validate_data_relation_async("companies")] = None
user: Annotated[ObjectIdField | None, keyword_mapping(), validate_data_relation_async("users")] = None
company: Annotated[ObjectIdField | None, keyword_mapping(), validate_data_relation_async("companies")] = None
item: Keyword
version: str
section: Keyword
Expand Down
2 changes: 2 additions & 0 deletions newsroom/types/wire.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Annotated
from datetime import datetime

from pydantic import Field

Expand All @@ -14,6 +15,7 @@ class PublishedProduct:

class WireItem(ContentAPIItem):
products: Annotated[list[PublishedProduct], Field(default_factory=list)]
publish_schedule: datetime | None = None

bookmarks: Annotated[list[fields.ObjectId], fields.keyword_mapping(), Field(default_factory=list)]
downloads: Annotated[list[fields.ObjectId], fields.keyword_mapping(), Field(default_factory=list)]
Expand Down
8 changes: 7 additions & 1 deletion newsroom/wire/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ def apply_item_type_filter(request: NewshubSearchRequest) -> None:
"""Applies item type filter(s) based on the request args"""

request.search.query.must_not.append({"term": {"type": "composite"}})
if not request.args.ignore_latest:
try:
ignore_latest = request.args.ignore_latest
except AttributeError:
# This can happen when ``BaseSearchRequestArgs`` is used for search args
ignore_latest = False

if not ignore_latest:
request.search.query.must_not.append({"constant_score": {"filter": {"exists": {"field": "nextversion"}}}})


Expand Down
Loading
Loading