diff --git a/db/knex_migrations/2023-10-08-0000-mqtt-query.js b/db/knex_migrations/2023-10-08-0000-mqtt-query.js new file mode 100644 index 00000000..f37bab83 --- /dev/null +++ b/db/knex_migrations/2023-10-08-0000-mqtt-query.js @@ -0,0 +1,16 @@ +exports.up = function (knex) { + // Add new column monitor.mqtt_check_type + return knex.schema + .alterTable("monitor", function (table) { + table.string("mqtt_check_type", 255).notNullable().defaultTo("keyword"); + }); + +}; + +exports.down = function (knex) { + // Drop column monitor.mqtt_check_type + return knex.schema + .alterTable("monitor", function (table) { + table.dropColumn("mqtt_check_type"); + }); +}; diff --git a/server/model/monitor.js b/server/model/monitor.js index 3cf72d23..e0b52062 100644 --- a/server/model/monitor.js +++ b/server/model/monitor.js @@ -4,7 +4,7 @@ const { Prometheus } = require("../prometheus"); const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, MAX_INTERVAL_SECOND, MIN_INTERVAL_SECOND, SQL_DATETIME_FORMAT } = require("../../src/util"); -const { tcping, ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, mqttAsync, setSetting, httpNtlm, radius, grpcQuery, +const { tcping, ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, setSetting, httpNtlm, radius, grpcQuery, redisPingAsync, mongodbPing, kafkaProducerAsync, getOidcTokenClientCredentials, rootCertificatesFingerprints, axiosAbortSignal } = require("../util-server"); const { R } = require("redbean-node"); @@ -134,6 +134,7 @@ class Monitor extends BeanModel { maintenance: await Monitor.isUnderMaintenance(this.id), mqttTopic: this.mqttTopic, mqttSuccessMessage: this.mqttSuccessMessage, + mqttCheckType: this.mqttCheckType, databaseQuery: this.databaseQuery, authMethod: this.authMethod, grpcUrl: this.grpcUrl, @@ -757,14 +758,6 @@ class Monitor extends BeanModel { } else { throw Error("Container State is " + res.data.State.Status); } - } else if (this.type === "mqtt") { - bean.msg = await mqttAsync(this.hostname, this.mqttTopic, this.mqttSuccessMessage, { - port: this.port, - username: this.mqttUsername, - password: this.mqttPassword, - interval: this.interval, - }); - bean.status = UP; } else if (this.type === "sqlserver") { let startTime = dayjs().valueOf(); diff --git a/server/monitor-types/mqtt.js b/server/monitor-types/mqtt.js new file mode 100644 index 00000000..cff5c93d --- /dev/null +++ b/server/monitor-types/mqtt.js @@ -0,0 +1,121 @@ +const { MonitorType } = require("./monitor-type"); +const { log, UP } = require("../../src/util"); +const mqtt = require("mqtt"); +const jsonata = require("jsonata"); + +class MqttMonitorType extends MonitorType { + + name = "mqtt"; + + /** + * Run the monitoring check on the MQTT monitor + * @param {Monitor} monitor Monitor to check + * @param {Heartbeat} heartbeat Monitor heartbeat to update + * @param {UptimeKumaServer} server Uptime Kuma server + * @returns {Promise} + */ + async check(monitor, heartbeat, server) { + const receivedMessage = await this.mqttAsync(monitor.hostname, monitor.mqttTopic, { + port: monitor.port, + username: monitor.mqttUsername, + password: monitor.mqttPassword, + interval: monitor.interval, + }); + + if (monitor.mqttCheckType == null || monitor.mqttCheckType === "") { + // use old default + monitor.mqttCheckType = "keyword"; + } + + if (monitor.mqttCheckType === "keyword") { + if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) { + heartbeat.msg = `Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`; + heartbeat.status = UP; + } else { + throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`); + } + } else if (monitor.mqttCheckType === "json-query") { + const parsedMessage = JSON.parse(receivedMessage); + + let expression = jsonata(monitor.jsonPath); + + let result = await expression.evaluate(parsedMessage); + + if (result?.toString() === monitor.expectedValue) { + heartbeat.msg = "Message received, expected value is found"; + heartbeat.status = UP; + } else { + throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]"); + } + } else { + throw Error("Unknown MQTT Check Type"); + } + } + + /** + * Connect to MQTT Broker, subscribe to topic and receive message as String + * @param {string} hostname Hostname / address of machine to test + * @param {string} topic MQTT topic + * @param {object} options MQTT options. Contains port, username, + * password and interval (interval defaults to 20) + * @returns {Promise} Received MQTT message + */ + mqttAsync(hostname, topic, options = {}) { + return new Promise((resolve, reject) => { + const { port, username, password, interval = 20 } = options; + + // Adds MQTT protocol to the hostname if not already present + if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) { + hostname = "mqtt://" + hostname; + } + + const timeoutID = setTimeout(() => { + log.debug("mqtt", "MQTT timeout triggered"); + client.end(); + reject(new Error("Timeout, Message not received")); + }, interval * 1000 * 0.8); + + const mqttUrl = `${hostname}:${port}`; + + log.debug("mqtt", `MQTT connecting to ${mqttUrl}`); + + let client = mqtt.connect(mqttUrl, { + username, + password + }); + + client.on("connect", () => { + log.debug("mqtt", "MQTT connected"); + + try { + client.subscribe(topic, () => { + log.debug("mqtt", "MQTT subscribed to topic"); + }); + } catch (e) { + client.end(); + clearTimeout(timeoutID); + reject(new Error("Cannot subscribe topic")); + } + }); + + client.on("error", (error) => { + client.end(); + clearTimeout(timeoutID); + reject(error); + }); + + client.on("message", (messageTopic, message) => { + if (messageTopic === topic) { + client.end(); + clearTimeout(timeoutID); + resolve(message.toString("utf8")); + } + }); + + }); + } +} + +module.exports = { + MqttMonitorType, +}; diff --git a/server/server.js b/server/server.js index 0be9a972..b5665fa6 100644 --- a/server/server.js +++ b/server/server.js @@ -799,6 +799,7 @@ let needSetup = false; bean.mqttPassword = monitor.mqttPassword; bean.mqttTopic = monitor.mqttTopic; bean.mqttSuccessMessage = monitor.mqttSuccessMessage; + bean.mqttCheckType = monitor.mqttCheckType; bean.databaseConnectionString = monitor.databaseConnectionString; bean.databaseQuery = monitor.databaseQuery; bean.authMethod = monitor.authMethod; diff --git a/server/uptime-kuma-server.js b/server/uptime-kuma-server.js index f55dcde3..6a06f7da 100644 --- a/server/uptime-kuma-server.js +++ b/server/uptime-kuma-server.js @@ -118,6 +118,7 @@ class UptimeKumaServer { UptimeKumaServer.monitorTypeList["real-browser"] = new RealBrowserMonitorType(); UptimeKumaServer.monitorTypeList["tailscale-ping"] = new TailscalePing(); UptimeKumaServer.monitorTypeList["dns"] = new DnsMonitorType(); + UptimeKumaServer.monitorTypeList["mqtt"] = new MqttMonitorType(); this.io = new Server(this.httpServer); } @@ -436,3 +437,4 @@ module.exports = { const { RealBrowserMonitorType } = require("./monitor-types/real-browser-monitor-type"); const { TailscalePing } = require("./monitor-types/tailscale-ping"); const { DnsMonitorType } = require("./monitor-types/dns"); +const { MqttMonitorType } = require("./monitor-types/mqtt"); diff --git a/server/util-server.js b/server/util-server.js index 3e95e70a..3246925e 100644 --- a/server/util-server.js +++ b/server/util-server.js @@ -7,7 +7,6 @@ const { Resolver } = require("dns"); const childProcess = require("child_process"); const iconv = require("iconv-lite"); const chardet = require("chardet"); -const mqtt = require("mqtt"); const chroma = require("chroma-js"); const { badgeConstants } = require("./config"); const mssql = require("mssql"); @@ -173,73 +172,6 @@ exports.pingAsync = function (hostname, ipv6 = false, size = 56) { }); }; -/** - * MQTT Monitor - * @param {string} hostname Hostname / address of machine to test - * @param {string} topic MQTT topic - * @param {string} okMessage Expected result - * @param {object} options MQTT options. Contains port, username, - * password and interval (interval defaults to 20) - * @returns {Promise} Received MQTT message - */ -exports.mqttAsync = function (hostname, topic, okMessage, options = {}) { - return new Promise((resolve, reject) => { - const { port, username, password, interval = 20 } = options; - - // Adds MQTT protocol to the hostname if not already present - if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) { - hostname = "mqtt://" + hostname; - } - - const timeoutID = setTimeout(() => { - log.debug("mqtt", "MQTT timeout triggered"); - client.end(); - reject(new Error("Timeout")); - }, interval * 1000 * 0.8); - - const mqttUrl = `${hostname}:${port}`; - - log.debug("mqtt", `MQTT connecting to ${mqttUrl}`); - - let client = mqtt.connect(mqttUrl, { - username, - password - }); - - client.on("connect", () => { - log.debug("mqtt", "MQTT connected"); - - try { - log.debug("mqtt", "MQTT subscribe topic"); - client.subscribe(topic); - } catch (e) { - client.end(); - clearTimeout(timeoutID); - reject(new Error("Cannot subscribe topic")); - } - }); - - client.on("error", (error) => { - client.end(); - clearTimeout(timeoutID); - reject(error); - }); - - client.on("message", (messageTopic, message) => { - if (messageTopic === topic) { - client.end(); - clearTimeout(timeoutID); - if (okMessage != null && okMessage !== "" && message.toString() !== okMessage) { - reject(new Error(`Message Mismatch - Topic: ${messageTopic}; Message: ${message.toString()}`)); - } else { - resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`); - } - } - }); - - }); -}; - /** * Monitor Kafka using Producer * @param {string[]} brokers List of kafka brokers to connect, host and diff --git a/src/lang/en.json b/src/lang/en.json index bcf9e220..1bacee31 100644 --- a/src/lang/en.json +++ b/src/lang/en.json @@ -246,8 +246,8 @@ "Current User": "Current User", "topic": "Topic", "topicExplanation": "MQTT topic to monitor", - "successMessage": "Success Message", - "successMessageExplanation": "MQTT message that will be considered as success", + "successKeyword": "Success Keyword", + "successKeywordExplanation": "MQTT Keyword that will be considered as success", "recent": "Recent", "Reset Token": "Reset Token", "Done": "Done", diff --git a/src/pages/EditMonitor.vue b/src/pages/EditMonitor.vue index 490e55ab..1a2d1c93 100644 --- a/src/pages/EditMonitor.vue +++ b/src/pages/EditMonitor.vue @@ -349,12 +349,34 @@
- - + + +
+ +
+ +
- {{ $t("successMessageExplanation") }} + {{ $t("successKeywordExplanation") }}
+ + +
+ + + + +
+
+
+ + + +