Skip to content

Commit

Permalink
Add opaque struct and functions to serialize it.
Browse files Browse the repository at this point in the history
  • Loading branch information
AniruddhaKanhere committed Oct 24, 2024
1 parent 2eda149 commit a643f7f
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 268 deletions.
2 changes: 2 additions & 0 deletions .github/.cSpellWords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ DLIBRARY
DNDEBUG
DUNITTEST
DUNITY
getbytesinmqttvec
getpacketid
isystem
lcov
Expand All @@ -34,6 +35,7 @@ NONDET
pylint
pytest
pyyaml
serializemqttvec
sinclude
UNACKED
unpadded
Expand Down
105 changes: 49 additions & 56 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@

struct MQTTVec
{
TransportOutVector_t pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
TransportOutVector_t pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
};

/*-----------------------------------------------------------*/
Expand Down Expand Up @@ -450,8 +450,7 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
*
* @param[in] pContext Initialized MQTT context.
*
* @return #MQTTPublishClearAllFailed if clearing all the copied publishes fails;
* #MQTTSuccess otherwise.
* @return #MQTTSuccess always otherwise.
*/
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext );

Expand Down Expand Up @@ -1608,10 +1607,9 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) )
{
if( ( status == MQTTSuccess ) &&
( pContext->clearFunction != NULL ) &&
( pContext->clearFunction( pContext, packetIdentifier ) != true ) )
( pContext->clearFunction != NULL ) )
{
LogWarn( ( "Failed to clear copied publish on receiving an ack.\n" ) );
pContext->clearFunction( pContext, packetIdentifier );
}
}

Expand Down Expand Up @@ -2222,10 +2220,14 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,

/* store a copy of the publish for retransmission purposes */
if( ( pPublishInfo->qos > MQTTQoS0 ) &&
( pContext->storeFunction != NULL ) &&
( pContext->storeFunction( pContext, packetId, pIoVector, ioVectorLength ) != true ) )
( pContext->storeFunction != NULL ) )
{
status = MQTTPublishStoreFailed;
MQTTVec_t * pMqttVec = ( MQTTVec_t * ) pIoVector;

if( pContext->storeFunction( pContext, packetId, pMqttVec, ioVectorLength ) != true )
{
status = MQTTPublishStoreFailed;
}
}

/* change the value of the dup flag to its original, if it was changed */
Expand Down Expand Up @@ -2524,9 +2526,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTPublishState_t state = MQTTStateNull;
TransportOutVector_t * pIoVec, * pIoVectIterator;
size_t ioVecCount;
size_t totalMessageLength;
uint8_t * pMqttPacket;

assert( pContext != NULL );

Expand All @@ -2547,42 +2548,31 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

packetId = MQTT_PublishToResend( pContext, &cursor );

if( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true ) )
{
status = MQTTPublishRetrieveFailed;
}

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) )
do
{
totalMessageLength = 0;

for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
{
totalMessageLength += pIoVectIterator->iov_len;
}

MQTT_PRE_STATE_UPDATE_HOOK( pContext );
packetId = MQTT_PublishToResend( pContext, &cursor );

if( sendMessageVector( pContext, pIoVec, ioVecCount ) != ( int32_t ) totalMessageLength )
if( packetId != MQTT_PACKET_ID_INVALID )
{
status = MQTTSendFailed;
}
if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true )
{
status = MQTTPublishRetrieveFailed;
break;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

packetId = MQTT_PublishToResend( pContext, &cursor );
if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength )
{
status = MQTTSendFailed;
}

if( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true )
{
status = MQTTPublishRetrieveFailed;
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
}
} while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

return status;
Expand All @@ -2591,13 +2581,33 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTSuccess;
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;

assert( pContext != NULL );

/* Reset the index and clear the buffer when a new session is established. */
pContext->index = 0;
( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );

if( pContext->clearFunction != NULL )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
do
{
packetId = MQTT_PublishToResend( pContext, &cursor );

if( packetId != MQTT_PACKET_ID_INVALID )
{
pContext->clearFunction( pContext, packetId );
}
} while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

if( pContext->outgoingPublishRecordMaxCount > 0U )
{
/* Clear any existing records if a new session is established. */
Expand All @@ -2613,12 +2623,6 @@ static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
}

if( ( pContext->clearAllFunction != NULL ) &&
( pContext->clearAllFunction( pContext ) != true ) )
{
status = MQTTPublishClearAllFailed;
}

return status;
}

Expand Down Expand Up @@ -2787,8 +2791,7 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
MQTTStorePacketForRetransmit storeFunction,
MQTTRetrievePacketForRetransmit retrieveFunction,
MQTTClearPacketForRetransmit clearFunction,
MQTTClearAllPacketsForRetransmit clearAllFunction )
MQTTClearPacketForRetransmit clearFunction )
{
MQTTStatus_t status = MQTTSuccess;

Expand All @@ -2813,17 +2816,11 @@ MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
LogError( ( "Invalid parameter: clearFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( clearAllFunction == NULL )
{
LogError( ( "Invalid parameter: clearAllFunction is NULL" ) );
status = MQTTBadParameter;
}
else
{
pContext->storeFunction = storeFunction;
pContext->retrieveFunction = retrieveFunction;
pContext->clearFunction = clearFunction;
pContext->clearAllFunction = clearAllFunction;
}

return status;
Expand Down Expand Up @@ -3718,10 +3715,6 @@ const char * MQTT_Status_strerror( MQTTStatus_t status )
str = "MQTTPublishRetrieveFailed";
break;

case MQTTPublishClearAllFailed:
str = "MQTTPublishClearAllFailed";
break;

default:
str = "Invalid MQTT Status code";
break;
Expand Down
Loading

0 comments on commit a643f7f

Please sign in to comment.