Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。
首先下载与es版本对应的logstash,解压到/usr/local/logstash-6.4.0
1.jdbc-connector的jar包
maven 仓库下载 mysql-connector-java
把mysql-connector-java-6.0.6.jar
包放在
/usr/local/logstash-6.4.0/lib/mysql-connector-java-6.0.6.jar
[========]
2.mysql文件下创建文件
在/usr/local/logstash-6.4.0/bin
目录下创建mysql文件夹
1.执行mysql操作的.sql文件 jdbc.conf
input {
stdin {
}
jdbc {
# mysql 数据库链接
jdbc_connection_string => "jdbc:mysql://127.0.0.1/video?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "/usr/local/logstash-6.4.0/lib/mysql-connector-java-6.0.6.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "mysql/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
#schedule => "0 1 * * *"
#处理中文乱码问题
codec => plain { charset => "UTF-8"}
# 索引类型
type => "product"
lowercase_column_names => false
#是否记录最后一次运行内容
record_last_run => true
# 是否适用列元素
use_column_value => true
# 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
tracking_column => "updated_at"
# 默认为number,如果为日期必须声明为timestamp
tracking_column_type => "timestamp"
# 设置记录的路径
last_run_metadata_path => "mysql/prodcut_last_time"
# 每次运行是否清除
clean_run => false
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type]=="product"{
elasticsearch {
hosts => ["211.147.6.230:9200"]
# 索引名称
index => "collect"
# type名称
document_type => "product"
# 文档id,inquiryId为sql文件中查询出的字段名
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
}
2.jdbc.sql 配置文件.conf
SELECT `a`.`id`,`a`.`catid`,`a`.`catids`,`a`.`title`,`f`.`name`, `a`.`status`,a.updated_at FROM `c_product` `a` LEFT JOIN `c_company` `f` ON f.id = a.cid WHERE a.updated_at > :sql_last_value order by a.updated_at
启动logstash
./logstash -f ./mysql/jdbc.conf