elk5.5+kafka源码包部署

本文采用源码包安装elk5.5(单机版)

elasticsearch官方文档,请参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
logstash官方文档,请参考:https://www.elastic.co/guide/en/logstash/current/index.html
filebeat官方文档,请参考:https://www.elastic.co/guide/en/beats/filebeat/7.x/index.html
kafka官方文档,请参考:http://kafka.apache.org/21/documentation.html

下载相关部署包:

1
2
3
4
5
6
yum install -y wget
mkdir /export/install_packages && cd /export/install_packages
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.0.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.tar.gz
wget https://artifacts.elastic.co/downloads/kibana/kibana-5.5.0-linux-x86_64.tar.gz
wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz

解压到安装目录:(服务端)

1
2
3
4
5
6
tar -zxvf kafka_2.12-2.3.0.tgz -C /export/servers/
tar -zxvf elasticsearch-5.5.0.tar.gz -C /export/servers/
tar -zxvf logstash-5.5.0.tar.gz -C /export/servers/
tar -zxvf kibana-5.5.0-linux-x86_64.tar.gz -C /export/servers/
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /export/servers/
tar -zxvf jdk-8u40-linux-x64.tar.gz -C /export/servers/

重命名:

1
2
3
4
5
6
cd /export/servers
mv apache-zookeeper-3.5.5-bin/ zookeeper
mv kafka_2.12-2.3.0/ kafka
mv elasticsearch-5.5.0/ elasticsearch
mv logstash-5.5.0/ logstash
mv kibana-5.5.0/ kibana

配置JDK环境:

1
2
3
4
5
6
7
vim /etc/profile
#JDK config
JAVA_HOME=/export/servers/jdk1.8.0_40
CLASSPATH=$JAVA_HOME/jre/lib/rt.jar:.
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH CLASSPATH
source /etc/profile

系统优化配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
vim /etc/sysctl.conf
fs.file-max = 6815744
net.ipv4.ip_local_port_range = 9000 65500
fs.aio-max-nr = 1048576
net.core.rmem_default = 1048576
net.core.wmem_default = 524288
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.core.netdev_max_backlog = 2500
net.ipv4.tcp_max_syn_backlog = 8192
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30
vm.max_map_count=655360

sysctl -p

vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
* soft memlock unlimited
* hard memlock unlimited

vim /etc/security/limits.d/90-nproc.conf
#Default limit for number of user's processes to prevent
#accidental fork bombs.
#See rhbz #432903 for reasoning.
* soft nproc 65536
root soft nproc unlimited

重启reboot生效

配置zookeeper

1
2
3
4
5
6
7
8
cd /export/servers
vim zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/export/servers/zookeeper/data
dataLogDir=/export/servers/zookeeper/logs
clientPort=2181

启动zookeeper

1
/export/servers/zookeeper/bin/zkServer.sh start

配置kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
cd /export/servers
vim kafka/config/server.properties
#kafka使用唯一的一个整数来标识broker。该参数默认值是-1,如果不指定,kafka会自动生成一个唯一值。
broker.id=0
port=9092
advertised.host.name=172.20.103.55
# broker处理消息的最大线程数
num.network.threads=3 #主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1。
# broker处理磁盘IO的线程数
num.io.threads=8 #主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍。
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
#socket server可接受数据大小(防止OOM异常):
socket.request.max.bytes=104857600
socket.message.max.bytes=10000000
socket.replica.fetch.max.bytes=10
#非常重要的参数,kafka持久化消息的目录,该参数可以设置多个目录,用逗号分隔,这样kafka会将负载均匀地分配到多个目录下。如果每个目录都在不同的磁盘上,那么还能提升整体写消息的吞吐量。默认情况下是/tmp/kafka-logs。
log.dirs=/export/servers/kafka/logs
#默认partition数量1,如果topic在创建时没有指定partition数量,默认使用此值。Partition的数量选取也会直接影响到Kafka集群的吞吐性能,配置过小会影响消费性能,建议改为5。
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
# 保留三天,也可以更短
log.retention.hours=72
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,
# kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#此参数没有默认值,必须指定,该参数可以是一个csv列表,如果使用一套zookeeper管理多个kafka集群,则zookeeper的chroot必须指定。
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#replica复制配置
num.replica.fetchers=3
replica.fetch.min.bytes=1
replica.fetch.max.bytes=5242880
#每个follow从leader拉取消息进行同步数据,follow同步性能由这几个参数决定,分别为:
#拉取线程数(num.replica.fetchers):fetcher配置多可以提高follower的I/O并发度,单位时间内leader持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡,建议适当调大;
#最小字节数(replica.fetch.min.bytes):一般无需更改,默认值即可;
#最大字节数(replica.fetch.max.bytes):默认为1MB,这个值太小,推荐5M,根据业务情况调整
#最大等待时间(replica.fetch.wait.max.ms):follow拉取频率,频率过高,leader会积压大量无效请求情况,无法进行数据同步,导致cpu飙升。配置时谨慎使用,建议默认值,无需配置。

vim kafka/bin/kafka-server-start.sh
调整KAFKA_HEAP_OPTS="-Xmx16G -Xms16G”的值

推荐配置:一般HEAP SIZE的大小不超过主机内存的50%。

启动kafka

1
nohup bash /export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties &

配置elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
useradd admin
groupadd admin
mkdir /export/data/elasticsearch /export/logs/elasticsearch -p
chown admin:admin /export/data/elasticsearch /export/logs/elasticsearch
cd /export/servers
vim elasticsearch/config/elasticsearch.yml
cluster.name: elasticsearch-cluster
path.data: /export/data/elasticsearch/
path.logs: /export/logs/elasticsearch/
bootstrap.memory_lock: true
network.host: 172.20.103.55
http.port: 9200
xpack.security.enabled: true

vim elasticsearch-5.5.0/config/jvm.options
#建议服务器内存的一半
-Xms8g
-Xmx8g
#内存垃圾回收机制
#GC configuration
#-XX:+UseConcMarkSweepGC
#-XX:CMSInitiatingOccupancyFraction=75
#-XX:+UseCMSInitiatingOccupancyOnly
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200

chown admin:admin -R /export/servers/elasticsearch

启动elasticsearch:

1
2
su - admin
nohup /export/servers/elasticsearch/bin/elasticsearch &

配置logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
cd /export/servers
vim logstash/config/logstash.yml
## pipeline线程数,官方建议是等于CPU内核数
pipeline.workers: 8
# 实际output时的线程数
pipeline.output.workers: 8
## 每次发送的事件数(当batch.size增大,es处理的事件数就会变少,写入也就越快了)
pipeline.batch.size: 10000
# 发送延时(根据实际的数据量逐渐增大来测试最优值)
pipeline.batch.delay: 10
queue.type: persisted

#Logstash持久化到磁盘,当发生异常情况,比如logstash重启,有可能发生数据丢失,可以选择logstash持久化到磁盘,修改之前重启logstash数据丢失,修改之后重启logstash数据不丢失。以下是具体操作:
path.queue: /export/data/logstash/data #队列存储路径;如果队列类型为persisted,则生效
queue.page_capacity: 250mb #队列为持久化,单个队列大小
queue.max_events: 0 #当启用持久化队列时,队列中未读事件的最大数量,0为不限制
queue.max_bytes: 1024mb #队列最大容量
queue.checkpoint.acks: 1024 #在启用持久队列时强制执行检查点的最大数量,0为不限制
queue.checkpoint.writes: 1024 #在启用持久队列时强制执行检查点之前的最大数量的写入事件,0为不限制
queue.checkpoint.interval: 1000 #当启用持久队列时,在头页面上强制一个检查点的时间间隔

log.level: info
path.logs: /export/servers/logstash/

vim logstash-5.5.0/config/jvm.options
-Xms4g
-Xmx4g
##内存垃圾回收机制
GC configuration
#-XX:+UseConcMarkSweepGC
#-XX:CMSInitiatingOccupancyFraction=75
#-XX:+UseCMSInitiatingOccupancyOnly
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200

mkdir conf.d
vim logstash/conf.d/service-manager.conf
input {
kafka {
bootstrap_servers => "172.20.103.55:9092"
request_timeout_ms => "40000"
session_timeout_ms => "30000"
max_partition_fetch_bytes => "52428700"
max_poll_records => "1000"
consumer_threads => "5"
client_id => "service-manager-info.log"
group_id => "service-manager-info.log"
auto_offset_reset => "latest"
topics => ["service-manager-info.log"]
type => "service-manager-info.log"
codec => 'json'
}
kafka {
bootstrap_servers => "172.20.103.55:9092"
request_timeout_ms => "40000"
session_timeout_ms => "30000"
max_partition_fetch_bytes => "52428700"
max_poll_records => "1000"
consumer_threads => "5"
client_id => "service-manager-error.log"
group_id => "service-manager-error.log"
auto_offset_reset => "latest"
topics => ["service-manager-error.log"]
type => "service-manager-error.log"
codec => 'json'
}
}

filter {
mutate {
remove_field =>["@version"]
remove_field =>["prospector"]
remove_field =>["beat"]
}
}

output {
if [type] == "service-manager-info.log" {
elasticsearch {
user => "elastic"
password => "changeme"
hosts => ["172.20.103.55:9200"]
index => "logstash-service-manager-info.log-%{+YYYY.MM.dd}"
}
}
if [type] == "service-manager-error.log" {
elasticsearch {
user => "elastic"
password => "changeme"
hosts => ["172.20.103.55:9200"]
index => "logstash-service-manager-error.log-%{+YYYY.MM.dd}"
}
#exec {
#command => "/usr/bin/python /export/script/send_mail.py %{type} '%{message}'"
#}
#exec {
#command => "/usr/bin/python /export/script/wechat.py yuhulin %{type} '%{message}'"
#}
}
}

vim logstash/conf.d/service-server.conf
input {
kafka {
bootstrap_servers => "172.20.103.55:9092"
request_timeout_ms => "40000"
session_timeout_ms => "30000"
max_partition_fetch_bytes => "52428700"
max_poll_records => "1000"
consumer_threads => "5"
client_id => "service-server-debug.log"
group_id => "service-server-debug.log"
auto_offset_reset => "latest"
topics => ["service-server-debug.log"]
type => "service-server-debug.log"
codec => 'json'
}
}

filter {
mutate {
remove_field =>["@version"]
remove_field =>["prospector"]
remove_field =>["beat"]
}
}

output {
#if [type] == "service-server-debug.log" {
elasticsearch {
user => "elastic"
password => "changeme"
hosts => ["172.20.103.55:9200"]
index => "logstash-service-server-debug.log-%{+YYYY.MM.dd}"
}
#exec {
#command => "/usr/bin/python /export/script/send_mail.py %{type} '%{message}'"
#}
#exec {
#command => "/usr/bin/python /export/script/wechat.py yuhulin %{type} '%{message}'"
#}
#}
}

chown admin:admin -R /export/servers/logstash

启动logstash

1
2
su - admin
nohup /export/servers/logstash/bin/logstash -f /export/servers/logstash/conf.d/ &

配置kibana

1
2
3
4
5
6
7
8
9
10
11
cd /export/servers
vim kibana/config/kibana.yml
server.port: 5601
server.host: "172.20.103.55"
server.name: "kibana"
elasticsearch.url: "http://172.20.103.55:9200"
kibana.index: ".kibana"
elasticsearch.username: "elastic"
elasticsearch.password: "changeme"
logging.dest: /export/data/logs/kibana/kibana.log
i18n.defaultLocale: "en"

启动kibana

1
nohup /export/servers/kibana/bin/kibana &

配置filebeat(客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cd /export/install_packages
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.5.0-linux-x86_64.tar.gz
tar -zxvf filebeat-5.5.0-linux-x86_64.tar.gz -C /export/servers/
cd /export/servers/
mv filebeat-5.5.0 filebeat
vim filebeat/filebeat.yml
filebeat.prospectors:

- input_type: log
enabled: true
paths:
- /export/data/logs/debug/service-exchange-debug.log
fields:
log_topics: service-exchange-debug.log
- input_type: log
enabled: true
paths:
- /export/data/logs/debug/service-server-debug.log
fields:
log_topics: service-server-debug.log
multiline.pattern: '^[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}'
multiline.negate: true
multiline.match: after

output.kafka:
enabled: true
hosts: ["172.20.103.55:9092"]
topic: '%{[fields][log_topics]}'

启动filebeat

1
nohup /export/servers/filebeat/filebeat -c /export/servers/filebeat/filebeat.yml -e &