Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

71.2. ElasticSearch + Logstash + Kibana

官方网站 https://www.elastic.co

71.2.1. 安装

71.2.1.1. 8.x

dnf 安定
				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-8.x.sh | bash
				
				

手工安装

			
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

cat >> /etc/yum.repos.d/logstash.repo <<EOF
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

dnf install -y logstash

cp /etc/logstash/logstash.yml{,.original}
chown logstash:logstash -R /etc/logstash

systemctl daemon-reload
systemctl enable logstash.service
systemctl start logstash.service
			
				

修改启动用户,否则启动会失败

			
[root@netkiller ~]# vim /usr/lib/systemd/system/logstash.service
User=logstash
Group=logstash
修改
User=root
Group=root
			
				
Docker 安装
				
docker run --rm -it -v ~/pipeline/:/usr/share/logstash/pipeline/ docker.elastic.co/logstash/logstash:8.5.1				
				
				

71.2.1.2. kubernetes 采集日志

			 
apiVersion: v1
data:
	filebeat.yml: |-
	filebeat.inputs:
	- type: log
		paths:
		- /tmp/*
		fields:
		project: test
		group: test
		stage: test
		format: json

		multiline:
		pattern: '^\[[^stacktrace]'
		negate: true
		match: after
	processors:
		- add_cloud_metadata:
		- add_host_metadata:

	output.logstash:
		hosts: ["172.18.200.10:5044"]
kind: ConfigMap
  metadata:
	name: filebeat
	namespace: default			
			
			
			 
apiVersion: apps/v1
kind: Deployment
metadata:
	labels:
	app: bottleneck	
	name: bottleneck
	namespace: default
spec:
	progressDeadlineSeconds: 600
	replicas: 1
	revisionHistoryLimit: 10
	selector:
	matchLabels:
		app: bottleneck
	strategy:
	rollingUpdate:
		maxSurge: 1
		maxUnavailable: 0
	type: RollingUpdate
	template:
	metadata:
		creationTimestamp: null
		labels:
		app: bottleneck
	spec:
		affinity: {}
		containers:
		- env:
		- name: TZ
			value: Asia/Shanghai
		- name: JAVA_OPTS
			value: -Xms2048m -Xmx4096m
		- name: SPRING_OPTS
			value: --spring.profiles.active=dev --server.undertow.worker-threads=5000
		image: nginx:latest
		imagePullPolicy: IfNotPresent
		name: nginx
		ports:
		- containerPort: 80
			name: http
			protocol: TCP
		resources: {}
		terminationMessagePath: /dev/termination-log
		terminationMessagePolicy: File
		volumeMounts:
		- mountPath: /tmp
			name: tmp
		- args:
		- -c
		- /usr/share/filebeat/filebeat.yml
		- -e
		env:
		- name: TZ
			value: Asia/Shanghai
		- name: JAVA_OPTS
		- name: SPRING_OPTS
		image: docker.elastic.co/beats/filebeat:8.6.1
		imagePullPolicy: IfNotPresent
		name: filebeat
		resources: {}
		terminationMessagePath: /dev/termination-log
		terminationMessagePolicy: File
		volumeMounts:
		- mountPath: /usr/share/filebeat/filebeat.yml
			name: config
			readOnly: true
			subPath: filebeat.yml
		- mountPath: /tmp
			name: tmp
		dnsPolicy: ClusterFirst
		restartPolicy: Always
		schedulerName: default-scheduler
		securityContext: {}
		terminationGracePeriodSeconds: 30
		volumes:
		- configMap:
			defaultMode: 420
			name: filebeat
		name: config
		- emptyDir: {}
		name: tmp
			
			

71.2.2. logstash 命令简单应用

71.2.2.1. -e 命令行运行

logstash -e "input {stdin{}} output {stdout{}}"

			
/usr/share/logstash/bin/logstash  -e 'input{file {path => "/etc/centos-release" start_position => "beginning"}} output { stdout {}}'			
			
			

71.2.2.2. -f 指定配置文件

			
/usr/share/logstash/bin/logstash -f stdin.conf		

/usr/share/logstash/bin/logstash -f jdbc.conf --path.settings /etc/logstash --path.data /tmp	
			
			

71.2.2.3. -t:测试配置文件是否正确,然后退出。

			
root@netkiller ~/logstash % /usr/share/logstash/bin/logstash -t -f test.conf
WARNING: Default JAVA_OPTS will be overridden by the JAVA_OPTS defined in the environment. Environment JAVA_OPTS are -server -Xms2048m -Xmx4096m
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
Configuration OK
			
			

71.2.2.4. -l:日志输出的地址

默认就是stdout直接在控制台中输出

71.2.2.5. log.level 启动Debug模式

			
% /usr/share/logstash/bin/logstash -f nginx.conf --path.settings /etc/logstash --log.level debug			
			
			

71.2.3. 配置 logstash

71.2.3.1. JVM 配置

			
## JVM configuration

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms1g
-Xmx4g

################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################

## GC configuration
11-13:-XX:+UseConcMarkSweepGC
11-13:-XX:CMSInitiatingOccupancyFraction=75
11-13:-XX:+UseCMSInitiatingOccupancyOnly

## Locale
# Set the locale language
-Duser.language=zh

# Set the locale country
-Duser.country=CN

# Set the locale variant, if any
#-Duser.variant=

## basic

# set the I/O temp directory
#-Djava.io.tmpdir=$HOME

# set to headless, just in case
-Djava.awt.headless=true

# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8

# use our provided JNA always versus the system one
#-Djna.nosys=true

# Turn on JRuby invokedynamic
-Djruby.compile.invokedynamic=true

## heap dumps

# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError

# specify an alternative path for heap dumps
# ensure the directory exists and has sufficient space
#-XX:HeapDumpPath=${LOGSTASH_HOME}/heapdump.hprof

## GC logging
#-Xlog:gc*,gc+age=trace,safepoint:file=@loggc@:utctime,pid,tags:filecount=32,filesize=64m

# log GC status to a file with time stamps
# ensure the directory exists
#-Xloggc:${LS_GC_LOG_FILE}

# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom

# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true			
			
			

71.2.3.2. 多 pipeline 配置

			
[root@netkiller ~]# cat /etc/logstash/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"			
			
			

配置 pipelines.yml 文件

			
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: finance
  path.config: "/etc/logstash/conf.finance/*.conf"
- pipeline.id: market
  path.config: "/etc/logstash/conf.market/*.conf"
- pipeline.id: customer
  path.config: "/etc/logstash/conf.customer/*.conf"  
			
			

71.2.3.3. input

标准输入输出
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}"
Helloworld
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
2017-08-03T10:03:38.375Z localhost Helloworld
18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}				
				
				
rubydebug

rubydebug提供以json格式输出到屏幕

								
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
My name is neo
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
{
"@timestamp" => 2017-08-03T10:05:02.764Z,
"@version" => "1",
"host" => "localhost",
"message" => "My name is neo"
}
18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
				
				
本地文件
				
input {
  file {
    type => "syslog"
    path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ]
    start_position => "beginning"
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { 
    hosts => ["127.0.0.1:9200"] 
  }
}		
				
				

start_position => "beginning" 从头开始读,如果没有这个选项,只会读取最后更新的数据。

指定文件类型
					
input { 
 file { path =>"/var/log/messages" type =>"syslog"} 
 file { path =>"/var/log/apache/access.log" type =>"apache"} 
}					 
					
					
Nginx
						
input {
        file {
                type => "nginx_access"
                path => ["/usr/share/nginx/logs/test.access.log"]
        }
}
output {
        redis {
                host => "localhost"
                data_type => "list"
                key => "logstash:redis"
        }
}						
						
						
TCP/UDP
				
input {
  file {
    type => "syslog"
    path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ]
  }
  tcp {
    port => "5145"
    type => "syslog-network"
  }
  udp {
    port => "5145"
    type => "syslog-network"
  }
}
output {
  elasticsearch { 
    hosts => ["127.0.0.1:9200"] 
  }
}
				
				
Redis
				
input {
  redis {
    host => "127.0.0.1"
    port => "6379" 
    key => "logstash:demo"
    data_type => "list"
    codec  => "json"
    type => "logstash-redis-demo"
    tags => ["logstashdemo"]
  }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }
}
				
				

指定 Database 10

				
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  db => 10
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-api"
  }
}
				
				
Kafka

				
input {
  kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}		
				
				
jdbc
				
root@netkiller /etc/logstash/conf.d % cat jdbc.conf 
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "select * from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where ctime > :sql_last_value"
    use_column_value => true
    tracking_column => "ctime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-ctime.last"
  }

}
output {
    elasticsearch {
    	hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
    }
}				
				
				

71.2.3.4. filter

日期格式化

系统默认是 ISO8601 如果需要转换为 yyyy-MM-dd-HH:mm:ss 参考:

				

filter {
  date {
    match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ]
    locale => "cn"
  }
 date {
    match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ]
    locale => "cn"
  } 
}
				
				
				
date {
    locale => "zh-CN"
    #match => ["@timestamp", "yyyy-MM-dd HH:mm:ss"]
	match => ["@timestamp", "ISO8601"]
    timezone => "Asia/Shanghai"
	target => ["@timestamp"]
}				
				
				
patterns

创建匹配文件 /usr/share/logstash/patterns

				
mkdir /usr/share/logstash/patterns
vim /usr/share/logstash/patterns

NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
				
				
				
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}
				
				
syslog
				
input {
  file {
    type => "syslog" 
    path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
    sincedb_path => "/opt/logstash/sincedb-access"
  }
  syslog {
    type => "syslog"
    port => "5544"
  }
}
 
filter {
  grok {
    type => "syslog"
    match => [ "message", "%{SYSLOGBASE2}" ]
    add_tag => [ "syslog", "grokked" ]
  }
}
 
output {
 elasticsearch { host => "elk.netkiller.cn" }
}				
				
				
csv
				
input {
    file {
        type => "SSRCode"
        path => "/SD/2015*/01*/*.csv"
        start_position => "beginning"
    }
}
 
filter {
	csv {
		columns => ["Code","Source"]
		separator => ","
	}
	kv {
		source => "uri"
		field_split => "&?"
		value_split => "="
	}
 
}
 
# output logs to console and to elasticsearch
output {
    stdout {}
    elasticsearch {
        hosts => ["172.16.1.1:9200"]
    }
}				
				
				
使用ruby 处理 CSV文件
				
input {
    stdin {}
}
filter {
    ruby {
        init => "
            begin
                @@csv_file    = 'output.csv'
                @@csv_headers = ['A','B','C']
                if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
                    CSV.open(@@csv_file, 'w') do |csv|
                        csv << @@csv_headers
                    end
                end
            end
        "
        code => "
            begin
                event['@metadata']['csv_file']    = @@csv_file
                event['@metadata']['csv_headers'] = @@csv_headers
            end
        "
    }
    csv {
        columns => ["a", "b", "c"]
    }
}
output {
    csv {
        fields => ["a", "b", "c"]
        path   => "%{[@metadata][csv_file]}"
    }
    stdout {
        codec => rubydebug {
            metadata => true
        }
    }
}				
				
				

测试

				
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf				
				
				

输出结果

				
A,B,C
1,2,3
4,5,6
7,8,9
				
				
执行 ruby 代码

日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S

保存下面内容到配置文件data.conf

				
input {
	stdin{}
}
filter {

	ruby {
		init => "require 'time'"
        code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }

	ruby {
		init => "require 'time'"
        code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}
output {

	stdout {
		codec => rubydebug
	}

}
				
				

/usr/share/logstash/bin/logstash -f date.conf

数据清洗

丢弃日志种包含 MonthShardingAlgorithm 字符串的日志

				
root@logging /o/l/p/e/03# cat /srv/logstash/pipeline/filebeat.conf
input {
  beats {
    port => 5044
  }
}
filter{
	if "MonthShardingAlgorithm" in [message] {
		drop{}
	}
}
output {
    file {
        path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
        codec => line { format => "%{message}"}
    }
    #file {
    #    path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log.json.gz"
    #    codec => json_lines
    #    gzip => true
    #}
    redis {
        host => ["r-bp1d17217fa77e14756.redis.rds.aliyuncs.com"]
        password => "Ejy2016redis"
        key => "filebeat2"
        codec => json_lines
        data_type => "list"
    }
}
				
				
grok debug 工具

http://grokdebug.herokuapp.com

grok-patterns

filebeat 发送过来日志是文本可是,我们需要使用 grok 匹配后将对应的值放入指定变量

				
input {
  beats {
    port => 5044   
  }
}
filter{
    if "MonthShardingAlgorithm" in [message] {
        drop{}
    }

    grok{
        match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{NOTSPACE:hostname}\] \[%{LOGLEVEL:level}\] \[%{NOTSPACE:thread-id}\] %{NOTSPACE:class} - %{JAVALOGMESSAGE:msg}"]
        #target => "result"
    }
}
output {
    file {
        path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
        codec => line { format => "%{message}"}
    }
    file {
        path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/%{+dd}/spring.%{level}.log"
        codec => line { format => "%{message}"}
    }

    if "ERROR" == [level] {
        file {
            path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/beats.log"
            codec => line { format => "%{message}"}
        }
    }
    file {
        path => "/opt/log/beats.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
        codec => json_lines
        gzip => true
    }
}				
				
				

71.2.3.5. output

stdout
				
output {
	stdout { codec => rubydebug }
}				
				
				
file 写入文件

/etc/logstash/conf.d/file.conf

				
output {
    file {
        path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}
				
				

每个 tags 标签生成一个日志文件

				
input {
  tcp {
	port => 4567 
	codec => json_lines
  }
}

filter {
	ruby {
	    code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {

	if "finance" in [tags] { 
		file {
			path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message}"}
		}
	 } else if "market" in [tags] {
		file {
			path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message}"}
		}
	} else {
		file {
			path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message}"}
		}
	
	}
    file {
        path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
		codec => json_lines
        gzip => true
    }

}				
				
				
elasticsearch
				
output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logging"
  }
}				
				
				
自定义 index

配置实现每日切割一个 index

					
index => "logstash-%{+YYYY.MM.dd}"

"_index" : "logstash-2017.03.22"	
					
					

index 自定义 logstash-%{type}-%{+YYYY.MM.dd}

					
input {

    redis {
        data_type => "list"
        key => "logstash:redis"
        host => "127.0.0.1"
        port => 6379
        threads => 5
        codec => "json"
    }
}
filter {

}
output {

    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        workers => 1
        flush_size => 20
        idle_flush_time => 1
        template_overwrite => true
    }
    stdout{}
}					
					
					
exec 执行脚本
				
output {
    exec {
        command => "sendsms.php \"%{message}\" -t %{user}"
    }
}
				
				
http
				
[root@netkiller log]# cat /etc/logstash/conf.d/file.conf 
input {
  tcp {
	port => 4567 
	codec => json_lines
  }
  gelf {
    port => 12201
    use_udp => true
    #use_tcp => true
  }
}

filter {
	ruby {
	    code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {

    file {
    	path => "/opt/log/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
    	codec => line { format => "[%{datetime}] %{level} %{message}"}
    }
   
    file {
        path => "/opt/log/origin.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
		codec => json_lines
        gzip => true
    }

    if "ERROR" in [level] {
        http {
            url => "https://oapi.dingtalk.com/robot/send?access_token=56c27cb761c4a16473db02d9d28734a56cf549f6977ecc281d008f9a239ba3e0"
            http_method => "post"
            content_type => "application/json; charset=utf-8"
            format => "message"
            message => '{"msgtype":"text","text":{"content":"Monitor: %{message}"}}'
        }
    }
}
				
				

71.2.4. Example

https://github.com/kmtong/logback-redis-appender

71.2.4.1. 配置 Broker(Redis)

indexer

/etc/logstash/conf.d/indexer.conf

			
input {
  redis {
    host => "127.0.0.1"
    port => "6379" 
    key => "logstash:demo"
    data_type => "list"
    codec  => "json"
    type => "logstash-redis-demo"
    tags => ["logstashdemo"]
  }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }
}	
			
			

测试

			
# redis-cli 
127.0.0.1:6379> RPUSH logstash:demo "{\"time\": \"2012-01-01T10:20:00\", \"message\": \"logstash demo message\"}"
(integer) 1
127.0.0.1:6379> exit
			
			

如果执行成功日志如下

			
# cat /var/log/logstash/logstash-plain.log 
[2017-03-22T15:54:36,491][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2017-03-22T15:54:36,496][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2017-03-22T15:54:36,600][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x20dae6aa URL:http://127.0.0.1:9200/>}
[2017-03-22T15:54:36,601][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-03-22T15:54:36,686][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-03-22T15:54:36,693][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-03-22T15:54:36,780][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x2f9efc89 URL://127.0.0.1>]}
[2017-03-22T15:54:36,787][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-03-22T15:54:36,792][INFO ][logstash.inputs.redis    ] Registering Redis {:identity=>"redis://@127.0.0.1:6379/0 list:logstash:demo"}
[2017-03-22T15:54:36,793][INFO ][logstash.pipeline        ] Pipeline main started
[2017-03-22T15:54:36,838][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-03-22T15:55:10,018][WARN ][logstash.runner          ] SIGTERM received. Shutting down the agent.
[2017-03-22T15:55:10,024][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}			
			
			
shipper
			
input {
  file {
    path => [ "/var/log/nginx/access.log" ]
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{NGINXACCESS}" }
    add_field => { "type" => "access" }
  }
  date {
    match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
  }
  geoip {
    source => "clientip"
  }
}

output {
  redis {
    host => "127.0.0.1"
    port => 6379
    data_type => "list"
    key => "logstash:demo"
  }
}
			
			

71.2.4.2. Spring boot logback

例 71.2. spring boot logback

				
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-api"
  }
}
				
				

src/main/resources/logback.xml

				
neo@MacBook-Pro ~/deployment % cat api.netkiller.cn/src/main/resources/logback.xml 
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
	<include resource="org/springframework/boot/logging/logback/defaults.xml" />
	<include resource="org/springframework/boot/logging/logback/file-appender.xml" />
	<property name="type.name" value="test" />
	<appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender">
		<source>mySource</source>
		<sourcePath>mySourcePath</sourcePath>
		<type>myApplication</type>
		<tags>production</tags>
		<host>localhost</host>
		<port>6379</port>
		<database>0</database>
		<key>logstash:api</key>
	</appender>
	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern>
		</encoder>
	</appender>
	<root level="INFO">
		<appender-ref ref="STDOUT" />
		<appender-ref ref="FILE" />
		<appender-ref ref="LOGSTASH" />
	</root>
</configuration>
				
				

			
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf
input {
  tcp {
    port => 4567 
    codec => json_lines
  }
}

filter {
    #ruby {
    #      code => "event.set('@timestamp', LogStash::Timestamp.at(event.get('@timestamp').time.localtime + 8*60*60))"
    #}
    ruby {
		code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}

output {

    file {
        path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
		codec => line { format => "[%{datetime}] %{level} %{message}"}
		#codec => json_lines
        gzip => true
    }

}
			
			

每个 tags 一个文件

			
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf 
input {
  tcp {
    port => 4567 
    codec => json_lines
  }
}

filter {
    ruby {
	code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}

output {

	if "finance" in [tags] { 
	    file {
	        path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
		}
	 
	 } else if "market" in [tags] {
	
	    file {
			path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
		}
	} else {
	
	    file {
			path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
		}
    }

}			
			
			

71.2.4.3. 索引切割实例

例 71.3. Elasticsearch 索引切割示例

				
root@netkiller /opt/api.netkiller.cn % cat /etc/logstash/conf.d/spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  db => 10
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}"
  }
}

				
				
				
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
	<include resource="org/springframework/boot/logging/logback/defaults.xml" />
	<include resource="org/springframework/boot/logging/logback/file-appender.xml" />
	<property name="logstash.type" value="api" />
	<property name="logstash.tags" value="springboot" />
	<appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender">
		<source>application.properties</source>
		<type>${logstash.type}</type>
		<tags>${logstash.tags}</tags>

		<host>localhost</host>
		<database>10</database>
		<key>logstash:redis</key>

		<mdc>true</mdc>
		<location>true</location>
		<callerStackIndex>0</callerStackIndex>

	</appender>
	<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
		<appender-ref ref="LOGSTASH" />
	</appender>

	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern>
		</encoder>
	</appender>
	<root level="INFO">
		<appender-ref ref="STDOUT" />
		<appender-ref ref="FILE" />
		<appender-ref ref="LOGSTASH" />
	</root>
</configuration>
				
				

71.2.4.4. csv 文件实例

			
			
input {
    file {
        path => ["/home/test/data.csv"]
        start_position => "beginning" #从什么位置读取,beginnig时导入原有数据
        sincedb_path => "/test/111"
                type => "csv"
                tags => ["optical", "gather"]
    }
}

filter {
        if [type] == "csv" { #多个配置文件同时执行的区分
        csv {
            columns =>["name","device_id"]
            separator => "^"
			quote_char => "‰"
			remove_field => ["device_id","branch_id","area_type"]
       }
   }
output{
}			
			
			
			

71.2.4.5. 区分环境

			 
root@logging ~# find /srv/logstash/ -type f
/srv/logstash/pipeline/config.conf
/srv/logstash/bin/logstash
/srv/logstash/config/logstash.yml
			
			
			 
root@logging ~# cat /srv/logstash/bin/logstash
#!/usr/bin/python3
# -*- coding: utf-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-01-11
##############################################
import os
import sys
try:
	module = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
	sys.path.insert(0, module)
	from netkiller.docker import *
except ImportError as err:
	print("%s" % (err))

project = 'logstash'

# extra_hosts = [
#    'mongo.netkiller.cn:172.17.195.17', 'eos.netkiller.cn:172.17.15.17',
#    'cfca.netkiller.cn:172.17.15.17'
# ]

dockerfile = Dockerfile()
dockerfile.image('docker.elastic.co/logstash/logstash:8.6.0').run(
	['apk add -U tzdata', 'rm -f /usr/share/logstash/pipeline/logstash.conf']
).copy('pipeline/', '/usr/share/logstash/pipeline/').copy('config/', '/usr/share/logstash/config/').workdir('/usr/share/logstash')

logstash = Services(project)
# logstash.image('logstash/logstash:alpine')
# logstash.build(dockerfile)
logstash.image('docker.elastic.co/logstash/logstash:8.6.0')
logstash.container_name(project)
logstash.restart('always')
# logstash.hostname('www.netkiller.cn')
# openrelogstashsty.extra_hosts(extra_hosts)
logstash.extra_hosts(['elasticsearch:127.0.0.1'])
logstash.environment(['TZ=Asia/Shanghai','XPACK_MONITORING_ENABLED=false','LOG_LEVEL=info'])
logstash.ports(['12201:12201/udp', '12201:12201/tcp'])
#logstash.ports(['12201:12201','4567:4567'])
# logstash.depends_on('test')
logstash.working_dir('/usr/share/logstash')
logstash.user('root')
logstash.volumes(
	[
		'/srv/logstash/pipeline/:/usr/share/logstash/pipeline/',
		#'/srv/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:rw',
		'/srv/logstash/logs/:/usr/share/logstash/logs/',
		'/opt/log/:/opt/log/',
		'/proc:/proc','/sys:/sys'
	]
).privileged()

development = Composes('development')
development.workdir('/var/tmp/development')
development.version('3.9')
development.services(logstash)

if __name__ == '__main__':
	try:
		docker = Docker(
			# {'DOCKER_HOST': 'ssh://root@192.168.30.11'}
		)
		# docker.sysctl({'neo': '1'})
		docker.environment(development)
		docker.main()
	except KeyboardInterrupt:
		print("Crtl+C Pressed. Shutting down.")

			
			
			 
root@logging ~# cat /srv/logstash/pipeline/config.conf
input {
	tcp {
	port => 4567
	codec => json_lines
	}
	gelf {
	port => 12201
	use_udp => true
	use_tcp => true
	}
}

filter {
	ruby {
		code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {
	if [marker] {
	file {
		path => "/opt/log/%{environment}/%{service}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
		codec => line { format => "[%{datetime}] %{level} %{message}"}
	}
	} else {
		file {
		path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
		codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"}
		}
	}
	file {
		path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.json.gz"
		codec => json_lines
		gzip => true
	}
	if [environment] =~ /(prod|grey)/ {
		if "ERROR" in [level] {
			http {
				url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a3f084b0160ec06ae40f95b0b052e69c699400eaa5db316612de90f8"
				http_method => "post"
				content_type => "application/json; charset=utf-8"
				format => "message"
				message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
			}
		}
		if "WARN" in [level] {
			http {
				url => "https://oapi.dingtalk.com/robot/send?access_token=d6602c6fb6b47250f38d31f791968a12201a6980f3a1175829a57e6afca7678b"
				http_method => "post"
				content_type => "application/json; charset=utf-8"
				format => "message"
				message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
			}
		}
	}

	if [environment] =~ /(stage|test|dev)/ {
		if ("ERROR" in [level] or "WARN" in [level]) {
			http {
				url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d983188517fcbd204c89bf5f47b9dfdac2a788bda85bd353d8e266fb5f"
				http_method => "post"
				content_type => "application/json; charset=utf-8"
				format => "message"
				message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
			}
		}
	}
}
			
			

71.2.4.6. Logstash 集成禅道

日志钉钉报警,同时创建禅道任务,用来跟进故障

			
input {
  tcp {
	port => 4567
	codec => json_lines
  }
  gelf {
    port => 12201
    use_udp => true
    use_tcp => true
  }
}

filter {
	ruby {
	    code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {
    if [marker] {
    file {
    	path => "/opt/log/%{environment}/%{service}/%{+MM}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
    	codec => line { format => "[%{datetime}] %{level} %{message}"}
    }
    } else {
        file {
        path => "/opt/log/%{environment}/%{service}/%{+MM}/unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
    	codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"}
        }
    }
    file {
        path => "/opt/log/%{environment}/%{service}/%{+MM}/unknow.%{+yyyy}-%{+MM}-%{+dd}.json.gz"
		codec => json_lines
        gzip => true
    }
    if [environment] =~ /(prod|grey)/ {
	    if "ERROR" in [level] {
	        http {
	            url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a0ec06ae40f316613f084b095b0b052e69c699400eaa5db162de90f8"
	            http_method => "post"
	            content_type => "application/json; charset=utf-8"
	            format => "message"
	            message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
	        }
	    }
	    if "WARN" in [level] {
	        http {
	            url => "https://oapi.dingtalk.com/robot/send?access_token=d66029a57e68d31f791968a12201a6980f3ac6fb6b47250f3117582afca7678b"
	            http_method => "post"
	            content_type => "application/json; charset=utf-8"
	            format => "message"
	            message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
	        }
	    }
    }

    if "compute" in [marker] and "prod" in [environment] {
        http {
            url => "https://oapi.dingtalk.com/robot/send?access_token=324ab12a36bcb2bb788720c974486218f2517de5a8f5fa009b52297934310c7f"
            http_method => "post"
            content_type => "application/json; charset=utf-8"
            format => "message"
            message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
        }
        http {
            url => "http://zentao.netkiller.cn/zentao/gitlab.php?type=task&func=create&name=服务%{service}环境%{environment}"
            http_method => "post"
            format => "form"
	    	mapping => {"message" => "时间:%{datetime}</br>主机:%{host}[%{source_host}]</br>环境:%{environment}</br>服务:%{service}</br>消息:%{message}"}
        }
    }

    if [environment] =~ /(pre|test|dev|office)/ {
	    if ("ERROR" in [level] or "WARN" in [level]) {
	        http {
	            url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d9b9dfda204c89bf5f47788bda85bc2a83188517fcbdd353d8e266fb5f"
	            http_method => "post"
	            content_type => "application/json; charset=utf-8"
	            format => "message"
	            message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
	        }
	    }
    }

}			
			
			

71.2.5. Beats

Beats 是一个免费且开放的平台,集合了多种单一用途数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。

71.2.5.1. 安装 Beta

Beats 6.x 安装
        
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/beats/beats.sh | bash
        
			
Beats 5.x 安装
				curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/beats/beats-5.x.sh | bash
			

71.2.5.2. Filebeat

模块管理
            
filebeat modules list				
            
			
文件到文件
            
filebeat.inputs:
- type: log
paths:
- /data/logs/*
fields:
project: ${PROJECT}
group: ${GROUP}
stage: ${STAGE}
format: ${FORMAT}

processors:
- add_cloud_metadata:
- add_host_metadata:

output.file:
path: "/tmp"
filename: filebeat			
            
			
TCP
            
[docker@netkiller ~]$ cat filebeat.tcp.yml 
filebeat.inputs:
- type: tcp
max_message_size: 10MiB
host: "localhost:9000"

output.file:
path: "/tmp"
filename: filebeat.log			
            
			

            
[docker@netkiller ~]$ sudo chmod go-w /home/docker/filebeat.tcp.yml				
            
			
            
[docker@netkiller ~]$ ss -lnt | grep 9000
LISTEN 0      1024       127.0.0.1:9000       0.0.0.0:*  				
            
			
            
[docker@netkiller ~]$ echo "Hello world!!!" | nc localhost 9000
echo "Hello worldss -lnt | grep 9000!" | nc localhost 9000

[docker@netkiller ~]$ cat /etc/filesystems | nc localhost 9000

[docker@netkiller ~]$ sudo cat /tmp/filebeat.log-20220728.ndjson |jq | grep message
"message": "Hello worldss -lnt | grep 9000!"
"message": "ext4",
"message": "ext3",
"message": "ext2",
"message": "nodev proc",
"message": "nodev devpts",
"message": "iso9660"
"message": "vfat",
"message": "hfs",
"message": "hfsplus",
"message": "*",				
            
			

71.2.5.3. 配置实例

从 filebeat 到 redis

filebeat.yml

        
filebeat.inputs:
- type: log
  paths:
  - /tmp/*
  fields:
    project: www
    group: netkiller.cn
    stage: dev
    format: json

  multiline:
    pattern: '^\[[^stacktrace]'
    negate: true
    match: after

processors:
  - add_cloud_metadata:
  - add_host_metadata:

output.logstash:
  hosts: ["172.18.200.10:5044"]        
        
			

logstash 配置

			 
input {
	beats {
		port => 5044
	}
}
output {
	file {
		path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
		codec => line { format => "%{message}"}
	}
	file {
		path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/spring.%{+yyyy}-%{+MM}-%{+dd}.log.json.gz"
		codec => json_lines
		gzip => true
	}
	redis {
		host => ["redis.netkiller.cn"]
		password => "passw0rd"
		key => "filebeat"
		codec => json_lines
		data_type => "channel"
	}
}
			
			
日志级别处理

filebeat 从 file 采集日志,发送到 logstash,logstash 接收的是一行一行的文本数据,没有 level 变量。实现 INFO,WARN,ERROR 切割,可以通过字符串匹配方式实现。

			
input {
  beats {
    port => 5044   
  }
}
filter{
	if "MonthShardingAlgorithm" in [message] {
		drop{}
	}
}
output {
    file {
        path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
        codec => line { format => "%{message}"}
    }
    if "netkiller-service" == [fields][service] and "ERROR" in [message] {
	    file {
	        path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/netkiller.beats.log"
	        codec => line { format => "%{message}"}
	    }
    }
    file {
        path => "/opt/log/beats.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
        codec => json_lines
        gzip => true
    }
}			
			
			

			
input {
  beats {
    port => 5044
  }
}
filter{
    if "MonthShardingAlgorithm" in [message] {
	drop{}
    }
    grok{
        match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{NOTSPACE:hostname}\] \[%{LOGLEVEL:level}\] \[%{NOTSPACE:thread-id}\] %{NOTSPACE:class} - %{JAVALOGMESSAGE:msg}"]
    }
}
output {
    file {
        path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
        codec => line { format => "%{message}"}
    }
    if "compute-service" == [fields][service] and "ERROR" == [level] {
        file {
            path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/compute.error.log"
            codec => line { format => "%{message}"}
        }
    }
}
			
			

71.2.6. FAQ

71.2.6.1. Logstash CPU 占用率过高

使用 top 查看 cpu 占用率长期80%,甚至 100%,解决方案是修改 jvm.options 配置文件,将 -Xmx1g 改为 -Xmx4g,-Xmx 配置建议是4~8g

			
## JVM configuration

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms1g
-Xmx4g

################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################

## GC configuration
11-13:-XX:+UseConcMarkSweepGC
11-13:-XX:CMSInitiatingOccupancyFraction=75
11-13:-XX:+UseCMSInitiatingOccupancyOnly

## Locale
# Set the locale language
#-Duser.language=en

# Set the locale country
#-Duser.country=US

# Set the locale variant, if any
#-Duser.variant=

## basic

# set the I/O temp directory
#-Djava.io.tmpdir=$HOME

# set to headless, just in case
-Djava.awt.headless=true

# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8

# use our provided JNA always versus the system one
#-Djna.nosys=true

# Turn on JRuby invokedynamic
-Djruby.compile.invokedynamic=true

## heap dumps

# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError

# specify an alternative path for heap dumps
# ensure the directory exists and has sufficient space
#-XX:HeapDumpPath=${LOGSTASH_HOME}/heapdump.hprof

## GC logging
#-Xlog:gc*,gc+age=trace,safepoint:file=@loggc@:utctime,pid,tags:filecount=32,filesize=64m

# log GC status to a file with time stamps
# ensure the directory exists
#-Xloggc:${LS_GC_LOG_FILE}

# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom

# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true
			
			

71.2.6.2. 查看 Kibana 数据库

			
# curl 'http://localhost:9200/_search?pretty'
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : ".kibana",
        "_type" : "config",
        "_id" : "5.2.2",
        "_score" : 1.0,
        "_source" : {
          "buildNum" : 14723
        }
      }
    ]
  }
}			
			
			

71.2.6.3. logstash 无法写入 elasticsearch

elasticsearch 的配置不能省略 9200 端口,否则将无法链接elasticsearch

			
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }			
			
			

71.2.6.4. 标准输出

			
#cd /etc/logstash/conf.d
#vim logstash_server.conf
input {
    redis {
        port => "6379"
        host => "127.0.0.1"
        data_type => "list"
        key => "logstash-redis"
        type => "redis-input"
   }
}
output {
    stdout {
    	codec => rubydebug
    }
}			
			
			

71.2.6.5. 5.x 升级至 6.x 的变化

5.x type类型如果是date,那么系统默认使用 ISO8601 格式。 6.x 修复了这个问题。"ctime": "2017-12-18 11:21:57"

71.2.6.6. 日志的调试

UDP 调试方法

			 
[root@netkiller log]# cat test.json 
{"facility":"logstash-gelf","source_host":"172.18.0.186","@version":"1","method":"init","message":"Test","class":"Application","host":"macbook-pro-m2.local","@timestamp":"2023-01-07T03:32:28.368Z","timestamp":"2023-01-07 11:32:28.368","marker":"spring","datetime":"2023-01-07 11:32:28","logger":"cn.netkiller.Application","level":"WARN","line":21,"version":"1.1"}

[root@netkiller log]# cat test.json | nc -u 127.0.0.1 12202
			
			

71.2.6.7. 6.x

			
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
			
			

71.2.6.8. ElasticSearch + Logstash + Kibana 安装

环境准备:

操作系统: CentOS 7

Java 1.8

Redis

ElasticSearch + Logstash + Kibana 均使用 5.2 版本

以下安装均使用 Netkiller OSCM 脚本一键安装

ElasticSearch 安装

粘贴下面命令到Linux控制台即可一键安装

				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elasticsearch/elasticsearch-5.x.sh | bash
				
				
Kibana 安装
				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/kibana-5.x.sh | bash
				
				
Logstash 安装
					curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/logstash-5.x.sh | bash
				
从 5.x 升级到 6.x

升级仓库

				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash		
				
				
				
yum update logstash