Skip to content

Commit

Permalink
Add more log and optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
baegjae committed Feb 16, 2024
1 parent d2f626d commit ec4aea6
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Connection request handler."""
import logging

from .....connections.models.conn_record import ConnRecord
from .....messaging.base_handler import BaseHandler, BaseResponder, RequestContext
Expand All @@ -7,6 +8,8 @@
from ..messages.connection_request import ConnectionRequest
from ..messages.problem_report import ConnectionProblemReport

LOGGER = logging.getLogger(__name__)


class ConnectionRequestHandler(BaseHandler):
"""Handler class for connection requests."""
Expand All @@ -19,6 +22,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
context: Request context
responder: Responder callback
"""
LOGGER.info("TEMP_LOG_0629 " + "start handle")

self._logger.debug(f"ConnectionRequestHandler called with context {context}")
assert isinstance(context.message, ConnectionRequest)
Expand All @@ -34,13 +38,17 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
)
mediation_id = mediation_metadata.get(MediationManager.METADATA_ID)

LOGGER.info("TEMP_LOG_0629 " + "after connection_record.metadata_get")

try:
connection = await mgr.receive_request(
context.message,
context.message_receipt,
mediation_id=mediation_id,
)

LOGGER.info("TEMP_LOG_0629 " + "after mgr.receive_request")

if connection.accept == ConnRecord.ACCEPT_AUTO:
response = await mgr.create_response(
connection, mediation_id=mediation_id
Expand All @@ -50,6 +58,9 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
)
else:
self._logger.debug("Connection request will await acceptance")

LOGGER.info("TEMP_LOG_0629 " + "after responder.send_reply")

except ConnectionManagerError as e:
self._logger.exception("Error receiving connection request")
if e.error_code:
Expand Down
60 changes: 40 additions & 20 deletions aries_cloudagent/protocols/connections/v1_0/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ async def receive_request(
The new or updated `ConnRecord` instance
"""

LOGGER.info("TEMP_LOG_0629 " + "start receive_request")
ConnRecord.log_state(
"Receiving connection request",
{"request": request},
Expand Down Expand Up @@ -539,6 +541,8 @@ async def receive_request(
"a prior connection request may have updated the connection state"
)

LOGGER.info("TEMP_LOG_0629 " + "after get_local_did or retrieve_by_invitation_key")

invitation = None
if connection:
async with self.profile.session() as session:
Expand All @@ -550,6 +554,8 @@ async def receive_request(
settings=self.profile.settings,
)

LOGGER.info("TEMP_LOG_0629 " + "after retrieve_invitation")

if connection.is_multiuse_invitation:
async with self.profile.session() as session:
wallet = session.inject(BaseWallet)
Expand Down Expand Up @@ -594,6 +600,9 @@ async def receive_request(
keylist_updates = await mediation_mgr.remove_key(
connection_key, keylist_updates
)

LOGGER.info("TEMP_LOG_0629 " + "after is_multiuse_invitation")

conn_did_doc = request.connection.did_doc
if not conn_did_doc:
raise ConnectionManagerError(
Expand All @@ -606,6 +615,8 @@ async def receive_request(
)
await self.store_did_document(conn_did_doc)

LOGGER.info("TEMP_LOG_0629 " + "after store_did_document")

if connection:
connection.their_label = request.label
connection.their_did = request.connection.did
Expand All @@ -616,25 +627,28 @@ async def receive_request(
)
elif not self.profile.settings.get("public_invites"):
raise ConnectionManagerError("Public invitations are not enabled")
else: # request from public did
# SKT: reuse connection if exist
async with self.profile.session() as session:
tag_filter = {"their_did": request.connection.did}
conn_records = await ConnRecord.query(
session,
tag_filter=tag_filter,
)
if conn_records:
if len(conn_records) > 1:
self._logger.warning("delete duplicate connections for their_did %s", request.connection.did)
conn_del_records = conn_records[1:] # delete records except first
for conn_record in conn_del_records:
await conn_record.delete_record(session)
connection = conn_records[0] # use first record
connection.state = ConnRecord.State.REQUEST.rfc160
connection.invitation_msg_id = (request._thread and request._thread.pthid) or None
connection.their_label = request.label
await connection.save(session, reason="connection is reused")
# SKT: (temporal) do not reuse connection to avoid slow query
# else: # request from public did
# # SKT: reuse connection if exist
# async with self.profile.session() as session:
# tag_filter = {"their_did": request.connection.did}
# conn_records = await ConnRecord.query(
# session,
# tag_filter=tag_filter,
# )
# if conn_records:
# if len(conn_records) > 1:
# self._logger.warning("delete duplicate connections for their_did %s", request.connection.did)
# conn_del_records = conn_records[1:] # delete records except first
# for conn_record in conn_del_records:
# await conn_record.delete_record(session)
# connection = conn_records[0] # use first record
# connection.state = ConnRecord.State.REQUEST.rfc160
# connection.invitation_msg_id = (request._thread and request._thread.pthid) or None
# connection.their_label = request.label
# await connection.save(session, reason="connection is reused")

LOGGER.info("TEMP_LOG_0629 " + "after if connection")

if not connection: # request from public did and need new connection
async with self.profile.session() as session:
Expand Down Expand Up @@ -680,6 +694,8 @@ async def receive_request(
session, reason="Received connection request from public DID"
)

LOGGER.info("TEMP_LOG_0629 " + "after if not connection")

async with self.profile.session() as session:
# Attach the connection request so it can be found and responded to
await connection.attach_request(session, request)
Expand All @@ -696,6 +712,8 @@ async def receive_request(
oob_processor = self.profile.inject(OobMessageProcessor)
await oob_processor.clean_finished_oob_record(self.profile, request)

LOGGER.info("TEMP_LOG_0629 " + "end receive_request")

return connection

async def create_response(
Expand Down Expand Up @@ -1160,7 +1178,9 @@ async def resolve_inbound_connection(self, receipt: MessageReceipt) -> ConnRecor
if "connections/1.0/request" in receipt.raw_message:
LOGGER.info("TEMP_LOG_0629 " + "start resolve_inbound_connection")

if receipt.sender_verkey:
# SKT: (temporal) do not reuse connection to avoid slow query
# if receipt.sender_verkey:
if receipt.sender_verkey and "connections/1.0/request" not in receipt.raw_message:
try:
receipt.sender_did = await self.find_did_for_key(receipt.sender_verkey)
except StorageNotFoundError:
Expand Down

0 comments on commit ec4aea6

Please sign in to comment.