filebeat采集docker日志 (logstash怎么获取kafka日志)

想听听大道理跟理论?不存在的、文章都是干货实战。这里使用filebeat收集日志到logstash中,再由logstash再生产数据到kafka,如果kafka那边没有kerberos认证也可以直接收集到kafka中。

logstash怎么获取kafka日志,logstashkafka集群

使用方法

使用logstash的版本为6.2.2、使用Centos7系统作为演示

*载下**logstash

cd /usr/share
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
mv logstash-6.2.2 logstash

Filebeat

安装filebeat

yum install filebeat

filebeat配置文件 /etc/filebeat/filebeat.yml

filebeat.inputs:
- type: log
 enabled: true
 publish_async: true
 paths:
 ## 日志位置
 - /etc/filebeat/logs/*.log
filebeat.config.modules:
 path: ${path.config}/modules.d/*.yml
 reload.enabled: false
setup.template.settings:
 index.number_of_shards: 3
output.logstash:
 hosts: ["127.0.0.1:5044"]
processors:
 - drop_fields:
 fields: []

logstash怎么获取kafka日志,logstashkafka集群

Logstash

安装kafka插件

/usr/share/logstash/bin/logstash-plugin install logstash-output-kafka

配置logstash

mkdir /etc/logstash

logstash pipelines文件 /etc/logstash/pipelines.yml

 - pipeline.id: another_test
 queue.type: persisted
 path.config: "/etc/logstash/conf.d/*.conf"
 pipeline.id: mylogs
 pipeline.workers: 8
 pipeline.batch.size: 1000
 pipeline.batch.delay: 50
 pipeline.output.workers: 1

logstash 配置文件 /etc/logstash/logstash.yml

 pipeline:
 batch:
 size: 125
 delay: 5
 
path.data: /var/lib/logstash
pipeline.workers: 8
pipeline.batch.size: 1000
path.logs: /var/log/logstash

jvm 配置 /etc/logstash/jvm.options

-Xms10g
-Xmx10g
-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djruby.compile.invokedynamic=true
-Djruby.jit.threshold=0
-XX:+HeapDumpOnOutOfMemoryError
-Djava.security.egd=file:/dev/urandom

/etc/logstash/conf.d/kafka/kafka_jaas.conf

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="admin/admin@demo.com"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/admin.keytab"
 client=true;
};

kafka认证文件 /etc/logstash/conf.d/logstash.conf

input {
 beats{
 port => 5044
 }
}
output{
 kafka {
 topic_id => "test_hello"
 bootstrap_servers => "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092"
 compression_type => "snappy"
 jaas_path => "/etc/logstash/conf.d/kafka/kafka_jaas.conf"
 kerberos_config => "/etc/krb5.conf"
 ##名字一定要跟kafka_jaas.conf里面的serviceName一样
 sasl_kerberos_service_name => "kafka"
 security_protocol => "SASL_PLAINTEXT"
 client_id => "log.demo.com"
 }
}

logstash怎么获取kafka日志,logstashkafka集群

运行服务

运行logstash

/usr/share/logstash/bin/logstash --path.settings /etc/logstash

运行filebeat

systemctl start filebeat

日志

logstash 正常运行日志

[root@log logstash]# /usr/share/logstash/bin/logstash --path.settings /etc/logstash
Could not find log4j2 configuration at path /etc/logstash/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2026-03-13T04:15:58+00:00.758 [main] scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[INFO ] 2026-03-13T04:15:58+00:00.772 [main] scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[INFO ] 2026-03-13T04:15:58+00:00.657 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.2.2"}
[INFO ] 2026-03-13T04:15:58+00:00.170 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2026-03-13T04:15:58+00:00.075 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] pipeline - Starting pipeline {:pipeline_id=>"mylogs", "pipeline.workers"=>8, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>50}
[INFO ] 2026-03-13T04:15:58+00:00.199 [[mylogs]-pipeline-manager] ProducerConfig - ProducerConfig values: 
 acks = 1
 batch.size = 16384
 bootstrap.servers = [t2.demo.com:9092, t3.demo.com:9092, t4.demo.com:9092]
 buffer.memory = 33554432
 client.id = log.demo.com
 compression.type = snappy
 connections.max.idle.ms = 540000
 enable.idempotence = false
 interceptor.classes = null
 key.serializer = class org.apache.kafka.common.serialization.StringSerializer
 linger.ms = 0
 max.block.ms = 60000
 max.in.flight.requests.per.connection = 5
 max.request.size = 1048576
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
 receive.buffer.bytes = 32768
 reconnect.backoff.max.ms = 10
 reconnect.backoff.ms = 10
 request.timeout.ms = 30000
 retries = 0
 retry.backoff.ms = 100
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = kafka
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.mechanism = GSSAPI
 security.protocol = SASL_PLAINTEXT
 send.buffer.bytes = 131072
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = null
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 transaction.timeout.ms = 60000
 transactional.id = null
 value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[INFO ] 2026-03-13T04:15:58+00:00.311 [[mylogs]-pipeline-manager] AbstractLogin - Successfully logged in.
[INFO ] 2026-03-13T04:15:58+00:00.315 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT refresh thread started.
[INFO ] 2026-03-13T04:15:58+00:00.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT valid starting at: 2018-09-02T23:33:31.000+0800
[INFO ] 2026-03-13T04:15:58+00:00.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT expires: 2018-09-03T23:33:31.000+0800
[INFO ] 2026-03-13T04:15:58+00:00.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT refresh sleeping until: 2018-09-03T19:41:17.691+0800
[INFO ] 2026-03-13T04:15:58+00:00.338 [[mylogs]-pipeline-manager] AppInfoParser - Kafka version : 1.0.0
[INFO ] 2026-03-13T04:15:58+00:00.338 [[mylogs]-pipeline-manager] AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
[INFO ] 2026-03-13T04:15:58+00:00.653 [[mylogs]-pipeline-manager] beats - Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[INFO ] 2026-03-13T04:15:58+00:00.705 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] pipeline - Pipeline started succesfully {:pipeline_id=>"mylogs", :thread=>"#<Thread:0x49c5d932 run>"}
[INFO ] 2026-03-13T04:15:58+00:00.752 [[mylogs]<beats] Server - Starting server on port: 5044
[INFO ] 2026-03-13T04:15:58+00:00.780 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :pipelines=>["mylogs"]}

logstash怎么获取kafka日志,logstashkafka集群