logstash 增量定时同步mysql数据到elasticsearch集群

近期文章:单机器如何实现 Nginx 百万并发连接

线上搜索业务,使用logstash将 mysql数据同步到阿里云 es集群中,一开始数据量小,基本上都是全量同步,后面数据量大了,跟研发沟通改用增量,且定时同步(搜索不要求实时同步)mysql数据到es集群中,减少数据库查询和 es压力写入压力

安装 logstash

cd /usr/local/data/
wget https://mirrors.huaweicloud.com/logstash/7.10.0/logstash-7.10.0-linux-x86_64.tar.gz
tar zxvf logstash-7.10.0-linux-x86_64.tar.gz
#下载mysql.jdbc.Driver
wget https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33.zip
unzip mysql-connector-j-8.0.33.zip
#下面只会用到目录下mysql-connector-j-8.0.33.jar

logstash 配置文件

#第一部分字段详解,看下面内容
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://数据库连接地址:3306/数据库库名?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull"
        jdbc_user => "your-username"
        jdbc_password => "your-password"
        jdbc_validate_connection => true
        #配置mysql.jdbc.Driver
        jdbc_driver_library => "/usr/local/data/mysql-connector-j-8.0.33/mysql-connector-j-8.0.33.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "10000"
        jdbc_default_timezone => "Asia/Shanghai"
	statement => "SELECT * FROM your_table WHERE updated_at > :sql_last_value"
	tracking_column_type => "timestamp"
        tracking_column => "updated_at"
        use_column_value => true
        #clean_run => true
        #last_run_metadata_path => "syncpoint_v1"
	codec => plain { charset => "UTF-8"}
	#schedule => "0 */1 * * *" #测试时先关闭定时,不然 logstash 启动之后一直没有执行同步,比如我这里要等一小时才执行一次
    }
}
#第二部分字段详解,看下面内容
filter {
	json {
	       	source => "message"
        	remove_field => ["message"]
    	}
	mutate {
                split => { "tags" => "," }
    		split => { "type_ids" => "," }
    		split => { "style_ids" => "," }
    		split => { "scene_ids" => "," }
    		split => { "language_ids" => "," }
    		split => { "system_ids" => "," }
    		split => { "character_ids" => "," }
	}
        #因为时区问题需要修正时间
        ruby {
                code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
        }
        ruby {
                code => "event.set('@timestamp',event.get('timestamp'))"
        }
       ruby {
               code => "event.set('save_time', event.get('save_time').time.localtime + 8*60*60)"
       }
       ruby {
               code => "event.set('up_time', event.get('up_time').time.localtime + 8*60*60)"
       }
       ruby {
               code => "event.set('edit_time', event.get('edit_time').time.localtime + 8*60*60)"
       }
	#移除timestamp
        mutate {
                remove_field => ["timestamp"]
        }
}

#第三部分字段详解,看下面内容
output {
    stdout {
#       codec => json_lines
        codec => rubydebug
}


#第四部分字段详解,看下面内容
elasticsearch {
	codec => plain { charset => "UTF-8"}
	action => "index"
        index => "font_v1" #索引名称
        document_type => "font"
        document_id => "%{id}"
        hosts => "http://es请求地址:9200"
	user => ""
        password => ""
    }
}

第一部分字段解释

jdbc_connection_string:MySQL 数据库的连接字符串。
jdbc_user 和 jdbc_password:数据库的用户名和密码。
jdbc_driver_library 和 jdbc_driver_class:指定 MySQL 驱动的路径和类。
jdbc_paging_enabled 和 jdbc_page_size:启用分页,并设置每页的数据量。
jdbc_default_timezone:数据库的时区设置。
statement:SQL 查询语句,用于从数据库中选择数据。这里使用了增量同步,只选择更新时间在上次同步时间之后的数据。
tracking_column_type 和 tracking_column:配置增量同步追踪列,用于记录上次同步时数据库表中哪一列的数值,以便下一次同步时只选择该列数值发生变化的数据。
last_run_metadata_path:记录同步时间的文件路径。
use_column_value:使用 tracking_column 列的当前值来确定同步的起始点,进行更准确的增量同步。
clean_run:如果设置为 true,将在启动时清除 last_run_metadata_path,重新同步所有数据。
schedule:定时同步的 CRON 表达式,这里设置为每小时执行一次。

备注:
last_run_metadata_path 和 use_column_value 在 Logstash 中都用于实现增量同步,但它们的实现机制略有不同
last_run_metadata_path:
作用: 用于记录上一次同步的时间戳,通常存储在一个文件中。
使用场景: 适用于需要根据时间戳进行增量同步的场景,比如 WHERE updated_at > last_sync_timestamp。
优点: 实现简单,易于理解。
缺点: 如果同步过程中出现异常,可能会导致时间戳记录不准确,需要手动处理。
use_column_value:
作用: 使用指定列的当前值来确定同步的起始点。
使用场景: 适用于具有递增或唯一性的列,比如自增主键或时间戳,用于基于列值进行增量同步。
优点: 更精确,不容易受到异常情况的影响。
缺点: 需要确保指定的列在同步过程中不会出现重复、缺失或异常值。

选择建议:
如果你的表中有一个递增或唯一性的列,而且这个列的值在同步过程中是可靠的,那么使用 use_column_value 是一个更稳妥的选择。
如果时间戳是你的同步参考,同时你可以容忍在异常情况下手动修复时间戳,那么 last_run_metadata_path 也是一个不错的选择。

第二部分字段详解

filter更多内容请看官方文档

https://www.elastic.co/guide/en/logstash/7.10/filter-plugins.html

json 插件: 这个插件用于解析输入中的 JSON 数据。
source => "message" 指定要解析的字段是 "message",而 remove_field => ["message"] 表示解析后移除原始的 "message" 字段。
mutate 插件: 这个插件提供了对事件进行修改的功能。在这里,使用了 split 操作,将一些字段按照逗号分割为数组。这样,原始数据中逗号分割的字符串就被拆分成了数组形式。
ruby 插件: 在这里,使用了 Ruby 代码来修正时间戳的时区event.set('timestamp',event.get('@timestamp').time.localtime + 8*60*60) 将 @timestamp 字段的时间戳由 UTC 转换为东八区时间。
mutate 插件: 又一次使用 mutate 插件来移除原始的 "timestamp" 字段,因为这个字段在经过 Ruby 插件的处理后,已经被更新到了 "@timestamp" 字段中。

第三部分字段详解

output 部分,用于指定 Logstash 处理完数据后的输出目的地,
stdout 插件: 这个插件用于将事件输出到控制台,主要用于测试和调试。
JSON Lines 格式: 每行是一个独立的 JSON 对象,这在处理大量日志时非常有用。每个日志事件以一行 JSON 形式表示,这样可以逐行读取和处理,而不需要将整个文件加载到内存中,这样的格式适合于各种分析工具和数据存储系统,使其更容易处理和查询
codec => rubydebug 表示使用 Ruby 格式进行调试输出,这样输出的信息更易于阅读。
如果你启动 Logstash 并查看输出,你将在控制台上看到经过处理的事件以更易读的形式显示。
json格式输出
ruby格式输出

第四部分字段详解

在 Logstash 的 Elasticsearch 输出插件中,action 参数指定要执行的 Elasticsearch 操作。主要的取值有:
action => "index": 这是最常用的操作。它表示要将文档添加到 Elasticsearch 索引中。如果文档的 ID 已经存在,它将被替换。如果不存在,则会创建一个新文档。这是通常的写入操作。
action => "update": 表示要在 Elasticsearch 中更新文档。这通常涉及到根据文档的 ID 更新部分字段的值,而不是替换整个文档。
action => "delete": 表示要从 Elasticsearch 中删除文档。这是删除文档的操作。
action => "create": 表示只在文档的 ID 不存在时创建文档。如果文档 ID 已经存在,它将不执行任何操作。
这些操作对应于 Elasticsearch 的 Index API、Update API、Delete API 和 Create API。在 Logstash 配置中,通常使用默认的 "index" 操作,因为它能满足大多数场景的需求。

启动logstash

注意:启动前es中先建好索引,我这边es索引是研发同学建的

/usr/local/data/logstash-7.10.0/bin/logstash \
-f /usr/local/data/logstash-7.10.0/bin/mysql/test.conf \
--path.data=/usr/local/data/logstash-7.10.0/data/test
#下面是logstash控制台输出的一小部分结果
ruby输出的结果

控制台正常输出数据后去es那边看下索引,是否有数据,如果es没数据,就是索引没建好,让研发同学重建

使用5年过程中遇到的问题总结:

1、如果多索引多个配置文件,下面的数据目录不要重复

–path.data=/usr/local/data/logstash-7.10.0/data/test #这个目录不要重启,否则将无法启动 logstash

2、如果数据量大 logstash默认的内存不够,可适当增加jvm内存

jvm内存

3、如果数据量大logstash同步到 es很慢, 可以增加配置文件中这个参数 jdbc_page_size => “50000” #值更加实际情况修改

4、服务器不需要很高的配置,我这里20 多个索引同步,配置使用的是 2c,16G(后续加的内存)

服务器配置

Comments

No comments yet. Why don’t you start the discussion?

发表评论