BBoss — хороший Java-клиент для работы с Elasticsearch по протоколу REST. Он работает с Elasticsearch аналогично тому, как это делает mybatis.
Требуется версия 1.8 или выше.
Поддерживаются версии:
Поддерживаются версии:
Демонстрация инструмента подходит для новых версий клиента Kafka и позволяет быстро написать, упаковать и опубликовать инструмент для импорта данных, который можно запустить в контейнерной среде.
Поддерживаемые версии Kafka:
Поддерживаемые версии Elasticsearch:
Инструмент поддерживает функцию импорта больших объёмов данных уровня PB.
Подробнее см. в справочном документе.
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-datatran-kafka2x</artifactId>
<version>7.2.0</version>
<scope>compile</scope>
</dependency>
В зависимости от версии сервера Kafka необходимо импортировать и настроить версию клиента Kafka.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-tools</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>compile</scope>
</dependency>
Для сборки и публикации версии требуется Gradle. Подробные инструкции по установке и настройке Gradle см. здесь.
Исходный код проекта можно скачать с этого адреса и импортировать в IDE, например, IDEA или Eclipse. Затем можно изменить логику программы в соответствии со своими потребностями.
Чтобы протестировать и отладить функцию импорта, запустите метод main класса Kafka2ESdemo:
public class Kafka2ESdemo {
public static void main(String[] args){
Kafka2ESdemo dbdemo = new Kafka2ESdemo();
boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值
dbdemo.scheduleTimestampImportData(dropIndice);
}
.....
}
Например, для MySQL, но также поддерживаются другие базы данных и функции изменения данных. Чтобы протестировать и отладить функцию импорта, запустите метод main класса Kafka2DBdemo:
public class Kafka2DBdemo {
public static void main(String[] args){
Kafka2DBdemo dbdemo = new Kafka2DBdemo();
boolean dropIndice =
``` **Модификация es конфигурации — kafka2x-elasticsearch\src\main\resources\application.properties**
После завершения модификации конфигурации можно приступать к функциональной отладке.
После успешного тестирования и отладки можно приступить к сборке и выпуску рабочей версии: перейти в режим командной строки, в корневом каталоге исходного кода kafka2x-elasticsearch выполнить следующую команду gradle для упаковки и выпуска версии:
release.bat
## Выполнение задания
После успешной сборки gradle в каталоге build/distributions будет создан исполняемый zip-пакет, который можно распаковать и запустить.
Linux:
chmod +x restart.sh
./restart.sh
Windows: restart.bat
## Конфигурация JVM задания
Измените файл jvm.options, чтобы установить размер памяти и другие параметры JVM:
-Xms1g
-Xmx1g
## Настройка параметров задания
При использовании [kafka2x-elasticsearch ](https://github.com/bbossgroups/kafka2x-elasticsearch) во избежание постоянного перекомпоновки и выпуска данных синхронизации инструментов в процессе отладки, можно настроить некоторые параметры управления в файле запуска resources/application.properties, а затем получить эти параметры в коде следующим образом:
```properties
# Инструмент основной программы
mainclass=org.frameworkset.elasticsearch.imp.Kafka2ESdemo
# mainclass=org.frameworkset.elasticsearch.imp.Kafka2DBdemo
# Настройка параметров
# Метод получения в коде: CommonLauncher.getBooleanAttribute("dropIndice",false);// одновременно указано значение по умолчанию false
dropIndice=false
В коде метод получения параметра dropIndice:
boolean dropIndice = CommonLauncher.getBooleanAttribute("dropIndice",false); // одновременно указано значение по умолчанию false
Также можно настроить некоторые параметры выполнения задания в resources/application.properties, такие как количество рабочих потоков, размер очереди ожидания и размер пакета обработки и т. д.:
queueSize=50
workThreads=10
batchSize=20
Метод получения и использования этих параметров в задании:
int batchSize = CommonLauncher.getIntProperty("batchSize",10); // одновременно указано значение по умолчанию
int queueSize = CommonLauncher.getIntProperty("queueSize",50); // одновременно указано значение по умолчанию
int workThreads = CommonLauncher.getIntProperty("workThreads",10); // одновременно указано значение по умолчанию
importBuilder.setBatchSize(batchSize);
importBuilder.setQueue(queueSize); // установить длину очереди пула потоков пакетного импорта
importBuilder.setThreadCount(workThreads); // установить количество рабочих потоков пула потоков пакетного импорта
Команда bin/kafka-console-consumer.sh --zookeeper 10.19.85.65:2185 --topic mysqlbinlog
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )