分布式高性能消息系统(Kafka MQ)的原理与实践
2016年04月18日

一、关于Kafka的一些概念和理解


Kafka是一个分布式的数据流平台,它基于独特日志文件形式,提供了高性能消息系统功能。也可以用于大数据流管道。


Kafka维护了按目录划分的消息订阅源,称之为 Topic

称发布消息到Topic的工程为生产者

称订阅Topic和处理发布的消息的订阅源的工程为消费者

Kafka以一个或者多个服务器组成的集群的形式运行,每个服务器被称为broker


Kafka客户端和服务器端通过TCP协议连接,并提供了Java客户端,许多其他语言的客户端也有。


对于每个Topic,Kafka集群维护了分区的日志文件(分区1、分区2、分区3),每个分区(partition)是顺序的、不可改变的、一直不停地往后面追加的消息队列,称之为提交日志(commit log),每个在其中的消息都有一个称之为offset的序列号,来唯一的标识在分区里的每条消息。


Kafka集群保存了所有发布的消息,不管他们是否被消费,保存时间期限是可以配置的。Kafka对于性能表现对于数据的数量是恒定的,所以它处理大数据量没有任何问题。


消息系统通常有两个模型:排队模式和广播模式,排队模式是许多消费者同时去服务器争夺数据,但是一条数据只分发给一个消费者,广播模式是消息广播给所有消费者,每个消费者都可以拿到消息。Kafka通过consumer group统一概括了这两种模式。


消费者们都给自己定了一个group name(id) 的标签,每条发布到topic的消息都会发给每个订阅的consumer group里面的一个且仅一个成员。consumers可以分布在不同的进程或者服务器上。



message、partition和consumer的关系

1、message按一定hash逻辑分发到topic的某个partition;

2、一个consumer可以连接多个partition;

3、所有partition都会有consumer线程去连接,这个consumer的分配是自动的,无法指定某个consumer连接哪一个partition;

4、consumer连接的partitions是固定的,不会中途自动变更,比如consumer1连接的是partition1和partition3,consumer2连接的是partition2,这个分配中途不会自己变化。

5、consumer如果多于partition数,则多余的那部分consumer会连不到partition而空闲。



Kafka服务器常用脚本命令


启动kafka:

bin/kafka-server-start.sh config/server.properties &


停止kafka:

bin/kafka-server-stop.sh


1、Topic操作

创建topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic TEST2

删除topic:

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicname

查看所有topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看某个topic详情:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

修改topic:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic TEST2 --partitions 2


2、消费消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning


3、生产消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

按ctrl+c结束(^C)


consumer_group

1、查看有哪些consumer groups

./kafka-consumer-groups.sh --bootstrap-server 172.16.1.170:9092,172.16.1.171:9092,172.16.172:9092 --list --new-consumer

2、查看指定consumer groups的消费情况(可以看到topic的offset)

./kafka-consumer-groups.sh --bootstrap-server 172.16.1.170:9092,172.16.1.171:9092,172.16.172:9092 --describe --group PushConsumer_qAbA7b --new-consumer

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
ztest-group, ZTEST2, 6, 4987, 4987, 0, consumer-7_/172.19.15.113
ztest-group, ZTEST2, 0, 4876, 4936, 60, consumer-1_/172.19.15.113
ztest-group, ZTEST2, 3, 5008, 5062, 54, consumer-4_/172.19.15.113
ztest-group, ZTEST2, 4, 4963, 4992, 29, consumer-5_/172.19.15.113
ztest-group, ZTEST2, 1, 4900, 4949, 49, consumer-2_/172.19.15.113
ztest-group, ZTEST2, 2, 5046, 5046, 0, consumer-3_/172.19.15.113
ztest-group, ZTEST2, 7, 5051, 5051, 0, consumer-8_/172.19.15.113
ztest-group, ZTEST2, 5, 5010, 5010, 0, consumer-6_/172.19.15.113


参考官方文档如下:

Managing Consumer Groups

With the ConsumerGroupCommand tool, we can list, delete, or describe consumer groups. For example, to list all consumer groups across all topics:

 > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

test-consumer-group

To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:

 > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group test-consumer-group

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group            test-foo                       0          1               3               2               test-consumer-group_postamac.local-1456198719410-29ccd54f-0

When you're using the new consumer API where the broker handles coordination of partition handling and rebalance, you can manage the groups with the "--new-consumer" flags:

 > bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server broker1:9092 --list


Checking consumer position

Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this:

 > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

Note, however, after 0.9.0, the kafka.tools.ConsumerOffsetChecker tool is deprecated and you should use the kafka.admin.ConsumerGroupCommand (or the bin/kafka-consumer-groups.sh script) to manage consumer groups, including consumers created with the new consumer API.


查看topic的最大和最小offset

bin/kafka-run-class.sh kafka.tools.GetOffsetShell



官方文档:

1、官方网站http://kafka.apache.org/documentation

2、官方WIKIhttps://cwiki.apache.org/confluence/display/KAFKA/Index

3、issues情况(JIRA):https://issues.apache.org/jira/browse/KAFKA


Kafka集群配置

kafka集群配置非常简单,在不同服务器上的kafka server只要连接同一个zookeeper就可以组成集群

在server.properties配置 zookeeper.connect=172.16.1.6:2181,172.16.1.7:2181,172.16.1.8:2181

实例配置如下(kafka 0.9版本),供参考:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

listeners=PLAINTEXT://:9092

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=172.16.1.170

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=172.16.1.170

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# add by zollty
num.partitions=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
# use 2 factors add by zollty
default.replication.factor=2
############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.16.1.6:2181,172.16.1.7:2181,172.16.1.8:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#############################################
delete.topic.enable=true


Kafka 服务器生产配置

num.network.threads=3-8

queued.max.requests=500-16

fetch.purgatory.purge.interval.requests=1000-100

producer.purgatory.purge.interval.requests=1000-100

num.replica.fetchers=1-4

default.replication.factor=1-3

replication.factor=1-3

controlled.shutdown.enable=true

另外:

From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities. LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version. LinkedIn's tuning looks like this:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC

-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M

-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80