1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/amazingJun-mqttx

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
readme_en.md

MQTTX Project

Основной язык текста запроса — английский.

1 Введение

MQTTx разработан на основе протокола MQTT v3.1.1 и направлен на предоставление брокера MQTT с простым в использовании и превосходной производительностью.

1.1 Быстрый старт

  1. Упаковка:

    • Режим тестирования: запустите mvnw -P test -DskipTests=true clean package.
    • Режим разработки:
      • Запустите экземпляр redis.
      • Выполните команду mvnw -P dev -DskipTests=true clean package.
  2. Запуск:

    • Выполните команду: java -jar mqttx-1.0.5.BETA.jar.

Быстрый старт в режиме тестирования:

  • Функция кластера принудительно закрыта.
  • Сообщение хранится в памяти вместо redis.

Режим разработки:

  • Сообщения будут сохраняться в redis, соединение по умолчанию — localhost:6376 без пароля.

Так называемые режимы тестирования и разработки предназначены только для того, чтобы студенты могли быстро запускать проекты и тестировать функциональные тесты. После ознакомления с проектом студенты могут изменить [6.1 Конфигурационный элемент](#61-Конфигурационный элемент) для включения или отключения функций, предоставляемых mqttx.

mqttx полагается на redis для обеспечения сохранения сообщений, кластеризации и других функций. Также это может быть реализовано с использованием другого промежуточного программного обеспечения (mysql, mongodb, kafka и т. д.), в то время как springboot имеет spring-boot-starter -*** и другие подключаемые компоненты, удобные для всех, кто хочет изменить реализацию по умолчанию.

1.2 Проектные зависимости

  • redis: кластерное сообщение, сохранение сообщений.
  • kafka: поддержка моста сообщений.

Другие инструкции:

  1. В проекте используется lombok, пожалуйста, установите соответствующий плагин для использования ide.

Рекомендуется использовать Intellij IDEA в качестве инструмента разработки:

Пример: idea необходимо установить плагин Lombok, settings > Build, Execution, Deployment > Compiler > Annotation Processor, чтобы включить Enable annotation processing.

1.3 Онлайн-примеры

Синглтон-сервис mqttx развёрнут в облаке для функционального тестирования:

  1. Не поддерживает ssl.
  2. Websocket включён, он может пройти тест http://tools.emqx.io/, нужно только изменить доменное имя на: 119.45.158.51 (порт и адрес остаются неизменными).
  3. Поддерживает функцию общей подписки.
  4. Версия развёртывания v1.0.5.BETA.

2 Архитектура

mqttx поддерживает аутентификацию клиентов и функции аутентификации публикации/подписки тем. Если вам нужно использовать их вместе, рекомендуемая архитектура выглядит следующим образом: Аутентификация пользователей реализуется самими пользователями

Структура отношений внутренней реализации (перечислены только ключевые элементы):

Схема: https://s1.ax1x.com/2020/07/28/ak6KAO.png

Схема: https://s1.ax1x.com/2020/07/28/ak6mB6.png

2.1 Структура каталогов

├─java
│  └─com
│      └─jun
│          └─mqttx
│              ├─broker         # реализация и обработка протокола mqtt
│              │  ├─codec       # кодек
│              │  └─handler     # обработчик сообщений (pub, sub, connn и т. д.)
│              ├─config         # конфигурация, в основном объявление бина
│              ├─constants      # константы
│              ├─consumer       # потребитель кластерных сообщений
│              ├─entity         # класс сущности
│              ├─exception      # класс исключений
│              ├─service        # интерфейс бизнес-сервиса (аутентификация пользователя, хранение сообщений и т.д.)
│              │  └─impl        # реализация по умолчанию
│              └─utils          # инструменты
└─resources                     # файл ресурсов (в этой папке находится application.yml)
    └─tls                       # адрес хранилища ca

3 Развёртывание в контейнерах

Чтобы упростить быстрое развёртывание проекта, используется docker.

  1. Прежде чем выполнять локальное развёртывание, необходимо сначала загрузить docker.
  2. Номера портов (1883, 8083) жёстко заданы в файле docker-compose. Если вы изменяете конфигурацию портов mqttx, вам также следует изменить её в docker-compose.yml.
  1. Упакуйте проект как target/*.jar с помощью функции упаковки, предоставляемой IDE.
  2. Войдите в каталог на том же уровне, что и dockerfile, и выполните docker build -t mqttx:v1.0.4.RELEASE .
  3. Выполните docker-compose up.

4 Описание функций

4.1 Поддержка QoS

Qos0 Qos1 Qos2
Поддержка Поддержка Поддержка

Для поддержки QoS1 и QoS2 в качестве уровня сохраняемости вводится redis. Эта часть была инкапсулирована в интерфейс, который можно заменить самостоятельно (например, используя mysql).

4.2 Поддержка TopicFilter

  1. Поддерживаются многоуровневые подстановочные знаки # и одноуровневые подстановочные знаки +.
  2. Темы, оканчивающиеся на /, не поддерживаются, например a/b/, пожалуйста, измените на a/b.
  3. Другие правила см. в mqtt v3.1.1 4.7 Topic Names and Topic Filters

mqttx проверяет только тему подписки TopicFilter. Тема публикации не проверяется на достоверность. Вы можете включить [4.5 поддержку безопасности тем](#45-topic-security support), чтобы ограничить темы, которые клиент может публиковать.

Например:

topicFilter match topics
/a/b/+ /a/b/abc, /a/b/test
a/b/# a/b, a/b/abc, a/b/c/def
a/+/b/# a/nani/b/abc
/+/a/b/+/c /aaa/a/b/test/c

Инструмент проверки — класс com.jun.mqttx.utils.TopicUtils.

4.3 Поддержка кластера

В проекте используется redis pub/sub для распределения сообщений для поддержки функции кластера. Если вам нужно изменить его на kafka или другой mq, вам необходимо изменить класс конфигурации ClusterConfig и заменить класс реализации InternalMessageServiceImpl.

Схема: https://s1.ax1x.com/2020/07/28/ak6nHK.png

  1. mqttx.cluster.enable: переключатель функций, по умолчанию false

В версиях до v1.0.5.RELEASE есть ошибки в обработке сообщений кластера, и их нельзя использовать.

4.4 Поддержка SSL

Чтобы включить SSL, у вас должен быть ca (самозаверяющий или купленный), а затем изменить несколько конфигураций в файле application.yml:

  1. mqttx.ssl.enable: переключатель функций, по умолчанию false, управляет как websocket, так и socket
  2. mqttx.ssl.key-store-location: адрес сертификата, на основе classpath
  3. mqttx.ssl.key-store-password: пароль сертификата
  4. mqttx.ssl.key-store-type: тип хранилища ключей, такой как PKCS12
  5. mqttx.ssl.client-auth: требуется ли серверу проверять сертификат клиента, по умолчанию — false

mqttx.keystore в каталоге resources/tls предназначен только для тестирования. Пароль: 123456.

4.5 Механизм аутентификации при подписке и публикации тем

Чтобы ограничить подписки клиентов на темы, добавьте механизм аутентификации для подписки и публикации тем:

  1. mqttx.enable-topic-sub-pub-secure — функция переключения, по умолчанию false.
  2. При использовании необходимо реализовать интерфейс AuthenticationService. Возвращаемый объект этого интерфейса содержит authorizedSub и authorizedPub для хранения списка тем, на которые клиент имеет право подписываться и публиковать.
  3. Брокер проверяет разрешения клиента во время подписки на сообщения и их публикации.

Поддерживаемые типы тем:

  • Обычная тема;
  • Общая тема;
  • Системная тема.

4.6 Поддержка общих тем

Общая подписка — это контент, предусмотренный протоколом mqtt5, и многие MQ (например, Kafka) были реализованы.

  1. mqtx.share-topic.enable — функция переключения, по умолчанию true.
  2. Формат: $share/{ShareName}/{filter}, где $share — префикс, ShareName — имя общей подписки, а filter — фильтр тем необщей подписки.
  3. В настоящее время поддерживаются три правила: hash, random и round.

Следующее изображение показывает разницу между общими и обычными темами:

share-topic

Стратегия распределения сообщений msg-a зависит от элемента конфигурации mqtx.share-topic.share-sub-strategy.

Вы можете сотрудничать с сеансом cleanSession = 1. После того как клиент, разделяющий тему, отключится, сервер удалит подписку, так что сообщение общей темы будет распространяться только на онлайн-клиентов.

Введение в CleanSession: протокол mqtt3.1.1 предусматривает, что при cleanSession = 1 все состояния (исключая сохранённые сообщения), связанные с сеансом, будут удалены после отключения соединения (в mqtt5 добавлено значение времени ожидания сеанса, заинтересованные студенты могут узнать). После версии mqtx v1.0.5.BETA (включительно) сообщения сеанса cleanSession = 1 хранятся в памяти, что обеспечивает чрезвычайно высокую производительность.

Если для CleanSession установлено значение 1, Клиент и Сервер ДОЛЖНЫ отказаться от любого предыдущего Сеанса и начать новый. Этот сеанс длится до тех пор, пока существует Сетевое Соединение. Состояние данных, связанное с этим сеансом НЕ ДОЛЖНО использоваться в любом последующем сеансе [MQTT-3.1.2-6].

Состояние сеанса Клиента состоит из:

  • Сообщений QoS 1 и QoS 2, отправленных на Сервер, но ещё не полностью подтверждённых. -Сообщений QoS 2, полученных от Сервера, но ещё не полностью подтверждённых.

Состояние сеанса на Сервере состоит из:

  • Существования сеанса, даже если остальная часть состояния сеанса пуста.
  • Подписок Клиента.
  • Сообщений QoS 1 и QoS 2, отправленных Клиенту, но ещё не полностью подтверждённых.
  • Очереди сообщений QoS 1 и QoS 2 для передачи Клиенту.
  • Сообщений QoS 2, полученных от Клиента, но ещё не полностью подтверждённых.
  • Опционально, сообщений QoS 0 для передачи Клиенту.

4.7 Поддержка WebSocket

Поддерживается.

4.8 Системные темы

Клиент может получить статус брокера, подписавшись на системные темы. В настоящее время система поддерживает следующие темы:

Тема Повторить Комментарий
$SYS/broker/status false Клиенты, подписавшиеся на эту тему, периодически (mqttx.sys-topic.interval) будут получать статус брокера, который охватывает значения статуса всех тем ниже.
Примечание: после отключения клиентского соединения подписка отменяется
$SYS/broker/activeConnectCount true Немедленно возвращает текущее количество активных подключений
$SYS/broker/time true Возвращает текущую временную метку немедленно
$SYS/broker/version true Возвращает версию брокера немедленно

repeat:

  • repeat = false*: Только подписаться. Однажды брокер будет регулярно публиковать данные в эту тему.
  • repeat = true: подписаться один раз, брокер публикует один раз и может подписаться несколько раз.

Примечание:

  1. Механизм безопасности темы также повлияет на подписку клиента на системные темы, и неавторизованные клиенты не смогут подписаться на системные темы.
  2. Подписка на системную тему не будет постоянной.

Формат объекта ответа — строка JSON:

{
"activeConnectCount": 2,
"timestamp": "2020-09-18 15:13:46",
"version": "1.0.5.ALPHA"
}
Поле Описание
ActiveConnectCount Текущее количество активных подключений
timestamp Отметка времени; (yyyy-MM-dd HH:mm:ss)
version Версия mqttx

4.9 Поддержка мостов сообщений

Поддержка промежуточного программного обеспечения для сообщений:

  • kafka

Функция моста сообщений может удобно подключаться к середине очереди сообщений.

  1. mqttx.message-bridge.enable: включить функцию моста сообщений.
  2. mqttx.bridge-topics: Темы, которые необходимо связать сообщениями.

После того как mqttx получает сообщение от клиента, который его публикует, он сначала определяет, включена ли функция моста, а затем определяет, является ли тема темой, которую необходимо связать, и, наконец, публикует сообщение в MQ.

Поддерживает только односторонний мост: устройство (клиент) => mqttx => MQ

5 Разработчик говорит

  1. В состоянии кластера рассмотрите функцию интеграции регистрации служб, которая удобна для управления состоянием кластера. Вы можете использовать consul. Смотрите мои дальнейшие мысли.

    На самом деле я хочу представить SpringCloud, но чувствую, что springcloud немного тяжеловат, поэтому я могу открыть ветку для его реализации.

  2. Исправление ошибок и оптимизация будут продолжаться, но в основном полагаются на студентов, использующих и изучающих mqttx, чтобы сообщить мне о проблеме (если обратной связи нет, я буду считать, что её нет).

    Это на самом деле очень важно, но пока мало студентов обращались ко мне за обратной связью. Я всё-таки человек с ограниченными возможностями.

  3. Платформа управления mqttx-admin, основанная на vue2.0, element-ui, сейчас разрабатывается. Обновление функций mqttx будет приостановлено на некоторое время (недавно смотрел mqtt5). В процессе разработки проекта было обнаружено, что некоторые изменения в mqttx были необходимы, но эти изменения не должны быть перенесены в мастер mqttx (например, аутентификация безопасности тем должна взаимодействовать с mqttx-platform, я могу представить Retrofit для обработки вызовов интерфейса, на самом деле вы можете использовать feign, я думаю, что они похожи), я должен открыть бизнес-ветку для этого. Кстати, писать проекты на javascript так круто, почему вы не подумали?

    Изначально мне нужно было посвятить часть энергии производному проекту mqttx-admin, но позже я обнаружил, что у mqttx всё ещё слишком много дел, и мне пришлось изменить план.

  4. Показатели производительности mqttx могут быть улучшены. Я изменю логику обработки pub/sub в версии v1.1.0.RELEASE.

    В основном StringRedisTemplate => ReactiveStringRedisTemplate, изменение синхронного на асинхронное.

  5. Введение в направление развития.

    ~~Версия v1.0.5.RELEASE становится первой версией LTS mqttx, и версия v1.0 будет поддерживаться и обновляться на её основе. Чтобы улучшить автономную производительность, версия v1.1 будет полностью асинхронной. Последующая поддержка протокола mqtt5, возможно, начнётся с версии v1.0. ~~

    mqttx создаёт две ветки:

    • v1.0: com.jun.mqttx.service.impl интерфейс синхронизации.
    • v1.1: com.jun.mqttx.service.impl изменён на асинхронный интерфейс. Поддерживается mqtt5. Оптимизация механизма обработки сообщений cleanSession

x Оптимизация механизма обработки сообщений cleanSession.

Мост сообщений

x Мост сообщений.

Исправление ошибок и оптимизация

x Исправление ошибок и оптимизация.

v1.0.4.RELEASE

Поддержка websocket

x Поддержка websocket.

Самопроверка состояния кластера

x Самопроверка состояния кластера.

Исправления ошибок и оптимизации

x Исправления ошибок и оптимизации.

v1.0.3.RELEASE

Исправление ошибки

x Исправление ошибки.

v1.0.2.RELEASE

Добавление общих тем в стратегию опроса

x Добавление общих тем в стратегию опроса.

Исправления ошибок и оптимизаций

x Исправления ошибок и оптимизаций.

v1.0.1.RELEASE

Функция поддержки кластера на основе redis

x Функция поддержки кластера на основе redis.

Общая тема поддержки

x Общая тема поддержки.

Тема функции разрешения

x Тема функции разрешения.

Исправления ошибок и оптимизация

x Исправления ошибок и оптимизация.

v1.0.0.RELEASE

Полная реализация протокола mqttv3.1.1

6.2.2 v1.1

v1.1.0.RELEASE (в разработке)

Синхронно-асинхронная реализация redis для повышения производительности

6.3 Benchmark

Условия тестирования просты, результаты приведены только для справки.

Версия: MQTTX v1.0.5.BETA

Инструменты: mqtt-bench

Машина:

Система Процессор Память
win10 i5-4460 16G

6.3.1 CleanSessionTrue

  1. Включить redis.
  2. CleanSession: true.

На самом деле, хранилище сообщений pub не использует redis, причина описана во введении cleanSession в разделе «Поддержка общих тем».

Выполнить команду java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar.

qos0:

C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 15:33:54.462089 +0800 CST Start benchmark
2020-09-30 15:34:33.6010217 +0800 CST End benchmark

Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=39134ms, throughput=25553.23messages/sec

qos1:

C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 15:29:17.9027515 +0800 CST Start benchmark
2020-09-30 15:30:25.0316915 +0800 CST End benchmark

Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=67124ms, throughput=14897.80messages/sec

qos2:

C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 15:37:00.0678207 +0800 CST Start benchmark
2020-09-30 15:38:55.4419847 +0800 CST End benchmark

Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=115369ms, throughput=8667.84messages/sec
Количество одновременных подключений Поведение Размер одного сообщения Количество сообщений на одно подключение Общее количество сообщений qos Время выполнения qps
1000 Отправка сообщения 1024 байта 1000 Один миллион 0 39,1 с 25 553
1000 Отправка сообщения 1024 байта 1000 Один миллион 1 67,1 с 14 897
1000 Отправка сообщения 1024 байта 1000 Один миллион 2 115,3 с 8667

Потребление ресурсов: процессор — 25 %, память — 440 МБ

6.3.2 CleanSessionFalse

  1. Включить redis.
  2. CleanSession: false.

Выполнить команду java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar.

qos0:

C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 17:03:55.7560928 +0800 CST Start benchmark
2020-09-30 17:04:36.2080909 +0800 CST End benchmark

Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=40447ms, throughput=24723.71messages/sec

qos1:

C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 17:06:18.9136484 +0800 CST Start benchmark
2020-09-30 17:08:20.9072865 +0800 CST End benchmark

Result: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=121992ms, throughput=8197.26messages/sec

qos2: ``` C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000 2020-09-30 17:09:35.1314262 +0800 CST Start benchmark 2020-09-30 17:13:10.7914125 +0800 CST End benchmark

Результат: broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=215656ms, throughput=4637.01messages/sec

| Количество одновременных подключений | Поведение | Размер одного сообщения | Количество сообщений в одном подключении | Общее количество сообщений | QoS | Время выполнения | QPS |
| ------------ | -------- | ------------ | -------------- | -------- | ---- | -------- | ------- |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `0` | `40.4 с` | `24723` |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `1` | `121.9 с` | `8197` |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `2` | `215.6 с` | `4637` |

**Потребление ресурсов: `CPU: 45%`, `память: 440 МБ`**

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Проект MQTTX полностью реализует протокол mqtt v3.1.1 и направлен на предоставление простого в использовании и высокопроизводительного брокера mqtt. Расширить Свернуть
Java
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://gitlife.ru/oschina-mirror/amazingJun-mqttx.git
git@gitlife.ru:oschina-mirror/amazingJun-mqttx.git
oschina-mirror
amazingJun-mqttx
amazingJun-mqttx
v1.2