1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/venus-suite-rocketmq-with-delivery-time

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Концепция

В открытой версии RocketMQ только поддерживает сообщения с задержкой, и только 18 уровней задержки. В платных версиях MQ-продукты на платформах Alibaba Cloud и Tencent Cloud поддерживают точность задержки в секундах.

  • Сообщение по расписанию: производитель отправляет сообщение в очередь сообщений RocketMQ, но не ожидает немедленной доставки сообщения, а откладывает доставку до определённого момента времени после текущего момента. Это сообщение называется сообщением по расписанию.
  • Отложенное сообщение: производитель отправляет сообщение в очередь сообщений RocketMQ, но не ждёт немедленной доставки, а задерживает доставку на некоторое время. Это сообщение называется отложенным сообщением.

Сообщения по расписанию и отложенные сообщения имеют некоторые различия в конфигурации кода, но конечный эффект одинаков: сообщение не доставляется сразу после отправки в очередь RocketMQ, а доставляется через фиксированный промежуток времени в соответствии с атрибутами сообщения.

Реализация (4 варианта)

  1. Реализация через прокси [ссылка].
  2. Реализация с использованием временного цикла и delay-commit-log [ссылка].
  3. Реализация с использованием временного цикла и временного файла [ссылка].
  4. Модификация на основе 18 уровней RocketMQ [ссылка].

Сценарии использования

Сообщения по расписанию и отложенные сообщения подходят для следующих сценариев:

  • В транзакциях электронной коммерции, если заказ не оплачен в течение определённого периода времени, отправляется отложенное сообщение. Это сообщение будет доставлено потребителю через 30 минут. Потребитель должен проверить, был ли заказ оплачен. Если нет, то заказ закрывается. Если да, то сообщение игнорируется.
  • Некоторые задачи могут быть запущены по расписанию с помощью сообщений. Например, отправка напоминаний пользователям в определённое время.

Использование

При использовании сообщений по расписанию и отложенных сообщений есть небольшие различия в коде:

Для отправки сообщения по расписанию необходимо указать момент времени после текущей точки отправки как момент доставки сообщения. Для отправки отложенного сообщения необходимо установить задержку. Сообщение будет доставлено через определённый промежуток времени после отправки.

Примечание

Точность сообщений по расписанию составляет около 1–2 секунд. Параметры msg.setStartDeliverTime для сообщений по расписанию и отложенных должны быть установлены после текущей временной метки (в миллисекундах). Если они установлены до текущей временной метки, сообщение будет доставлено немедленно. Параметры msg.setStartDeliverTime могут быть установлены на любое время в пределах 40 дней (в миллисекунах), иначе отправка сообщения завершится неудачно. StartDeliverTime — это момент, когда сервер начинает доставлять сообщение потребителю. Если у потребителя есть накопленные сообщения, сообщения по расписанию и отложенные будут доставлены после них и не будут строго следовать установленному времени доставки. Из-за разницы во времени между клиентом и сервером фактическое время доставки может отличаться от установленного времени.

Как использовать

Рекомендуется использовать версию RocketMQ от Alibaba Cloud.

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.4.Final</version>
</dependency>

Отправка сообщения

import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils;

import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876");

        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();

        {
            Message msg = new Message(
                    // 您在消息队列RocketMQ版控制台创建的Topic。
                    "TopicTest",
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版服务器过滤。
                    "TagA",
                    // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "演示15秒钟>>> ".getBytes());
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
``` **Настройка Topic в консоли RocketMQ**

Message msg = new Message("YOUR TOPIC",  
    /*设置消息的Tag。*/
    "YOUR MESSAGE TAG",
    /*消息内容。*/
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

/* Отправка отложенного сообщения, необходимо задать время задержки в миллисекундах (ms), сообщение будет доставлено через указанное время задержки, например, сообщение будет доставлено через 3 секунды. */
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

/**
* Если необходимо отправить сообщение по расписанию, то нужно задать время расписания, сообщение будет отправлено в указанное время, например, сообщение будет отправлено 10 августа 2021 года в 18:45:00.
* Формат времени расписания: yyyy-MM-dd HH:mm:ss, если заданное время метки раньше текущего времени, то сообщение будет немедленно доставлено потребителю.
* long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
* msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

## **Совершенный UI-менеджмент**
### **Управление темами**
![](docs/img/a.png)

### **Групповое управление**
![](docs/img/f.png)

### **Управление сообщениями**

#### **Мощный поиск сообщений (поддерживает различные измерения)**

![](docs/img/search.png)

![](docs/img/result.png)

#### **Подробное представление сообщений (просмотр сообщений в формате JSON, поиск истории сообщений)**

![](docs/img/detail.png)

### **Свяжитесь с нами**

![](docs/img/qrcode.jpg)

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Расширить Свернуть
Java и 6 других языков
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://gitlife.ru/oschina-mirror/venus-suite-rocketmq-with-delivery-time.git
git@gitlife.ru:oschina-mirror/venus-suite-rocketmq-with-delivery-time.git
oschina-mirror
venus-suite-rocketmq-with-delivery-time
venus-suite-rocketmq-with-delivery-time
main