Skip to content

Add message filtering ability #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ void onMqttConnect(bool sessionPresent) {
uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3");
Serial.print("Publishing at QoS 2, packetId: ");
Serial.println(packetIdPub2);

packetIdSub = mqttClient.subscribe("Topic3/Example/Test", 2, onSpecificMqttMessage);
Serial.print("Subscribing at QoS 2 with specified callback, packetId: ");
Serial.println(packetIdSub);
mqttClient.publish("Topic1/Example/Test", 0, true, "Test");
Serial.println("Publishing \"Test\" to \"Topic1/Example/Test\" at QoS 0");
mqttClient.publish("Topic2/Example/Test", 0, true, "Test");
Serial.println("Publishing \"Test\" to \"Topic2/Example/Test\" at QoS 0");
mqttClient.publish("Topic3/Example/Test", 0, true, "Test");
Serial.println("Publishing \"Test\" to \"Topic3/Example/Test\" at QoS 0");
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Expand Down Expand Up @@ -104,6 +114,23 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties
Serial.println(total);
}

void onSpecificMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
Serial.print("Publish received for specific topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
}

void onMqttPublish(uint16_t packetId) {
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Expand All @@ -125,11 +152,12 @@ void setup() {
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onMessage(onSpecificMqttMessage, "Topic1/#"); // Optional - Overloaded example
mqttClient.onFilteredMessage(onSpecificMqttMessage, "Topic2/+/Test"); // Optional - Method for verbose reading
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);

connectToWifi();
}

void loop() {
}
void loop() {}
33 changes: 31 additions & 2 deletions examples/FullyFeatured-ESP8266/FullyFeatured-ESP8266.ino
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);

uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
Expand All @@ -51,6 +52,16 @@ void onMqttConnect(bool sessionPresent) {
uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3");
Serial.print("Publishing at QoS 2, packetId: ");
Serial.println(packetIdPub2);

packetIdSub = mqttClient.subscribe("Topic3/Example/Test", 2, onSpecificMqttMessage);
Serial.print("Subscribing at QoS 2 with specified callback, packetId: ");
Serial.println(packetIdSub);
mqttClient.publish("Topic1/Example/Test", 0, true, "Test");
Serial.println("Publishing \"Test\" to \"Topic1/Example/Test\" at QoS 0");
mqttClient.publish("Topic2/Example/Test", 0, true, "Test");
Serial.println("Publishing \"Test\" to \"Topic2/Example/Test\" at QoS 0");
mqttClient.publish("Topic3/Example/Test", 0, true, "Test");
Serial.println("Publishing \"Test\" to \"Topic3/Example/Test\" at QoS 0");
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Expand Down Expand Up @@ -93,6 +104,23 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties
Serial.println(total);
}

void onSpecificMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
Serial.print("Publish received for specific topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
}

void onMqttPublish(uint16_t packetId) {
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Expand All @@ -112,11 +140,12 @@ void setup() {
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onMessage(onSpecificMqttMessage, "Topic1/#"); // Optional - Overloaded example
mqttClient.onFilteredMessage(onSpecificMqttMessage, "Topic2/+/Test"); // Optional - Method for verbose reading
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);

connectToWifi();
}

void loop() {
}
void loop() {}
1 change: 1 addition & 0 deletions keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ onDisconnect KEYWORD2
onSubscribe KEYWORD2
onUnsubscribe KEYWORD2
onMessage KEYWORD2
onFilteredMessage KEYWORD2
onPublish KEYWORD2

connected KEYWORD2
Expand Down
60 changes: 57 additions & 3 deletions src/AsyncMqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ AsyncMqttClient::AsyncMqttClient()
AsyncMqttClient::~AsyncMqttClient() {
delete _currentParsedPacket;
delete[] _parsingInformation.topicBuffer;
for (auto callback : _onMessageUserCallbacks) {
delete callback.first;
}
#ifdef ESP32
vSemaphoreDelete(_xSemaphore);
#endif
Expand Down Expand Up @@ -140,8 +143,14 @@ AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsu
return *this;
}

AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) {
_onMessageUserCallbacks.push_back(callback);
AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic) {
onFilteredMessage(callback, _userTopic);
return *this;
}

AsyncMqttClient& AsyncMqttClient::onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic) {
// _onMessageUserCallbacks.push_back(AsyncMqttClientInternals::onFilteredMessageUserCallback(_userTopic, callback));
_onMessageUserCallbacks.push_back(AsyncMqttClientInternals::onFilteredMessageUserCallback(strdup(_userTopic), callback)); // leakage issue
return *this;
}

Expand Down Expand Up @@ -520,7 +529,47 @@ void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool d
properties.dup = dup;
properties.retain = retain;

for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total);
for (auto callback : _onMessageUserCallbacks) {
bool mqttTopicMatch = false;

if (strcmp(callback.first, "#") == 0 || strcmp(callback.first, topic) == 0) {
mqttTopicMatch = true;
}
else {
char* messageTopic = strdup(topic);
char* userTopic = strdup(callback.first);
char* messageSubTopic = strtok_r (messageTopic, "/", &messageTopic);
char* userSubTopic = strtok_r (userTopic, "/", &userTopic);

while (messageSubTopic != NULL || userSubTopic != NULL) {
if (messageSubTopic != NULL && userSubTopic == NULL) {
mqttTopicMatch = false;
break;
}
else if (messageSubTopic == NULL && userSubTopic != NULL) {
mqttTopicMatch = false;
break;
}
else if (mqttTopicMatch && strcmp(userSubTopic, "#") == 0) {
mqttTopicMatch = true;
break;
}
else if (strcmp(messageSubTopic, userSubTopic) == 0 || strcmp(userSubTopic, "+") == 0) {
messageSubTopic = strtok_r (messageTopic, "/", &messageTopic);
userSubTopic = strtok_r (userTopic, "/", &userTopic);
mqttTopicMatch = true;
}
else {
mqttTopicMatch = false;
break;
}
}
}

if (mqttTopicMatch) {
callback.second(topic, payload, properties, len, index, total);
}
}
}
}

Expand Down Expand Up @@ -765,6 +814,11 @@ uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
return packetId;
}

uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback) {
onFilteredMessage(callback, topic);
subscribe(topic, qos);
}

uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
if (!_connected) return 0;

Expand Down
6 changes: 4 additions & 2 deletions src/AsyncMqttClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ class AsyncMqttClient {
AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback);
AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback);
AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback);
AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback);
AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic = "#");
AsyncMqttClient& onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char* _userTopic);
AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback);

bool connected() const;
void connect();
void disconnect(bool force = false);
uint16_t subscribe(const char* topic, uint8_t qos);
uint16_t subscribe(const char* topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback);
uint16_t unsubscribe(const char* topic);
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);

Expand Down Expand Up @@ -116,7 +118,7 @@ class AsyncMqttClient {
std::vector<AsyncMqttClientInternals::OnDisconnectUserCallback> _onDisconnectUserCallbacks;
std::vector<AsyncMqttClientInternals::OnSubscribeUserCallback> _onSubscribeUserCallbacks;
std::vector<AsyncMqttClientInternals::OnUnsubscribeUserCallback> _onUnsubscribeUserCallbacks;
std::vector<AsyncMqttClientInternals::OnMessageUserCallback> _onMessageUserCallbacks;
std::vector<AsyncMqttClientInternals::onFilteredMessageUserCallback> _onMessageUserCallbacks;
std::vector<AsyncMqttClientInternals::OnPublishUserCallback> _onPublishUserCallbacks;

AsyncMqttClientInternals::ParsingInformation _parsingInformation;
Expand Down
2 changes: 2 additions & 0 deletions src/AsyncMqttClient/Callbacks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ typedef std::function<void(AsyncMqttClientDisconnectReason reason)> OnDisconnect
typedef std::function<void(uint16_t packetId, uint8_t qos)> OnSubscribeUserCallback;
typedef std::function<void(uint16_t packetId)> OnUnsubscribeUserCallback;
typedef std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)> OnMessageUserCallback;
// typedef std::pair<std::string, std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)>> onFilteredMessageUserCallback;
typedef std::pair<char*, std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)>> onFilteredMessageUserCallback;
typedef std::function<void(uint16_t packetId)> OnPublishUserCallback;

// internal callbacks
Expand Down