2. kafka安装
这篇文章主要讲解kafka单机安装,以及集群安装。kafka的单机安装非常简单,只需按照下一段落提到的几步操作即可。
单机安装
- 下载
首先从官方下载安装包,官方地址:http://kafka.apache.org/downloads
。kafka安装包和一般安装包的命名方式不一样,我们看一个kafka包命名:kafka_2.11-1.1.0.tgz
,其中2.11是scala的版本,1.1.0才是kafka的版本。官方强烈建议scala版本和服务器上的scala版本保持一致,避免引发一些不可预知的问题。官方原文如下:
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended). - 解压
将安装包放到服务器上某个位置,执行如下命令:
[afei@kafka opt]$ tar -xzf kafka_2.11-1.1.0.tgz
[afei@kafka opt]$ cd kafka_2.11-1.1.0
- 启动zk
kafka依赖zookeeper,zookeeper集群可以自己搭建,也可以用kafka安装包中内置的shell脚本启动zookeeper。
如果已经安装了zk单机或者zk集群(单机或者集群都可以,取决于你对高可用的要求),那么可以跳过这一步:
[afei@kafka kafka_2.11-1.1.0]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-06-21 15:10:14,449] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-06-21 15:10:14,466] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-06-21 15:10:14,466] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-06-21 15:10:14,467] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
... ...
这种启动方式如果执行ctrl+z后会退出,启动的进程也会退出,所以建议增加
-daemon
参数启动:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
说明:笔者后面的测试,假设在10.0.55.209上部署了独立zk,而不是通过kafka安装包中的脚本部署zk,且端口为默认端口2181。
- 配置
启动kafka之前,需要确认一些配置信息,kafka的配置信息在config/server.properties文件中,笔者配置kafka直接显示指定绑定的ip为10.0.55.229
,所以需要增加如下一下配置:
host.name=10.0.55.229
port=9092
listeners=PLAINTEXT://10.0.55.229:9092
advertised.listeners=PLAINTEXT://10.0.55.229:9092
- 启动
执行如下脚本即可:
[afei@izwz91rhzhed2c54e6yx87z kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[2018-06-21 15:15:41,830] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-06-21 15:15:42,373] INFO starting (kafka.server.KafkaServer)
[2018-06-21 15:15:42,374] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
... ...
这种启动方式如果执行ctrl+z后会退出,启动的进程也会退出,所以建议增加
-daemon
参数启动:
bin/kafka-server-start.sh -daemon config/server.properties
- -daemon运行原理
为什么启动kafka时增加-daemon
参数就能实现后台进程方式启动呢?我们首先看一下kafka-server-start.sh
最后一行:exec base_dir/kafka-run-class.shEXTRA_ARGS kafka.Kafka "$@"
,其本质是调用了kafka-run-class.sh
,这个脚本的部分代码如下,DAEMON_MODE为true,就能通过nohup ... &
方式启动,而DAEMON_MODE为true的前提是COMMAND 为-daemon,COMMAND就是第一个参数:COMMAND=$1
,所以启动脚本增加-daemon
就能实现后台进程方式启动:
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
GC_LOG_ENABLED="true"
fi
shift
;;
-daemon)
DAEMON_MODE="true"
shift
;;
*)
break
;;
esac
done
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
- 发送消息
发送消息之前,先创建topic(kafka能自动创建topic,通过配置auto.create.topics.enable
决定,但是一般生产环境建议关闭该特性):
-- 创建topic
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper 10.0.55.209:2181 --replication-factor 1 --partitions 1 --topic TOPIC-TEST-AFEI
Created topic "TOPIC-TEST-AFEI".
-- 查看所有创建的topic集合(结果中包含了刚刚创建的名为TOPIC-TEST-AFEI的topic)
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --list --zookeeper 10.0.55.209:2181
TOPIC-TEST-AFEI
使用kafka提供的kafka-console-producer.sh
发送消息:
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-console-producer.sh --broker-list 10.0.55.229:9092 --topic TOPIC-TEST-AFEI
>hello+world
>my+name+is+afei
- 接收消息
使用kafka提供的kafka-console-consumer.sh
接收消息:
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server 10.0.55.229:9092 --topic TOPIC-TEST-AFEI --from-beginning
firt
hello+world
my+name+is+afei
集群安装
单机版kafka安装并可以成功发送&接收消息后,搭建kafka集群就比较简单了。
- 配置文件
首先基于conf目录下的server.propreties文件,复制两个文件server-1.propreties,server-2.propreties
server-1.propreties做如下变更:
broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1
server-2.properties做类似的变更:
broker.id=2
port=9094
log.dirs=/tmp/kafka-logs-2
由于笔者将三个kafka broker放在一台linux服务器上,所以
port
不能一样(如果每个broker一台服务器,那么port可以一致,生产环节肯定是每个broker部署在一台独立的服务器上)。但是无论三个broker是否在一台服务器上,broker.id
一定是整个集群下全局唯一。否则无法正常启动broker。其原理是在zk上以broker.id的值写入对应的节点,如果写入失败,broker启动就失败。所以整个集群的broker.id要保证全局唯一。
- 启动kafka
执行如下两个命令,分别再启动两个kafka broker:
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
- 集群验证
由于总计部署了3个broker,所以创建topic时能指定--replication-factor 3
,如果不能指定,说明集群部署有问题:
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper 10.0.55.209:2181 --replication-factor 3 --partitions 5 --topic TOPIC-TEST-CLUSTER
Created topic "TOPIC-TEST-CLUSTER".
查看刚才创建的topic详情,可以看到总计5个分区(PartitionCount),且副本数为3(ReplicationFactor),且每个分区上有3个副本(通过Replicas的值可以得出),另外最后一列Isr(In-Sync Replicas)即表示处理同步状态的副本集合,这些副本与leader副本保持同步,没有任何同步延迟,另外Leader,Replicas Isr中的0,1,2就是broker id,对应配置文件conf/server.properties中的broker.id
:
[afei@kafka kafka_2.11-1.1.0]$ bin/kafka-topics.sh --zookeeper 10.0.55.209:2181 --topic TOPIC-TEST-CLUSTER --describe
Topic:TOPIC-TEST-CLUSTER PartitionCount:5 ReplicationFactor:3 Configs:
Topic: TOPIC-TEST-CLUSTER Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: TOPIC-TEST-CLUSTER Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: TOPIC-TEST-CLUSTER Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: TOPIC-TEST-CLUSTER Partition: 3 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: TOPIC-TEST-CLUSTER Partition: 4 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
作者:阿飞的博客
来源:https://www.jianshu.com/p/f3a327ab2998
看完两件小事
如果你觉得这篇文章对你挺有启发,我想请你帮我两个小忙:
- 把这篇文章分享给你的朋友 / 交流群,让更多的人看到,一起进步,一起成长!
- 关注公众号 「方志朋」,公众号后台回复「666」 免费领取我精心整理的进阶资源教程