Skip to content

Commit

Permalink
Merge branch 'async' into release/3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkLark86 committed Nov 12, 2024
2 parents 4dbc63e + dc55efd commit 1b694a7
Show file tree
Hide file tree
Showing 19 changed files with 205 additions and 233 deletions.
2 changes: 1 addition & 1 deletion newsroom/agenda/agenda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def enhance_coverages(self, coverages: list[dict[str, Any]]):
wire_items = await WireSearchServiceAsync().get_items_by_id(text_delivery_ids)
if await wire_items.count():
async for item in wire_items:
coverage = [c for c in completed_coverages if c.get("delivery_id") == item.get("_id")][0]
coverage = [c for c in completed_coverages if c.get("delivery_id") == item.id][0]
coverage["publish_time"] = item.publish_schedule or item.firstpublished

async def set_delivery(self, wire_item: dict[str, Any]) -> list[dict[str, Any]]:
Expand Down
116 changes: 56 additions & 60 deletions newsroom/agenda/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,72 +138,68 @@ async def notify_agenda_update(
!= (agenda.get("dates") or {}).get("end").replace(tzinfo=None)
)

if agenda.get("state") and agenda.get("state") != original_agenda.get("state"):
state_changed = agenda.get("state") in notify_states

if state_changed:
_fill_all_coverages(
agenda,
original_agenda,
coverage_updates,
cancelled=False if agenda.get("state") == WORKFLOW_STATE.SCHEDULED else True,
use_original_agenda=True,
)
if agenda.get("state") and agenda.get("state") != original_agenda.get("state"):
state_changed = agenda.get("state") in notify_states

if state_changed:
_fill_all_coverages(
agenda,
original_agenda,
coverage_updates,
cancelled=False if agenda.get("state") == WORKFLOW_STATE.SCHEDULED else True,
use_original_agenda=True,
)
else:
if time_updated:
_fill_all_coverages(agenda, original_agenda, coverage_updates)
else:
if time_updated:
_fill_all_coverages(agenda, original_agenda, coverage_updates)
else:
for coverage in agenda.get("coverages") or []:
existing_coverage = next(
for coverage in agenda.get("coverages") or []:
existing_coverage = next(
(
c
for c in original_agenda.get("coverages") or []
if c["coverage_id"] == coverage["coverage_id"]
),
None,
)
detailed_coverage = _get_detailed_coverage(agenda, original_agenda, coverage)
if detailed_coverage:
if not existing_coverage:
if coverage["workflow_status"] != WORKFLOW_STATE.CANCELLED:
coverage_updates["modified_coverages"].append(detailed_coverage)
elif coverage.get("workflow_status") == WORKFLOW_STATE.CANCELLED and existing_coverage.get(
"workflow_status"
) != coverage.get("workflow_status"):
coverage_updates["cancelled_coverages"].append(detailed_coverage)
elif (
(
coverage.get("delivery_state") != existing_coverage.get("delivery_state")
and coverage.get("delivery_state") == "published"
)
or (
coverage.get("workflow_status") != existing_coverage.get("workflow_status")
and coverage.get("workflow_status") == "completed"
)
or (existing_coverage.get("scheduled") != coverage.get("scheduled"))
):
coverage_updates["modified_coverages"].append(detailed_coverage)
only_new_coverages = False
elif detailed_coverage["coverage_id"] != (coverage_updated or {}).get("coverage_id"):
coverage_updates["unaltered_coverages"].append(detailed_coverage)

# Check for removed coverages - show it as cancelled
if item and item.get("type") == "planning":
for original_cov in original_agenda.get("coverages") or []:
updated_cov = next(
(
c
for c in original_agenda.get("coverages") or []
if c["coverage_id"] == coverage["coverage_id"]
for c in (agenda.get("coverages") or [])
if c.get("coverage_id") == original_cov.get("coverage_id")
),
None,
)
detailed_coverage = _get_detailed_coverage(agenda, original_agenda, coverage)
if detailed_coverage:
if not existing_coverage:
if coverage["workflow_status"] != WORKFLOW_STATE.CANCELLED:
coverage_updates["modified_coverages"].append(detailed_coverage)
elif coverage.get(
"workflow_status"
) == WORKFLOW_STATE.CANCELLED and existing_coverage.get(
"workflow_status"
) != coverage.get(
"workflow_status"
):
coverage_updates["cancelled_coverages"].append(detailed_coverage)
elif (
(
coverage.get("delivery_state") != existing_coverage.get("delivery_state")
and coverage.get("delivery_state") == "published"
)
or (
coverage.get("workflow_status") != existing_coverage.get("workflow_status")
and coverage.get("workflow_status") == "completed"
)
or (existing_coverage.get("scheduled") != coverage.get("scheduled"))
):
coverage_updates["modified_coverages"].append(detailed_coverage)
only_new_coverages = False
elif detailed_coverage["coverage_id"] != (coverage_updated or {}).get("coverage_id"):
coverage_updates["unaltered_coverages"].append(detailed_coverage)

# Check for removed coverages - show it as cancelled
if item and item.get("type") == "planning":
for original_cov in original_agenda.get("coverages") or []:
updated_cov = next(
(
c
for c in (agenda.get("coverages") or [])
if c.get("coverage_id") == original_cov.get("coverage_id")
),
None,
)
if not updated_cov:
coverage_updates["cancelled_coverages"].append(original_cov)
if not updated_cov:
coverage_updates["cancelled_coverages"].append(original_cov)

if len(coverage_updates["modified_coverages"]) > 0 or len(coverage_updates["cancelled_coverages"]) > 0:
coverage_modified = True
Expand Down
12 changes: 7 additions & 5 deletions newsroom/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def _send_email(to, subject, text_body, html_body=None, sender=None, sender_name
return app.mail.send(msg)


def send_email(to, subject, text_body, html_body=None, sender=None, sender_name=None, attachments_info=None, cc=None):
async def send_email(
to, subject, text_body, html_body=None, sender=None, sender_name=None, attachments_info=None, cc=None
):
"""
Sends the email
:param to: List of recipients
Expand All @@ -132,7 +134,7 @@ def send_email(to, subject, text_body, html_body=None, sender=None, sender_name=
"sender_name": sender_name or get_app_config("EMAIL_DEFAULT_SENDER_NAME"),
"attachments_info": attachments_info,
}
_send_email.apply_async(kwargs=kwargs)
await _send_email.apply_async(kwargs=kwargs)


async def send_new_signup_email(company: Company, user: User, is_new_company: bool):
Expand Down Expand Up @@ -286,7 +288,7 @@ async def _send_localized_email(
text_body = await render_template(text_template, **template_kwargs)
html_body = await render_template(html_template, **template_kwargs)

send_email(
await send_email(
to=to,
cc=cc,
subject=subject,
Expand Down Expand Up @@ -518,7 +520,7 @@ async def _send_wire_killed_notification_email(user: UserResourceModel, item: di
subject = gettext("Kill/Takedown notice")
text_body = to_text(await formatter.format_item(item))

send_email(to=recipients, subject=subject, text_body=text_body)
await send_email(to=recipients, subject=subject, text_body=text_body)


async def _send_agenda_killed_notification_email(user: UserResourceModel, item: dict[str, Any]) -> None:
Expand All @@ -527,7 +529,7 @@ async def _send_agenda_killed_notification_email(user: UserResourceModel, item:
subject = gettext("%(section)s cancelled notice", section=get_app_config("AGENDA_SECTION"))
text_body = to_text(await formatter.format_item(item, item_type="agenda"))

send_email(to=recipients, subject=subject, text_body=text_body)
await send_email(to=recipients, subject=subject, text_body=text_body)


def to_text(output: Union[str, bytes]) -> str:
Expand Down
2 changes: 1 addition & 1 deletion newsroom/push/agenda_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ async def set_item_reference(self, coverage):
"coverage_id": coverage.get("coverage_id"),
},
)
notify_new_wire_item.delay(item.id, check_topics=False)
await notify_new_wire_item.delay(item.id, check_topics=False)
4 changes: 2 additions & 2 deletions newsroom/push/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def _send_notification(section: str, users_ids: set[ObjectId]):
dict(
user=user_id,
item=item["_id"],
resource=item.get("type"),
resource=item.get("type") or item.get("_type"),
action="history_match",
data=None,
)
Expand Down Expand Up @@ -291,7 +291,7 @@ async def notify_agenda_topic_matches(
users_dict: dict[str, User] = {str(user.id): user.to_dict() for user in users.values()}
topic_matches |= set(
[
topic
topic["_id"]
for topic in await get_agenda_notification_topics_for_query_by_id(item, users_dict)
if topic.get("_id") not in topic_matches
]
Expand Down
2 changes: 1 addition & 1 deletion newsroom/push/publishing.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def publish_item(self, doc: Item, original: Item):
if doc.get("coverage_id"):
agenda_items = await AgendaItemService().set_delivery(doc)
if agenda_items:
[notify_new_agenda_item.delay(item["_id"], check_topics=False) for item in agenda_items]
[await notify_new_agenda_item.delay(item["_id"], check_topics=False) for item in agenda_items]
except Exception as ex:
logger.info("Failed to notify new wire item for Agenda watches")
logger.exception(ex)
Expand Down
6 changes: 3 additions & 3 deletions newsroom/push/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
async def handle_publish_event(item):
orig = await AgendaItemService().find_by_id(item["guid"])
event_id = await publisher.publish_event(item, orig.to_dict() if orig else None)
notify_new_agenda_item.delay(event_id, check_topics=True, is_new=orig is None)
await notify_new_agenda_item.delay(event_id, check_topics=True, is_new=orig is None)


async def handle_publish_planning(item):
Expand All @@ -49,15 +49,15 @@ async def handle_publish_planning(item):

# Prefer parent Event when sending notificaitons
_id = event_id or plan_id
notify_new_agenda_item.delay(_id, check_topics=True, is_new=orig is None)
await notify_new_agenda_item.delay(_id, check_topics=True, is_new=orig is None)


async def handle_publish_text_item(item):
orig = await WireSearchServiceAsync().service.find_by_id(item["guid"])
item["_id"] = await publisher.publish_item(item, orig.to_dict() if orig else None)

if not item.get("nextversion"):
notify_new_wire_item.delay(
await notify_new_wire_item.delay(
item["_id"], check_topics=orig is None or get_app_config("WIRE_NOTIFICATIONS_ON_CORRECTIONS")
)

Expand Down
23 changes: 15 additions & 8 deletions newsroom/reports/content_activity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, cast
from typing import Any, cast, TypedDict
from copy import deepcopy

from quart_babel import gettext
Expand All @@ -15,6 +15,7 @@
from newsroom.wire import WireItemService
from newsroom.agenda.filters import get_date_filters, apply_item_type_filter as apply_agenda_type_filter
from newsroom.agenda import AgendaItemService
from newsroom.history_async import HistoryService

from newsroom.utils import query_resource, MAX_TERMS_SIZE

Expand All @@ -23,7 +24,7 @@


async def get_query_source(args: dict[str, Any], source: dict[str, Any]) -> dict[str, Any]:
search_request = NewshubSearchRequest[BaseSearchRequestArgs](section=args["section"])
search_request = NewshubSearchRequest[BaseSearchRequestArgs](section=SectionEnum(args["section"]))
query = search_request.search.query

if args.get("genre"):
Expand Down Expand Up @@ -84,16 +85,22 @@ async def get_items(args):

while True:
cursor = await service.search(source)
items = await cursor.to_list_raw()
items = await cursor.to_list()

if not len(items):
break

source["from"] += CHUNK_SIZE
yield items
yield [item.to_dict() for item in items]


def get_aggregations(args, ids):
class ItemAggregation(TypedDict):
total: int
actions: dict[str, int]
companies: list[str]


async def get_aggregations(args: dict[str, Any], ids: list[str]) -> dict[str, ItemAggregation]:
"""Get action and company aggregations for the items provided"""

if not args.get("section"):
Expand Down Expand Up @@ -122,8 +129,8 @@ def get_aggregations(args, ids):
},
}

results = get_resource_service("history").fetch_history(source)
aggs = (results.get("hits") or {}).get("aggregations") or {}
results = cast(ElasticsearchResourceCursorAsync, await HistoryService().search(source))
aggs = (results.hits or {}).get("aggregations") or {}
buckets = (aggs.get("items") or {}).get("buckets") or []

return {
Expand Down Expand Up @@ -302,7 +309,7 @@ async def get_content_activity_report():

async for items in get_items(args):
item_ids = [item.get("_id") for item in items]
aggs = get_aggregations(args, item_ids)
aggs = await get_aggregations(args, item_ids)

for item in items:
item_id = item["_id"]
Expand Down
8 changes: 2 additions & 6 deletions newsroom/tests/markers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
# Sets ``app.config["FORCE_ENABLE_GOOGLE_OAUTH"]`` to True
enable_google_login = mark.enable_google_login


# Adds ``newsroom.auth.saml`` to ``app.config["INSTALLED_APPS"]``
# This registers SAML views to the auth blueprint
enable_saml = mark.enable_saml


# Skips the test, due to known issue with async changes
# Change this to `requires_async_celery = mark.requires_async_celery` to run these tests
requires_async_celery = mark.skip(reason="Requires celery to support async tasks")

# Mark tests that require celery
requires_async_celery = mark.requires_async_celery

skip_auto_wire_items = mark.skip_auto_wire_items
skip_auto_agenda_items = mark.skip_auto_agenda_items
6 changes: 3 additions & 3 deletions newsroom/types/history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Annotated, Any
from datetime import datetime

from superdesk.core.resources.fields import ObjectId as ObjectIdField, Keyword
from superdesk.core.resources.fields import ObjectId as ObjectIdField, Keyword, keyword_mapping
from superdesk.core.resources.validators import validate_data_relation_async

from newsroom.core.resources.model import NewshubResourceModel
Expand All @@ -10,8 +10,8 @@
class HistoryResourceModel(NewshubResourceModel):
action: Keyword
versioncreated: datetime
user: Annotated[ObjectIdField | None, validate_data_relation_async("users")] = None
company: Annotated[ObjectIdField | None, validate_data_relation_async("companies")] = None
user: Annotated[ObjectIdField | None, keyword_mapping(), validate_data_relation_async("users")] = None
company: Annotated[ObjectIdField | None, keyword_mapping(), validate_data_relation_async("companies")] = None
item: Keyword
version: str
section: Keyword
Expand Down
2 changes: 2 additions & 0 deletions newsroom/types/wire.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Annotated
from datetime import datetime

from pydantic import Field

Expand All @@ -14,6 +15,7 @@ class PublishedProduct:

class WireItem(ContentAPIItem):
products: Annotated[list[PublishedProduct], Field(default_factory=list)]
publish_schedule: datetime | None = None

bookmarks: Annotated[list[fields.ObjectId], fields.keyword_mapping(), Field(default_factory=list)]
downloads: Annotated[list[fields.ObjectId], fields.keyword_mapping(), Field(default_factory=list)]
Expand Down
8 changes: 7 additions & 1 deletion newsroom/wire/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ def apply_item_type_filter(request: NewshubSearchRequest) -> None:
"""Applies item type filter(s) based on the request args"""

request.search.query.must_not.append({"term": {"type": "composite"}})
if not request.args.ignore_latest:
try:
ignore_latest = request.args.ignore_latest
except AttributeError:
# This can happen when ``BaseSearchRequestArgs`` is used for search args
ignore_latest = False

if not ignore_latest:
request.search.query.must_not.append({"constant_score": {"filter": {"exists": {"field": "nextversion"}}}})


Expand Down
Loading

0 comments on commit 1b694a7

Please sign in to comment.