Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring inbox processing to smaller tasks #647

Merged
merged 15 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions activities/models/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ def by_ap(cls, data, create=False, update=False, fetch_author=False) -> "Post":
except IntegrityError:
# despite previous checks, a parallel thread managed
# to create the same object already
post = cls.by_object_uri(object_uri=data["id"])
raise TryAgainLater()
else:
raise cls.DoesNotExist(f"No post with ID {data['id']}", data)
if update or created:
Expand Down Expand Up @@ -1014,7 +1014,7 @@ def by_object_uri(cls, object_uri, fetch=False) -> "Post":
response = SystemActor().signed_request(
method="get", uri=object_uri
)
except (httpx.HTTPError, ssl.SSLCertVerificationError):
except (httpx.HTTPError, ssl.SSLCertVerificationError, ValueError):
raise cls.DoesNotExist(f"Could not fetch {object_uri}")
if response.status_code in [404, 410]:
raise cls.DoesNotExist(f"No post at {object_uri}")
Expand Down Expand Up @@ -1072,7 +1072,7 @@ def handle_create_ap(cls, data):
if data["actor"] != data["object"]["attributedTo"]:
raise ValueError("Create actor does not match its Post object", data)
# Create it, stator will fan it out locally
cls.by_ap(data["object"], create=True, update=True)
cls.by_ap(data["object"], create=True, update=True, fetch_author=True)

@classmethod
def handle_update_ap(cls, data):
Expand Down
4 changes: 3 additions & 1 deletion core/signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ def verify_signature(cls, document: dict, public_key: str) -> None:
Verifies a document
"""
try:
# causing side effects to the original document is bad form
document = document.copy()
# Strip out the signature from the incoming document
signature = document.pop("signature")
# Create the options document
Expand Down Expand Up @@ -322,7 +324,7 @@ def verify_signature(cls, document: dict, public_key: str) -> None:
hashes.SHA256(),
)
except InvalidSignature:
raise VerificationError("Signature mismatch")
raise VerificationError("LDSignature mismatch")

@classmethod
def create_signature(
Expand Down
8 changes: 6 additions & 2 deletions tests/core/test_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def test_sign_ld(keypair):
"""
# Create the signature
document = {
"@context": ["https://www.w3.org/ns/activitystreams"],
"id": "https://example.com/test-create",
"type": "Create",
"actor": "https://example.com/test-actor",
Expand All @@ -38,20 +39,23 @@ def test_verifying_ld(keypair):
Tests verifying JSON-LD signatures from a known-good document
"""
document = {
"@context": ["https://www.w3.org/ns/activitystreams"],
"id": "https://example.com/test-create",
"type": "Create",
"actor": "https://example.com/test-actor",
"object": {"id": "https://example.com/test-object", "type": "Note"},
"signature": {
"@context": "https://w3id.org/identity/v1",
"creator": "https://example.com/test-actor#test-key",
"created": "2022-11-12T21:41:47Z",
"signatureValue": "nTHfkHqG4hegfnjpHucXtXDLDaIKi2Duk+NeCzqTtkjf4NneXsofbZY2tGew4uAooEe1UeM23PIyjWYnR16KwcD4YY8nMj8L3xY2czwQPScMM9n+KhSHzkWfX+iI4FWKbjpPI8M53EtTRJU+1qEjjmGUx03Ip0vfvT5821etIgvY4wLNhg3y7R8fevnNux+BeytcEV6gM4awJJ6RK0xrWGLyTgDNon5V5aNUjwcV/UVPy9UAQi1KYWtA74/F0Y4oPzL5CTudPpyiViyVHZQaal4r+ExzgSvGztqKxQeT1ya6gLXxbm1YQ+8UiGVSS8zoGhMFDEZWVsRPv7e0jm5wfA==",
"created": "2023-10-25T08:08:47.702Z",
"signatureValue": "ajg4ukZzCtBWjflO1u6MlTc4tBVO6MsqzBr/L+kO5VI2ucutFaUdDa/Kx4W12ZCm9oYvTyMQMnoeELx5BifslRWEeMmo1wWMPXmg2/BMKgm8Spt+Zanq68uTlYGyKvuw1Q0FyNq84N2PbRZRXu2Yhlj2KnAVTRtKrsfEiCg3yNfVQ7lbUpDtlXvXLAq2yBN8H/BnZDoynjaDlafFW9Noq8025q1K/lz5jNzBEL22CSrKsD2qYWq1TK3s3h6SJ+j3J+5s0Ni3F/TH7W/5VeGBpzx4z6MSjmn7aHAS3JNCnAWDW9Rf6yKLg2y5htj6FpexiGcoEjO3VqjLoIP4f/115Q==",
"type": "RsaSignature2017",
},
}
# Ensure it verifies with correct data
LDSignature.verify_signature(document, keypair["public_key"])
# signature should remain in document if it was valid
assert "signature" in document
# Mutate it slightly and ensure it does not verify
with pytest.raises(VerificationError):
document["actor"] = "https://example.com/evil-actor"
Expand Down
1 change: 1 addition & 0 deletions tests/users/models/test_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def test_fetch_actor(httpx_mock, config_system):
identity.featured_collection_uri
== "https://example.com/test-actor/collections/featured/"
)
identity.fetch_pinned_post_uris(identity.featured_collection_uri)
assert identity.icon_uri == "https://example.com/icon.jpg"
assert identity.image_uri == "https://example.com/image.jpg"
assert identity.summary == "<p>A test user</p>"
Expand Down
40 changes: 23 additions & 17 deletions users/models/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from stator.exceptions import TryAgainLater
from stator.models import State, StateField, StateGraph, StatorModel
from users.models.domain import Domain
from users.models.inbox_message import InboxMessage
from users.models.system_actor import SystemActor


Expand Down Expand Up @@ -743,7 +744,7 @@ def fetch_webfinger(cls, handle: str) -> tuple[str | None, str | None]:
except (httpx.HTTPError, ssl.SSLCertVerificationError) as ex:
response = getattr(ex, "response", None)
if isinstance(ex, httpx.TimeoutException) or (
response and response.status_code in [408, 504]
response and response.status_code in [408, 429, 504]
):
raise TryAgainLater() from ex
elif (
Expand Down Expand Up @@ -800,7 +801,7 @@ def fetch_pinned_post_uris(cls, uri: str) -> list[str]:
except (httpx.HTTPError, ssl.SSLCertVerificationError) as ex:
response = getattr(ex, "response", None)
if isinstance(ex, httpx.TimeoutException) or (
response and response.status_code in [408, 504]
response and response.status_code in [408, 429, 504]
):
raise TryAgainLater() from ex
elif (
Expand Down Expand Up @@ -847,7 +848,6 @@ def fetch_actor(self) -> bool:
webfinger if it's available.
"""
from activities.models import Emoji
from users.services import IdentityService

if self.local:
raise ValueError("Cannot fetch local identities")
Expand All @@ -866,30 +866,24 @@ def fetch_actor(self) -> bool:
return False
status_code = response.status_code
if status_code >= 400:
if status_code in [408, 504]:
if status_code in [408, 429, 504]:
raise TryAgainLater()
if status_code == 410 and self.pk:
# Their account got deleted, so let's do the same.
Identity.objects.filter(pk=self.pk).delete()
if status_code < 500 and status_code not in [401, 403, 404, 406, 410]:
logging.info(
f"Client error fetching actor at {self.actor_uri}: {status_code}",
extra={
"identity": self.pk,
"domain": self.domain_id,
"content": response.content,
},
"Client error fetching actor: %d %s", status_code, self.actor_uri
)
return False
try:
document = canonicalise(response.json(), include_security=True)
except ValueError:
# servers with empty or invalid responses are inevitable
logging.info(
f"Invalid response fetching actor at {self.actor_uri}",
"Invalid response fetching actor %s",
self.actor_uri,
extra={
"identity": self.pk,
"domain": self.domain_id,
"content": response.content,
},
)
Expand Down Expand Up @@ -948,7 +942,16 @@ def fetch_actor(self) -> bool:
self.domain = Domain.get_remote_domain(webfinger_domain)
except TryAgainLater:
# continue with original domain when webfinger times out
logging.info("WebFinger timed out: %s", self.actor_uri)
pass
except ValueError as exc:
logging.info(
"Can't parse WebFinger: %s %s",
exc.args[0],
self.actor_uri,
exc_info=exc,
)
return False
# Emojis (we need the domain so we do them here)
for tag in get_list(document, "tag"):
if tag["type"].lower() in ["toot:emoji", "emoji"]:
Expand All @@ -973,11 +976,14 @@ def fetch_actor(self) -> bool:
with transaction.atomic():
self.save()

# Fetch pinned posts after identity has been fetched and saved
# Fetch pinned posts in a followup task
if self.featured_collection_uri:
featured = self.fetch_pinned_post_uris(self.featured_collection_uri)
service = IdentityService(self)
service.sync_pins(featured)
InboxMessage.create_internal(
{
"type": "SyncPins",
"identity": self.pk,
}
)

return True

Expand Down
4 changes: 4 additions & 0 deletions users/models/inbox_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def handle_received(cls, instance: "InboxMessage"):
IdentityService.handle_internal_add_follow(
instance.message["object"]
)
case "syncpins":
IdentityService.handle_internal_sync_pins(
instance.message["object"]
)
case unknown:
return cls.errored
case unknown:
Expand Down
28 changes: 26 additions & 2 deletions users/services/identity.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging

from django.core.exceptions import MultipleObjectsReturned
from django.db import models, transaction
from django.template.defaultfilters import linebreaks_filter

Expand Down Expand Up @@ -222,12 +225,14 @@ def sync_pins(self, object_uris):
post=post,
state__in=PostInteractionStates.group_active(),
)
except MultipleObjectsReturned as exc:
logging.exception("%s on %s", exc, object_uri)
pass
except Post.DoesNotExist:
# ignore 404s...
pass
except TryAgainLater:
# when fetching a post -> author -> post we can
# get into a state. Ignore this round.
# don't wait for it now, it'll be synced on next refresh
pass
for removed in PostInteraction.objects.filter(
type=PostInteraction.Types.pin,
Expand Down Expand Up @@ -319,3 +324,22 @@ def handle_internal_add_follow(cls, payload):
raise ValueError(f"Cannot find identity to follow: {target_identity}")
# Follow!
self.follow(target_identity=target_identity, boosts=payload.get("boosts", True))

@classmethod
def handle_internal_sync_pins(cls, payload):
"""
Handles an inbox message saying we need to sync featured posts

Message format:
{
"type": "SyncPins",
"identity": "90310938129083",
}
"""
# Retrieve ourselves
actor = Identity.objects.get(pk=payload["identity"])
self = cls(actor)
# Get the remote end (may need a fetch)
if actor.featured_collection_uri:
featured = actor.fetch_pinned_post_uris(actor.featured_collection_uri)
self.sync_pins(featured)
104 changes: 73 additions & 31 deletions users/views/activitypub.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from urllib.parse import urldefrag, urlparse

from django.conf import settings
from django.http import Http404, HttpResponse, HttpResponseBadRequest, JsonResponse
Expand All @@ -20,9 +21,9 @@
VerificationFormatError,
)
from core.views import StaticContentView
from stator.exceptions import TryAgainLater
from takahe import __version__
from users.models import Identity, InboxMessage, SystemActor
from users.models.domain import Domain
from users.shortcuts import by_handle_or_404


Expand Down Expand Up @@ -153,50 +154,91 @@ def post(self, request, handle=None):
# We don't have an Identity record for the user. No-op
return HttpResponse(status=202)

if not identity.public_key:
# See if we can fetch it right now
try:
identity.fetch_actor()
except TryAgainLater:
logging.warning(
f"Inbox error: timed out fetching actor {document['actor']}"
)
return HttpResponse(status=504)

if not identity.public_key:
logging.warning(f"Inbox error: cannot fetch actor {document['actor']}")
return HttpResponseBadRequest("Cannot retrieve actor")

# See if it's from a blocked user or domain
if identity.blocked or identity.domain.recursively_blocked():
# See if it's from a blocked user or domain - without calling
# fetch_actor, which would fetch data from potentially bad actor
domain = identity.domain
if not domain:
actor_url_parts = urlparse(document["actor"])
domain = Domain.get_remote_domain(actor_url_parts.hostname)
if identity.blocked or domain.recursively_blocked():
# I love to lie! Throw it away!
logging.warning(f"Inbox: Discarded message from {identity.actor_uri}")
logging.info(
"Inbox: Discarded message from blocked %s %s",
"domain" if domain.recursively_blocked() else "user",
identity.actor_uri,
)
return HttpResponse(status=202)

# If there's a "signature" payload, verify against that
if "signature" in document:
# authenticate HTTP signature first, if one is present and the actor
# is already known to us. An invalid signature is an error and message
# should be discarded. NOTE: for previously unknown actors, we
# don't have their public key yet!
if "signature" in request:
try:
LDSignature.verify_signature(document, identity.public_key)
if identity.public_key:
HttpSignature.verify_request(
request,
identity.public_key,
)
logging.debug(
"Inbox: %s from %s has good HTTP signature",
document["type"],
identity,
)
else:
logging.info(
"Inbox: New actor, no key available: %s",
document["actor"],
)
except VerificationFormatError as e:
logging.warning(f"Inbox error: Bad LD signature format: {e.args[0]}")
logging.warning("Inbox error: Bad HTTP signature format: %s", e.args[0])
return HttpResponseBadRequest(e.args[0])
except VerificationError:
logging.warning("Inbox error: Bad LD signature")
logging.warning("Inbox error: Bad HTTP signature from %s", identity)
return HttpResponseUnauthorized("Bad signature")

# Otherwise, verify against the header (assuming it's the same actor)
else:
# Mastodon advices not implementing LD Signatures, but
# they're widely deployed today. Validate it if one exists.
# https://docs.joinmastodon.org/spec/security/#ld
if "signature" in document:
try:
HttpSignature.verify_request(
request,
identity.public_key,
# signatures are identified by the signature block
creator = urldefrag(document["signature"]["creator"]).url
creator_identity = Identity.by_actor_uri(
creator, create=True, transient=True
)
if not creator_identity.public_key:
logging.info("Inbox: New actor, no key available: %s", creator)
# if we can't verify it, we don't keep it
document.pop("signature")
else:
LDSignature.verify_signature(document, creator_identity.public_key)
logging.debug(
"Inbox: %s from %s has good LD signature",
document["type"],
creator_identity,
)
except VerificationFormatError as e:
logging.warning(f"Inbox error: Bad HTTP signature format: {e.args[0]}")
logging.warning("Inbox error: Bad LD signature format: %s", e.args[0])
return HttpResponseBadRequest(e.args[0])
except VerificationError:
logging.warning("Inbox error: Bad HTTP signature")
return HttpResponseUnauthorized("Bad signature")
# An invalid LD Signature might also indicate nothing but
# a syntactical difference between implementations.
# Strip it out if we can't verify it.
if "signature" in document:
document.pop("signature")
logging.info(
"Inbox: Stripping invalid LD signature from %s %s",
creator_identity,
document["id"],
)

if not ("signature" in request or "signature" in document):
logging.debug(
"Inbox: %s from %s is unauthenticated. That's OK.",
document["type"],
identity,
)

# Don't allow injection of internal messages
if document["type"].startswith("__"):
Expand Down
Loading