Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

首先下载与es版本对应的logstash,解压到/usr/local/logstash-6.4.0

1.jdbc-connector的jar包

maven 仓库下载 mysql-connector-java

mysql-connector-java-6.0.6.jar包放在

/usr/local/logstash-6.4.0/lib/mysql-connector-java-6.0.6.jar

[========]

2.mysql文件下创建文件

/usr/local/logstash-6.4.0/bin 目录下创建mysql文件夹

1.执行mysql操作的.sql文件 jdbc.conf

input {
    stdin {
    }
      jdbc {
          # mysql 数据库链接
          jdbc_connection_string => "jdbc:mysql://127.0.0.1/video?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai"

          # 用户名和密码
          jdbc_user => "root"
          jdbc_password => "root"
          # 驱动
          jdbc_driver_library => "/usr/local/logstash-6.4.0/lib/mysql-connector-java-6.0.6.jar"
          # 驱动类名
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # 执行的sql 文件路径+名称
          statement_filepath => "mysql/jdbc.sql"
          # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "* * * * *"
          #schedule => "0 1 * * *"

          #处理中文乱码问题
          codec => plain { charset => "UTF-8"}
          # 索引类型
          type => "product"
          lowercase_column_names => false
          #是否记录最后一次运行内容
          record_last_run => true
          # 是否适用列元素
          use_column_value => true
          # 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
          tracking_column => "updated_at"
          # 默认为number,如果为日期必须声明为timestamp
          tracking_column_type => "timestamp"
          # 设置记录的路径
          last_run_metadata_path => "mysql/prodcut_last_time"
          # 每次运行是否清除
          clean_run => false
        }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
   if [type]=="product"{
        elasticsearch {
            hosts => ["211.147.6.230:9200"]
           # 索引名称
            index => "collect"
          # type名称
            document_type => "product"
          # 文档id,inquiryId为sql文件中查询出的字段名
            document_id => "%{id}"
        }
        stdout {
            # JSON格式输出
            codec => json_lines
        }
   }
}

2.jdbc.sql 配置文件.conf

SELECT `a`.`id`,`a`.`catid`,`a`.`catids`,`a`.`title`,`f`.`name`, `a`.`status`,a.updated_at FROM `c_product` `a` LEFT JOIN `c_company` `f` ON f.id = a.cid WHERE a.updated_at > :sql_last_value order by a.updated_at
启动logstash
./logstash -f ./mysql/jdbc.conf 
Last modification:September 18, 2019
如果觉得我的文章对你有用,请随意赞赏