go-bin2es — это сервис, синхронизирующий binlog с es.
go-mysql используется для фильтрации данных из указанной базы данных и таблицы. Затем отфильтрованные данные binlog обновляются в Elasticsearch 7 с помощью метода конфигурации (./config/binlog2es.json).
create database test;
create table Parent (
id int not null auto_increment primary key,
name varchar(64) not null,
sex char(1) not null
)comment = '父';
create table Child (
id int not null auto_increment primary key,
name varchar(64) not null,
sex char(1) not null,
parent_id int not null
)comment = '子';
Parent:
insert into Parent (name, sex) values ('Tom', "m"); -- id:1
Child:
insert into Child (name, sex, parent_id) values ('Tom1', 'm', 1); -- Tom's child1
insert into Child (name, sex, parent_id) values ('Tom2', 'm', 1); -- Tom's child2
insert into Child (name, sex, parent_id) values ('Tom3', 'm', 1); -- Tom's child3
Parent:
insert into Parent (name, sex) values ('Jerry', "m"); -- id:2
Child:
insert into Child (name, sex, parent_id) values ('Jerry1', 'm', 2); -- Jerry's child1
insert into Child (name, sex, parent_id) values ('Jerry2', 'f', 2); -- Jerry's child2
[bin2es]
sync_ch_len = 64
[es]
nodes = [
"http://localhost:9200" #es集群
]
user = "elastic"
passwd = "123456"
enable_authentication = false #开启认证
insecure_skip_verify = true #是否忽略证书校验
bulk_size = 1024 #批量刷新个数
flush_duration = 500 #批量刷新时间间隔, 单位:ms
[mysql]
addr = "localhost"
port = 3306
user = "bin2es"
pwd = "bin2es"
charset = "utf8mb4"
server_id = 3 #与其他slave节点的server_id不同即可
[master_info]
addr = "localhost"
port = 3306
user = "bin2es"
pwd = "bin2es"
charset = "utf8mb4"
schema = "bin2es"
table = "master_info"
flush_duration = 3 #binlog position 刷新时间间隔, 单位:s
[zk]
enable = false
lock_path = "/go-bin2es-lock"
session_timeout = 1
hosts = [
"localhost:2181"
]
[etcd]
enable = true
enable_tls = true
lock_path = "/go-bin2es-lock"
dial_timeout = 3
cert_path = "/etc/etcd/etcdSSL/etcd.pem"
key_path = "/etc/etcd/etcdSSL/etcd-key.pem"
ca_path = "/etc/etcd/etcdSSL/ca.pem"
endpoints = [
"127.0.0.1:2379",
]
[[source]]
schema = "test"
dump_table = "Parent"
tables = [
"Parent",
"Child"
]
[
{
"schema":"test",
"tables":[
"Parent",
"Child"
],
"actions":[
"insert",
"update"
],
"pipeline":[
{
"DoSQL":{
"sql":"SELECT Parent.id, Parent.name, Parent.sex, group_concat(concat_ws('_', Child.name, Child.sex) separator ',') as Childs FROM Parent join Child on Parent.id = Child.parent_id WHERE (?) GROUP BY Parent.id",
"placeholders":{ #若遇到Parent或Child, 自动将`?`替换为`$key.$value = {该表对应的$value对应的字段的值}`
"Parent":"id", #eg: 若遇到Parent, 则`?`被替换为`Parent.id = 行数据对应的`id`字段的值`
"Child":"parent_id" #eg: 若遇到Child, 则`?`被替换为`Child.parent_id = 行数据对应的`parent_id`字段的值`
}
}
},
{
"Object":{
"common":"profile",
"fields":{
"name":"es_name", #将查询到的结果的`name`字段放进`profile`的`es_name`下
"sex":"es_sex" #同理
}
}
},
{
"NestedArray":{
"sql_field":"Childs", #将查询到的结果的`Childs`字段解析放入到common指定的`childs`下
"common":"childs",
"pos2fields":{ #其中解析结果的第一个位置放入到es的`es_name`下, 第二й позиция помещается в `es_sex` ниже
"es_name":1,
"es_sex":2
},
``` ```
fields_seprator: _
group_separator: ,
}
},
{
SetDocID: {
_id: "id"
}
}
],
dest: {
index: "test_es" # es的索引名
}
execute ./bin/go-bin2es
then, you will get results in es by using following API: GET /test_es/_search
{
took: 0,
timed_out: false,
_shards: {
total: 1,
successful: 1,
skipped: 0,
failed: 0
},
hits: {
total: {
value: 2,
relation: "eq"
},
max_score: 1.0,
hits: [
{
_index: "test_es",
_type: "_doc",
_id: "1",
_score: 1.0,
_source: {
childs: [ #nested array
{
chl_name: "Tom1",
chl_sex: "f"
},
{
chl_name: "Tom2",
chl_sex: "f"
},
{
chl_name: "Tom3",
chl_sex: "m"
}
],
id: "1",
profile: { # object
es_name: "Tom",
es_sex: "m"
}
}
},
{
_index: "test_es",
_type: "_doc",
_id: "2",
_score: 1.0,
_source: {
childs: [ #nested array
{
chl_name: "Jerry1",
chl_sex: "m"
},
{
chl_name: "Jerry2",
chl_sex: "f"
}
],
id: "2",
profile: { # object
es_name: "Jerry",
es_sex: "m"
}
}
}
]
}
}
Если вы считаете это полезным, можете вознаградить меня бутылкой Colo.
You can comment after Login
Inappropriate content may be displayed here and will not be shown on the page. You can check and modify it through the relevant editing function
If you confirm that the content does not involve inappropriate language/advertisement redirection/violence/vulgar pornography/infringement/piracy/false/insignificant or illegal content related to national laws and regulations, you can click submit to make an appeal, and we will handle it as soon as possible.
Comments ( 0 )