diff --git a/CHANGELOG.md b/CHANGELOG.md index 636792cb91..b01878ee78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,6 @@ The main work (all changes without a GitHub username in brackets in the below li ## __WORK IN PROGRESS__ - @matter/node - - Enhancement: Matter protocol initialization now runs independently of and after behavior initialization, giving behaviors more flexibility in participating in protocol setup - Enhancement: Each new PASE session now automatically arms the failsafe timer for 60s as required by specs - Fix: Fixes withBehaviors() method on endpoints @@ -22,6 +21,7 @@ The main work (all changes without a GitHub username in brackets in the below li - @matter/protocol - Feature: Reworks Event server handling and optionally allow Non-Volatile event storage (currently mainly used in tests) - Enhancement: Adds a too-fast-resubmission guard for Unicast MDNS messages + - Enhancement: Optimized Logging for messages in various places - Fix: Corrects some Batch invoke checks and logic - Fix: Fixes MDNS discovery duration for retransmission cases to be 5s - Fix: Processes all TXT/SRV records in MDNS messages and optimized the processing diff --git a/packages/general/src/log/Diagnostic.ts b/packages/general/src/log/Diagnostic.ts index 22089631e7..e896e36cb8 100644 --- a/packages/general/src/log/Diagnostic.ts +++ b/packages/general/src/log/Diagnostic.ts @@ -60,6 +60,11 @@ export namespace Diagnostic { */ Weak = "weak", + /** + * A keylike diagnostic to list flags. The key gets suppressed and the value is rendered as a key. + */ + Flag = "flag", + /** * An error message diagnostic. */ @@ -179,6 +184,13 @@ export namespace Diagnostic { return Diagnostic(Diagnostic.Presentation.Weak, value); } + /** + * Create a value presented as key + */ + export function flag(value: string) { + return Diagnostic(Diagnostic.Presentation.Flag, value); + } + /** * Create a value identifying the source of a diagnostic event. */ @@ -237,11 +249,19 @@ export namespace Diagnostic { /** * Create a K/V map that presents with formatted keys. */ - export function dict(entries: object): Record & Diagnostic { - return { + export function dict(entries: object, suppressUndefinedValues = true): Record & Diagnostic { + const result: any = { ...entries, [presentation]: Diagnostic.Presentation.Dictionary, }; + if (suppressUndefinedValues) { + for (const key in result) { + if (result[key] === undefined) { + delete result[key]; + } + } + } + return result; } /** @@ -353,6 +373,23 @@ export namespace Diagnostic { export function hex(value: number | bigint) { return `0x${value.toString(16)}`; } + + /** + * Convert an object with keys to a flag list listing the truthy keys in a keylike/flag presentation. + */ + export function asFlags(flags: Record) { + return Diagnostic.flag(Diagnostic.toFlagString(flags)); + } + + /** + * Convert an object with keys to a space-separated list of truthy keys. + */ + export function toFlagString(flags: Record) { + return Object.entries(flags) + .filter(([, value]) => !!value) + .map(([key]) => key) + .join(" "); + } } function formatError(error: any, options: { messagePrefix?: string; parentStack?: string[] } = {}) { diff --git a/packages/general/src/log/LogFormat.ts b/packages/general/src/log/LogFormat.ts index 1f449b8954..40ed612417 100644 --- a/packages/general/src/log/LogFormat.ts +++ b/packages/general/src/log/LogFormat.ts @@ -8,6 +8,7 @@ import { ImplementationError, InternalError, MatterError } from "../MatterError. import { Bytes } from "../util/Bytes.js"; import { Lifecycle } from "../util/Lifecycle.js"; import { serialize } from "../util/String.js"; +import { isObject } from "../util/Type.js"; import { Diagnostic } from "./Diagnostic.js"; import { LogLevel } from "./LogLevel.js"; @@ -67,6 +68,7 @@ interface Formatter { indent(producer: DiagnosticProducer): string; break(): string; key(text: string): string; + keylike(text: string): string; value(producer: DiagnosticProducer): string; strong(producer: DiagnosticProducer): string; weak(producer: DiagnosticProducer): string; @@ -133,6 +135,7 @@ function formatPlain(diagnostic: unknown, indents = 0) { } ${message.facility} ${message.prefix}${formattedValues}`; }, key: text => creator.text(`${text}: `), + keylike: text => creator.text(`${text}`), value: producer => creator.text(producer()), strong: producer => creator.text(`*${producer()}*`), weak: producer => creator.text(producer()), @@ -270,6 +273,8 @@ function formatAnsi(diagnostic: unknown, indents = 0) { key: text => creator.text(style("key", `${text}: `)), + keylike: text => creator.text(style("key", `${text}`)), + value: producer => { styles.push("value"); const result = producer(); @@ -433,6 +438,7 @@ function formatHtml(diagnostic: unknown) { break: () => "
", indent: producer => htmlSpan("indent", producer()), key: text => htmlSpan("key", `${escape(text)}:`) + " ", + keylike: text => htmlSpan("key", `${escape(text)}`), value: producer => htmlSpan("value", producer()), strong: producer => `${producer()}`, weak: producer => htmlSpan("weak", producer()), @@ -523,8 +529,19 @@ function renderDictionary(value: object, formatter: Formatter) { if (parts.length) { parts.push(" "); } - parts.push(formatter.key(k)); - parts.push(formatter.value(() => renderDiagnostic(v, formatter))); + const suppressKey = isObject(v) && (v as Diagnostic)[Diagnostic.presentation] === Diagnostic.Presentation.Flag; + if (!suppressKey) { + parts.push(formatter.key(k)); + } + const formattedValue = formatter.value(() => renderDiagnostic(v, formatter)); + if (!suppressKey || formattedValue.length) { + parts.push(formattedValue); + } else { + // if flag but the value is empty we need to remove the last space if added above + if (parts.length && parts[parts.length - 1] === " ") { + parts.pop(); + } + } } return parts.join(""); @@ -598,6 +615,9 @@ function renderDiagnostic(value: unknown, formatter: Formatter): string { case Diagnostic.Presentation.Deleted: return formatter.deleted(() => renderDiagnostic(value, formatter)); + case Diagnostic.Presentation.Flag: + return (value as string).length ? formatter.keylike(value as string) : ""; + case Diagnostic.Presentation.Error: return formatter.error(() => renderDiagnostic(value, formatter)); diff --git a/packages/nodejs/test/IntegrationTest.ts b/packages/nodejs/test/IntegrationTest.ts index 1370a6d138..a39cc7e9c4 100644 --- a/packages/nodejs/test/IntegrationTest.ts +++ b/packages/nodejs/test/IntegrationTest.ts @@ -413,7 +413,7 @@ describe("Integration Test", () => { { timedRequestTimeoutMs: 1 }, ); await assert.rejects(async () => await promise, { - message: "(148) Received error status: 148", // Timeout expired + message: "(148) Received error status: 148 (InvokeResponse)", // Timeout expired }); }); @@ -431,7 +431,7 @@ describe("Integration Test", () => { ); const promise = onoffCluster.toggle(undefined, { timedRequestTimeoutMs: 1 }); await assert.rejects(async () => await promise, { - message: "(148) Received error status: 148", // Timeout expired + message: "(148) Received error status: 148 (InvokeResponse)", // Timeout expired }); }); diff --git a/packages/protocol/src/codec/MessageCodec.ts b/packages/protocol/src/codec/MessageCodec.ts index 4b5ead176b..38071d283d 100644 --- a/packages/protocol/src/codec/MessageCodec.ts +++ b/packages/protocol/src/codec/MessageCodec.ts @@ -5,7 +5,9 @@ */ import { Bytes, DataReader, DataWriter, Diagnostic, Endian, NotImplementedError, UnexpectedDataError } from "#general"; -import { GroupId, NodeId } from "#types"; +import { ExchangeLogContext } from "#protocol/index.js"; +import { GroupId, INTERACTION_PROTOCOL_ID, NodeId, SECURE_CHANNEL_PROTOCOL_ID, SecureMessageType } from "#types"; +import { MessageType } from "../interaction/InteractionMessenger.js"; export interface PacketHeader { sessionId: number; @@ -84,6 +86,22 @@ const enum SecurityFlag { HasMessageExtension = 0b00100000, } +function mapProtocolAndMessageType(protocolId: number, messageType: number): { type: string; for?: string } { + const msgTypeHex = Diagnostic.hex(messageType); + const type = `${Diagnostic.hex(protocolId)}/${msgTypeHex}`; + switch (protocolId) { + case SECURE_CHANNEL_PROTOCOL_ID: { + return { type, for: `SC/${SecureMessageType[messageType] ?? msgTypeHex}` }; + } + case INTERACTION_PROTOCOL_ID: { + return { type, for: `I/${MessageType[messageType] ?? msgTypeHex}` }; + } + // TODO Add BDX and UDC once we support it + default: + return { type }; + } +} + export class MessageCodec { static decodePacket(data: Uint8Array): DecodedPacket { const reader = new DataReader(data, Endian.Little); @@ -238,16 +256,30 @@ export class MessageCodec { payloadHeader: { exchangeId, messageType, protocolId, ackedMessageId, requiresAck }, payload, }: Message, - isDuplicate = false, + logContext?: ExchangeLogContext, ) { - return Diagnostic.dict({ - id: `${sessionId}/${exchangeId}/${messageId}`, - type: `${protocolId}/${messageType}`, - acked: ackedMessageId, - reqAck: requiresAck, - duplicate: isDuplicate, - payload: payload, - }); + const duplicate = !!logContext?.duplicate; + const forInfo = logContext?.for; + const log = { ...logContext }; + delete log.duplicate; + delete log.for; + const { type, for: forType } = mapProtocolAndMessageType(protocolId, messageType); + return Diagnostic.dict( + { + for: forInfo ?? forType, + ...log, + msgId: `${sessionId}/${exchangeId}/${messageId}`, + type, + acked: ackedMessageId, + msgFlags: Diagnostic.asFlags({ + reqAck: requiresAck, + dup: duplicate, + }), + size: payload.length ? payload.length : undefined, + payload: payload.length ? payload : undefined, + }, + true, + ); } private static encodePayloadHeader({ diff --git a/packages/protocol/src/interaction/InteractionClient.ts b/packages/protocol/src/interaction/InteractionClient.ts index 77439bbd61..13e8e06835 100644 --- a/packages/protocol/src/interaction/InteractionClient.ts +++ b/packages/protocol/src/interaction/InteractionClient.ts @@ -621,7 +621,6 @@ export class InteractionClient { updateReceived?.(); if (!Array.isArray(dataReport.attributeReports) || !dataReport.attributeReports.length) { - logger.debug(`Subscription result empty for subscription ID ${dataReport.subscriptionId}`); return; } @@ -715,7 +714,6 @@ export class InteractionClient { updateReceived?.(); if (!Array.isArray(dataReport.eventReports) || !dataReport.eventReports.length) { - logger.debug(`Subscription result empty for subscription ID ${dataReport.subscriptionId}`); return; } @@ -889,7 +887,6 @@ export class InteractionClient { (!Array.isArray(dataReport.attributeReports) || !dataReport.attributeReports.length) && (!Array.isArray(dataReport.eventReports) || !dataReport.eventReports.length) ) { - logger.debug(`Subscription result empty for subscription ID ${dataReport.subscriptionId}`); return; } const { attributeReports, eventReports } = dataReport; diff --git a/packages/protocol/src/interaction/InteractionMessenger.ts b/packages/protocol/src/interaction/InteractionMessenger.ts index b0494aee99..629aa552e6 100644 --- a/packages/protocol/src/interaction/InteractionMessenger.ts +++ b/packages/protocol/src/interaction/InteractionMessenger.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { Logger, MatterFlowError, NoResponseTimeoutError, UnexpectedDataError } from "#general"; +import { Diagnostic, Logger, MatterFlowError, NoResponseTimeoutError, UnexpectedDataError } from "#general"; import { Specification } from "#model"; import { Status, @@ -85,22 +85,64 @@ class InteractionMessenger { return this.send( MessageType.StatusResponse, TlvStatusResponse.encode({ status, interactionModelRevision: Specification.INTERACTION_MODEL_REVISION }), - options, + { + ...options, + logContext: { + for: options?.logContext?.for ? `I/Status-${options?.logContext?.for}` : undefined, + status: `${StatusCode[status] ?? "unknown"}(${Diagnostic.hex(status)})`, + ...options?.logContext, + }, + }, ); } - async waitForSuccess(expectedProcessingTimeMs?: number) { + async waitForSuccess( + expectedMessageInfo: string, + options?: { expectedProcessingTimeMs?: number; timeoutMs?: number }, + ) { // If the status is not Success, this would throw an Error. - await this.nextMessage(MessageType.StatusResponse, expectedProcessingTimeMs); + await this.nextMessage(MessageType.StatusResponse, options, `Success-${expectedMessageInfo}`); + } + + async nextMessage( + expectedMessageType: number, + options?: { + expectedProcessingTimeMs?: number; + timeoutMs?: number; + }, + expectedMessageInfo?: string, + ) { + return this.#nextMessage(expectedMessageType, options, expectedMessageInfo); + } + + async anyNextMessage( + expectedMessageInfo: string, + options?: { + expectedProcessingTimeMs?: number; + timeoutMs?: number; + }, + ) { + return this.#nextMessage(undefined, options, expectedMessageInfo); } - async nextMessage(expectedMessageType?: number, expectedProcessingTimeMs?: number) { - const message = await this.exchange.nextMessage(expectedProcessingTimeMs); + async #nextMessage( + expectedMessageType?: number, + options?: { + expectedProcessingTimeMs?: number; + timeoutMs?: number; + }, + expectedMessageInfo?: string, + ) { + const { expectedProcessingTimeMs, timeoutMs } = options ?? {}; + const message = await this.exchange.nextMessage({ expectedProcessingTimeMs, timeoutMs }); const messageType = message.payloadHeader.messageType; - this.throwIfErrorStatusMessage(message); + if (expectedMessageType !== undefined && expectedMessageInfo === undefined) { + expectedMessageInfo = MessageType[expectedMessageType]; + } + this.throwIfErrorStatusMessage(message, expectedMessageInfo); if (expectedMessageType !== undefined && messageType !== expectedMessageType) { throw new UnexpectedDataError( - `Received unexpected message type: ${messageType}, expected: ${expectedMessageType}`, + `Received unexpected message for ${expectedMessageInfo} type: ${messageType}, expected: ${expectedMessageType}`, ); } return message; @@ -110,7 +152,7 @@ class InteractionMessenger { await this.exchange.close(); } - protected throwIfErrorStatusMessage(message: Message) { + protected throwIfErrorStatusMessage(message: Message, logHint?: string) { const { payloadHeader: { messageType }, payload, @@ -118,7 +160,8 @@ class InteractionMessenger { if (messageType !== MessageType.StatusResponse) return; const { status } = TlvStatusResponse.decode(payload); - if (status !== StatusCode.Success) throw new StatusResponseError(`Received error status: ${status}`, status); + if (status !== StatusCode.Success) + throw new StatusResponseError(`Received error status: ${status}${logHint ? ` (${logHint})` : ""}`, status); } getExchangeChannelName() { @@ -193,7 +236,9 @@ export class InteractionServerMessenger extends InteractionMessenger { case MessageType.TimedRequest: { const timedRequest = TlvTimedRequest.decode(message.payload); recipient.handleTimedRequest(this.exchange, timedRequest, message); - await this.sendStatus(StatusCode.Success); + await this.sendStatus(StatusCode.Success, { + logContext: { for: "TimedRequest" }, + }); continueExchange = true; break; } @@ -226,7 +271,7 @@ export class InteractionServerMessenger extends InteractionMessenger { * Handle DataReportPayload with the content of a DataReport to send, split them into multiple DataReport * messages and send them out based on the size. */ - async sendDataReport(dataReportPayload: DataReportPayload, forFabricFilteredRead: boolean) { + async sendDataReport(dataReportPayload: DataReportPayload, forFabricFilteredRead: boolean, waitForAck = true) { const { subscriptionId, attributeReportsPayload, @@ -255,7 +300,7 @@ export class InteractionServerMessenger extends InteractionMessenger { let firstAttributeAddedToReportMessage = false; let firstEventAddedToReportMessage = false; const sendAndResetReport = async () => { - await this.sendDataReportMessage(dataReport); + await this.sendDataReportMessage(dataReport, waitForAck); dataReport.attributeReports = undefined; dataReport.eventReports = undefined; messageSize = emptyDataReportBytes.length; @@ -321,10 +366,10 @@ export class InteractionServerMessenger extends InteractionMessenger { } } - await this.sendDataReportMessage(dataReport); + await this.sendDataReportMessage(dataReport, waitForAck); } - async sendDataReportMessage(dataReport: TypeFromSchema) { + async sendDataReportMessage(dataReport: TypeFromSchema, waitForAck = true) { const dataReportToSend = { ...dataReport, suppressResponse: dataReport.moreChunkedMessages ? false : dataReport.suppressResponse, // always false when moreChunkedMessages is true @@ -337,17 +382,24 @@ export class InteractionServerMessenger extends InteractionMessenger { )}`, ); } - logger.debug( - `Sending DataReport chunk with ${dataReportToSend.attributeReports?.length ?? 0} attributes and ${ - dataReportToSend.eventReports?.length ?? 0 - } events: ${encodedMessage.length} bytes (moreChunkedMessages: ${dataReportToSend.moreChunkedMessages ?? false}, suppressResponse: ${dataReportToSend.suppressResponse})`, - ); + + const logContext = { + subId: dataReportToSend.subscriptionId, + interactionFlags: Diagnostic.asFlags({ + suppressResponse: dataReportToSend.suppressResponse ?? false, + moreChunkedMessages: dataReportToSend.moreChunkedMessages ?? false, + }), + attr: dataReportToSend.attributeReports?.length, + ev: dataReportToSend.eventReports?.length, + }; if (dataReportToSend.suppressResponse) { // We do not expect a response other than a Standalone Ack, so if we receive anything else, we throw an error try { await this.exchange.send(MessageType.ReportData, encodedMessage, { expectAckOnly: true, + disableMrpLogic: !waitForAck, + logContext, }); } catch (e) { UnexpectedMessageError.accept(e); @@ -356,15 +408,19 @@ export class InteractionServerMessenger extends InteractionMessenger { this.throwIfErrorStatusMessage(receivedMessage); } } else { - await this.exchange.send(MessageType.ReportData, encodedMessage); - await this.waitForSuccess(); + await this.exchange.send(MessageType.ReportData, encodedMessage, { + disableMrpLogic: !waitForAck, + logContext, + }); + // We wait for a Success Message - when we don't request an Ack only wait 500ms + await this.waitForSuccess("DataReport", { timeoutMs: waitForAck ? undefined : 500 }); } } } export class IncomingInteractionClientMessenger extends InteractionMessenger { - async waitFor(messageType: number, timeoutMs?: number) { - const message = await this.nextMessage(timeoutMs); + async waitFor(expectedMessageInfo: string, messageType: number, timeoutMs?: number) { + const message = await this.anyNextMessage(expectedMessageInfo, { timeoutMs }); const { payloadHeader: { messageType: receivedMessageType }, } = message; @@ -389,11 +445,16 @@ export class IncomingInteractionClientMessenger extends InteractionMessenger { const eventValues: TypeFromSchema[] = []; while (true) { - const dataReportMessage = await this.waitFor(MessageType.ReportData); + const dataReportMessage = await this.waitFor("DataReport", MessageType.ReportData); const report = TlvDataReport.decode(dataReportMessage.payload); if (expectedSubscriptionIds !== undefined) { if (report.subscriptionId === undefined || !expectedSubscriptionIds.includes(report.subscriptionId)) { - await this.sendStatus(StatusCode.InvalidSubscription, { multipleMessageInteraction: true }); + await this.sendStatus(StatusCode.InvalidSubscription, { + multipleMessageInteraction: true, + logContext: { + subId: report.subscriptionId, + }, + }); throw new UnexpectedDataError( report.subscriptionId === undefined ? "Invalid Data report without Subscription ID" @@ -411,9 +472,15 @@ export class IncomingInteractionClientMessenger extends InteractionMessenger { throw new UnexpectedDataError(`Invalid subscription ID ${report.subscriptionId} received`); } - logger.debug( - `Received DataReport chunk with ${report.attributeReports?.length ?? 0} attributes and ${report.eventReports?.length ?? 0} events, suppressResponse: ${report.suppressResponse}, moreChunkedMessages: ${report.moreChunkedMessages}${report.subscriptionId !== undefined ? `, subscriptionId: ${report.subscriptionId}` : ""}`, - ); + const logContext = { + subId: report.subscriptionId, + dataReportFlags: Diagnostic.asFlags({ + suppressResponse: report.suppressResponse, + moreChunkedMessages: report.moreChunkedMessages, + }), + attr: report.attributeReports?.length, + ev: report.eventReports?.length, + }; if (Array.isArray(report.attributeReports) && report.attributeReports.length > 0) { attributeValues.push(...report.attributeReports); @@ -423,11 +490,17 @@ export class IncomingInteractionClientMessenger extends InteractionMessenger { } if (report.moreChunkedMessages) { - await this.sendStatus(StatusCode.Success, { multipleMessageInteraction: true }); + await this.sendStatus(StatusCode.Success, { + multipleMessageInteraction: true, + logContext, + }); } else if (!report.suppressResponse) { // We received the last message and need to send a final Success, but we do not need to wait for it and // also don't care if it fails - this.sendStatus(StatusCode.Success, { multipleMessageInteraction: true }).catch(error => + this.sendStatus(StatusCode.Success, { + multipleMessageInteraction: true, + logContext, + }).catch(error => logger.info("Error while sending final Success after receiving all DataReport chunks", error), ); } @@ -619,6 +692,11 @@ export class InteractionClientMessenger extends IncomingInteractionClientMesseng await this.send(requestMessageType, requestSchema.encode(request), { expectAckOnly: true, expectedProcessingTimeMs, + logContext: { + invokeFlags: Diagnostic.asFlags({ + suppressResponse: true, + }), + }, }); } @@ -634,7 +712,11 @@ export class InteractionClientMessenger extends IncomingInteractionClientMesseng expectAckOnly: false, expectedProcessingTimeMs, }); - const responseMessage = await this.nextMessage(responseMessageType, expectedProcessingTimeMs); + const responseMessage = await this.nextMessage( + responseMessageType, + { expectedProcessingTimeMs }, + MessageType[responseMessageType] ?? `Response-${Diagnostic.hex(responseMessageType)}`, + ); return responseSchema.decode(responseMessage.payload); } } diff --git a/packages/protocol/src/interaction/InteractionServer.ts b/packages/protocol/src/interaction/InteractionServer.ts index dfca915db8..8355b7d3e3 100644 --- a/packages/protocol/src/interaction/InteractionServer.ts +++ b/packages/protocol/src/interaction/InteractionServer.ts @@ -1072,7 +1072,11 @@ export class InteractionServer implements ProtocolHandler, InteractionRecipient await subscription.close(); // Cleanup if (error instanceof StatusResponseError) { logger.info(`Sending status response ${error.code} for interaction error: ${error.message}`); - await messenger.sendStatus(error.code); + await messenger.sendStatus(error.code, { + logContext: { + for: "I/SubscriptionSeed-Status", + }, + }); } await messenger.close(); return; // Make sure to not bubble up the exception @@ -1092,6 +1096,12 @@ export class InteractionServer implements ProtocolHandler, InteractionRecipient maxInterval, interactionModelRevision: Specification.INTERACTION_MODEL_REVISION, }), + { + logContext: { + subId: subscriptionId, + maxInterval, + }, + }, ); // When an error occurs while sending the response, the subscription is not yet active and will be cleaned up by GC @@ -1230,12 +1240,21 @@ export class InteractionServer implements ProtocolHandler, InteractionRecipient `Send ${lastMessageProcessed ? "final " : ""}invoke response for ${invokeResponseMessage.invokeResponses} commands`, ); } + const moreChunkedMessages = lastMessageProcessed ? undefined : true; await messenger.send( MessageType.InvokeResponse, TlvInvokeResponseForSend.encode({ ...invokeResponseMessage, - moreChunkedMessages: lastMessageProcessed ? undefined : true, + moreChunkedMessages, }), + { + logContext: { + invokeMsgFlags: Diagnostic.asFlags({ + suppressResponse, + moreChunkedMessages, + }), + }, + }, ); invokeResponseMessage.invokeResponses = []; messageSize = emptyInvokeResponseBytes.length; diff --git a/packages/protocol/src/interaction/ServerSubscription.ts b/packages/protocol/src/interaction/ServerSubscription.ts index 6314a5d32f..ee2632922d 100644 --- a/packages/protocol/src/interaction/ServerSubscription.ts +++ b/packages/protocol/src/interaction/ServerSubscription.ts @@ -896,6 +896,7 @@ export class ServerSubscription extends Subscription { interactionModelRevision: Specification.INTERACTION_MODEL_REVISION, }, this.criteria.isFabricFiltered, + !this.isClosed, // Do not wait for ack when closed ); } else { await messenger.sendDataReport( @@ -928,6 +929,7 @@ export class ServerSubscription extends Subscription { }), }, this.criteria.isFabricFiltered, + !this.isClosed, // Do not wait for ack when closed ); } } catch (error) { diff --git a/packages/protocol/src/mdns/MdnsScanner.ts b/packages/protocol/src/mdns/MdnsScanner.ts index bac0bd4019..094bd77d24 100644 --- a/packages/protocol/src/mdns/MdnsScanner.ts +++ b/packages/protocol/src/mdns/MdnsScanner.ts @@ -1206,27 +1206,30 @@ export class MdnsScanner implements Scanner { } static discoveryDataDiagnostics(data: DiscoveryData) { - return Diagnostic.dict({ - SII: data.SII, - SAI: data.SAI, - SAT: data.SAT, - T: data.T, - DT: data.DT, - PH: data.PH, - ICD: data.ICD, - VP: data.VP, - DN: data.DN, - RI: data.RI, - PI: data.PI, - }); + return Diagnostic.dict( + { + SII: data.SII, + SAI: data.SAI, + SAT: data.SAT, + T: data.T, + DT: data.DT, + PH: data.PH, + ICD: data.ICD, + VP: data.VP, + DN: data.DN, + RI: data.RI, + PI: data.PI, + }, + true, + ); } static deviceAddressDiagnostics(addresses: Map) { return Array.from(addresses.values()).map(address => Diagnostic.dict({ + type: address.type, ip: address.ip, port: address.port, - type: address.type, }), ); } diff --git a/packages/protocol/src/protocol/ExchangeManager.ts b/packages/protocol/src/protocol/ExchangeManager.ts index a927ea1c16..60be4b21ad 100644 --- a/packages/protocol/src/protocol/ExchangeManager.ts +++ b/packages/protocol/src/protocol/ExchangeManager.ts @@ -27,7 +27,7 @@ import { SecureSession } from "../session/SecureSession.js"; import { Session } from "../session/Session.js"; import { SessionManager, UNICAST_UNSECURE_SESSION_ID } from "../session/SessionManager.js"; import { ChannelManager } from "./ChannelManager.js"; -import { MessageExchange, MessageExchangeContext } from "./MessageExchange.js"; +import { ExchangeLogContext, MessageExchange, MessageExchangeContext } from "./MessageExchange.js"; import { DuplicateMessageError } from "./MessageReceptionState.js"; import { ProtocolHandler } from "./ProtocolHandler.js"; @@ -70,8 +70,8 @@ export class MessageChannel implements Channel { return this.channel.maxPayloadSize; } - send(message: Message): Promise { - logger.debug("Message »", MessageCodec.messageDiagnostics(message)); + send(message: Message, logContext?: ExchangeLogContext): Promise { + logger.debug("Message »", MessageCodec.messageDiagnostics(message, logContext)); const packet = this.session.encode(message); const bytes = MessageCodec.encodePacket(packet); if (bytes.length > this.maxPayloadSize) { diff --git a/packages/protocol/src/protocol/MessageExchange.ts b/packages/protocol/src/protocol/MessageExchange.ts index bc01808db4..506db825b9 100644 --- a/packages/protocol/src/protocol/MessageExchange.ts +++ b/packages/protocol/src/protocol/MessageExchange.ts @@ -42,6 +42,8 @@ export class UnexpectedMessageError extends MatterError { } } +export type ExchangeLogContext = Record; + export type ExchangeSendOptions = { /** * The response to this send should be an ack only and no StatusResponse or such. If a StatusResponse is returned @@ -66,6 +68,14 @@ export type ExchangeSendOptions = { /** Use the provided acknowledge MessageId instead checking the latest to send one */ includeAcknowledgeMessageId?: number; + + /** + * Disables the MRP logic which means that no retransmissions are done and receiving an ack is not awaited. + */ + disableMrpLogic?: boolean; + + /** Additional context information for logging to be included at the beginning of the Message log. */ + logContext?: ExchangeLogContext; }; /** @@ -214,14 +224,16 @@ export class MessageExchange { Diagnostic.dict({ channel: channel.name, protocol: this.#protocolId, - id: this.#exchangeId, - session: session.name, - peerSessionId: this.#peerSessionId, - "active threshold ms": this.#activeThresholdMs, - "active interval ms": this.#activeIntervalMs, - "idle interval ms": this.#idleIntervalMs, - maxTransmissions: this.#maxTransmissions, - useMrp: this.#useMRP, + exId: this.#exchangeId, + sess: session.name, + peerSess: this.#peerSessionId, + SAT: this.#activeThresholdMs, + SAI: this.#activeIntervalMs, + SII: this.#idleIntervalMs, + maxTrans: this.#maxTransmissions, + exchangeFlags: Diagnostic.asFlags({ + MRP: this.#useMRP, + }), }), ); } @@ -266,8 +278,8 @@ export class MessageExchange { await this.send(SecureMessageType.StandaloneAck, new Uint8Array(0), { includeAcknowledgeMessageId: messageId }); } - async onMessageReceived(message: Message, isDuplicate = false) { - logger.debug("Message «", MessageCodec.messageDiagnostics(message, isDuplicate)); + async onMessageReceived(message: Message, duplicate = false) { + logger.debug("Message «", MessageCodec.messageDiagnostics(message, { duplicate })); // Adjust the incoming message when ack was required but this exchange do not use it to skip all relevant logic if (message.payloadHeader.requiresAck && !this.#useMRP) { @@ -289,7 +301,7 @@ export class MessageExchange { this.session.notifyActivity(true); - if (isDuplicate) { + if (duplicate) { // Received a message retransmission but the reply is not ready yet, ignoring if (requiresAck) { await this.sendStandaloneAckForMessage(message); @@ -354,9 +366,11 @@ export class MessageExchange { const { expectAckOnly = false, + disableMrpLogic, expectedProcessingTimeMs = DEFAULT_EXPECTED_PROCESSING_TIME_MS, requiresAck, includeAcknowledgeMessageId, + logContext, } = options ?? {}; if (!this.#useMRP && includeAcknowledgeMessageId !== undefined) { throw new InternalError("Cannot include an acknowledge message ID when MRP is not used"); @@ -409,10 +423,10 @@ export class MessageExchange { }; let ackPromise: Promise | undefined; - if (this.#useMRP && message.payloadHeader.requiresAck) { + if (this.#useMRP && message.payloadHeader.requiresAck && !disableMrpLogic) { this.#sentMessageToAck = message; this.#retransmissionTimer = Time.getTimer( - "Message retransmission", + `Message retransmission ${message.packetHeader.messageId}`, this.#getResubmissionBackOffTime(0), () => this.#retransmitMessage(message, expectedProcessingTimeMs), ); @@ -422,7 +436,7 @@ export class MessageExchange { this.#sentMessageAckFailure = rejecter; } - await this.channel.send(message); + await this.channel.send(message, logContext); if (ackPromise !== undefined) { this.#retransmissionCounter = 0; @@ -441,30 +455,37 @@ export class MessageExchange { } } - nextMessage(expectedProcessingTimeMs?: number) { + nextMessage(options?: { expectedProcessingTimeMs?: number; timeoutMs?: number }) { let timeout: number; - switch (this.channel.type) { - case "tcp": - // TCP uses 30s timeout according to chip sdk implementation, so do the same - timeout = 30_000; - break; - case "udp": - // UDP normally uses MRP, if not we have Group communication which normally have no responses - if (!this.#useMRP) { - throw new MatterFlowError("No response expected for this message exchange because UDP and no MRP."); - } - timeout = this.calculateMaximumPeerResponseTime(expectedProcessingTimeMs); - break; - case "ble": - // chip sdk uses BTP_ACK_TIMEOUT_MS which is wrong in my eyes, so we use static 30s as like TCP here - timeout = 30_000; - break; - default: - throw new MatterFlowError( - `Can not calculate expected timeout for unknown channel type: ${this.channel.type}`, - ); + if (options?.timeoutMs !== undefined) { + timeout = options.timeoutMs; + } else { + switch (this.channel.type) { + case "tcp": + // TCP uses 30s timeout according to chip sdk implementation, so do the same + timeout = 30_000; + break; + case "udp": + // UDP normally uses MRP, if not we have Group communication which normally have no responses + if (!this.#useMRP) { + throw new MatterFlowError( + "No response expected for this message exchange because UDP and no MRP.", + ); + } + const { expectedProcessingTimeMs } = options ?? {}; + timeout = this.calculateMaximumPeerResponseTime(expectedProcessingTimeMs); + break; + case "ble": + // chip sdk uses BTP_ACK_TIMEOUT_MS which is wrong in my eyes, so we use static 30s as like TCP here + timeout = 30_000; + break; + default: + throw new MatterFlowError( + `Can not calculate expected timeout for unknown channel type: ${this.channel.type}`, + ); + } + timeout += PEER_RESPONSE_TIME_BUFFER_MS; } - timeout += PEER_RESPONSE_TIME_BUFFER_MS; return this.#messagesQueue.read(timeout); } @@ -517,10 +538,10 @@ export class MessageExchange { if (finalWaitTime > 0) { this.#retransmissionCounter--; // We will not resubmit the message again logger.debug( - `Wait additional ${finalWaitTime}ms for processing time and peer resubmissions after all our resubmissions`, + `Message ${message.packetHeader.messageId}: Wait additional ${finalWaitTime}ms for processing time and peer resubmissions after all our resubmissions`, ); this.#retransmissionTimer = Time.getTimer( - "Message wait time after resubmissions", + `Message wait time after resubmissions ${message.packetHeader.messageId}`, finalWaitTime, () => this.#retransmitMessage(message), ).start(); diff --git a/packages/protocol/src/securechannel/SecureChannelMessenger.ts b/packages/protocol/src/securechannel/SecureChannelMessenger.ts index f49ae868ff..41ced8ad11 100644 --- a/packages/protocol/src/securechannel/SecureChannelMessenger.ts +++ b/packages/protocol/src/securechannel/SecureChannelMessenger.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { MatterError, UnexpectedDataError } from "#general"; +import { Diagnostic, MatterError, UnexpectedDataError } from "#general"; import { GeneralStatusCode, ProtocolStatusCode, @@ -43,17 +43,35 @@ export class SecureChannelMessenger { this.#defaultExpectedProcessingTimeMs = defaultExpectedProcessingTimeMs; } + async nextMessage( + expectedMessageType: number, + expectedProcessingTimeMs = this.#defaultExpectedProcessingTimeMs, + expectedMessageInfo?: string, + ) { + return this.#nextMessage(expectedMessageType, expectedProcessingTimeMs, expectedMessageInfo); + } + + async anyNextMessage( + expectedMessageInfo: string, + expectedProcessingTimeMs = this.#defaultExpectedProcessingTimeMs, + ) { + return this.#nextMessage(undefined, expectedProcessingTimeMs, expectedMessageInfo); + } + /** * Waits for the next message and returns it. * When no expectedProcessingTimeMs is provided, the default value of EXPECTED_CRYPTO_PROCESSING_TIME_MS is used. */ - async nextMessage( - expectedMessageInfo: string, + async #nextMessage( expectedMessageType?: number, expectedProcessingTimeMs = this.#defaultExpectedProcessingTimeMs, + expectedMessageInfo?: string, ) { - const message = await this.exchange.nextMessage(expectedProcessingTimeMs); + const message = await this.exchange.nextMessage({ expectedProcessingTimeMs }); const messageType = message.payloadHeader.messageType; + if (expectedMessageType !== undefined && expectedMessageInfo === undefined) { + expectedMessageInfo = SecureMessageType[expectedMessageType]; + } this.throwIfErrorStatusReport(message, expectedMessageInfo); if (expectedMessageType !== undefined && messageType !== expectedMessageType) throw new UnexpectedDataError( @@ -69,12 +87,9 @@ export class SecureChannelMessenger { async nextMessageDecoded( expectedMessageType: number, schema: TlvSchema, - expectedMessageInfo: string, expectedProcessingTimeMs = this.#defaultExpectedProcessingTimeMs, ) { - return schema.decode( - (await this.nextMessage(expectedMessageInfo, expectedMessageType, expectedProcessingTimeMs)).payload, - ); + return schema.decode((await this.nextMessage(expectedMessageType, expectedProcessingTimeMs)).payload); } /** @@ -86,7 +101,7 @@ export class SecureChannelMessenger { expectedProcessingTimeMs = this.#defaultExpectedProcessingTimeMs, ) { // If the status is not Success, this would throw an Error. - await this.nextMessage(expectedMessageInfo, SecureMessageType.StatusReport, expectedProcessingTimeMs); + await this.nextMessage(SecureMessageType.StatusReport, expectedProcessingTimeMs, expectedMessageInfo); } /** @@ -136,7 +151,13 @@ export class SecureChannelMessenger { protocolId: SECURE_CHANNEL_PROTOCOL_ID, protocolStatus, }), - { requiresAck }, + { + requiresAck, + logContext: { + generalStatus: GeneralStatusCode[generalStatus] ?? Diagnostic.hex(generalStatus), + protocolStatus: ProtocolStatusCode[protocolStatus] ?? Diagnostic.hex(protocolStatus), + }, + }, ); } diff --git a/packages/protocol/src/session/SecureSession.ts b/packages/protocol/src/session/SecureSession.ts index d70bb2be9c..4b929c5ed6 100644 --- a/packages/protocol/src/session/SecureSession.ts +++ b/packages/protocol/src/session/SecureSession.ts @@ -156,16 +156,23 @@ export class SecureSession extends Session { logger.debug( `Created secure ${this.isPase ? "PASE" : "CASE"} session for fabric index ${fabric?.fabricIndex}`, this.name, - Diagnostic.dict({ - idleIntervalMs: this.idleIntervalMs, - activeIntervalMs: this.activeIntervalMs, - activeThresholdMs: this.activeThresholdMs, - dataModelRevision: this.dataModelRevision, - interactionModelRevision: this.interactionModelRevision, - specificationVersion: this.specificationVersion, - maxPathsPerInvoke: this.maxPathsPerInvoke, - caseAuthenticatedTags: this.#caseAuthenticatedTags, - }), + this.parameterDiagnostics(), + ); + } + + parameterDiagnostics() { + return Diagnostic.dict( + { + SII: this.idleIntervalMs, + SAI: this.activeIntervalMs, + SAT: this.activeThresholdMs, + DMRev: this.dataModelRevision, + IMRev: this.interactionModelRevision, + spec: Diagnostic.hex(this.specificationVersion), + maxPaths: this.maxPathsPerInvoke, + CATs: this.#caseAuthenticatedTags, + }, + true, ); } diff --git a/packages/protocol/src/session/case/CaseClient.ts b/packages/protocol/src/session/case/CaseClient.ts index 98dade528b..319ae08537 100644 --- a/packages/protocol/src/session/case/CaseClient.ts +++ b/packages/protocol/src/session/case/CaseClient.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { Bytes, Crypto, Diagnostic, Logger, PublicKey, UnexpectedDataError } from "#general"; +import { Bytes, Crypto, Logger, PublicKey, UnexpectedDataError } from "#general"; import { SessionManager } from "#session/SessionManager.js"; import { NodeId } from "#types"; import { TlvIntermediateCertificate, TlvOperationalCertificate } from "../../certificate/CertificateManager.js"; @@ -106,8 +106,8 @@ export class CaseClient { }); await messenger.sendSuccess(); logger.info( - `Case client: session resumed with ${messenger.getChannelName()} and parameters`, - Diagnostic.dict(secureSession.parameters), + `Case client: Session resumed with ${messenger.getChannelName()} and parameters`, + secureSession.parameterDiagnostics(), ); resumptionRecord.resumptionId = resumptionId; /* update resumptionId */ @@ -196,7 +196,7 @@ export class CaseClient { const encryptedData = TlvEncryptedDataSigma3.encode({ nodeOpCert, intermediateCACert, signature }); const encrypted = Crypto.encrypt(sigma3Key, encryptedData, TBE_DATA3_NONCE); const sigma3Bytes = await messenger.sendSigma3({ encrypted }); - await messenger.waitForSuccess("Success after CASE Sigma3"); + await messenger.waitForSuccess("Sigma3-Success"); // All good! Create secure session const secureSessionSalt = Bytes.concat( @@ -216,7 +216,7 @@ export class CaseClient { }); logger.info( `Case client: Paired successfully with ${messenger.getChannelName()} and parameters`, - Diagnostic.dict(secureSession.parameters), + secureSession.parameterDiagnostics(), ); resumptionRecord = { fabric, diff --git a/packages/protocol/src/session/case/CaseMessenger.ts b/packages/protocol/src/session/case/CaseMessenger.ts index 050af3df63..c7f5b3ba00 100644 --- a/packages/protocol/src/session/case/CaseMessenger.ts +++ b/packages/protocol/src/session/case/CaseMessenger.ts @@ -11,7 +11,7 @@ import { TlvCaseSigma1, TlvCaseSigma2, TlvCaseSigma2Resume, TlvCaseSigma3 } from export class CaseServerMessenger extends SecureChannelMessenger { async readSigma1() { - const { payload } = await this.nextMessage("CASE Sigma1", SecureMessageType.Sigma1); + const { payload } = await this.nextMessage(SecureMessageType.Sigma1); return { sigma1Bytes: payload, sigma1: TlvCaseSigma1.decode(payload) }; } @@ -24,7 +24,7 @@ export class CaseServerMessenger extends SecureChannelMessenger { } async readSigma3() { - const { payload } = await this.nextMessage("CASE Sigma3", SecureMessageType.Sigma3); + const { payload } = await this.nextMessage(SecureMessageType.Sigma3); return { sigma3Bytes: payload, sigma3: TlvCaseSigma3.decode(payload) }; } } @@ -38,7 +38,7 @@ export class CaseClientMessenger extends SecureChannelMessenger { const { payload, payloadHeader: { messageType }, - } = await this.nextMessage("CASE Sigma2 or Sigma2Resume"); + } = await this.anyNextMessage("Sigma2(Resume)"); switch (messageType) { case SecureMessageType.Sigma2: return { sigma2Bytes: payload, sigma2: TlvCaseSigma2.decode(payload) }; diff --git a/packages/protocol/src/session/case/CaseServer.ts b/packages/protocol/src/session/case/CaseServer.ts index 41998aba03..0400ff2afc 100644 --- a/packages/protocol/src/session/case/CaseServer.ts +++ b/packages/protocol/src/session/case/CaseServer.ts @@ -130,7 +130,7 @@ export class CaseServer implements ProtocolHandler { } logger.info( - `session ${secureSession.id} resumed with ${messenger.getChannelName()} for Fabric ${NodeId.toHexString( + `Session ${secureSession.id} resumed with ${messenger.getChannelName()} for Fabric ${NodeId.toHexString( fabric.nodeId, )}(index ${fabric.fabricIndex}) and PeerNode ${NodeId.toHexString(peerNodeId)}`, "with CATs", @@ -139,7 +139,7 @@ export class CaseServer implements ProtocolHandler { resumptionRecord.resumptionId = resumptionId; /* Update the ID */ // Wait for success on the peer side - await messenger.waitForSuccess("Success after CASE Sigma2Resume"); + await messenger.waitForSuccess("Sigma2Resume-Success"); await messenger.close(); await this.#sessions.saveResumptionRecord(resumptionRecord); diff --git a/packages/protocol/src/session/pase/PaseClient.ts b/packages/protocol/src/session/pase/PaseClient.ts index 996ebfb6ff..624e3cb89e 100644 --- a/packages/protocol/src/session/pase/PaseClient.ts +++ b/packages/protocol/src/session/pase/PaseClient.ts @@ -80,7 +80,7 @@ export class PaseClient { await messenger.sendPasePake3({ verifier: hAY }); // All good! Creating the secure session - await messenger.waitForSuccess("Success after PASE Pake3"); + await messenger.waitForSuccess("PasePake3-Success"); const secureSession = await this.#sessions.createSecureSession({ sessionId: initiatorSessionId, fabric: undefined, diff --git a/packages/protocol/src/session/pase/PaseMessenger.ts b/packages/protocol/src/session/pase/PaseMessenger.ts index ca170e096a..99d23bd8c8 100644 --- a/packages/protocol/src/session/pase/PaseMessenger.ts +++ b/packages/protocol/src/session/pase/PaseMessenger.ts @@ -30,7 +30,6 @@ type PasePake3 = TypeFromSchema; export class PaseServerMessenger extends SecureChannelMessenger { async readPbkdfParamRequest() { const { payload } = await this.nextMessage( - "PASE PbkdfParamRequest", SecureMessageType.PbkdfParamRequest, DEFAULT_NORMAL_PROCESSING_TIME_MS, ); @@ -44,7 +43,7 @@ export class PaseServerMessenger extends SecureChannelMessenger { } readPasePake1() { - return this.nextMessageDecoded(SecureMessageType.PasePake1, TlvPasePake1, "PASE Pake1"); + return this.nextMessageDecoded(SecureMessageType.PasePake1, TlvPasePake1); } sendPasePake2(pasePake2: PasePake2) { @@ -52,7 +51,7 @@ export class PaseServerMessenger extends SecureChannelMessenger { } readPasePake3() { - return this.nextMessageDecoded(SecureMessageType.PasePake3, TlvPasePake3, "PASE Pake3"); + return this.nextMessageDecoded(SecureMessageType.PasePake3, TlvPasePake3); } } @@ -65,7 +64,6 @@ export class PaseClientMessenger extends SecureChannelMessenger { async readPbkdfParamResponse() { const { payload } = await this.nextMessage( - "PASE PbkdfParamResponse", SecureMessageType.PbkdfParamResponse, DEFAULT_NORMAL_PROCESSING_TIME_MS, ); @@ -77,7 +75,7 @@ export class PaseClientMessenger extends SecureChannelMessenger { } readPasePake2() { - return this.nextMessageDecoded(SecureMessageType.PasePake2, TlvPasePake2, "PASE Pake2"); + return this.nextMessageDecoded(SecureMessageType.PasePake2, TlvPasePake2); } sendPasePake3(pasePake3: PasePake3) { diff --git a/packages/react-native/src/net/UdpChannelReactNative.ts b/packages/react-native/src/net/UdpChannelReactNative.ts index 3b3eff29a6..d5f9a662f9 100644 --- a/packages/react-native/src/net/UdpChannelReactNative.ts +++ b/packages/react-native/src/net/UdpChannelReactNative.ts @@ -122,9 +122,9 @@ export class UdpChannelReactNative implements UdpChannel { logger.debug( "Initialize multicast", Diagnostic.dict({ + type: type, address: `${multicastInterface}:${listeningPort}`, interface: netInterface, - type: type, }), ); socket.setMulticastInterface(multicastInterface);