-
Notifications
You must be signed in to change notification settings - Fork 23
/
mqttpacket.h
196 lines (170 loc) · 7.01 KB
/
mqttpacket.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
This file is part of FlashMQ (https://www.flashmq.org)
Copyright (C) 2021-2023 Wiebe Cazemier
FlashMQ is free software: you can redistribute it and/or modify
it under the terms of The Open Software License 3.0 (OSL-3.0).
See LICENSE for license details.
*/
#ifndef MQTTPACKET_H
#define MQTTPACKET_H
#include <unistd.h>
#include <memory>
#include <vector>
#include <exception>
#include "forward_declarations.h"
#include "types.h"
#include "cirbuf.h"
#include "logger.h"
#include "variablebyteint.h"
#include "mqtt5properties.h"
#include "packetdatatypes.h"
enum class HandleResult
{
Done,
Defer
};
/**
* @brief The MqttPacket class represents incoming and outgoing packets.
*
* Be sure to understand the 'externallyReceived' member. See in-code documentation.
*/
class MqttPacket
{
#ifdef TESTING
friend class MainTests;
#endif
std::vector<char> bites;
Publish publishData;
size_t fixed_header_length = 0; // if 0, this packet does not contain the bytes of the fixed header.
VariableByteInt remainingLength;
char first_byte = 0;
size_t pos = 0;
size_t packet_id_pos = 0;
uint16_t packet_id = 0;
ProtocolVersion protocolVersion = ProtocolVersion::None;
size_t payloadStart = 0;
size_t payloadLen = 0;
bool dontReuseBites = false;
// It's important to understand that this class is used for incoming packets as well as new outgoing packets. When we create
// new outgoing packets, we generally know exactly who it's for and the information is only stored in this->bites. So, the
// publishData and fields like hasTopicAlias are invalid in those cases.
bool externallyReceived = false;
Logger *logger = Logger::getInstance();
void advancePos(size_t len);
char *readBytes(size_t length);
char readByte();
uint8_t readUint8();
void writeByte(char b);
void writeUint16(uint16_t x);
void writeBytes(const char *b, size_t len);
template<typename T>
void writeProperties(T properties)
{
if (!properties)
{
writeByte(0);
return;
}
writeProperties(*properties);
}
void writeProperties(Mqtt5PropertyBuilder &properties);
void writeVariableByteInt(const VariableByteInt &v);
void writeString(const std::string &s);
void writeString(std::string_view s);
uint16_t readTwoBytesToUInt16();
uint32_t readFourBytesToUint32();
size_t remainingAfterPos();
size_t decodeVariableByteIntAtPos();
std::string readBytesToString(bool validateUtf8 = true, bool alsoCheckInvalidPublishChars = false);
void calculateRemainingLength();
void setPosToDataStart();
bool atEnd() const;
#ifndef TESTING
// In production, I want to be sure I don't accidentally copy packets, because it's slow.
MqttPacket(const MqttPacket &other) = delete;
#endif
public:
#ifdef TESTING
// In testing I need to copy packets for administrative purposes.
MqttPacket(const MqttPacket &other) = default;
#endif
PacketType packetType = PacketType::Reserved;
MqttPacket(CirBuf &buf, size_t packet_len, size_t fixed_header_length, std::shared_ptr<Client> &sender); // Constructor for parsing incoming packets.
MqttPacket(MqttPacket &&other) = default;
// Constructor for outgoing packets. These may not allocate room for the fixed header, because we don't (always) know the length in advance.
MqttPacket(const ConnAck &connAck);
MqttPacket(const SubAck &subAck);
MqttPacket(const UnsubAck &unsubAck);
MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish);
MqttPacket(const ProtocolVersion protocolVersion, const Publish &_publish, const uint8_t _qos, const uint16_t _topic_alias,
const bool _skip_topic, const uint32_t subscriptionIdentifier);
MqttPacket(const PubResponse &pubAck);
MqttPacket(const Disconnect &disconnect);
MqttPacket(const Auth &auth);
MqttPacket(const Connect &connect);
MqttPacket(const Subscribe &subscribe);
MqttPacket(const Unsubscribe &unsubscribe);
static void bufferToMqttPackets(CirBuf &buf, std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender);
HandleResult handle(std::shared_ptr<Client> &sender);
AuthPacketData parseAuthData();
ConnectData parseConnectData(std::shared_ptr<Client> &sender);
ConnAckData parseConnAckData();
void handleConnect(std::shared_ptr<Client> &sender);
void handleConnAck(std::shared_ptr<Client> &sender);
void handleExtendedAuth(std::shared_ptr<Client> &sender);
DisconnectData parseDisconnectData();
void handleDisconnect(std::shared_ptr<Client> &sender);
void handleSubscribe(std::shared_ptr<Client> &sender);
void handleSubAck(std::shared_ptr<Client> &sender);
void handleUnsubscribe(std::shared_ptr<Client> &sender);
void handlePing(std::shared_ptr<Client> &sender);
void parsePublishData(std::shared_ptr<Client> &sender);
void handlePublish(std::shared_ptr<Client> &sender);
void parsePubAckData();
void handlePubAck(std::shared_ptr<Client> &sender);
PubRecData parsePubRecData();
void handlePubRec(std::shared_ptr<Client> &sender);
void parsePubRelData();
void handlePubRel(std::shared_ptr<Client> &sender);
void parsePubComp();
void handlePubComp(std::shared_ptr<Client> &sender);
SubAckData parseSubAckData();
uint8_t getFixedHeaderLength() const;
size_t getSizeIncludingNonPresentHeader() const;
const std::vector<char> &getBites() const { return bites; }
uint8_t getQos() const { return publishData.qos; }
void setQos(const uint8_t new_qos);
ProtocolVersion getProtocolVersion() const { return protocolVersion;}
const std::string &getTopic() const;
const std::vector<std::string> &getSubtopics();
bool containsFixedHeader() const;
void setPacketId(uint16_t packet_id);
uint16_t getPacketId() const;
void setDuplicate();
void readIntoBuf(CirBuf &buf) const;
std::string getPayloadCopy() const;
std::string_view getPayloadView() const;
bool getRetain() const;
void setRetain(bool val);
const Publish &getPublishData();
bool biteArrayCannotBeReused() const;
std::vector<std::pair<std::string, std::string>> *getUserProperties() const;
const std::optional<std::string> &getCorrelationData() const;
const std::optional<std::string> &getResponseTopic() const;
};
struct SubscriptionTuple
{
const std::string topic;
const std::vector<std::string> subtopics;
const uint8_t qos;
const bool noLocal;
const bool retainAsPublished;
const std::string shareName;
const AuthResult authResult;
const uint32_t subscriptionIdentifier = 0;
const RetainHandling retainHandling = RetainHandling::SendRetainedMessagesAtSubscribe;
SubscriptionTuple(const std::string &topic, const std::vector<std::string> &subtopics, uint8_t qos, bool noLocal, bool retainAsPublished,
const std::string &shareName, const AuthResult authResult, const uint32_t subscriptionIdentifier,
const RetainHandling retainHandling);
};
#endif // MQTTPACKET_H