kafka集群部署

下载安装包

http://kafka.apache.org/downloads.html

我下载的版本是 kafka_2.12-2.0.0.tgz

解压安装包

1
2
3
$ tar -zxvf /export/software/ kafka_2.12-2.0.0.tgz -C /export/servers/
$ cd /export/servers/
$ ln -s kafka_2.12-2.0.0 kafka

修改配置文件

1
2
$ cp /export/servers/kafka/config/server.properties /export/servers/kafka/config/server.properties.bak
$ vi /export/servers/kafka/config/server.properties

参考的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

// broker的全局唯一标识

broker.id=0

// 用来监听链接的端口,producer或consumer将在此端口建立链接

port=9092

// 如果不配 使用终端broker list 发送数据会报错
listeners=PLAINTEXT://192.168.1.10:9092

// 处理网络请求的线程数量

num.network.threads=3

// 处理磁盘io的线程数量

num.io.threads=8

// 发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

// 接收套接字的缓冲区域大小

socket.receive.buffer.bytes=102400

// 请求套接字的缓冲区大小

socket.request.max.bytes=104857600

// 日志存放目录

log.dirs=/export/servers/logs/kafka

// topic在当前broker上的分片数

num.partitions=2

// 用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

// segment文件保留的最长时间,超时将会被删除

offsets.topic.replication.factor=1

// 滚动生成新的segment文件的最大时间

log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

// zokeeper的链接端口

zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

# false时执行topic delete 时进行逻辑删除
delete.topic.enable=true

分发安装包

1
2
3
4
5
6
7

$ scp -r /export/servers/ kafka_2.12-2.0.0 kafka02:/export/servers

// 然后分别在各机器上创建软连

$ cd /export/servers/
$ ln -s kafka_2.12-2.0.0 kafka

再次修改配置文件依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。

运行下面指令启动kafka集群

1
$ bin/kafka-server-start.sh  config/server.properties

server-start.sh config/server.properties

1
2
3
4
5
6
7

### Kafka常用操作命令

查看当前服务器中的所有topic

```shell
$ bin/kafka-topics.sh --list --zookeeper zk01:2181

创建topic

1
$ ./kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 3 --topic first

删除topic

1
$ sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

通过shell命令发送消息

1
$ kafka-console-producer.sh --broker-list 192.168.1.12:9092 --topic first

image

如果报错参考文章最下面的报错信息

通过shell消费消息

1
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --from-beginning --topic first

image

查看消费位置

1
$ sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

查看某个Topic的详情

1
$ sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181

困难记录

运行bin/kafka-console-producer.sh --broker-list cor3:9092 --topic first发送消息时

错误:ERROR Error when sending message to topic first with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for first-0: 1625 ms has passed since batch creation plus linger time

解决办法:

修改配置文件中

1
listeners=PLAINTEXT://192.168.1.11:9092

运行

1
bin/kafka-console-producer.sh --broker-list 192.168.1.11:9092 --topic first

参考blog https://blog.csdn.net/lvbinibnsb/article/details/81542893