Основной язык текста запроса — английский.
MQTTx разработан на основе протокола MQTT v3.1.1 и направлен на предоставление брокера MQTT с простым в использовании и превосходной производительностью.
Упаковка:
mvnw -P test -DskipTests=true clean package
.mvnw -P dev -DskipTests=true clean package
.Запуск:
java -jar mqttx-1.0.5.BETA.jar
.Быстрый старт в режиме тестирования:
Режим разработки:
Так называемые режимы тестирования и разработки предназначены только для того, чтобы студенты могли быстро запускать проекты и тестировать функциональные тесты. После ознакомления с проектом студенты могут изменить [6.1 Конфигурационный элемент](#61-Конфигурационный элемент) для включения или отключения функций, предоставляемых mqttx.
mqttx полагается на redis для обеспечения сохранения сообщений, кластеризации и других функций. Также это может быть реализовано с использованием другого промежуточного программного обеспечения (mysql, mongodb, kafka и т. д.), в то время как springboot имеет spring-boot-starter -*** и другие подключаемые компоненты, удобные для всех, кто хочет изменить реализацию по умолчанию.
Другие инструкции:
Рекомендуется использовать Intellij IDEA в качестве инструмента разработки:
Пример: idea необходимо установить плагин Lombok, settings > Build, Execution, Deployment > Compiler > Annotation Processor, чтобы включить Enable annotation processing.
Синглтон-сервис mqttx развёрнут в облаке для функционального тестирования:
mqttx поддерживает аутентификацию клиентов и функции аутентификации публикации/подписки тем. Если вам нужно использовать их вместе, рекомендуемая архитектура выглядит следующим образом: Аутентификация пользователей реализуется самими пользователями
Структура отношений внутренней реализации (перечислены только ключевые элементы):
Схема: https://s1.ax1x.com/2020/07/28/ak6KAO.png
Схема: https://s1.ax1x.com/2020/07/28/ak6mB6.png
├─java
│ └─com
│ └─jun
│ └─mqttx
│ ├─broker # реализация и обработка протокола mqtt
│ │ ├─codec # кодек
│ │ └─handler # обработчик сообщений (pub, sub, connn и т. д.)
│ ├─config # конфигурация, в основном объявление бина
│ ├─constants # константы
│ ├─consumer # потребитель кластерных сообщений
│ ├─entity # класс сущности
│ ├─exception # класс исключений
│ ├─service # интерфейс бизнес-сервиса (аутентификация пользователя, хранение сообщений и т.д.)
│ │ └─impl # реализация по умолчанию
│ └─utils # инструменты
└─resources # файл ресурсов (в этой папке находится application.yml)
└─tls # адрес хранилища ca
Чтобы упростить быстрое развёртывание проекта, используется docker.
- Прежде чем выполнять локальное развёртывание, необходимо сначала загрузить docker.
- Номера портов (
1883
,8083
) жёстко заданы в файле docker-compose. Если вы изменяете конфигурацию портовmqttx
, вам также следует изменить её вdocker-compose.yml
.
dockerfile
, и выполните docker build -t mqttx:v1.0.4.RELEASE
.docker-compose up
.Qos0 | Qos1 | Qos2 |
---|---|---|
Поддержка | Поддержка | Поддержка |
Для поддержки QoS1 и QoS2 в качестве уровня сохраняемости вводится redis
. Эта часть была инкапсулирована в интерфейс, который можно заменить самостоятельно (например, используя mysql
).
#
и одноуровневые подстановочные знаки +
./
, не поддерживаются, например a/b/
, пожалуйста, измените на a/b
.mqttx проверяет только тему подписки TopicFilter. Тема публикации не проверяется на достоверность. Вы можете включить [4.5 поддержку безопасности тем](#45-topic-security support), чтобы ограничить темы, которые клиент может публиковать.
Например:
topicFilter | match topics |
---|---|
/a/b/+ | /a/b/abc, /a/b/test |
a/b/# | a/b, a/b/abc, a/b/c/def |
a/+/b/# | a/nani/b/abc |
/+/a/b/+/c | /aaa/a/b/test/c |
Инструмент проверки — класс com.jun.mqttx.utils.TopicUtils
.
В проекте используется redis pub/sub
для распределения сообщений для поддержки функции кластера. Если вам нужно изменить его на kafka
или другой mq
, вам необходимо изменить класс конфигурации ClusterConfig
и заменить класс реализации InternalMessageServiceImpl
.
Схема: https://s1.ax1x.com/2020/07/28/ak6nHK.png
mqttx.cluster.enable
: переключатель функций, по умолчанию false
В версиях до
v1.0.5.RELEASE
есть ошибки в обработке сообщений кластера, и их нельзя использовать.
Чтобы включить SSL, у вас должен быть ca (самозаверяющий или купленный), а затем изменить несколько конфигураций в файле application.yml
:
mqttx.ssl.enable
: переключатель функций, по умолчанию false
, управляет как websocket
, так и socket
mqttx.ssl.key-store-location
: адрес сертификата, на основе classpath
mqttx.ssl.key-store-password
: пароль сертификатаmqttx.ssl.key-store-type
: тип хранилища ключей, такой как PKCS12
mqttx.ssl.client-auth
: требуется ли серверу проверять сертификат клиента, по умолчанию — false
mqttx.keystore
в каталогеresources/tls
предназначен только для тестирования. Пароль: 123456.
Чтобы ограничить подписки клиентов на темы, добавьте механизм аутентификации для подписки и публикации тем:
Поддерживаемые типы тем:
Общая подписка — это контент, предусмотренный протоколом mqtt5, и многие MQ (например, Kafka) были реализованы.
Следующее изображение показывает разницу между общими и обычными темами:
Стратегия распределения сообщений msg-a зависит от элемента конфигурации mqtx.share-topic.share-sub-strategy.
Вы можете сотрудничать с сеансом cleanSession = 1. После того как клиент, разделяющий тему, отключится, сервер удалит подписку, так что сообщение общей темы будет распространяться только на онлайн-клиентов.
Введение в CleanSession: протокол mqtt3.1.1 предусматривает, что при cleanSession = 1 все состояния (исключая сохранённые сообщения), связанные с сеансом, будут удалены после отключения соединения (в mqtt5 добавлено значение времени ожидания сеанса, заинтересованные студенты могут узнать). После версии mqtx v1.0.5.BETA (включительно) сообщения сеанса cleanSession = 1 хранятся в памяти, что обеспечивает чрезвычайно высокую производительность.
Если для CleanSession установлено значение 1, Клиент и Сервер ДОЛЖНЫ отказаться от любого предыдущего Сеанса и начать новый. Этот сеанс длится до тех пор, пока существует Сетевое Соединение. Состояние данных, связанное с этим сеансом НЕ ДОЛЖНО использоваться в любом последующем сеансе [MQTT-3.1.2-6].
Состояние сеанса Клиента состоит из:
- Сообщений QoS 1 и QoS 2, отправленных на Сервер, но ещё не полностью подтверждённых. -Сообщений QoS 2, полученных от Сервера, но ещё не полностью подтверждённых.
Состояние сеанса на Сервере состоит из:
- Существования сеанса, даже если остальная часть состояния сеанса пуста.
- Подписок Клиента.
- Сообщений QoS 1 и QoS 2, отправленных Клиенту, но ещё не полностью подтверждённых.
- Очереди сообщений QoS 1 и QoS 2 для передачи Клиенту.
- Сообщений QoS 2, полученных от Клиента, но ещё не полностью подтверждённых.
- Опционально, сообщений QoS 0 для передачи Клиенту.
Поддерживается.
Клиент может получить статус брокера, подписавшись на системные темы. В настоящее время система поддерживает следующие темы:
Тема | Повторить | Комментарий |
---|---|---|
$SYS/broker/status |
false |
Клиенты, подписавшиеся на эту тему, периодически (mqttx.sys-topic.interval ) будут получать статус брокера, который охватывает значения статуса всех тем ниже. Примечание: после отключения клиентского соединения подписка отменяется |
$SYS/broker/activeConnectCount |
true |
Немедленно возвращает текущее количество активных подключений |
$SYS/broker/time |
true |
Возвращает текущую временную метку немедленно |
$SYS/broker/version |
true |
Возвращает версию брокера немедленно |
repeat:
- repeat = false*: Только подписаться. Однажды брокер будет регулярно публиковать данные в эту тему.
repeat = true
: подписаться один раз, брокер публикует один раз и может подписаться несколько раз.Примечание:
- Механизм безопасности темы также повлияет на подписку клиента на системные темы, и неавторизованные клиенты не смогут подписаться на системные темы.
- Подписка на системную тему не будет постоянной.
Формат объекта ответа — строка JSON:
{
"activeConnectCount": 2,
"timestamp": "2020-09-18 15:13:46",
"version": "1.0.5.ALPHA"
}
Поле | Описание |
---|---|
ActiveConnectCount |
Текущее количество активных подключений |
timestamp |
Отметка времени; (yyyy-MM-dd HH:mm:ss ) |
version |
Версия mqttx |
Поддержка промежуточного программного обеспечения для сообщений:
Функция моста сообщений может удобно подключаться к середине очереди сообщений.
mqttx.message-bridge.enable
: включить функцию моста сообщений.mqttx.bridge-topics
: Темы, которые необходимо связать сообщениями.После того как mqttx получает сообщение от клиента, который его публикует, он сначала определяет, включена ли функция моста, а затем определяет, является ли тема темой, которую необходимо связать, и, наконец, публикует сообщение в MQ.
Поддерживает только односторонний мост: устройство (клиент) => mqttx => MQ
В состоянии кластера рассмотрите функцию интеграции регистрации служб, которая удобна для управления состоянием кластера. Вы можете использовать consul
. Смотрите мои дальнейшие мысли.
На самом деле я хочу представить
SpringCloud
, но чувствую, чтоspringcloud
немного тяжеловат, поэтому я могу открыть ветку для его реализации.
Исправление ошибок и оптимизация будут продолжаться, но в основном полагаются на студентов, использующих и изучающих mqttx
, чтобы сообщить мне о проблеме (если обратной связи нет, я буду считать, что её нет).
Это на самом деле очень важно, но пока мало студентов обращались ко мне за обратной связью. Я всё-таки человек с ограниченными возможностями.
Платформа управления mqttx-admin
, основанная на vue2.0
, element-ui
, сейчас разрабатывается. Обновление функций mqttx
будет приостановлено на некоторое время (недавно смотрел mqtt5
). В процессе разработки проекта было обнаружено, что некоторые изменения в mqttx
были необходимы, но эти изменения не должны быть перенесены в мастер mqttx
(например, аутентификация безопасности тем должна взаимодействовать с mqttx-platform
, я могу представить Retrofit
для обработки вызовов интерфейса, на самом деле вы можете использовать feign
, я думаю, что они похожи), я должен открыть бизнес-ветку для этого. Кстати, писать проекты на javascript
так круто, почему вы не подумали?
Изначально мне нужно было посвятить часть энергии производному проекту
mqttx-admin
, но позже я обнаружил, что уmqttx
всё ещё слишком много дел, и мне пришлось изменить план.
Показатели производительности mqttx
могут быть улучшены. Я изменю логику обработки pub/sub
в версии v1.1.0.RELEASE
.
В основном
StringRedisTemplate
=>ReactiveStringRedisTemplate
, изменение синхронного на асинхронное.
Введение в направление развития.
~~Версия v1.0.5.RELEASE
становится первой версией LTS mqttx
, и версия v1.0
будет поддерживаться и обновляться на её основе. Чтобы улучшить автономную производительность, версия v1.1
будет полностью асинхронной. Последующая поддержка протокола mqtt5
, возможно, начнётся с версии v1.0
. ~~
mqttx
создаёт две ветки:
com.jun.mqttx.service.impl
интерфейс синхронизации.com.jun.mqttx.service.impl
изменён на асинхронный интерфейс.
Поддерживается mqtt5
. Оптимизация механизма обработки сообщений cleanSession
x Оптимизация механизма обработки сообщений cleanSession.
Мост сообщений
x Мост сообщений.
Исправление ошибок и оптимизация
x Исправление ошибок и оптимизация.
v1.0.4.RELEASE
Поддержка websocket
x Поддержка websocket.
Самопроверка состояния кластера
x Самопроверка состояния кластера.
Исправления ошибок и оптимизации
x Исправления ошибок и оптимизации.
v1.0.3.RELEASE
Исправление ошибки
x Исправление ошибки.
v1.0.2.RELEASE
Добавление общих тем в стратегию опроса
x Добавление общих тем в стратегию опроса.
Исправления ошибок и оптимизаций
x Исправления ошибок и оптимизаций.
v1.0.1.RELEASE
Функция поддержки кластера на основе redis
x Функция поддержки кластера на основе redis.
Общая тема поддержки
x Общая тема поддержки.
Тема функции разрешения
x Тема функции разрешения.
Исправления ошибок и оптимизация
x Исправления ошибок и оптимизация.
v1.0.0.RELEASE
Полная реализация протокола mqttv3.1.1
v1.1.0.RELEASE (в разработке)
Синхронно-асинхронная реализация redis для повышения производительности
Условия тестирования просты, результаты приведены только для справки.
Версия: MQTTX v1.0.5.BETA
Инструменты: mqtt-bench
Машина:
Система | Процессор | Память |
---|---|---|
win10 | i5-4460 | 16G |
На самом деле, хранилище сообщений pub не использует redis, причина описана во введении cleanSession в разделе «Поддержка общих тем».
Выполнить команду java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar.
— qos0:
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 15:33:54.462089 +0800 CST Start benchmark
2020-09-30 15:34:33.6010217 +0800 CST End benchmark
Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=39134ms, throughput=25553.23messages/sec
— qos1:
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 15:29:17.9027515 +0800 CST Start benchmark
2020-09-30 15:30:25.0316915 +0800 CST End benchmark
Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=67124ms, throughput=14897.80messages/sec
— qos2:
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 15:37:00.0678207 +0800 CST Start benchmark
2020-09-30 15:38:55.4419847 +0800 CST End benchmark
Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=115369ms, throughput=8667.84messages/sec
Количество одновременных подключений | Поведение | Размер одного сообщения | Количество сообщений на одно подключение | Общее количество сообщений | qos | Время выполнения | qps |
---|---|---|---|---|---|---|---|
1000 | Отправка сообщения | 1024 байта | 1000 | Один миллион | 0 | 39,1 с | 25 553 |
1000 | Отправка сообщения | 1024 байта | 1000 | Один миллион | 1 | 67,1 с | 14 897 |
1000 | Отправка сообщения | 1024 байта | 1000 | Один миллион | 2 | 115,3 с | 8667 |
Потребление ресурсов: процессор — 25 %, память — 440 МБ
Выполнить команду java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar.
— qos0:
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 17:03:55.7560928 +0800 CST Start benchmark
2020-09-30 17:04:36.2080909 +0800 CST End benchmark
Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=40447ms, throughput=24723.71messages/sec
— qos1:
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 17:06:18.9136484 +0800 CST Start benchmark
2020-09-30 17:08:20.9072865 +0800 CST End benchmark
Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=121992ms, throughput=8197.26messages/sec
— qos2: ``` C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000 2020-09-30 17:09:35.1314262 +0800 CST Start benchmark 2020-09-30 17:13:10.7914125 +0800 CST End benchmark
Результат: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=215656ms, throughput=4637.01messages/sec
| Количество одновременных подключений | Поведение | Размер одного сообщения | Количество сообщений в одном подключении | Общее количество сообщений | QoS | Время выполнения | QPS |
| ------------ | -------- | ------------ | -------------- | -------- | ---- | -------- | ------- |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `0` | `40.4 с` | `24723` |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `1` | `121.9 с` | `8197` |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `2` | `215.6 с` | `4637` |
**Потребление ресурсов: `CPU: 45%`, `память: 440 МБ`**
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )