Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

8.11. Migrating MySQL Data into Elasticsearch using logstash

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

8.11.1. 安装 logstash

安装 JDBC 驱动 和 Logstash

curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh	 | bash			
curl -s https://raw.githubusercontent.com/oscm/shell/master/search/logstash/logstash-5.x.sh | bash
			

mysql 驱动文件位置在 /usr/share/java/mysql-connector-java.jar

8.11.2. 配置 logstash

创建配置文件 /etc/logstash/conf.d/jdbc-mysql.conf

			
mysql> desc article;
+-------------+--------------+------+-----+---------+-------+
| Field       | Type         | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+-------+
| id          | int(11)      | NO   |     | 0       |       |
| title       | mediumtext   | NO   |     | NULL    |       |
| description | mediumtext   | YES  |     | NULL    |       |
| author      | varchar(100) | YES  |     | NULL    |       |
| source      | varchar(100) | YES  |     | NULL    |       |
| ctime       | datetime     | NO   |     | NULL    |       |
| content     | longtext     | YES  |     | NULL    |       |
+-------------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)
			
			
			
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        
    }
}
			
			

8.11.3. 启动 Logstash

			
root@netkiller /var/log/logstash % systemctl restart logstash

root@netkiller /var/log/logstash % systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
   Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s ago
 Main PID: 10434 (java)
   CGroup: /system.slice/logstash.service
           └─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi...

Jul 31 09:35:00 netkiller systemd[1]: Started logstash.
Jul 31 09:35:00 netkiller systemd[1]: Starting logstash...
			
root@netkiller /var/log/logstash % cat logstash-plain.log 
[2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x453a18e9>}
[2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x66df34ae>]}
[2017-07-31T09:35:28,483][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-07-31T09:35:29,562][INFO ][logstash.pipeline        ] Pipeline main started
[2017-07-31T09:35:29,700][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc     ] (0.006000s) select * from article	
			
			

8.11.4. 验证

			
% curl -XGET 'http://localhost:9200/_all/_search?pretty'
			
			

8.11.5. 配置模板

8.11.5.1. 全量导入

适合数据没有改变的归档数据或者只能增加没有修改的数据

				
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        
    }
}
				
				

8.11.5.2. 多表导入

多张数据表导入到 Elasticsearch

				
# multiple inputs on logstash jdbc

input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
    type => "article"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from comment"
    type => "comment"
  } 
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "%{type}"
        document_id => "%{id}"
        
    }
}				
				
				

需要在每一个jdbc配置项中加入 type 配置,然后 elasticsearch 配置项中加入 document_type => "%{type}"

8.11.5.3. 通过 ID 主键字段增量复制数据

				
input {
  jdbc {
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric"
    # ... other configuration bits
  }
}
				
				

tracking_column_type => "numeric" 可以声明 id 字段的数据类型, 如果不指定将会默认为日期

[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc     ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'
				

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

8.11.5.4. 通过日期字段增量复制数据

				
input {
  jdbc {
    statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value"
    use_column_value => true
    tracking_column => "create_date"
    # ... other configuration bits
  }
}
				
				

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

8.11.5.5. 指定SQL文件

statement_filepath 指定 SQL 文件,有时SQL太复杂写入 statement 配置项维护部方便,可以将 SQL 写入一个文本文件,然后使用 statement_filepath 配置项引用该文件。

				
input {
    jdbc {
        jdbc_driver_library => "/path/to/driver.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_url => "jdbc://postgresql"
        jdbc_user => "neo"
        jdbc_password => "password"
        statement_filepath => "query.sql"
    }
}				
				
				

8.11.5.6. 参数传递

将需要复制的条件参数写入 parameters 配置项

				
input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}				
				
				

8.11.5.7. 控制返回JDBC数据量

	jdbc_fetch_size => 1000  #jdbc获取数据的数量大小
	jdbc_page_size => 1000 #jdbc一页的大小,
	jdbc_paging_enabled => true  #和jdbc_page_size组合,将statement的查询分解成多个查询,相当于: SELECT * FROM table LIMIT 1000 OFFSET 4000 				
				

8.11.5.8. 输出到不同的 Elasticsearch 中

通过 if [type]=="news" 执行不同的区块,实现将不同的type输出到指定的 index 中。

output {
	if [type]=="news" {
	  elasticsearch {
	  	hosts => "node1.netkiller.cn:9200"
		index => "information"
		document_id => "%{id}"
	  }
	}
	
	if [type]=="comment" {
	  elasticsearch {
		hosts => "node2.netkiller.cn:9200"
		index => "information"
		document_id => "%{id}"
	  }
	}
}		
				
				

8.11.5.9. 日期格式转换

日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S

				
input {
	jdbc {
		jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cms"
		jdbc_user => "cms"
		jdbc_password => "123456"
		schedule => "* * * * *"
		statement => "select * from article limit 5"
	}

}
filter {
	ruby {
		init => "require 'time'"
        code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }

	ruby {
		init => "require 'time'"
        code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}
output {

	stdout {
		codec => rubydebug
	}

}				
				
				

8.11.5.10. example

下面的例子实现了新数据复制,旧数据更新

				
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where ctime > :sql_last_value"
    use_column_value => true
    tracking_column => "ctime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-ctime.last"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"  # 操作执行的动作,可选值有["index", "delete", "create", "update"]
        doc_as_upsert => true  #支持update模式
    }
}
				
				

8.11.6. 解决数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

			
mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)
			
			

logstash 增加 mtime 的查询规则

			
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where mtime > :sql_last_value"
    use_column_value => true
    tracking_column => "mtime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-mtime.last"
  }
			
			
			

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

			
CREATE TABLE `elasticsearch_trash` (
  `id` int(11) NOT NULL,
  `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
			
			

为 article 表创建触发器

			
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
	-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
	IF NEW.status = 'N' THEN
		insert into elasticsearch_trash(id) values(OLD.id);
	END IF;
	-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
    IF NEW.status = 'Y' THEN
		delete from elasticsearch_trash where id = OLD.id;
	END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
	-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
	insert into elasticsearch_trash(id) values(OLD.id);
END
			
			

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

8.11.7. 修改 Mapping

<paraf>需求 Elasticsearch 时间格式 从ISO 8601 到 yyyy-MM-dd HH:mm:ss。首先停止 logstash</paraf>
			
systemctl stop logstash

rm -rf /var/tmp/article* 			
			
			

修改 /etc/logstash/conf.d/jdbc.conf 配置文件

			
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "select * from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where ctime > :sql_last_value"
    use_column_value => true
    tracking_column => "ctime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-ctime.last"
  }

}

filter {

    ruby {
        code => "event.set('ctime', event.get('[ctime]').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }

    ruby {
        code => "event.set('mtime', event.get('[mtime]').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }

}

output {
    elasticsearch {
    	hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
    }
}
			
			
			

删除就的index,重新创建,并配置 mapping。

			


curl -XDELETE http://localhost:9200/information

curl -XPUT http://localhost:9200/information
			
curl -XPOST http://localhost:9200/information/article/_mapping -d'
{
        "properties": {
            "title": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_max_word"
            },
            "description": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_max_word"
            },
            "content": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_max_word"
            },
            "ctime": {
               "type":   "date",
               "format": "yyyy-MM-dd HH:mm:ss"
           	},
 			"mtime": {
               "type":   "date",
               "format": "yyyy-MM-dd HH:mm:ss"
           	}
        }
}'

curl "http://localhost:9200/information/article/_mapping?pretty"
			
			

启动 logstash 重新复制数据。

			
rm -rf /var/log/logstash/*
systemctl start logstash