评论

收藏

[Linux] ELK+kafka+filebeat搭建生产ELFK集群 --wukong编辑器

服务系统 服务系统 发布于:2021-12-04 20:41 | 阅读数:362 | 评论:0


  • - -
作者:[SRE运维博客](https://www.cnsre.cn/ )
博客地址: [https://www.cnsre.cn/](https://www.cnsre.cn/ )
文章地址:​​https://www.cnsre.cn/posts/210325135520/​​
相关话题:​​https://www.cnsre.cn/tags/elk/​​


  • - -
ELK+kafka+filebeat搭建生产ELFK集群
ELK 架构介绍
DSC0000.png

集群服务版本
   服务|版本  -|-  java|1.8.0_221  elasticsearch|7.10.1  filebeat|7.10.1  kibana|7.10.1  logstash|7.10.1  cerebro|0.9.2-1  kafka|2.12-2.3.0  zookeeper|3.5.6
服务器环境说明
   IP地址|主机名|配置|角色  -|-|-|-

  • 0.11.172|elk-master|4C16G|es-master、kafka+zookeeper1
  • 0.21.117|elk-node1|4C16G|es-node1、kafka+zookeeper2
  • 0.11.208|elk-node2|4C16G|es-node2、kafka+zookeeper3
  • 0.10.242|elk-kibana|4C16G|logstash、kibana、cerebro

系统参数优化
   {{< notice warning "注意" >}}    三个节点都需要执行  {{< /notice >}}
修改主机名
shellhostnamectl set-hostname elk-masterhostnamectl set-hostname elk-node1hostnamectl set-hostname elk-node2
增加文件描述符
shellcat >>/etc/security/limits.conf<< EOF*         soft    nofile      65536*         hard    nofile      65536*         soft    nproc       65536*         hard    nproc       65536*         hard    memlock     unlimited*         soft    memlock     unlimitedEOF
DSC0001.png

修改默认限制内存
shellcat >>/etc/systemd/system.conf<< EOFDefaultLimitNOFILE=65536DefaultLimitNPROC=32000DefaultLimitMEMLOCK=infinityEOF
DSC0002.png

优化内核,对es支持
cat >>/etc/sysctl.conf<< EOF# 关闭交换内存vm.swappiness =0# 影响java线程数量,建议修改为262144或者更高vm.max_map_count= 262144# 优化内核listen连接net.core.somaxconn=65535# 最大打开文件描述符数,建议修改为655360或者更高fs.file-max=655360# 开启ipv4转发net.ipv4.ip_forward= 1EOF
DSC0003.png

修改Hostname配置文件
shellcat >>/etc/hosts<< EOFelk-master  10.0.11.172elk-node1   10.0.21.117elk-node2   10.0.11.208EOF
DSC0004.png

重启使配置生效
shellreboot
部署Zookeeper
   {{< notice warning "注意" >}}    三个节点都需要执行  {{< /notice >}}
创建Zookeeper项目目录
shell#存放快照日志mkdir zkdata  #存放事物日志mkdir zklogs
下载解压zookeeper
shellwget  http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gztar -zxvf apache-zookeeper-3.5.6-bin.tar.gz  mv apache-zookeeper-3.5.6-bin zookeeper
修改配置文件
shell[root@elk-master zookeeper]# cat conf/zoo.cfg  |grep  -v ^## 服务器之间或客户端与服务器之间维持心跳的时间间隔# tickTime以毫秒为单位。tickTime=2000  # 集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数initLimit=10# 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数syncLimit=5# 数据保存目录dataDir=../zkdata# 日志保存目录dataLogDir=../zklogs# 客户端连接端口clientPort=2181# 客户端最大连接数。# 根据自己实际情况设置,默认为60个maxClientCnxns=60# 三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口server.1=10.0.11.172:2888:3888server.2=10.0.21.117:2888:3888server.3=10.0.11.208:2888:3888
写入节点标记
   {{< notice warning "注意" >}}    分别在三个节点​​/home/tools/zookeeper/zkdata/myid​​写入节点标记  {{< /notice >}}    {{< tabs master节点 node1节点 node2节点 >}}    {{< tab >}}
master的操作
shellecho "1" > /home/tools/zookeeper/zkdata/myid
{{< /tab >}}  {{< tab >}}
node1的操作
shellecho "2" > /home/tools/zookeeper/zkdata/myid
{{< /tab >}}  {{< tab >}}
node2的操作
shellecho "3" > /home/tools/zookeeper/zkdata/myid
{{< /tab >}}  {{< /tabs >}}
启动zookeeper集群
shell[root@elk-master zookeeper]# cd /home/tools/zookeeper/bin/[root@elk-master bin]# ./zkServer.sh start  ZooKeeper JMX enabled by defaultUsing config: /home/tools/zookeeper/bin/../conf/zoo.cfgStarting zookeeper ... STARTED
检查集群状态
[root@elk-master bin]# sh /home/tools/zookeeper/bin/zkServer.sh status  ZooKeeper JMX enabled by defaultUsing config: /home/tools/zookeeper/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: leader
DSC0005.png

设置全局变量
shellcat >>/etc/profile<< EOFexport ZOOKEEPER_INSTALL=/home/tools/zookeeper/export PATH=$PATH:$ZOOKEEPER_INSTALL/binexport PATHEOF

  • 使配置生效
shellsource /etc/profile
这样就可以全局使用​​zkServer.sh​​命令了

部署 Kafka
   {{< notice warning "注意" >}}    三个节点都需要执行  {{< /notice >}}
下载解压kafka压缩包
[root@elk-master tools]# mkdir kafka[root@elk-master tools]# cd kafka/[root@elk-master kafka]# wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz[root@elk-master kafka]# tar xf kafka_2.12-2.3.0.tgz  [root@elk-master kafka]# mv kafka_2.12-2.3.0 kafka[root@elk-master kafka]# cd kafka/config/
配置kafka
shell  [root@elk-master config]# cat /home/tools/kafka/kafka/config/server.properties############################# Server Basics #############################  # broker的id,值为整数,且必须唯一,在一个集群中不能重复broker.id=1############################# Socket Server Se:ttings #############################  # kafka默认监听的端口为9092 (默认与主机名进行连接)listeners=PLAINTEXT://:9092# 处理网络请求的线程数量,默认为3个num.network.threads=3# 执行磁盘IO操作的线程数量,默认为8个  num.io.threads=8# socket服务发送数据的缓冲区大小,默认100KBsocket.send.buffer.bytes=102400# socket服务接受数据的缓冲区大小,默认100KBsocket.receive.buffer.bytes=102400# socket服务所能接受的一个请求的最大大小,默认为100Msocket.request.max.bytes=104857600############################# Log Basics #############################  # kafka存储消息数据的目录log.dirs=../kfkdata# 每个topic默认的partition数量num.partitions=3# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量num.recovery.threads.per.data.dir=1############################# Log Flush Policy #############################  # 消息刷新到磁盘中的消息条数阈值#log.flush.interval.messages=10000# 消息刷新到磁盘中的最大时间间隔,1s#log.flush.interval.ms=1000############################# Log Retention Policy #############################  # 日志保留小时数,超时会自动删除,默认为7天log.retention.hours=168# 日志保留大小,超出大小会自动删除,默认为1G#log.retention.bytes=1073741824# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件log.segment.bytes=1073741824# 每隔多长时间检测数据是否达到删除条件,300slog.retention.check.interval.ms=300000############################# Zookeeper #############################  # Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开zookeeper.connect=10.0.11.172,10.0.21.117,10.0.11.208# 连接zookeeper的超时时间,6szookeeper.connection.timeout.ms=6000
创建数据存储的目录
shell[root@elk-master config]# mkdir ../kfkdata
修改broker.id
   {{< notice warning "注意" >}}    分别在三个节点依次修改​​/home/tools/kafka/kafka/config/server.properties​​配置文件  {{< /notice >}}  {{< tabs master节点 node1节点 node2节点 >}}    {{< tab >}}
master的配置
shellbroker.id=1
{{< /tab >}}  {{< tab >}}
node1的配置
shellbroker.id=2
{{< /tab >}}  {{< tab >}}
node2的配置
shellbroker.id=3
{{< /tab >}}  {{< /tabs >}}
DSC0006.png

启动kafka集群
shellcd /home/tools/kafka/kafka/bin/#启动测试./kafka-server-start.sh ../config/server.properties#放入后台./kafka-server-start.sh -daemon ../config/server.properties
测试
   {{< notice warning "注意" >}}    任意节点均可执行  {{< /notice >}}  在创建topic在集群中的任意节点 发布消息订阅消息验证结果
DSC0007.png

    {{< tabs 创建topic 消息发布 topic消息订阅 >}}    {{< tab >}}
shell[root@elk-master bin]# ./kafka-topics.sh \--create \--zookeeper 10.0.11.172:2181,10.0.21.117:2181,10.0.11.208:2181 \--partitions 3 \--replication-factor 1 \--topic logs
  {{< /tab >}}  {{< tab >}}
shell[root@elk-master bin]# ./kafka-console-producer.sh \--broker-list 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \--topic logs
{{< /tab >}}  {{< tab >}}
shell[root@elk-master bin]#  ./kafka-console-consumer.sh \--bootstrap-server 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \--topic logs \--from-beginning
{{< /tab >}}  {{< /tabs >}}
DSC0008.png


部署elasticsearch
   {{< notice warning "注意" >}}    三个节点都需要执行  {{< /notice >}}
下载安装elasticsearch
shellwget https://pan.cnsre.cn/d/Package/Linux/ELK/elasticsearch-7.10.1-x86_64.rpm[root@elk-master package]#  rpm -ivh elasticsearch-7.10.1-x86_64.rpm
备份配置文件
shellcd /etc/elasticsearchcp elasticsearch.yml  elasticsearch.yml.bak
修改配置文件
shellcat >/etc/elasticsearch/elasticsearch.yml << EOF#集群名cluster.name: elk-cluster#node名node.name: elk-1#数据存储路径path.data: /home/elasticsearch/esdata#数据快照路径path.repo: /home/backup/essnapshot#日志存储路径path.logs: /home/elasticsearch/eslogs#es绑定的ip地址,根据自己机器ip进行修改network.host: 0.0.0.0#服务端口http.port: 9200#集群master需要和node名设置一致discovery.seed_hosts: ["10.0.11.172", "10.0.21.117", "10.0.11.208"]cluster.initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"]#允许跨域请求http.cors.enabled: true#* 表示支持所有域名http.cors.allow-origin: "*"#添加请求headerhttp.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type#生产必须为true,内存锁定检查,目的是内存地址直接映射,减少一次copy时间bootstrap.memory_lock: true#系统过滤检查,防止数据损坏,考虑集群安全,生产设置成falsebootstrap.system_call_filter: false#xpack配置xpack.security.enabled: truexpack.security.transport.ssl.enabled: truexpack.security.transport.ssl.verification_mode: certificatexpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12EOF
修改JVM

  • 将​​jvm.options​​​文件中22-23行的​​8g​​设置为你的服务内存的一半
shell[root@elk-node1 elasticsearch]# cat -n  jvm.options |grep 8g  22  -Xms8g  23  -Xmx8g
修改其他节点配置
   {{< notice warning "注意" >}}    分别在三个节点修改​​/etc/elasticsearch/elasticsearch.yml​​配置文件  {{< /notice >}}    {{< tabs master节点 node1节点 node2节点 >}}    {{< tab >}}
master的配置
shellnode.name: "es-master"
{{< /tab >}}  {{< tab >}}
node1的配置
shellnode.name: "es-node1"
{{< /tab >}}  {{< tab >}}
node2的配置
shellnode.name: "es-node2"
{{< /tab >}}  {{< /tabs >}}  最终展示
DSC0009.png

分配权限
   因为自定义数据、日志存储目录,所以要把权限给到目录
mkdir  -p /home/elasticsearch/{esdata,eslogs}chown  elasticsearch:elasticsearch  /home/elasticsearch/*mkdir  -p /home/backup/essnapshotchown elasticsearch:elasticsearch /home/backup/essnapshot
启动服务
   三个节点全部启动并加入开机启动
systemctl start elasticsearchsystemctl enable  elasticsearch
使用xpack进行安全认证
xpack的安全功能

  • TLS 功能。 可对通信进行加密
  • 文件和原生 Realm。 可用于创建和管理用户
  • 基于角色的访问控制。 可用于控制用户对集群 API 和索引的访问权限
  • 通过针对 Kibana Spaces 的安全功能,还可允许在Kibana 中实现多租户。
  在我配置过程中,发现集群认证需要首先配置秘钥才行,否则在给内置用户创建秘钥的时候将会报错。  {{< notice warning "error" >}}    Cause: Cluster state has not been recovered yet, cannot write to the [null] index  {{< /notice >}}
shellUnexpected response code [503] from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password?prettyCause: Cluster state has not been recovered yet, cannot write to the [null] indexPossible next steps:* Try running this tool again.* Try running with the --verbose parameter for additional messages.* Check the elasticsearch logs for additional error details.* Use the change password API manually.  ERROR: Failed to set password for user [apm_system].
申请证书
   {{< notice warning "注意" >}}    下面的操作,在其中一个节点操作即可  {{< /notice >}}
shell/usr/share/elasticsearch/bin/elasticsearch-certutil ca/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
  两条命令均一路回车即可,不需要给秘钥再添加密码  证书创建完成之后,默认在es的数据目录。  将证书拷贝到etc下,并给上权限。
[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*/usr/share/elasticsearch/elastic-certificates.p12/usr/share/elasticsearch/elastic-stack-ca.p12cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*
做完之后,将证书拷贝到其他节点
为内置账号添加密码
   ES中内置了几个管理其他集成组件的账号​​apm_system, beats_system, elastic, kibana, logstash_system, remote_monitoring_user​​使用之前,首先需要设置下密码。
shell/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactiveInitiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.You will be prompted to enter passwords as the process progresses.Please confirm that you would like to continue [y/N]yEnter password for [elastic]:Reenter password for [elastic]:Enter password for [apm_system]:Reenter password for [apm_system]:Enter password for [kibana]:Reenter password for [kibana]:Enter password for [logstash_system]:Reenter password for [logstash_system]:Enter password for [beats_system]:Reenter password for [beats_system]:Enter password for [remote_monitoring_user]:Reenter password for [remote_monitoring_user]:Changed password for user [apm_system]Changed password for user [kibana]Changed password for user [logstash_system]Changed password for user [beats_system]Changed password for user [remote_monitoring_user]Changed password for user [elastic]
部署 Cerebro
下载安装
shellwget https://pan.cnsre.cn/d/Package/Linux/ELK/cerebro-0.9.2-1.noarch.rpmrpm -ivh cerebro-0.9.2-1.noarch.rpm
修改配置文件
   修改​​/etc/cerebro/application.conf​​​配置文件  找到对应配置修改为以下内容   {{< codes 修改内容一 修改内容二>}}   {{​​}}  ​​
shelldata.path: "/var/lib/cerebro/cerebro.db"#data.path = "./cerebro.db"
​​  {{​​​}}  {{​​}}
shellhosts = [  #{  #  host = "http://localhost:9200"  #  name = "Localhost cluster"  #  headers-whitelist = [ "x-proxy-user", "x-proxy-roles", "X-Forwarded-For" ]  #}  # Example of host with authentication  {  host = "http://10.0.11.172:9200"  name = "elk-cluster"  auth = {    username = "elastic"    password = "123"  }  }]
{{​​}}  {{}}
报错
   {{< notice warning "error" >}}    cerebro[8073]: No java installations was detected.  {{< /notice >}}  启动服务后报错​​No java​​​,但是我的环境是有​​JAVA​​的。也做了全局变量  感觉很奇怪...
解决方法
   {{< notice success "解决方法" >}}    在启动服务文件中加入​​JAVA_HOME​​  {{< /notice >}}

  • 找到服务启动文件​​/usr/share/cerebro/bin/cerebro​​
  • 修改​​/usr/share/cerebro/bin/cerebro​​​中的​​JAVA_HOME​​
  具体如下,根据自己的​​JAVA_HOME​​​填写路径   {{< codes 修改前 修改后>}}   {{​​}}  ​​
shell  if [[ -n "$bundled_jvm" ]];  then  echo "$bundled_jvm/bin/java"  elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]];  then  echo "$JAVA_HOME/bin/java"  else  echo "java"  fi
​​  {{​​​}}  {{​​}}
shell  if [[ -n "$bundled_jvm" ]];  then  echo "$bundled_jvm/bin/java"  elif [[ -n "/home/tools/jdk1.8.0_221" ]] && [[ -x "/home/tools/jdk1.8.0_221/bin/java" ]];  then  echo "/home/tools/jdk1.8.0_221/bin/java"  else  echo "java"  fi
{{​​}}  {{}}
启动服务
shellsystemctl  start cerebro.servicesystemctl  enable cerebro.servicesystemctl  status  cerebro.service
可以看到监听的是​​9000​​端口  访问试下
DSC00010.png


部署Kibana
下载安装
https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpmrpm -ivh kibana-7.10.1-x86_64.rpm
修改备份配置文件

  • 备份配置文件
cd /etc/kibana/mv kibana.yml kibana.yml.bak

  • 修改配置文件
vim kibana.ymlserver.port: 5601server.host: 0.0.0.0elasticsearch.hosts: ["http://10.0.11.172:9200/","http://10.0.21.117:9200/","http://10.0.11.208:9200/"]elasticsearch.username: "elastic"elasticsearch.password: "123"i18n.locale: "zh-CN"
启动服务器
shellsystemctl  start   kibana.servicesystemctl  enable  kibana.servicesystemctl  status  kibana.service
访问WEB
   访问​​http://IP:5601​​
DSC00011.png


部署filebeat
shellwget https://pan.cnsre.cn/d/Package/Linux/ELK/filebeat-7.10.1-x86_64.rpmrpm -ivh filebeat-7.10.1-x86_64.rpm  cd /etc/filebeat/cp filebeat.yml filebeat.yml.bak
修改配置文件
   修改filebeat配置文件,把日志推送到kafka
yaml#=========================== Filebeat inputs =============================max_procs: 1           #限制filebeat的进程数量,其实就是内核数,避免过多抢占业务资源queue.mem.events: 256      # 存储于内存队列的事件数,排队发送 (默认4096)queue.mem.flush.min_events: 128  # 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048)filebeat.inputs:         # inputs为复数,表名type可以有多个- type: log            # 输入类型  enable: true           # 启用这个type配置  paths:  - /home/homeconnect/logs/AspectLog/aspect.log  # 监控tomcat  的业务日志  json.keys_under_root: true   #默认Flase,还会将json解析的日志存储至messages字段  json.overwrite_keys: true    #覆盖默认的key,使用自定义json格式的key  max_bytes: 20480         # 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部)  fields:            # 额外的字段   source: test-prod-tomcat-aspect-a  # 自定义source字段,用于es建议索引(字段名小写,我记得大写好像不行)- type: log    # 输入类型  enable: true   # 启用这个type配置  paths:  - /home/tools/apache-tomcat-8.5.23/logs/localhost_access_log.*.log  # 监控tomcat access日志  json.keys_under_root: true   #默认Flase,还会将json解析的日志存储至messages字段  json.overwrite_keys: true  #覆盖默认的key,使用自定义json格式的key  max_bytes: 20480       # 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部分)  fields:            # 额外的字段   source: test-prod-tomcat-access-a  # 自定义source字段,用于es建议索引# 自定义es的索引需要把ilm设置为falsesetup.ilm.enabled: false#=============================== output ===============================output.kafka:     # 输出到kafka  enabled: true   # 该output配置是否启用  hosts: ["10.0.11.172:9092","10.0.21.117:9092","10.0.11.208:9092"] # kafka节点列表  topic: 'logstash-%{[fields.source]}'  # kafka会创建该topic,然后logstash(可以过滤修改)会传给es作为索引名称  partition.hash:    reachable_only: true # 是否只发往可达分区  compression: gzip    # 压缩  max_message_bytes: 1000000  # Event最大字节数。默认1000000。应小于等于kafka broker message.max.bytes值  required_acks: 1  # kafka ack等级  worker: 1  # kafka output的最大并发数  bulk_max_size: 2048  # 单次发往kafka的最大事件数  logging.to_files: true   # 输出所有日志到file,默认true, 达到日志文件大小限制时,日志文件会自动限制替换#=============================== other ===============================close_older: 30m     # 如果文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1hforce_close_files: false # 这个选项关闭一个文件,当文件名称的变化。只在window建议为true# 没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭close_inactive: 1m# 传输了3h后荏没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key pointclose_timeout: 3h# 这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题clean_inactive: 72h# 设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactiveignore_older: 70h
启动服务
shellsystemctl  start  filebeat.servicesystemctl  enable  filebeat.servicesystemctl  status  filebeat.service
部署logstash
下载安装
shellwget https://pan.cnsre.cn/d/Package/Linux/ELK/logstash-7.10.1-x86_64.rpmrpm -ivh logstash-7.10.1-x86_64.rpmmv logstash.yml logstash.yml.bak
修改配置文件
   修改​​logstash.yml​​
ymlvim logstash.ymlhttp.host: "0.0.0.0"# 指发送到Elasticsearch的批量请求的大小,值越大,处理则通常更高效,但增加了内存开销pipeline.batch.size: 3000# 指调整Logstash管道的延迟,过了该时间则logstash开始执行过滤器和输出pipeline.batch.delay: 200
    修改配置文件,从kafka获取日志
shell[root@elk-kibana conf.d]# cat /etc/logstash/conf.d/get-kafka-logs.conf  input {                   # 输入组件  kafka {                  # 从kafka消费数据  bootstrap_servers => ["10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092"]  codec => "json"            # 数据格式  #topics => ["3in1-topi"]           # 使用kafka传过来的topic  topics_pattern => "logstash-.*"       # 使用正则匹配topic  consumer_threads => 3          # 消费线程数量  decorate_events => true        # 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为kafka的字段  auto_offset_reset => "latest"      # 自动重置偏移量到最新的偏移量  #group_id => "logstash-node"      # 消费组ID,多个有相同group_id的logstash实例为一个消费组  #client_id => "logstash1"         # 客户端ID  fetch_max_wait_ms => "1000"      # 指当没有足够的数据立即满足fetch_min_bytes时,服务器在回答fetch请求之前将阻塞的最长时间  }}filter{   # 当非业务字段时,无traceId则移除   #if ([message] =~ "traceId=null") {      # 过滤组件,这里只是展示,无实际意义,根据自己的业务需求进行过滤   #   drop {}   #}mutate {  convert => ["Request time", "float"]  }    if [ip] != "-" {    geoip {             source => "ip"            target => "geoip"             # database => "/usr/share/GeoIP/GeoIPCity.dat"            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]        }           mutate {            convert => [ "[geoip][coordinates]", "float"]        }    } }output {       # 输出组件  elasticsearch {   # Logstash输出到es  hosts => ["10.0.11.172:9200","10.0.21.117:9200","10.0.11.208:9200"]  index => "logstash-%{[fields][source]}-%{+YYYY-MM-dd}"  # 直接在日志中匹配  #index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}"  # 以日期建索引  user => "elastic"  password => "123"  }  #stdout {  #  codec => rubydebug #}}
测试接收日志
   测试是否能接收到数据
shell/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf
  下边把​​logstash​​​设置为使用​​systemd​​​启动  修改​​/etc/systemd/system/logstash.service​​文件
shell[Unit]Description=root[Service]Type=simpleUser=rootGroup=root# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.# Prefixing the path with '-' makes it try to load, but if the file doesn't# exist, it continues onward.EnvironmentFile=-/etc/default/logstashEnvironmentFile=-/etc/sysconfig/logstashExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"Restart=alwayWorkingDirectort=/Nice=19LimitNOFILE=16384[Install]WantedBy=multi-user.target
在启动程序​​/usr/share/logstash/bin/logstash.lib.sh​​​中加入​​JAVA_HOME​​​  在文件86行左右的​​if [ -z "$JAVACMD" ]; then​​​代码上方插入一行​​JAVACMD="/home/tools/jdk1.8.0_221/bin/java"​​​ 具体的路径需要你根据自己的​​JAVA​​来修改。
shell[root@elk-kibana ~]# cat  -n /usr/share/logstash/bin/logstash.lib.sh |grep  JAVACMD  85  # set the path to java into JAVACMD which will be picked up by JRuby to launch itself  86  JAVACMD="/home/tools/jdk1.8.0_221/bin/java"  87  if [ -z "$JAVACMD" ]; then
启动服务
shellsystemctl reload logstash.servicesystemctl restart  logstash.servicesystemctl enable  logstash.service
最后检查
登录kibana创建索引
   选择​​管理​​​--​​索引模式​​​--​​创建索引模式​​
DSC00012.png

DSC00013.png

  输入​​索引名称​​​--​​下一步​​
DSC00014.png

  选择​​@timestamp​​​--​​创建索引模式​​
DSC00015.png

  然后就可以看到日志了


  • - -
作者:[SRE运维博客](https://www.cnsre.cn/ )
博客地址: [https://www.cnsre.cn/](https://www.cnsre.cn/ )
文章地址:​​https://www.cnsre.cn/posts/210325135520/​​
相关话题:​​https://www.cnsre.cn/tags/elk/​​


  • - -



关注下面的标签,发现更多相似文章