1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/happyjoejoe-go-bin2es

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

go-bin2es — это сервис, синхронизирующий binlog с es.

Архитектура:

go-mysql используется для фильтрации данных из указанной базы данных и таблицы. Затем отфильтрованные данные binlog обновляются в Elasticsearch 7 с помощью метода конфигурации (./config/binlog2es.json).

Особенности:

  • Поддержка последней версии Elasticsearch 7.
  • Поддержка полного и инкрементального импорта данных.
  • Лёгкая и компактная архитектура, занимающая мало памяти и ресурсов процессора.
  • Встроенная поддержка объектов и вложенных массивов в Elasticsearch.
  • Высокая скорость и низкая задержка.
  • Гарантия окончательной согласованности данных.
  • Основан на etcd, поддерживает главный-подчиненный режим.
  • Не поддерживает операцию delete.
  • Поддерживает пользовательские функции UserDefinedFunc для обработки данных Elasticsearch, расширяя возможности.

Пример

create database test;

create table Parent (
    id int not null auto_increment primary key,
    name varchar(64) not null,
    sex char(1) not null
)comment = '父';

create table Child (
    id int not null auto_increment primary key,
    name varchar(64) not null,
    sex char(1) not null, 
    parent_id int not null
)comment = '子';

Parent:
insert into Parent (name, sex) values ('Tom', "m"); -- id:1 
Child:
insert into Child (name, sex, parent_id) values ('Tom1', 'm', 1); -- Tom's child1 
insert into Child (name, sex, parent_id) values ('Tom2', 'm', 1); -- Tom's child2 
insert into Child (name, sex, parent_id) values ('Tom3', 'm', 1); -- Tom's child3 

Parent:
insert into Parent (name, sex) values ('Jerry', "m"); -- id:2 
Child:
insert into Child (name, sex, parent_id) values ('Jerry1', 'm', 2); -- Jerry's child1 
insert into Child (name, sex, parent_id) values ('Jerry2', 'f', 2); -- Jerry's child2

Настройка config.toml

[bin2es]
sync_ch_len = 64

[es]
nodes = [
    "http://localhost:9200" #es集群
]
user = "elastic"
passwd = "123456"
enable_authentication = false #开启认证
insecure_skip_verify = true #是否忽略证书校验
bulk_size = 1024  #批量刷新个数
flush_duration = 500  #批量刷新时间间隔, 单位:ms

[mysql]
addr = "localhost"
port = 3306
user = "bin2es"
pwd = "bin2es"
charset = "utf8mb4"
server_id = 3 #与其他slave节点的server_id不同即可

[master_info]
addr = "localhost"
port = 3306
user = "bin2es"
pwd = "bin2es"
charset = "utf8mb4"
schema = "bin2es"
table = "master_info"
flush_duration = 3  #binlog position 刷新时间间隔, 单位:s

[zk]
enable = false
lock_path = "/go-bin2es-lock"
session_timeout = 1
hosts = [
    "localhost:2181"
]

[etcd]
enable = true
enable_tls = true
lock_path = "/go-bin2es-lock"
dial_timeout = 3
cert_path = "/etc/etcd/etcdSSL/etcd.pem"
key_path = "/etc/etcd/etcdSSL/etcd-key.pem"
ca_path = "/etc/etcd/etcdSSL/ca.pem"
endpoints = [
    "127.0.0.1:2379",
]

[[source]]
schema = "test"
dump_table = "Parent"
tables = [
    "Parent",
    "Child"
]

Настойка bin2es.json

[
    {
        "schema":"test",
        "tables":[
            "Parent",
            "Child"
        ],
        "actions":[
            "insert",
            "update"
        ],
        "pipeline":[
            {
                "DoSQL":{
                    "sql":"SELECT Parent.id, Parent.name, Parent.sex, group_concat(concat_ws('_', Child.name, Child.sex) separator ',') as Childs FROM Parent join Child on Parent.id = Child.parent_id WHERE (?) GROUP BY Parent.id",
                    "placeholders":{              #若遇到Parent或Child, 自动将`?`替换为`$key.$value = {该表对应的$value对应的字段的值}`
                        "Parent":"id",      #eg: 若遇到Parent, 则`?`被替换为`Parent.id = 行数据对应的`id`字段的值`
                        "Child":"parent_id" #eg: 若遇到Child, 则`?`被替换为`Child.parent_id = 行数据对应的`parent_id`字段的值`
                    }
                }
            },
            {
                "Object":{
                    "common":"profile",
                    "fields":{
                        "name":"es_name",   #将查询到的结果的`name`字段放进`profile`的`es_name`下
                        "sex":"es_sex"      #同理
                    }
                }
            },
            {
                "NestedArray":{
                    "sql_field":"Childs",     #将查询到的结果的`Childs`字段解析放入到common指定的`childs`下
                    "common":"childs",
                    "pos2fields":{            #其中解析结果的第一个位置放入到es的`es_name`下, 第二й позиция помещается в `es_sex` ниже
                        "es_name":1,
                        "es_sex":2
                    },

``` ```
fields_seprator: _
group_separator: ,
}
},
{
SetDocID: {
_id: "id"
}
}
],
dest: {
index: "test_es" # es的索引名
}

execute ./bin/go-bin2es then, you will get results in es by using following API: GET /test_es/_search

{
took: 0,
timed_out: false,
_shards: {
total: 1,
successful: 1,
skipped: 0,
failed: 0
},
hits: {
total: {
value: 2,
relation: "eq"
},
max_score: 1.0,
hits: [
{
_index: "test_es",
_type: "_doc",
_id: "1",
_score: 1.0,
_source: {
childs: [  #nested array
{
chl_name: "Tom1",
chl_sex: "f"
},
{
chl_name: "Tom2",
chl_sex: "f"
},
{
chl_name: "Tom3",
chl_sex: "m"
}
],
id: "1",
profile: { # object
es_name: "Tom",
es_sex: "m"
}
}
},
{
_index: "test_es",
_type: "_doc",
_id: "2",
_score: 1.0,
_source: {
childs: [  #nested array
{
chl_name: "Jerry1",
chl_sex: "m"
},
{
chl_name: "Jerry2",
chl_sex: "f"
}
],
id: "2",
profile: { # object
es_name: "Jerry",
es_sex: "m"
}
}
}
]
}
}

Если вы считаете это полезным, можете вознаградить меня бутылкой Colo.


Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Мониторинг данных binlog MySQL и их парсинг в промежуточное ПО Elasticsearch 7. Поддерживаются полное и инкрементальное импортирование. Есть возможность настраивать функции фильтрации для обработки данных binlog. Высокая доступность и хорошая расширяемость! Расширить Свернуть
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://gitlife.ru/oschina-mirror/happyjoejoe-go-bin2es.git
git@gitlife.ru:oschina-mirror/happyjoejoe-go-bin2es.git
oschina-mirror
happyjoejoe-go-bin2es
happyjoejoe-go-bin2es
master