kafka消息队列安装配置

版权说明:本文系博主通过各种渠道学习整理发表或者全文转载自其他平台,作为学习笔记,不能保证所有知识点是完全正确以及表达无误,用于生产环境配置时请斟酌。如有错误或建议请联系。转载请注明出处,侵删联系:linuxops@foxmail.com。感谢各位!

一、前言

Kafka 是一个高吞吐量、分布式的发布—订阅消息系统。据 Kafka 官方网站介绍,当前的Kafka 已经定位为一个分布式流式处理平台(a distributed streaming platform),它最初由 LinkedIn公司开发,后来成为 Apache 项目的一部分。 Kafka 核心模块使用 Scala 语言开发,支持多语言(如 Java、 C/C++、 Python、 Go、 Erlang、 Node.js 等)客户端,它以可水平扩展和具有高吞吐量等特性而被广泛使用。目前越来越多的开源分布式处理系统(如 Flume、 Apache Storm、 Spark、 Flink等)支持与 Kafka 集成。

Kafka 是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper 协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比, Kafka能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。


二、kafka基本概念

本节我们对 Kafka 的基本概念进行详细阐述。

1.主题

Kafka 将一组消息抽象归纳为一个主题(Topic),也就是说,一个主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。

2.消息

消息是 Kafka 通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。在老版本中,每一条消息称为 Message;在由 Java 重新实现的客户端中,每一条消息称为 Record。

3.分区和副本

Kafka 将一组消息归纳为一个主题,而每个主题又被分成一个或多个分区(Partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。每个分区在物理上对应为一个文件夹,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从 0 开始,编号最大值为分区的总数减 1。每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的分区数可以在 Kafka 启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。分区使得 Kafka 在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。同时,分区也是 Kafka 保证消息被顺序消费以及对消息进行负载均衡的基础。 Kafka 只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是 Kafka 高吞吐率的一个重要保证。同时与传统消息系统不同的是, Kafka 并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储(事实上这也是没有必要的),因此 Kafka 提供两种删除老数据的策略,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置

4. Leader 副本和 Follower 副本

由于 Kafka 副本的存在,就需要保证一个分区的多个副本之间数据的一致性, Kafka 会选择该分区的一个副本作为 Leader 副本,而该分区其他副本即为 Follower 副本,只有 Leader 副本才负责处理客户端读/写请求, Follower 副本从 Leader 副本同步数据。如果没有 Leader 副本,那就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,假设有 n 个副本则需要有 n×n 条通路来同步数据,这样数据的一致性和有序性就很难保证。引入 Leader 副本后客户端只需与 Leader 副本进行交互,这样数据一致性及顺序性就有了保证。 Follower 副本从 Leader 副本同步消息,对于 n 个副本只需 n-1 条通路即可,这样就使得系统更加简单而高效。副本 Follower 与 Leader 的角色并不是固定不变的,如果 Leader 失效,通过相应的选举算法将从其他 Follower 副本中选出新的 Leader 副本。

5.偏移量

任何发布到分区的消息会被直接追加到日志文件(分区目录下以“.log”为文件名后缀的数据文件)的尾部,而每条消息在日志文件中的位置都会对应一个按序递增的偏移量。偏移量是一个分区下严格有序的逻辑值,它并不表示消息在磁盘上的物理位置。由于 Kafka 几乎不允许对消息进行随机读写,因此 Kafka 并没有提供额外索引机制到存储偏移量,也就是说并不会给偏移量再提供索引。消费者可以通过控制消息偏移量来对消息进行消费,如消费者可以指定消费的起始偏移量。为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到 ZooKeeper 当中,而新版消费者是将消费偏移量保存到 Kafka 内部一个主题当中。当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到 Kafka 中。

6.日志段

一个日志又被划分为多个日志段(LogSegment),日志段是 Kafka 日志对象分片的最小单位。与日志对象一样,日志段也是一个逻辑概念,一个日志段对应磁盘上一个具体日志文件和两个索引文件。日志文件是以“.log”为文件名后缀的数据文件,用于保存消息实际数据。两个索引文件分别以“.index”和“.timeindex”作为文件名后缀,分别表示消息偏移量索引文件和消息时间戳索引文件。

7.代理

在 Kafka 基本体系结构中我们提到了 Kafka 集群。 Kafka 集群就是由一个或多个 Kafka 实例构成,我们将每一个 Kafka 实例称为代理(Broker),通常也称代理为 Kafka 服务器(KafkaServer)。在生产环境中 Kafka 集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。每一个代理都有唯一的标识 id,这个 id 是一个非负整数。在一个 Kafka集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的 id, id 值可以选择任意非负整数即可,只要保证它在整个 Kafka 集群中唯一,这个 id 就是代理的名字,也就是在启动代理时配置的 broker.id 对应的值,因此在本书中有时我们也称为 brokerId。由于给每个代理分配了不同的 brokerId,这样对代理进行迁移就变得更方便,从而对消费者来说是透明的,不会影响消费者对消息的消费。代理有很多个参数配置,由于在本节只是对其概念进行阐述,因此不做深入展开,对于代理相关配置将穿插在本书具体组件实现原理、流程分析及相关实战操作章节进行介绍。

8.生产者

生产者(Producer)负责将消息发送给代理,也就是向 Kafka 代理发送消息的客户端。

9.消费者和消费组

消费者(Comsumer)以拉取(pull)方式拉取数据,它是消费的客户端。在 Kafka 中每一个消费者都属于一个特定消费组(ConsumerGroup),我们可以为每个消费者指定一个消费组,以 groupId 代表消费组名称,通过 group.id 配置设置。如果不指定消费组,则该消费者属于默认消费组 test-consumer-group。同时,每个消费者也有一个全局唯一的 id,通过配置项 client.id指定,如果客户端没有指定消费者的 id, Kafka 会自动为该消费者生成一个全局唯一的 id,格式为${groupId}-${hostName}-${timestamp}-${UUID 前 8 位字符}。同一个主题的一条消息只能被同一个消费组下某一个消费者消费,但不同消费组的消费者可同时消费该消息。消费组是 Kafka用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。

10. ISR

Kafka 在 ZooKeeper 中动态维护了一个 ISR(In-sync Replica),即保存同步的副本列表,该列表中保存的是与 Leader 副本保持消息同步的所有副本对应的代理节点 id。如果一个 Follower副本宕机(本书用宕机来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等)或是落后太多,则该 Follower副本节点将从 ISR 列表中移除。

11. ZooKeeper

这里我们并不打算介绍 ZooKeeper 的相关知识,只是简要介绍 ZooKeeper 在 Kafka 中的作用。 Kafka 利用 ZooKeeper 保存相应元数据信息, Kafka 元数据信息包括如代理节点信息、 Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。 Kafka 在启动或运行过程当中会在 ZooKeeper 上创建相应节点来保存元数据信息, Kafka 通过监听机制在这些节点注册相应监听器来监听节点元数据的变化,从而由 ZooKeeper 负责管理维护 Kafka 集群,同时通过 ZooKeeper 我们能够很方便地对 Kafka 集群进行水平扩展及数据迁移。

以上【一、前言】和【二、kafka的基本概念】两节摘自 牟大恩 的《kafka入门与实践》购书地址:当当


三、依赖环境配置

kafka是基于Scala语言开发的,运行再JVM上,所以安装kafka之前需要先安装JDK。

kafka需要通过Zookeeper来实现同步服务、配置维护、命名服务等功能,所以除了基本的java环境以外需要先安装Zookeeper。

kafka官方二进制包集成了Zookeeper,但是为了更深入了解kafka集群的原理,我们不使用kafka官方集成的Zookeeper。

为了保证集群的健壮性,我们需要配置3台Zookeeper集群和3台的kafka集群(事实上如果需要集群架构,至少需要三台,为了节省成本可以将zookeeps安装再kafka服务器上,有条件的用户建议单独安装)

本文使用的软件版本如下:

  • 云平台:华为云
  • 系统:centos 7.4
  • JDK版本:1.8.0_151
  • zookeeper版本:3.4.10
  • kafka版本:kafka_2.11-1.0.0

软件请移步到各自官网下载。

本文使用到的资源如下:

服务名称 IP地址 域名 CPU 内存 说明
zookeeper 192.168.0.218 zk1.linuxops.org 8核 16G myid:1
zookeeper 192.168.0.52 zk2.linuxops.org 8核 16G myid:2
zookeeper 192.168.0.186 zk3.linuxops.org 8核 16G myid:3
kafka 192.168.0.219 kafka1.linuxops.org 8核 16G broker.id=1
kafka 192.168.0.36 kafka2.linuxops.org 8核 16G broker.id=2
kafka 192.168.0.184 kafka3.linuxops.org 8核 16G broker.id=3
kafka manager 192.168.0.172 manager.linuxops.org 8核 16G

1、修改hosts文件

在配置之前,为了方便使用我们修改一下hosts文件,以解析IP。

[root@ZK1 ~]# vim /etc/hosts

添加如下内容:

192.168.0.218 zk1.linuxops.org
192.168.0.52  zk2.linuxops.org
192.168.0.186 zk3.linuxops.org
192.168.0.219 kafka1.linuxops.org
192.168.0.36  kafka2.linuxops.org
192.168.0.184 kafka3.linuxops.org
192.168.0.172 manager.linuxops.org

保存退出,使用ping命令确认生效:

[root@ZK1 ~]# ping zk1.linuxops.org
PING zk1.linuxops.org (192.168.0.218) 56(84) bytes of data.
64 bytes from zk1.linuxops.org (192.168.0.218): icmp_seq=1 ttl=64 time=0.023 ms

注意:7台服务器均修改/etc/hosts

2、安装JDK

2.1、解压二进制包

[root@ZK1 ~]# tar -zxvf jdk-8u151-linux-x64.tar.gz

2.2、移动到指定的目录

[root@ZK1 ~]# mv jdk1.8.0_151/ /usr/local/jdk

2.3、配置环境变量

[root@ZK1 ~]# vim /etc/profile

在文件末尾添加如下内容:

#set java environment
JAVA_HOME=/usr/local/jdk
JRE_HOME=/usr/local/jdk/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

使其生效

[root@ZK1 ~]# source /etc/profile

2.4、检查是否安装成功

[root@ZK1 ~]# java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
[root@ZK1 ~]#

至此基本的依赖环境已经配置完毕。

注意:7台服务器均需要java环境


四、zookeep安装配置

1、解压zookeeper二进制包

[root@ZK1 ~]# tar -zxvf zookeeper-3.4.10.tar.gz

2、移动到指定目录

[root@ZK1 ~]# mv zookeeper-3.4.10 /usr/local/zookeeper

3、修改配置文件

进入配置文件目录

[root@ZK1 ~]# cd /usr/local/zookeeper/conf
[root@ZK1 conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@ZK1 conf]#

zookeeper提供了一个示例配置文件 zoo_sample.cfg 复制一份

[root@ZK1 conf]# cp zoo_sample.cfg zoo.cfg

打开文件看一下内容

[root@ZK1 conf]# vim zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
#zookeeper中的一个时间单元,zookeeper中所有的时间都是以这个时间单元为准,进行整倍数的调整,默认是2S

# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
#Follower在启动过程中,会从Leader同步所有最新的数据,确定自己能够对外服务的起始状态。
#当Follower在initLimit个tickTime还没有完成数据同步时,则Leader仍为Follower连接失败。

# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
#Leader于Follower之间通信请求和应答的时间长度。
#若Leader在syncLimit个tickTime还没有收到Follower应答,则认为该Lwader已经下线。

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
#dataDir=/tmp/zookeeper
dataDir=/data/zookeeper
#存储快照文件的目录,默认情况下事务日志也会存储在该目录上。
#由于事务日志的写性能直接影响zookeeper性能,因此建议同时配置dataLogDir
#在生产环境中,一般我们要修改此目录,我们将修改为dataDir=/tmp/zookeeper

dataLogDir=/data/zookeeper
#事务日志输入目录
# the port at which the clients will connect
clientPort=2181
#zookeeper的对外服务端口

# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.1=zk1.linuxops.org:2888:3888
server.2=zk2.linuxops.org:2888:3888
server.3=zk3.linuxops.org:2888:3888
#以上配置zookeeper集群的服务地址,需要手动添加。
#其中server.1为第1台服务器,zk1.linuxops.org为第一台服务器解析的域名
#2888为该服务器于集群中Leader交换信息的端口,3888为选举时服务器通信端口。

4、设置myid

在dataDir指定的数据目录(/data/zookeeper)下,创建文件myid,文件内容为一个正整数值,用来唯一标识当前机器,因此不同zookeeper的数值不能相同,建议从1开始递增标识,以方便记忆和管理。本文的myid约定依照第二节依赖环境配置说明配置。

准备目录,配置myid

[root@ZK1 conf]# mkdir -p /data/zookeeper/log
[root@ZK1 conf]# echo '1' > /data/zookeeper/myid
[root@ZK1 conf]# cat /data/zookeeper/myid 
1
[root@ZK1 conf]#

注意:不同zookeeper的myid不允许相同

5、修改zkEnv.sh文件

设置环境变量ZOO_LOG_DIR为zookeeper的日志存放目录

[root@ZK1 bin]# vim /usr/local/zookeeper/bin/zkEnv.sh

在有效配置范围为的第一行增加如下配置:

export ZOO_LOG_DIR=/data/zookeeper/log

这一行配置是将rzookeeper日志文件zookeeper.out输出到/data/zookeeper/log目录下。默认输出到启动zookeeper时候所在的目录。

6、修改环境变量

为了管理方便,可以将zookeeper的bin目录添加到环境变量

[root@ZK1 bin]# vim /etc/profile
#在文件末尾添加 export PATH=$PATH:/usr/local/zookeeper/bin
[root@ZK1 bin]# source /etc/profile

7、启动、停止和状态管理

启动zookeeper

[root@ZK1 bin]# zkServer.sh start 
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 4115.
[root@ZK1 bin]#

停止zookeeper

[root@ZK1 log]# zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
[root@ZK1 log]#

zookeeper状态

通过zkServer.sh status查看角色状态

ZK1状态:

[root@ZK1 conf]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

ZK2状态:

[root@ZK2 conf]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

ZK3状态:

[root@ZK3 conf]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader

由上看出Leader是ZK3。

至此zookeeper安装配置完成。

启动zookeeper之后,可以通过/data/zookeeper/log/zookeeper.out 查看到zookeeper日志。

说明:如果不需要zookeeper集群,不需要配server.1=zk1.linuxops.org:2888:3888 以及 myid文件即可


五、kafka安装配置

1、kafka安装

kafka的安装非常简单,下载二进制包,解压移动道你想的目录中即可。

如果有需要,可以设置环境变量,以便于使用。

[root@kafka1 ~]# wget http://mirrors.shu.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz 
[root@kafka1 ~]# tar -zxvf kafka_2.11-1.0.0.tgz 
[root@kafka1 ~]# mv kafka_2.11-1.0.0 /usr/local/kafka
[root@kafka1 bin]# vim /etc/profile
#在文件末尾添加"export PATH=$PATH:/usr/local/kafka/bin/"

kafka就安装好了。

我们看一下bin目录中有那些文件:


[root@kafka1 bin]# cd /usr/local/kafka/bin/ && ls
connect-distributed.sh        kafka-console-consumer.sh    kafka-log-dirs.sh                    kafka-replay-log-producer.sh   kafka-simple-consumer-shell.sh      trogdor.sh     zookeeper-shell.sh
connect-standalone.sh         kafka-console-producer.sh    kafka-mirror-maker.sh                kafka-replica-verification.sh  kafka-streams-application-reset.sh  windows
kafka-acls.sh                 kafka-consumer-groups.sh     kafka-preferred-replica-election.sh  kafka-run-class.sh             kafka-topics.sh                     zookeeper-security-migration.sh
kafka-broker-api-versions.sh  kafka-consumer-perf-test.sh  kafka-producer-perf-test.sh          kafka-server-start.sh          kafka-verifiable-consumer.sh        zookeeper-server-start.sh
kafka-configs.sh              kafka-delete-records.sh      kafka-reassign-partitions.sh         kafka-server-stop.sh           kafka-verifiable-producer.sh        zookeeper-server-stop.sh
[root@kafka1 bin]#

kafka-console-consumer.sh :官方控制台消费者

kafka-console-producer.sh : 官方控制台生产者

kafka-server-start.sh :kafka启动脚本

kafka-server-stop.sh:kafka停止脚本

kafka-topics.sh : topics管理脚本

zookeeper-server-start.sh : zookeeper启动脚本(本文不使用官方的zookeeper,所以用不到这个脚本)

zookeeper-server-stop.sh : zookeeper停止脚本(本文不使用官方的zookeeper,所以用不到这个脚本)

2、kafka配置

kafka的配置文件位于/usr/local/kafka/config/下,server.properties是kafka的配置文件,zookeeper.properties是官方基础zookeeper的配置文件,配置项目和zookeeper官方包的配置一样,但是本文没有使用kafka集成的zookeeper,所以不需要理会这个文件,我们早点看一下zookeeper.properties

kafka默认配置如下:

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

在一般的集群环境中,我们需要修改broker.idzookeeper.connect、以及listeners这三个参数,为了管理方便,也建议修改log.dir参数。

listeners配置kafka监听的地址,在默认配置中被注释,需要手动取消注释。

配置参数说明如下:

参数 说明
broker.id =0 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
listeners=PLAINTEXT://:9092 配置kafka监听地址,如果没有配置的话就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的值。
此处一定要指定IP或者hostname并且能通过IP或者hostname访问,否则会报 NoBrokersAvailable
advertised.listeners=PLAINTEXT://your.host.name:9092 advertised.listeners 是 broker 给 producer 和 consumer 连接使用的,如果没有设置,就使用 listeners,而如果 host_name 没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。
num.network.threads=3 broker处理消息的最大线程数,一般情况下数量为cpu核数
num.io.threads=8 broker处理磁盘IO的线程数,数值为cpu核数2倍
socket.send.buffer.bytes=102400 socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes=102400 socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.request.max.bytes=104857600 这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/tmp/kafka-logs kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2
num.partitions=1 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 用于在启动时,用于日志恢复的线程个数
offsets.topic.replication.factor=1 用于配置offset记录的topic的partition的副本个数
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 默认消息的最大持久化时间,168小时,7天
log.segment.bytes=1073741824 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms=300000 每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
zookeeper.connect=localhost:2181 zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connection.timeout.ms=6000 ZooKeeper的连接超时时间
group.initial.rebalance.delay.ms=0

根据机器配置,我们设置参数如下(注释项目为修改项):

[root@kafka1 config]# mkdir -p /data/kafka
[root@kafka1 config]# cp /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties.bak
[root@kafka1 config]# cat > /usr/local/kafka/config/server.properties << EOF
> broker.id=0
> #不同机器配置不同ID
> listeners=PLAINTEXT://:9092
> #监听所有地址
> num.network.threads=8
> #设置为CPU核心数
> num.io.threads=16
> #设置为CPU核心数的两倍
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/data/kafka
> #修改log文件目录
> num.partitions=20
> #修改默认partitions数量
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> zookeeper.connect=zk1.linuxops.org:2181,zk2.linuxops.org:2181,zk3.linuxops.org:2181
> #修改zookeeper的地址
> zookeeper.connection.timeout.ms=6000
> group.initial.rebalance.delay.ms=0
> EOF

三台kafka服务器均需要配置好,并且broker.id不能相同,具体请参考依赖环境配置中的规划。

3、启动kafka服务

kafka启动也很简单

[root@kafka1 kafka]# kafka-server-start.sh /usr/local/kafka/config/server.properties

通过以上命令就能启动kafka了,但是启动之后会在终端运行,并且打印出日志信息,这在调试的时候很方便,但是显然生产环境不能这样运行,终端退出后服务也会被终止。

如何将kafka以deamon的方式运行?很简单,通过指定-deamon参数即可

[root@kafka1 kafka]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

可以通过netstat -ntlp可以看到是否监听了端口,也可以通过jps查看kafka进程是否运行。

启动三台kafka服务器,到此为止kafka集群搭建完毕。


六、kafka基本命令和topic管理

1、创建topic

kafka创建topic比较感觉比较随意,可以通过kafka-topics.sh脚本创建,也可以通过生产者向一个不存在的topic发送消息来创建topic,不过依然建议事先创建好topic后再使用。

[root@kafka3 ~]# kafka-topics.sh --zookeeper zk1.linuxops.org:2181 --create --topic test --partitions 20 --replication-factor 3
Created topic "test".

如上命令是完整的创建topic的命令,其中:

--zookeeper 指定了zookeeper的地址,如果zookeeper是集群,可以指定多个zookeeper,也可以只指定一个可用的zookeeper服务地址

--create : 创建命令

--topic:指定topic的名称

--partitions :指定partitions 数量

--replication-factor : 设置消息保存在几个broker上,一般情况下和brocker数量相同

2、查看所有topic名称

[root@kafka3 ~]# kafka-topics.sh --zookeeper zk1.linuxops.org:2181  --list 
test
test1

3、查看topic详细信息

[root@kafka3 ~]# kafka-topics.sh --zookeeper zk1.linuxops.org:2181 --describe --topic test
Topic:test  PartitionCount:20  ReplicationFactor:3  Configs:
Topic: test  Partition: 0      Leader: 3            Replicas: 3,0,2Isr: 3,0,2
Topic: test  Partition: 1      Leader: 0            Replicas: 0,2,3Isr: 0,2,3
Topic: test  Partition: 2      Leader: 2            Replicas: 2,3,0Isr: 2,3,0
Topic: test  Partition: 3      Leader: 3            Replicas: 3,2,0Isr: 3,2,0
Topic: test  Partition: 4      Leader: 0            Replicas: 0,3,2Isr: 0,3,2
Topic: test  Partition: 5      Leader: 2            Replicas: 2,0,3Isr: 2,0,3
Topic: test  Partition: 6      Leader: 3            Replicas: 3,0,2Isr: 3,0,2
Topic: test  Partition: 7      Leader: 0            Replicas: 0,2,3Isr: 0,2,3
Topic: test  Partition: 8      Leader: 2            Replicas: 2,3,0Isr: 2,3,0
Topic: test  Partition: 9      Leader: 3            Replicas: 3,2,0Isr: 3,2,0
Topic: test  Partition: 10     Leader: 0            Replicas: 0,3,2Isr: 0,3,2
Topic: test  Partition: 11     Leader: 2            Replicas: 2,0,3Isr: 2,0,3
Topic: test  Partition: 12     Leader: 3            Replicas: 3,0,2Isr: 3,0,2
Topic: test  Partition: 13     Leader: 0            Replicas: 0,2,3Isr: 0,2,3
Topic: test  Partition: 14     Leader: 2            Replicas: 2,3,0Isr: 2,3,0
Topic: test  Partition: 15     Leader: 3            Replicas: 3,2,0Isr: 3,2,0
Topic: test  Partition: 16     Leader: 0            Replicas: 0,3,2Isr: 0,3,2
Topic: test  Partition: 17     Leader: 2            Replicas: 2,0,3Isr: 2,0,3
Topic: test  Partition: 18     Leader: 3            Replicas: 3,0,2Isr: 3,0,2
Topic: test  Partition: 19     Leader: 0            Replicas: 0,2,3Isr: 0,2,3

第一行,列出了topic的名称,分区数(PartitionCount),副本数(ReplicationFactor)以及其他的配置(Configs)

Leader:1 表示为做为读写的broker的编号

Replicas:表示该topic的每个分区在那些borker中保存

Isr:表示当前有效的broker, Isr是Replicas的子集

4、增加partitions分区数

[root@kafka3 ~]# kafka-topics.sh --zookeeper zk1.linuxops.org:2181 --alter --topic test --partitions 40
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

5、删除topic

[root@kafka3 ~]# kafka-topics.sh --zookeeper zk1.linuxops.org:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

如上提示,删除操作在默认情况下只是打上一个删除的标记,在重新启动kafka 后才删除。如果需要立即删除,则需要在server.properties中配置 delete.topic.enable=true

6、查看topic消费到的offset

[root@kafka3 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1.linuxops.org:9092 --topic test0 --time -1
test0:17:0
test0:8:0
test0:11:0
test0:2:0
test0:5:0
test0:14:0
test0:13:0
test0:4:0
test0:16:0
test0:7:0
test0:10:0
test0:1:0
test0:19:0
test0:18:0
test0:9:0
test0:3:0
test0:12:0
test0:15:0
test0:6:0
test0:0:

还没有生产消费,所以数据为0。可以通过官方自带的控制台生产者喝消费者来测试一下kafka,在第七节会介绍python的生产者和消费者代码,这里就不演示官方自带的控制台工具了。

kafka的常用命令就介绍到此,以后再添加完善。


七、kafka manager安装配置

一般为了方便管理,我们会选择一款kafka的管理工具,kafka manager就是其中一个优秀的kafka管理工具,它是yahoo开发并且开源的。

市面上有还有另外两款管理工具Kafka Web Conslole、KafkaOffsetMonitor也是很不错的工具,有兴趣可以自行下载体验。

1、下载

[root@kafka-manager ~]# git clone https://github.com/yahoo/kafka-manager
Cloning into 'kafka-manager'...
remote: Counting objects: 4555, done.
remote: Total 4555 (delta 0), reused 0 (delta 0), pack-reused 4555
Receiving objects: 100% (4555/4555), 2.81 MiB | 1.05 MiB/s, done.
Resolving deltas: 100% (2914/2914), done.

2、编译打包

[root@kafka-manager ~]# cd kafka-manager/
[root@kafka-manager kafka-manager]# ./sbt clean dist

由于众所周知的原因,编译异常慢,有过编译一个多小时的经历,也有过失败的经历......万分忧伤。

编译成功了以后,在 target/universal下有有个kafka-manager-1.3.3.16.zip文件,这就是我们需要的。

3、解压安装

解压 kafka-manager-1.3.3.16.zip,移动到目标路径中。

[root@kafka-manager universal]# unzip kafka-manager-1.3.3.16.zip
 mv kafka-manager-1.3.3.16 /usr/local/kafka-manager

设置一下环境变量,安装完成。

4、配置kafka-manager

kafka-manager配置文件在/usr/local/kafka-manager/conf中,修改application.conf文件。

[root@kafka-manager conf]# vim application.conf 
#修改kafka-manager.zkhosts配置项为我们自己的zookeeper地址即可。

5、启动kafka-manager

通过./bin/kafka-manager命令可以启动kafka-manager,默认kafka-manager使用的是9000端口,如果需要更改端口,请指定-Dhttp.port

例如:

./bin/kafka-manager -Dconfig.flie=/usr/local/kafka-manager/conf/application.conf -Dhttp.port=8011

启动后可以通过ip:8011访问kafka-manager页面。

关于kafka-manager的使用还是比较简单的,这里不展开说明了。


八、kafak身份以及权限认证

以上配置完成之后即可使用kafka了,但是没有提供任何的权限控制,所有能访问kafka的均可读取写入任意的队列消息,这样非常不安全,在生产环境中我们需要配置认证和权限,本章主要介绍kafka的SASL/PLAIN认证和ACL权限控制。

Kafka 的安全机制主要分为两个部分

  • 身份认证(Authentication):对client 与服务器的连接进行身份认证。
  • 权限控制(Authorization):实现对于TOPIC的权限控制

Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制。可以支持 客户端与brokers之间的认证,可以支持brokers与zookeeper之间的认证。因为SASL/PLAIN认证的用户名密码均是明文传书,所以可以使用SSL加密传输,而ACL基于用户对topic的读写权限进行控制。

在客户端与brokers、brokers与zookeeper之间的认证可以只做客户端与brokers,这并不影响brokers与zookeeper的之间的通讯,当然为了安全我们可以做brokers与zookeeper的认证。

下面开始配置

1、修改server.properties配置文件

在之前的server.properties配置中,我们配置了listeners=PLAINTEXT://172.30.26.221:9092以PLAINTEXT协议监听172.30.26.221的9092端口上提供服务,我们需要修改这一行,修如下:

listeners=SASL_PLAINTEXT://172.30.26.221:9092 #修改监听协议为SASL_PLAINTEXT

security.inter.broker.protocol=SASL_PLAINTEXT #配置安全协议为SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN # 使用PLAIN做broker之间通信的协议
sasl.enabled.mechanisms=PLAIN #启用SASL机制
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer # 配置java认证类
super.users=User:admin #设置超级用户为:admin
allow.everyone.if.no.acl.found=true #如果topic找不到acl配置是否运行操作,true为允许

如上配置中,我们修改了listeners配置,在这个配置中我们配置多个监听,例如listeners=PLAINTEXT://172.30.26.221:9093,SASL_PLAINTEXT://172.30.26.221:9092,只需要注意每一个监听的端口和不重复即可。

super.users配置了一个超级用户,这个超级用户不受ACL的限制可以自由访问任何的TOPIC,通常不对外使用,仅仅做管理使用,一般而言和JAAS配置的username一致。 allow.everyone.if.no.acl.found 配置了在TOPIC上没有找到ACL如何授权,配置true允许操作,配置false不允许操作,此配默认值为false,如果为false,TOPIC必须指定ACL,并且客户端使用指定的用户名才能访问成功。

注意:每一台brokers都需要配置

2、jaas文件配置

在config目录中添加kafka_server_jaas.conf,内容如下:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin@2017"
    user_alice="alice@2017";
};

其中,KafkaServer一定是要KafkaServer,否则启动会报在jaas文件中找不到KafkaServer。 username,password是broker内部使用的账号和密码。 user_alice="alice@2017" 配置了一个帐号为alice,密码为alice@2017的用户,这个帐号提供给客户端连接认证以及ACL权限控制时使用的。

本次配置没有使用SASL做为brokers与zookeeper之间的通信,所以没有配置Client,如果需要使用SASL做为brokers与zookeeper之间的通信,需要配置jaas中的Client。

注意:每一台brokers都需要配置

3、添加kafka_server_jaas.conf到jvm的环境变量

kafka启动时,会运行 bin/kafka-run-class.sh,将变量传给JVM。我们需要修改kafka-run-class.sh,将kafka_server_jaas.conf传递给JVM


KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'
if [ $# -lt 1 ];
then
  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  exit 1
fi

.
.
.

# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS 【$KAFKA_SASL_OPTS】 $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS 【$KAFKA_SASL_OPTS】 $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

如上,我们需要在bin/kafka-run-class.sh中有效配置的第一行添加KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf',kafka_server_jaas.conf的路径根据实际的情况修改。

然后在配置文件最后一段的Launch mode中,添加 $KAFKA_SASL_OPTS即可。

注意:每一台brokers都需要配置
为了方便识别,添加了【】符号,在实际配置中不能出现此符号。

4、重启kafka

配置完成后重启kafka。 在日志中我们可以看到这样的提示:

[2018-05-09 10:58:09,225] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/usr/local/kafka/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

这个提示告诉我们,brokers和Zookeeper通信没有启用SASL,如果Zookeeper服务器允许的话,将继续连接到Zookeeper服务器。

所以即便brokers和Zookeeper通信没有启用SASL,也是可以连接的。

5、ACL的使用

1. kafka提供了一个ACL的功能用来控制TOPIC的权限,权限如下:

权限 说明
READ 读取topic
WRITE 写入topic
DELETE 删除topic
CREATE 创建topic
ALTER 修改topic
DESCRIBE 获取topic的信息
ClusterAction
ALL 所有权限

访问控制列表ACL存储在zk上,路径为/kafka-acl

2. kafka提供了一个bin/kafka-acls.sh脚本来设置权限

Kafka 提供的命令如下表所示:

Option Description Default Option type
–add Indicates to the script that user is trying to add an acl. Action
–remove Indicates to the script that user is trying to remove an acl. Action
–list Indicates to the script that user is trying to list acts. Action
–authorizer Fully qualified class name of the authorizer. kafka.security.auth.SimpleAclAuthorizer Configuration
–authorizer-properties key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 Configuration
–cluster Specifies cluster as resource. Resource
–topic [topic-name] Specifies the topic as resource. Resource
–group [group-name] Specifies the consumer-group as resource. Resource
–allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allow permission. You can specify multiple –allow-principal in a single command. Principal
–deny-principal Principal is in PrincipalType:name format that will be added to ACL with Deny permission. You can specify multiple –deny-principal in a single command. Principal
–allow-host IP address from which principals listed in –allow-principal will have access. if –allow-principal is specified defaults to * which translates to “all hosts” Host
–deny-host IP address from which principals listed in –deny-principal will be denied access. if –deny-principal is specified defaults to * which translates to “all hosts” Host
–operation Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All All Operation
–producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster. Convenience
–consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. Convenience

3. 权限设置

通过几个例子介绍一下如何进行权限设置。

add 操作

# 为用户 alice 在 test(topic)上添加读写的权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --add --allow-principal User:alice --operation Read --operation Write --topic test
# 对于 topic 为 test 的消息队列,拒绝来自 ip 为198.51.100.3账户为 BadBob  进行 read 操作,其他用户都允许
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic test
# 为bob 和 alice 添加all,以允许来自 ip 为198.51.100.0或者198.51.100.1的读写请求
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --add --allow-principal User:bob --allow-principal User:alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

list 操作

# 列出 topic 为 test 的所有权限账户
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --list --topic test

输出信息为:

Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Describe from hosts: *
    User:alice has Allow permission for operations: Read from hosts: *
    User:alice has Allow permission for operations: Write from hosts: *

remove 操作

# 移除 acl
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

producer 和 consumer 的操作

# producer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --add --allow-principal User:alice --producer --topic test
#consumer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zk1:2181/kafka_test10 --add 

其他具体的操作请参考官方文档。


九、消费者和生产者示例

kafka支持多种语言,我们以python为例。 python使用kafka是需要安装kafka的扩展,通过pip install kafka命令安装。

1、生产者

#!/usr/bin/python
# -*- coding: utf-8 -*-

import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = ["172.30.26.221:9092"],security_protocol="SASL_PLAINTEXT",sasl_mechanism="PLAIN",sasl_plain_username="alice",sasl_plain_password="alice@2017")
topic = 'test'

for i in range(10):
    message=str(time.time())
    producer.send("test",message)
    print(str(i),message)

运行生产者,结果如下:

[root@kafka ~]# python py-KafkaProducer.py 
('0', '1525658172.98')
('1', '1525658172.98')
('2', '1525658172.98')
('3', '1525658172.98')
('4', '1525658172.98')
('5', '1525658172.98')
('6', '1525658172.98')
('7', '1525658172.98')
('8', '1525658172.98')
('9', '1525658172.98')

2、消费者

#!/usr/bin/python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers = ['172.30.26.221:9092'],security_protocol="SASL_PLAINTEXT",sasl_mechanism="PLAIN",sasl_plain_username="alice",sasl_plain_password="alice@2017")
for msg in consumer:
    print msg

运行消费者结果如下:

[root@kafka ~]# python py-KafkaConsumer.py 
ConsumerRecord(topic=u'test', partition=6, offset=18603, timestamp=1525658573162, timestamp_type=0, key=None, value='1525658573.16', checksum=-484640801, serialized_key_size=-1, serialized_value_size=13)
ConsumerRecord(topic=u'test', partition=6, offset=18604, timestamp=1525658573163, timestamp_type=0, key=None, value='1525658573.16', checksum=-197881057, serialized_key_size=-1, serialized_value_size=13)
ConsumerRecord(topic=u'test', partition=6, offset=18605, timestamp=1525658573163, timestamp_type=0, key=None, value='1525658573.16', checksum=-197881057, serialized_key_size=-1, serialized_value_size=13)
ConsumerRecord(topic=u'test', partition=6, offset=18606, timestamp=1525658573164, timestamp_type=0, key=None, value='1525658573.16', checksum=-1846694561, serialized_key_size=-1, serialized_value_size=13)
ConsumerRecord(topic=u'test', partition=6, offset=18607, timestamp=1525658573175, timestamp_type=0, key=None, value='1525658573.17', checksum=-819569464, serialized_key_size=-1, serialized_value_size=13)

从结果上看,kafka返回了很多信息,包括topic、value、时间戳、partition、offset等等信息。

以上是简单的python示例,更高级的应用请参考官方文档。

这里如果配置了ACL,服务器验证用户名通过过即可读取写入,如果没有配置ACL,也没有配置allow.everyone.if.no.acl.found=true将会被拒绝。

3、客户端一些问题

kafka中有消费者和消费组的概念,但是这两个均不在kafka服务器上配置,而和客户端相关。

一、关于消费者和生产者

kafka中,每一个消费者进程都是一个消费者,而消费组需要在消费者代码中进行指定,例如以下代码通过group_id指定了一个消费组:

consumer = KafkaConsumer('test', group_id='kafka_gid',bootstrap_servers = ['172.30.26.221:9092'],security_protocol="SASL_PLAINTEXT",sasl_mechanism="PLAIN",sasl_plain_username="alice",sasl_plain_password="alice@2017")

而生产者就没有这个概念,每一个生产进程都是一个生产者,均可写入有权限的队列中。

二、关于TOPIC

生产者向kafka写入一个不存在的TOPIC,那么kafka会自动在服务器上次创建这个TOPIC(前提条件是有权限),创建出的TOPIC的partitions、replication会使用配置文件配置的默认值。