消息队列kafka的应用场景 (kafka集群队列分布)

消息队列 - MQ

什么是消息队列

消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术。消息队列,一般会简称为 MQ(Message Queue)。消息队列是一种帮助开发人员解决系统间异步通信的中间件,常用于解决系统解耦和请求的削峰平谷的问题。

队列(Queue):Queue 是一种先进先出的数据结构,容器。

消息(Message):不同应用之间传送的数据。

消息队列:可以把消息队列比作是一个存放消息的容器,当需要使用消息的时候可以取出消息使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。比如生产者发送消息 1, 2, 3,对于消费者就会按照 1, 2, 3 的顺序来消费。

上游系统 --发送消息--> MQ --发送消息--> [下游系统1,下游系统2]

添加完成 MQ 可以实现上下游之间的解耦合,异步调用,流量削峰填谷。

下游系统可以灵活决定什么时候执行,执行几次,执行的时长。

消息队列的应用场景

消息队列在实际应用中包括如下四个场景:

1) 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败。

2) 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间。

3) 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况。

4) 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者 (可能有多个) 负责对消息进行处理。

异步处理

具体场景:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。对这两个子系统操作的处理方式有两种:串行及并行。涉及到三个子系统:注册系统、邮件系统、短信系统。

1)串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信。在这种方式下,需要最终发送验证短信后再返回给客户端。

用户注册 ----> 注册信息写入 ----> 发送注册邮件 ----> 发送注册短信

2) 并行处理:新注册信息写入后,由发短信和发邮件并行处理。在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。

用户注册 ----> 注册信息写入 ----> [发送注册短信,发送注册邮件]

假设以上三个子系统处理的时间均为 50 ms,且不考虑网络延迟,则总的处理时间:

  • 串行:50 + 50 + 50 = 150 ms
  • 并行:50 + 50 = 100 ms

如果引入消息队列,在来看整体的执行效率:

用户注册 ----> 注册信息写入 ----> 消息队列 ----> [发送注册短信,发送注册邮件]

在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本可以忽略不计,因此总的处理时间为 50 ms,相比串行提高了 2 倍,相比并行提高了一倍。

应用耦合

具体场景:用户使用 QQ 相册上传一张图片,人脸识别系统会对该图片进行人脸识别。

一般的做法是,服务器接收到图片后,图片上传系统立即调用人脸识别系统,调用完成后再返回成功。调用方式:webService、Http 协议(HttpClient、RestTemplate)、TCP 协议(Dubbo)。

图片上传系统 --调用接口--> 人脸识别系统

该方法有如下缺点:

  • 人脸识别系统被调失败,导致图片上传失败。
  • 延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果。
  • 图片上传系统与人脸识别系统之间互相调用,需要做耦合。

若使用消息队列:

图片上传系统 --生产--> 消息队列 <--消费-- 人脸识别系统
  • 客户端上传图片后,图片上传系统将图片信息批次写入消息队列,直接返回成功。
  • 人脸识别系统则定时从消息队列中取数据,完成对新增图片的识别。
  • 图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。
  • 事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时间,对队列中的图片信息进行处理。

限流削峰

具体场景:购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃;而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。

用户请求 --写入--> 消息队列 <--读取-- 业务处理系统

该方法有如下优点:

  • 请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力。
  • 队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息。

消息事件驱动的系统

具体场景: 用户新上传了一批照片 ->人脸识别系统需要对这个用户的所有照片进行聚类 -> 由对账系统重新生成用户的人脸索引(加快查询)。

这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。

图片上传系统 
--生产--> 消息队列 
<--消费-- 人脸识别系统 
--生产--> 消息队列 
<--消费-- 对账系统

该方法有如下优点:

  • 避免了直接调用下一个系统导致当前系统失败。
  • 每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处理。

消息队列的两种模式

消息队列包括两种模式,点对点模式(point to point,queue)和发布/订阅模式(publish/subscribe,topic)。

点对点模式

点对点模式下包括三个角色:

  • 消息队列
  • 发送者 (生产者)
  • 接收者(消费者)

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,可以放在内存中也可以持久化,直到他们被消费或超时。

[Producer, Producer, Producer] 
---sendMsg---> Queue 
<---receive&ack--- [Consumer, Cosumer, Consumer]

点对点模式特点:

  • 每个消息只有一个消费者,一旦被消费,消息就不再在消息队列中。
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息。
  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息。

发布 / 订阅模式

发布 / 订阅模式下包括三个角色:

  • 角色主题(Topic)
  • 发布者(Publisher)
  • 订阅者(Subscriber)

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息;和点对点方式不同,发布到 topic 的消息会被多个订阅者消费。

[Producer, Producer, Producer] 
---publishMsg---> Topic 
<---subscrib&deliver--- [Subscriber, Subscriber, Subscriber]

发布 / 订阅模式特点:

  • 每个消息可以有多个订阅者。
  • 发布者和订阅者之间有时间上的依赖性。
  • 为了消费消息,订阅者必须保持在线运行。

消息队列实现机制

JMS

JMS(JAVA Message Service,Java 消息服务)是一个 Java 平台中关于面向消息中间件的 API。

允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。

是一个消息服务的标准或者说是规范,是 Java 平台上有关面向消息中间件的技术规范。

便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

JMS 消息机制主要分为两种模型:PTP 模型和 Pub/Sub 模型。

实现产品:Apache ActiveMQ。

AMQP

AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。

队列可以绑定交换机,只要发送消息到交换机上,那么对应的队列就可以接受到。

JMS vs AMQP

JMS

  • Java api,非跨语言,非跨平台;
  • 提供两种消息模型 / 模式(Peer-2-Peer 和 Pub/sub);
  • 多种消息类型:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message(只有消息头和属性);
  • JMS 定义了 JAVA API 层面的标准;在 Java 体系中,多个 client 均可以通过 JMS 进行交 互,不需要应用修改代码,但是其对跨平台的支持较差。

AMQP

  • Wire-protocol,跨语言,跨平台;
  • 提供了五种消息模型 1)direct exchange、2)fanout exchange、3)topic change、4)headers exchange、5)system exchange,本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分;
  • byte[] 当实际应用时,有复杂的消息,可以将消息序列化后发送;
  • AMQP 定义了 wire-level 层的协议标准,天然具有跨平台、跨语言特性。

常见的消息队列产品

1)中小型软件公司:

建议选 RabbitMQ。一方面,erlang 语言天生具备高并发的特性,而且管理界面用起来十分方便。但它的弊端也在这里,虽然 RabbitMQ 是开源的,然而国内缺少能定制化开发 erlang 的程序员。所幸,RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 bug,这点对于中小型公司来说十分重要。不考虑 RocketMQ 和 Kafka 的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以 kafka 排除。不考虑RocketMQ 的原因是,RocketMQ 是阿里出品,如果阿里放弃维护 RocketMQ,中小型公司一般抽不出人来进行 RocketMQ 的定制化开发,因此不推荐。

2)大型软件公司:

根据具体使用在 RocketMQ 和 Kafka 之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对 RocketMQ,大型软件公司也可以抽出人手对RocketMQ 进行定制化开发,毕竟国内有能力改 JAVA 源码的人,还是相当多的。至于 Kafka,根据业务场景选择,如果有日志采集功能,肯定是首选 Kafka 了。

RabbitMQ

RabbitMQ 2007 年发布,是一个在 AMQP (高级消息队列协议) 基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

开发语言为 Erlang,单机吞吐量为万级,时效性为 us 级,可用性高(主从架构);

并发能力很强,性能极其好,延迟很低,管理界面较丰富。

ActiveMQ

ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

开发语言为 Java,单机吞吐量为万级,时效性为 ms 级,可用性高(主从架构);

成熟产品,在很多公司得到应用,有较多的文档,各种协议支持较好。

RocketMQ

RocketMQ 出自阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理等。

开发语言为 Java,单机吞吐量为十万级,时效性为 ms 级,可用性非常高(分布式架构);

MQ 功能比较完备,扩展性佳。

Kafka

Apache Kafka 是一个分布式消息发布订阅系统。它最初由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

开发语言为 scala,单机吞吐量为十万级,时效性为 ms 级以内,可用性非常高(分布式架构);

只支持主要的 MQ 功能,像一些消息查询、消息回溯等功能都没有提供,毕竟是为大数据准备的,在大数据领域应用广。

kafka 的基本介绍

什么是 Kafka

官网:http://kafka.apache.org/

Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 Zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka 主要设计目标:

以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间的访问性能。

  • 算法复杂度 - 时间复杂度和空间复杂度。
  • 以时间复杂度为 O(1) 的方式 - 常数时间运行和数据量的增长无关,假如操作一个链表,那么无论链表的大还是小,操作时间是一样的。

高吞吐率,即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。

  • 支持普通服务器每秒百万级写入请求。
  • Memory mapped Files。

支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输。

同时支持离线数据处理和实时数据处理。

Scale out - 支持在线水平扩展。

Kafka 的特点

1)解耦。Kafka 具备消息系统的优点,只要生产者和消费者数据两端遵循接口约束,就可以自行扩展或修改数据处理的业务过程。

2)高吞吐量、低延迟。即使在非常廉价的机器上,Kafka 也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒。

3)持久性。Kafka 可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异。

4)扩展性。Kafka 集群支持热扩展,Kafka 集群启动运行后,用户可以直接向集群添加。

5)容错性。Kafka 会将数据备份到多台服务器节点中,即使 Kafka 集群中的某一台 Kafka 服务节点宕机,也不会影响整个系统的功能。

6)支持多种客户端语言。Kafka 支持 Java、.NET、PHP、Python 等多种语言。

7) 支持多生产者和多消费者。

Kafka 的主要应用场景

消息处理(MQ)。

KafKa 可以代替传统的消息队列软件,使用 KafKa 来实现队列有如下优点:

  • KafKa 的 append 来实现消息的追加,保证消息都是有序的有先来后到的顺序。
  • 稳定性强,队列在使用中最怕丢失数据,KafKa 能做到理论上的写成功不丢失。
  • 分布式容灾好。
  • 容量大相对于内存队列,KafKa 的容量受硬盘影响。
  • 数据量不会影响到 KafKa 的速度。

分布式日志系统 (Log):

在很多时候需要对一些庞大的数据进行存留,日志存储这块会遇到巨大的问题,日志不能丢,日志存文件不好找,定位一条消息成本高(遍历当天日志文件),实时显示给用户难,这几类问题 KafKa 都能游刃有余。

  • KafKa 的集群备份机制能做到 n/2 的可用,当 n/2 以下的机器宕机时存储的日志不会丢失。
  • KafKa 可以对消息进行分组分片。
  • KafKa 非常容易做到实时日志查询。

流式处理:

流式处理就是指实时地处理一个或多个事件流。

流式的处理框架 (spark、storm、flink) 从主题中读取数据,对其进行处理,并将处理后的结果数据写入新的主题,供用户和应用程序使用,Kafka 的强耐久性在流处理的上下文中也非常的有用。

Kafka 的架构

架构案例

producer01 --订单消息--> Kafka集群

producer02 --用户注册的消息--> Kafka集群

Kafka集群:存储消息
{
  [broker0 ip port (订单的 topic)], 
  [broker1 ip port (用户注册的 topic)], 
  [broker2 ip port]
}

Kafka集群:可以有很多个节点实例,每一个实例都是一个 Broker。

Consumer 消费者 -----> Kafka集群

[Producer,Kafka集群,Consumer] ----> Zookeeper存储集群的元数据信息

不同的消费生产者生产不同的消息

Kafka Cluster - 由多个服务器组成,每个服务器单独的名字 broker。

Kafka Broker - Kafka 集群中包含的服务器。

Kafka Producer - 消息生产者,发布消息到 Kafka 集群的终端或服务。

Kafka Consumer - 消息消费者,负责消费数据。

Kafka Topic - 主题,一类消息的名称;存储数据时将一类数据存放在某个 Topic 下,消费数据也是消费一类数据。

  • 订单系统 - 创建一个 topic,叫做 order。
  • 用户系统 - 创建一个 topic,叫做 user。
  • 商品系统 - 创建一个 topic,叫做 product。

注意:Kafka 的元数据都是存放在 Zookeeper 中。

架构剖析

Kafka 支持消息持久化,消费端为 pull 模型来拉取数据,消费状态和订阅关系有客户端负责维护,消息消费完后,不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。

  • Broker - Kafka 集群中包含一个或者多个服务实例,这种服务实例被称为 Broker。
  • Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别就叫做 Topic。
  • Partition - 分区,物理上的概念;每个 topic 包含一个或多个 partition,一个 partition 对应一个文件夹,这个文件夹下存储 partition 的数据和索引文件,每个 partition 内部是有序的。

关系解释

Topic & Partition 主题和分区:

  • Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。
  • Kafka 中的 Topics 总是多订阅者模式,一个 topic 可以拥有一个或者多个消费者来订阅它的数据。
  • 一个 topic 为一类消息,每条消息必须指定一个 topic。
  • 对于每一个 topic,Kafka 集群都会维持一个分区日志。
  • 每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的 commit log 文件。
  • 分区中的每一个记录都会分配一个 id 号来表示顺序,称之为 offset,offset 用来唯一的标识分区中每一条记录。

Topic --> 多个分区 --> 分区日志。

每个消息都是在分区日志进行追加。

ID 就是 offset,为 long 类型,每次加一,有序性,唯一表示和位置。

消费者可以选择性消费消息。

在每一个消费者中唯一保存的元数据是 offset(偏移量)即消费在 log 中的位置,偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理旧的数据;也可以跳过最近的记录,从"现在"开始消费。

这些细节说明 Kafka 消费者是非常廉价的 ---- 消费者的增加和减少,对集群或者其他消费者没有多大的影响。

Kafka 集群环境搭建

ZooKeeper 作为给分布式系统提供协调服务的工具被 Kafka 所依赖。在分布式系统中,消费者需要知道有哪些生产者是可用的,而如果每次消费者都需要和生产者建立连接并测试是否成功连接,那效率也太低了,显然是不可取的。而通过使用 ZooKeeper 协调服务,Kafka 就能将 Producer,Consumer,Broker 等结合在一起,同时借助 ZooKeeper,Kafka 就能够将所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现负载均衡。

虚拟机中搭建 Kafka 集群

准备工作

环境准备:准备三台服务器,安装 jdk1.8,其中每一台 Linux 服务器的 hosts 文件中都需要配置如下的内容。

192.168.186.11 node1
192.168.186.12 node2
192.168.186.13 node3

安装目录创建:

安装包存放的目录:/export/software
安装程序存放的目录:/export/servers
数据目录:/export/data
日志目录:/export/logs

创建各级目录命令:
mkdir -p /export/servers/
mkdir -p /export/software/
mkdir -p /export/data/
mkdir -p /export/logs/

修改 host:

  • 执行命令 cd /etc/ 进入服务器 etc 目录。
  • 执行命令 vi hosts 编辑 hosts 文件。
  • 输入要修改的内容。
127.0.0.1  localhost localhost.localdomain localhost4 localhost4.localdomain4
::1     localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.186.11  node1
192.168.186.12  node2
192.168.186.13  node3
  • 执行命令 /etc/init.d/network restart 重启 hosts。
  • 执行命令 cat /etc/hosts 可以查看到 hosts 文件修改成功。

Zookeeper 集群搭建

Linux 安装 JDK,三台 Linux 都安装。

上传 JDK 到 linux:使用 SSH 方式、使用 CRT 方式。

使用 CRT 需要先在 Linux 虚拟机上安装 lrzsz 上传工具,安装方式: yum install -y lrzsz

安装 lrzsz 之后,只需要在 Linux 命令行中输入:rz,就可以弹出一个文件上传窗口。

安装并配置 JDK:

# 使用 rpm 安装 JDK
rpm -ivh jdk-8u261-linux-x64.rpm

# 默认的安装路径是 /usr/java/jdk1.8.0_261-amd64
# 配置 JAVA_HOME
vi /etc/profile

# 文件最后添加两行
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin

# 退出 vi,使配置生效
source /etc/profile

查看 JDK 是否正确安装

java -version

Linux 安装 Zookeeper,三台 Linux 都安装,以搭建 Zookeeper 集群。

上传 zookeeper-3.4.14.tar.gz

解压并配置 Zookeeper(配置 data 目录,集群节点)。

# node1操作

# 解压到/opt 目录
tar -zxf zookeeper-3.4.14.tar.gz -C /opt

# 配置
cd /opt/zookeeper-3.4.14/conf

# 配置文件重命名后生效
cp zoo_sample.cfg zoo.cfg

#编辑
vi zoo.cfg

# 设置数据目录
dataDir=/var/renda/zookeeper/data
# 添加配置 Zookeeper 集群节点
server.1=node1:2881:3881
server.2=node2:2881:3881
server.3=node3:2881:3881

# 退出 vim
mkdir -p /var/renda/zookeeper/data
echo 1 > /var/renda/zookeeper/data/myid

# 配置环境变量
vi /etc/profile

# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/renda/zookeeper/log

# 退出 vim,让配置生效
source /etc/profile

node2 配置与 node1 基本一样,下面为不同之处:

echo 2 > /var/renda/zookeeper/data/myid

node3 配置与 node1 基本一样,下面为不同之处:

echo 3 > /var/renda/zookeeper/data/myid

启动 Zookeeper:

# 在三台 Linux 上启动 Zookeeper
[root@node1 ~]# zkServer.sh start
[root@node2 ~]# zkServer.sh start
[root@node3 ~]# zkServer.sh start

# 在三台 Linux 上查看 Zookeeper 的状态
[root@node1 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower

[root@node2 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader

[root@node3 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower

*载下** Kafka 安装包

中文网站:https://kafka.apachecn.org/

英文网站:https://kafka.apache.org/

由于 kafka 是 scala 语言编写的,基于 scala 的多个版本,kafka 发布了多个版本。

其中 2.11 是推荐版本。

上传 Kafka 安装包并解压

# 使用 rz 命令将安装包上传至  /export/software

# 1)  切换目录上传安装包
cd /export/software
rz
# 选择对应安装包上传即可

# 2) 解压安装包到指定目录下
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/

# 3) 重命名 (由于名称太长)
mv kafka_2.11-1.0.0 kafka

修改 kafka 的核心配置文件

cd  /export/servers/kafka/config/
vi  server.properties

# 主要修改一下 6 个地方:
#   1) broker.id       需要保证每一台 kafka 都有一个独立的 broker
#   2) log.dirs       数据存放的目录
#   3) zookeeper.connect   zookeeper 的连接地址信息
#   4) delete.topic.enable  是否直接删除 topic
#   5) host.name       主机的名称
#   6) 修改: listeners=PLAINTEXT://node1:9092

# broker.id 标识了 kafka 集群中一个唯一 broker
broker.id=0
listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 存放生产者生产的数据,数据一般以 topic 的方式存放
log.dirs=/export/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# zk 的信息
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

delete.topic.enable=true
host.name=node1

将配置好的 Kafka 分发到其他二台主机

cd /export/servers
scp -r kafka/ node2:$PWD
scp -r kafka/ node3:$PWD

Linux scp 命令用于 Linux 之间复制文件和目录。

scp 是 secure copy 的缩写,scp 是 linux 系统下基于 ssh 登陆进行安全的远程文件拷贝命令。

拷贝后,需要修改每一台服务器的 Kafka 配置文件的 broker.idhost.namelisteners

# ip 为 192.168.186.11 的服务器:
broker.id=0
host.name=node1
listeners=PLAINTEXT://node1:9092

# ip 为 192.168.186.12 的服务器:
broker.id=1
host.name=node2
listeners=PLAINTEXT://node2:9092

# ip 为 192.168.186.13 的服务器:
broker.id=2
host.name=node3
listeners=PLAINTEXT://node3:9092

在每一台的服务器执行创建数据文件的命令:

mkdir -p /export/data/kafka

启动 Kafka 集群

注意事项:在 kafka 启动前,一定要让 zookeeper 启动起来。

cd /export/servers/kafka/bin 

# 前台启动
./kafka-server-start.sh /export/servers/kafka/config/server.properties

# 后台启动
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &
# 注意:可以启动一台 broker,单机版。也可以同时启动三台 broker,组成一个 kafka 集群版

# kafka 停止
./kafka-server-stop.sh

登录的前提是,通过 jps 是可以看到 Kafka 的进程。登录 zookeeper /opt/zookeeper-3.4.14/bin/zkCli.sh;然后执行 ls /brokers/ids,可以看到输出为 [0, 1, 2]

Docker 环境下的 Kafka 集群搭建

  • zoo1 - 192.168.0.11 - 2184:2181
  • zoo2 - 192.168.0.12 - 2185:2181
  • zoo3 - 192.168.0.13 - 2186:2181
  • kafka1 - 192.168.0.14 - 9092:9092 - kafka1
  • kafka2 - 192.168.0.15 - 9093:9092 - kafka1
  • kafka3 - 192.168.0.16 - 9094:9092 - kafka1
  • kafka-manager - 192.168.0.17 - 9000:9000
  • 宿主机 - 192.168.186.20

准备工作

1)宿主机 IP 地址为 192.168.186.20

修改网络配置:vi /etc/sysconfig/network-scrpits/ifcfg-ens33

TYPE=Ethernet
PROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=static
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=b8fd5718-51f5-48f8-979b-b9f1f7a5ebf2
DEVICE=ens33
ONBOOT=yes
IPADDR=192.168.186.20
GATEWAY=192.168.186.2
NETMASK=255.255.255.0
NM_CONTROLLED=no
DNS1=8.8.8.8
DNS2=8.8.4.4

2)安装 docker - compose

Compose 是用于定义和运行多容器 Docker 应用程序的工具。

如果还是使用原来的方式操作 docker,那么就需要*载下**三个镜像:Zookeeper、Kafka、Kafka-Manager,需要对 Zookeeper 安装三次并配置集群、需要对 Kafka 安装三次,修改配置文件,Kafka-Manager 安装一次,但是需要配置端口映射机器 Zookeeper、Kafka 容器的信息。

但是引入 Compose 之后可以使用 yaml 格式的配置文件配置好这些信息,每个 image 只需要编写一个 yaml 文件,可以在文件中定义集群信息、端口映射等信息,运行该文件即可创建完成集群。

通过 Compose,可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

Compose 使用的两个步骤:

  • 使用 docker-compose.yml 定义构成应用程序的服务,这样它们可以在隔离环境中一起运行。
  • 执行 docker-compose up 命令来启动并运行整个应用程序。
# curl 是一种命令行工具,作用是发出网络请求,然后获取数据
curl -L https://github.com/docker/compose/releases/download/1.8.0/run.sh > /usr/local/bin/docker-compose

# chmod(change mode)命令是控制用户对文件的权限的命令
chmod +x /usr/local/bin/docker-compose

# 查看版本
docker-compose --version

3)拉取镜像

# 拉取 Zookeeper 镜像
docker pull zookeeper:3.4

# 拉取 kafka 镜像
docker pull wurstmeister/kafka

# 拉取 kafka-manager 镜像
docker pull sheepkiller/kafka-manager:latest

4)创建集群网络

基于 Linux 宿主机而工作的,也是在 Linux 宿主机创建,创建之后 Docker 容器中的各个应用程序可以使用该网络。

# 创建
docker network create --driver bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 kafka

# 查看
docker network ls

5)网络设置

新建网段之后可能会出现:WARNING: IPv4 forwarding is disabled. Networking will not work.

解决方式:

  • 在宿主机上执行 - echo "net.ipv4.ip_forward=1" >>/usr/lib/sysctl.d/00-system.conf
  • 重启 network 和 docker 服务 - systemctl restart network && systemctl restart docker

搭建过程

每个镜像一个 yml 文件,Zookeeper、Kafka、Kafka-Manager 一个;编写 yml 文件。

1)docker-compose-zookeeper.yml

Zookeeper 各个节点的信息,端口映射,集群信息,网络配置:

# 指定 compose 文件的版本
version: '2'
# 通过镜像安装容器的配置
services:
  zoo1:
  # 使用的镜像
   image: zookeeper:3.4
  # 当 Docker 重启时,该容器重启
   restart: always
  # 类似于在基于 Linux 虚拟机 Kafka 集群中 hosts 文件的值
   hostname: zoo1
   container_name: zoo1
   ports:
  # 端口映射
   - 2184:2181
  # 集群环境
   environment:
   # 当前 Zookeeper 实例的 id
    ZOO_MY_ID: 1
   # 集群节点
    ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
  # 使用的网络配置
   networks:
    kafka:
     ipv4_address: 192.168.0.11
  zoo2:
   image: zookeeper:3.4
   restart: always
   hostname: zoo2
   container_name: zoo2
   ports:
   - 2185:2181
   environment:
    ZOO_MY_ID: 2
    ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
   networks:
    kafka:
     ipv4_address: 192.168.0.12
  zoo3:
   image: zookeeper:3.4
   restart: always
   hostname: zoo3
   container_name: zoo3
   ports:
   - 2186:2181
   environment:
    ZOO_MY_ID: 3
    ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
   networks:
    kafka:
     ipv4_address: 192.168.0.13
networks:
  kafka:
   external:
    name: kafka

2)docker-compose-kafka.yml

version: '2'
services:
  kafka1:
   image: wurstmeister/kafka
   restart: always
   hostname: kafka1
   container_name: kafka1
   privileged: true
   ports:
   - 9092:9092
  # 集群环境配置
   environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka1
    KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
    KAFKA_ADVERTISED_PORT: 9092
    KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
  # 配置 Zookeeper 集群的地址
   external_links:
   - zoo1
   - zoo2
   - zoo3
   networks:
    kafka:
     ipv4_address: 192.168.0.14
  kafka2:
   image: wurstmeister/kafka
   restart: always
   hostname: kafka2
   container_name: kafka2
   privileged: true
   ports:
   - 9093:9093
   environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka2
    KAFKA_LISTENERS: PLAINTEXT://kafka2:9093
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093
    KAFKA_ADVERTISED_PORT: 9093
    KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
   external_links:
   - zoo1
   - zoo2
   - zoo3
   networks:
    kafka:
     ipv4_address: 192.168.0.15
  kafka3:
   image: wurstmeister/kafka
   restart: always
   hostname: kafka3
   container_name: kafka3
   privileged: true
   ports:
   - 9094:9094
   environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka3
    KAFKA_LISTENERS: PLAINTEXT://kafka3:9094
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094
    KAFKA_ADVERTISED_PORT: 9094
    KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
   external_links:
   - zoo1
   - zoo2
   - zoo3
   networks:
    kafka:
     ipv4_address: 192.168.0.16
networks:
  kafka:
   external:
    name: kafka

3)docker-compose-manager.yml

version: '2'
services:
  kafka-manager:
   image: sheepkiller/kafka-manager:latest
   restart: always
   container_name: kafka-manager
   hostname: kafka-manager
   ports:
   - 9000:9000
  # 可以管理 zoo 集群和 kafka 集群
   environment:
   ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
   KAFKA_BROKERS: kafka1:9092,kafka2:9092,kafka3:9092
   APPLICATION_SECRET: letmein
   KM_ARGS: -Djava.net.preferIPv4Stack=true
   networks:
   kafka:
    ipv4_address: 192.168.0.17
networks:
  kafka:
   external:
    name: kafka

将 yaml 文件上传到 Docker 宿主机中。

开始部署:

# 使用命令:
# 参数说明:up 表示启动,-d 表示后台运行。
docker-compose up -d

# 参数说明:  -f:表示加载指定位置的yaml文件 
docker-compose -f /home/docker-compose-zookeeper.yml up -d
docker-compose -f /home/docker-compose-kafka.yml up -d
docker-compose -f /home/docker-compose-manager.yml up -d

测试:浏览器访问宿主机 http://192.168.186.20:9000/

Kafka 的基本操作

在 docker 环境中操作。

1) 创建 topic

创建一个名字为 test 的主题, 有一个分区,有三个副本。一个主题下可以有多个分区,每个分区可以用对应的副本。

Docker:

# 登录到 Kafka 容器
docker exec -it a44b97cb4f00 /bin/bash

# 切换到 bin 目录
cd opt/kafka/bin/

# 执行创建
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 3 --partitions 1 --topic test
  • --create:新建命令。
  • --zookeeper:Zookeeper节点,一个或多个。
  • --replication-factor:指定副本,每个分区有三个副本。
  • --partitions:1

2) 查看主题命令

查看 kafka 当中存在的主题:

kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181

__consumer_offsets 这个 topic 是由 kafka 自动创建的,默认 50 个分区,存储消费位移信息(offset),老版本架构中是存储在 Zookeeper 中。

3) 生产者生产数据

模拟生产者来生产数据:

Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为 message(消息)发送到 Kafka 集群。

默认情况下,每行将作为单独的 message 发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

bash-4.4# kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test
>This is a new Message
>This is another new Message

4) 消费者消费数据

bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning
This is a new Message
This is another new Message

在使用的时候会用到 bootstrap-serverbroker-list 其实是实现一个功能,broker-list 是旧版本命令。

确保消费者消费的消息是顺序的,需要把消息存放在同一个 topic 的同一个分区。

一个主题多个分区,分区内消息有序。

5) 运行 describe 的命令

运行 describe 查看 topic 的相关详细信息。

# 查看 topic 主题详情,Zookeeper 节点写一个和全部写,效果一致
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic test

# 结果列表
Topic: test1   PartitionCount: 3    ReplicationFactor: 3   Configs: 
     Topic: test1   Partition: 0   Leader: 1001   Replicas: 1001,1003,1002     Isr: 1001,1003,1002
     Topic: test1   Partition: 1   Leader: 1002   Replicas: 1002,1001,1003     Isr: 1002,1001,1003
     Topic: test1   Partition: 2   Leader: 1003   Replicas: 1003,1002,1001     Isr: 1003,1002,1001

结果说明:这是输出的解释。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。有几个分区,下面就显示几行。

  • Leader:是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  • Replicas:显示给定 partiton 所有副本所存储节点的节点列表,不管该节点是否是 leader 或者是否存活。
  • Isr:副本都已同步的的节点集合,这个集合中的所有节点都是存活状态,并且跟 leader 同步。

6) 增加 topic 分区数

任意 kafka 服务器执行以下命令可以增加 topic 分区数:

bash-4.4# kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --alter --topic test --partitions 8
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

7) 增加配置

flush.messages:此项配置指定时间间隔,强制进行 fsync 日志,默认值为 None。

例如,如果这个选项设置为 1,那么每条消息之后都需要进行 fsync,如果设置为 5,则每 5 条消息就需要进行一次 fsync

一般来说,建议不要设置这个值。此参数的设置需要在"数据可靠性"与"性能"之间做必要的权衡。

如果此值过大将会导致每次 fsync 的时间较长 (IO 阻塞)。

如果此值过小将会导致 fsync 的次数较多,这也意味着整体的 client 请求有一定的延迟,物理 server 故障,将会导致没有 fsync 的消息丢失。

动态修改 Kakfa 的配置示例:

bash-4.4# kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --config flush.messages=1
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
     Going forward, please use kafka-configs.sh for this functionality
Updated config for topic test.

8) 删除配置

动态删除 Kafka 集群配置:

bash-4.4# kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --delete-config flush.messages
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
     Going forward, please use kafka-configs.sh for this functionality
Updated config for topic test.

9) 删除 topic

目前删除 topic 在默认情况只是打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要在 server.properties 文件中配置:delete.topic.enable=true(集群中的所有实例节点),一个主题会在不同的 Kafka 节点中分配分组信息和副本信息。

然后执行以下命令进行删除 topic:

bash-4.4# kafka-topics.sh --zookeeper zoo1:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Java API 操作 Kafka

Kafka 集群之间是通过 Host Name 进行通讯,HostName --> IP 地址。

Kafka 容器各个节点的 Host Name 为 kafka1, kafka2, kafka3。

192.168.186.20:9092 -> kafka1
192.168.186.20:9093 -> kafka2
192.168.186.20:9094 -> kafka3

Compose.yml:指定了端口映射,将 Kafka 各个容器的端口映射到宿主机 CentOS 7,因此可以通过访问 CentOS 7 的 IP 地址访问 Kafka 容器。

将 Kafka 的 hostname 定义一份在 windows 的 host 文件中。

如果通过 Java API 访问 Kafka 的集群,那么需要在 Windows 的 host 文件添加本地解析。

修改 Windows 的 Host 文件:

192.168.186.20 kafka1
192.168.186.20 kafka2
192.168.186.20 kafka3

创建 maven 的工程, 导入 kafka 相关的依赖:

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.2</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
        <encoding>UTF-8</encoding>
      </configuration>
    </plugin>
  </plugins>
</build>

生产者代码

com.renda.ProducerDemo

public class ProducerDemo {

  // 定义主题
  public static String topic = "renda";

  public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        "192.168.186.20:9092,192.168.186.20:9093,192.168.186.20:9094");
    // 网络传输, 对 key 和 value 进行序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 创建消息生产对象,需要从 properties 对象或者从 properties 文件中加载信息
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    try {
      while (true) {
        // 设置消息内容
        String msg = "Hello, " + new Random().nextInt(100);
        // 将消息内容封装到 ProducerRecord 中
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
        kafkaProducer.send(producerRecord);
        System.out.println("Message Sent Successfully: " + msg);
        Thread.sleep(500);
       }
     } catch (Exception e) {
      e.printStackTrace();
     } finally {
      kafkaProducer.close();
     }
   }

}

消费者代码

com.renda.CosumerDemo

public class ConsumerDemo {

  public static String topic = "renda";

  public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        "192.168.186.20:9092,192.168.186.20:9093,192.168.186.20:9094");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 指定组名
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "renda-1");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 订阅消息
    kafkaConsumer.subscribe(Collections.singletonList(topic));

    while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(String.format("topic: %s, offset: %d. msg: %s",
            record.topic(), record.offset(), record.value()));
       }

     }
   }

}