22 kafka分布式消息队列
22.1 概述
Kafka是由LinkedIn开发的一个分布式的消息系统。它是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper的协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比,KafKa能够很好的处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。
Kafka使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。
22.1.1 特性
1)以时间复杂度为O(1)的方式(顺序读写)提供消息持久化能力,对TB级以上数据也能保证正常时间复杂度的访问性能;
2)高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100000条以上消息的传输;
3)支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输;
4)同时支持离线数据处理(因为消息可以保存默认是168小时)和实时数据处理;
5)支持在线水平扩展;
22.1.2 为什么使用消息队列
解耦、扩展性、缓冲、灵活性 & 峰值处理能力、顺序保证、异步通信、冗余、可恢复性
22.2 基本的kafka系统术语
broker**:**
Kafka以集群方式运行,集群中每个服务器称为broker;
主题(Topic)**:**
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic;
分区(Partition)**:**
topic中的数据分割为一个或多个partition,每个topic至少有一个partition;
partition中的数据是有序的,如果topic有多个partition,消费数据时就不能保证数据的顺序;
生产者(Producer)**:**
生产者即数据的发布者,该角色将消息发布到Kafka的topic中;
消费者(Consumer)**:**
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
22.3 体系结构
集群有4个broker,主题有4个分区,每个分区有两个副本,一个leader角色,一个Follower角色。
KafKa利用ZooKeeper保存相应元数据信息,KafKa元数据包括如broker节点信息、KafKa集群信息、旧版消费者信息以及消费偏移量信息offset、topic信息、分区状态信息、分区副本分配方案信息、动态配置信息等。KafKa在启动或者运行过程中会在ZooKeeper上创建相关的节点来保存元数据信息,KafKa通过监听机制在这些节点注册相应监听器来监听节点元素据的变化,从而由ZooKeeper负责管理维护KafKa集群,同时通过ZooKeeper我们能够很方便的对KafKa集群进行水平扩展以及数据迁移;
22.4 基本原理
22.4.1 分区与副本
每个topic 有一个或多个分区,每个分区在物理上对应一个文件夹,分区命令规则:主题名称—分区编号,(分区编号从0开始)
任何发布到分区的消息会直接追加到日志文件(分区目录下以.log为文件名后缀的数据文件)的尾部,而每条消息在日志中的位置都会对应一个按顺序递增的偏移量。偏移量是一个分区下严格有序的逻辑值,它并不表示消息在磁盘上的物理位置。
为了能快速查找到对应offset的数据,需要两个索引文件:
.index 结尾:根据有序的offset进行查找;
.timeindex 结尾: 根据有序的时间戳进行查找;
每个分区有一到多个副本。其中有一个是leader角色,其他是Follower角色。
leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。每个broker自己所管理的partition的可以是leader,同时又是其他broker所管理partitions的followers,kafka通过这种方式来达到负载均衡。
一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。
分区能增,不能减;
副本能增,也能减;
22.4.2 消息生产
Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区
主要两种方式:
1)round-robin做简单的负载均;
2)根据消息中的某一个关键字来进行区分;
Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面。
22.4.3 消费与消费者组
Kafka提供了一种consumer的抽象概念:Consumer Group。
一个消费组可以有多个消费实例,这个实例可以是进程,也可以是线程。
同一Topic 的一条消息,只能被同一个 Consumer Group 的一个 Consumer 消费;
但 多个Consumer Group可同时消费这一消息。
在一个Consumer Group内
1)如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数;
2)如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀,最好partiton数目是consumer数目的整数倍;
3)如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有所不同;
消费者多于partition
消费者少于和等于partition
多个消费者组
每个消费者组的某一个消费实例能消费同一同一个消息。
22.4.4 消费顺序
Kafka通过Topic中partition概念实现并行消费;
Kafka可以同时提供顺序性保证和多个consumer同时消费时的负载均衡;
实现的原理是通过将一个topic中的每个partition分配给一个consumer group中的不同consumer实例;
通过这种方式:
Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性;
22.4.5 kafka优化
kafka同hdfs一样实现了软件raid,支持多磁盘并且不做raid,就是为了充分利用多磁盘并发读写,又保证每个磁盘连续读写的特性。
合理设置topic的partition数量,保证并发度。
Jvm**内存不宜过大4G**左右即可
建议JVM Heap不要太大,在4GB以内就可以。JVM太大,导致Full GC产生的“stop the world”时间过长,导致broker和zk之间的session超时。
JVM也不能过小,否则会导致频繁地触发gc操作,也影响Kafka的吞吐量。
如何为Kafka**集群选择合适的Topic/Partitions**数量
1)越多的分区可以提供更高的吞吐量,因为一个Partitions可以对应一个Consumer,Partitions越多吞吐量越高,也就是Consumer会并行读取Partitions。
2)分区多了会消耗更多的系统资源比如文件的句柄,broker会为每个Partitions分配一个文件目录,目录中分索引文件和数据文件。所以分区多了需要打开的文件自然就多了。
3)更多分区会导致更高的不可用性,在有副本的情况下更多的分区会导致更多的复制操作,在brokers宕机时也需要恢复更多的Partition。
4)越多的分区可能增加端对端的延时,因为producer发布消息后consumer需要等待所有partition完成复本复制之后才能得到该消息,分区越多说明复制所花费的时间越长,因为从一个broker到别一个broker复制数据是由一个线程来完成的。
5)越多的Partition意味着需要客户端分配更多的内存,比如一个topic有2000个分区,当只有一个consumer时,则这个consumer需要在内存中记住这2000个分区信息。
日志保留策略设置
log.retention.hours:日志保留时间,默认7天;
log.segment.bytes:段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(不需要扫描过多的小文件);
文件刷盘策略
为了大幅度提高producer写入吞吐量,需要定期批量写文件。建议配置:
log.flush.interval.messages=10000(每当producer写入10000条消息时,刷数据到磁盘)
log.flush.interval.ms=1000(每间隔1秒钟时间,刷数据到磁盘)
网络和io**操作线程配置优化**
一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1。
num.network.threads=xxx (broker处理消息的最大线程数)
num.io.threads(主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍)
queued.max.requests=5000 (加入队列的最大请求数,超过该值,network thread阻塞)
socket.send.buffer.bytes=1024000 (server使用的send buffer大小)
socket.receive.buffer.bytes=1024000 (server使用的receive buffer大小)
配置参数:
参数 | 说明(**解释)** |
---|---|
broker.id =0 | 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 |
log.dirs=/data/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
port =9092 | broker server服务端口 |
message.max.bytes =6525000 | 表示消息体的最大大小,单位是字节 |
num.network.threads =4 | broker处理消息的最大线程数,一般情况下数量为cpu核数 |
num.io.threads =8 | broker处理磁盘IO的线程数,数值为cpu核数2倍 |
background.threads =4 | 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 |
queued.max.requests =500 | 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。 |
host.name | broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 |
socket.send.buffer.bytes=100*1024 | socket的发送缓冲区,socket的调优参数SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 | socket的接受缓冲区,socket的调优参数SO_RCVBUFF |
socket.request.max.bytes =10010241024 | socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 |
log.segment.bytes =102410241024 | topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 |
log.roll.hours =24*7 | 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖 |
log.cleanup.policy = delete | 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 |
log.retention.minutes=300或log.retention.hours=24 | 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除 有2删除数据文件方式:按照文件大小删除:log.retention.bytes 按照2中不同时间粒度删除:分别为分钟,小时 |
log.retention.bytes=-1 | topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 |
log.retention.check.interval.ms=5minutes | 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略 |
log.cleaner.enable=false | 是否开启日志清理 |
log.cleaner.threads = 2 | 日志清理运行的线程数 |
log.cleaner.io.max.bytes.per.second=None | 日志清理时候处理的最大大小 |
log.cleaner.dedupe.buffer.size=50010241024 | 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好 |
log.cleaner.io.buffer.size=512*1024 | 日志清理时候用到的IO块大小一般不需要修改 |
log.cleaner.io.buffer.load.factor =0.9 | 日志清理中hash表的扩大因子一般不需要修改 |
log.cleaner.backoff.ms =15000 | 检查是否处罚日志清理的间隔 |
log.cleaner.min.cleanable.ratio=0.5 | 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖 |
log.cleaner.delete.retention.ms =1day | 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖 |
log.index.size.max.bytes =1010241024 | 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖 |
log.index.interval.bytes =4096 | 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数 |
log.flush.interval.messages=None例如log.flush.interval.messages=1000表示每当消息记录数达到1000时flush一次数据到磁盘 | log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性“的必要手段,所以此参数的设置,需要在"**数据可靠性**"与”性能“之间做必要的权衡.如果此值过大,将会导致每次”fsync"的时间较长(IO阻塞),如果此值过小,将会导致“fsync”的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失. |
log.flush.scheduler.interval.ms =3000 | 检查是否需要固化到硬盘的时间间隔 |
log.flush.interval.ms = None例如:log.flush.interval.ms=1000表示每间隔1000毫秒flush一次数据到磁盘 | 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制“fsync”的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. |
log.delete.delay.ms =60000 | 文件在索引中清除后保留的时间一般不需要去修改 |
log.flush.offset.checkpoint.interval.ms =60000 | 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改 |
auto.create.topics.enable =true | 是否允许自动创建topic,若是false,就需要通过命令创建topic |
default.replication.factor =1 | 是否允许自动创建topic,若是false,就需要通过命令创建topic |
num.partitions =1 | 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 |
以下是kafka中Leader,replicas配置参数 | |
controller.socket.timeout.ms =30000 | partition leader与replicas之间通讯时,socket的超时时间 |
controller.message.queue.size=10 | partition leader与replicas数据同步时,消息的队列尺寸 |
replica.lag.time.max.ms =10000 | 如果leader发现flower超过10秒没有向它发起fech请求,那么leader考虑这个flower是不是程序出了点问题或者资源紧张调度不过来,它太慢了,不希望它拖慢后面的进度,就把它从ISR中移除。 |
replica.lag.max.messages =4000 | 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效,相差4000条就移除##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移##到其他follower中.##在broker数量较少,或者网络不足的环境中,建议提高此值. |
replica.socket.timeout.ms=30*1000 | follower与leader之间的socket超时时间 |
replica.socket.receive.buffer.bytes=64*1024 | leader复制时候的socket缓存大小 |
replica.fetch.max.bytes =1024*1024 | replicas每次获取数据的最大大小 |
replica.fetch.wait.max.ms =500 | replicas同leader之间通信的最大等待时间,失败了会重试 |
replica.fetch.min.bytes =1 | fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件 |
num.replica.fetchers=1 | leader进行复制的线程数,增大这个数值会增加follower的IO |
replica.high.watermark.checkpoint.interval.ms =5000 | 每个replica检查是否将最高水位进行固化的频率 |
controlled.shutdown.enable =false | 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker |
controlled.shutdown.max.retries =3 | 控制器关闭的尝试次数 |
controlled.shutdown.retry.backoff.ms =5000 | 每次关闭尝试的时间间隔 |
leader.imbalance.per.broker.percentage =10 | leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡 |
leader.imbalance.check.interval.seconds =300 | 检查leader是否不平衡的时间间隔 |
offset.metadata.max.bytes | 客户端保留offset信息的最大空间大小 |
kafka中zookeeper参数配置 | |
zookeeper.connect = localhost:2181 | zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3 |
zookeeper.session.timeout.ms=6000 | ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 |
zookeeper.connection.timeout.ms =6000 | ZooKeeper的连接超时时间 |
zookeeper.sync.time.ms =2000 | ZooKeeper集群中leader和follower之间的同步实际那 |
22.5 kafka安装
安装规划:
kafka 是集群需要 zookeeper,在zookeeper 集群对应的节点上 安装kafka集群。
worker-1
kafka安装
1)添加服务
2)选择kafka
3)选择kafka Broker安装的主机
4)修改配置
5)等待安装
创建topic
/opt/cloudera/parcels/CDH/bin/kafka-topics --zookeeper worker-1:2181 --create --replication-factor 1 --partitions 1 --topic hainiutest
这里指定了1个副本,1个分区,topic名为hainiutest,并且指定zookeeper地址
查看zk**中的信息**
/opt/cloudera/parcels/CDH/lib/zookeeper/bin/zkCli.sh
查看已有的topic
/opt/cloudera/parcels/CDH/bin/kafka-topics --list --zookeeper worker-1:2181
查看topic**的详情**
/opt/cloudera/parcels/CDH/bin/kafka-topics --describe --zookeeper worker-1:2181 --topic hainiu_test
启动一个生产者,用于生产数据放入kafka
/opt/cloudera/parcels/CDH/bin/kafka-console-producer --broker-list worker-1:9092 --topic hainiu_test
启动一个消费者,用于从kafka 里取出数据消费
/opt/cloudera/parcels/CDH/bin/kafka-console-consumer --bootstrap-server worker-1:9092 --topic test --from-beginning #消费完的offset保存在kafka中
/opt/cloudera/parcels/CDH/bin/kafka-console-consumer --bootstrap-server worker-1:9092 --topic test --group group1 #启动一个消费者,消费者的组id为group1
查看consumer group**列表,使用--list参数**
kafka-consumer-groups --bootstrap-server worker-1:9092 --list
查看特定consumer group 详情,使用-group**与-describe参数**
kafka-consumer-groups --bootstrap-server worker-1:9092 -group group1 -describe
消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id
修改Partitions**只能增加(扩容)**
当增加新分区后,需要重启producer 才能识别新增加的分区。
/opt/cloudera/parcels/CDH/bin/kafka-topics -alter --zookeeper worker-1:2181 --topic hainiu_test -partitions 3
当producer生产数据,写不往新分区里写
当重启producer 后,可以往新分区里写
修改备份数量
因为单节点问题,所以没法演示,后期再学
/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper worker-1:2181 -reassignment-json-file /home/hadoop/topic.json -execute
查看topic的详情
/opt/cloudera/parcels/CDH/bin/kafka-topics -describe --zookeeper worker-1:2181 --topic hainiu_test
验证修改后的结果是否与json文件中描述的格式一致
/opt/cloudera/parcels/CDH/bin/kafka-reassign-partitions.sh --zookeeper worker-1:2181 -reassignment-json-file /home/hadoop/topic.json -verify
用于修改备份数量创建的topic.json文件格式
使用下面这个json
{"partitions":[{"topic":"hainiu_test","partition":0,"replicas":[0,1,2]},{"topic":"hainiu_test","partition":1,"replicas":[0,1,2]},{"topic":"hainiu_test","partition":2,"replicas":[0,1,2]}],"version":1}
格式化后:
{
"partitions":
[
{
"topic": "hainiu_test",
"partition": 0,
"replicas": [0,1,2] //副本所在brokerID,就是在server.properties中配置的broker.id
},
{
"topic": "hainiu_test",
"partition": 1,
"replicas": [0,1,2]
},
{
"topic": "hainiu_test",
"partition": 2,
"replicas": [0,1,2]
}
],
"version":1
}
删除topic
/opt/cloudera/parcels/CDH/bin/kafka--topics -delete --zookeeper worker-1:2181 --topic hainiu_test
手动永久删除topic**:**
删除kafka存储目录(server.properties文件log.dirs配置,默认为“/data/kafka-logs”)相关topic目录
删除zookeeper “/brokers/topics/”目录下相关topic节点
停止kafka**服务**
./ssh_all_zookeeper.sh /usr/local/kafka/bin/kafka-server-stop.sh
22.6 kafka-manager的安装与使用
安装在worker-1
通过kafka-manager 来管理 kafka。
1)先解压到/usr/local目录下
2)创建软链接
ln -s /usr/local/kafka-manager-1.3.3.7 /usr/local/kafka-manager
4)修改配置文件
vim /usr/local/kafka-manager/conf/application.conf
5)创建日志目录:mkdir -p /usr/local/kafka-manager/logs/
6)启动
指定配置启动并指定端口为9999
nohup /usr/local/kafka-manager/bin/kafka-manager -Dconfig.file=/usr/local/kafka-manager/conf/application.conf -Dhttp.port=9999 > /usr/local/kafka-manager/logs/kafka-manager.log 2>&1 &
6).使用参考
http://blog.csdn.net/isea533/article/details/73727485
7).海牛kafka-manager服务访问地址 http://worker-1:9999
22.7 kafka开发之通过zookeeper消费kafka数据
22.7.1 java版
//--导包
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1-cdh6.3.2</version>
</dependency>
</dependencies>
//producer
@Test
public void putdata() throws InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//ack模式,all是最慢但最安全的
props.put(ProducerConfig.ACKS_CONFIG,"-1");
//失败重试次数
props.put(ProducerConfig.RETRIES_CONFIG, "3");
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10");
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
//序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("hainiudemo1", Integer.toString(i), "dd:"+i));
Thread.sleep(10);
producer.close();
}
//下面的例子是一个consumer 消费 kafka topic 的所有分区的数据
@Test
public void autoCommit() {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
//开启offset自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交时间间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("hainiudemo"));
//死循环不停的从broker中拿数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s, partition=%d%n", record.offset(), record.key(), record.value(),record.partition());
}
}
//--下面的例子是一个生产者生产一个自定义的对象 消费者消费的所有分区的数据
public class MyHainiuSerializer implements Serializer<Hainiu>{
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Hainiu data) {
return data.getName().getBytes();
}
@Override
public void close() {
}
}
class Hainiu{
public Hainiu() {
}
public Hainiu(String name) {
this.name = name;
}
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "name: "+name;
}
}
//--定义生产者和消费者
public class HainiuKafkaProducerWithSeri {
@Test
public void putdata() throws InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//ack模式,all是最慢但最安全的
props.put(ProducerConfig. ACKS_CONFIG, "all");
//失败重试次数
props.put(ProducerConfig.RETRIES_CONFIG, "3");
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10");
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
//序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//--设置自动以序列化类
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.hainiu.MyHainiuSerializer");
Producer<String, Hainiu> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++){
Hainiu hainiu=new Hainiu();
hainiu.setName("hainiu_"+i);
producer.send(new ProducerRecord<String, Hainiu>("hainiudemo1",hainiu));
Thread.sleep(10);
}
}
//--定义反序列化器
public class MyHainiuDeSerializer implements Deserializer<Hainiu> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Hainiu deserialize(String topic, byte[] data) {
Hainiu hainiu=new Hainiu(new String(data));
return hainiu;
}
@Override
public void close() {
}
}
@Test
public void autoCommit() {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
//开启offset自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交时间间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.hainiu.MyHainiuDeSerializer");
//实例化一个消费者
KafkaConsumer<String, Hainiu> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("hainiudemo1"));
//死循环不停的从broker中拿数据
while (true) {
ConsumerRecords<String, Hainiu> records = consumer.poll(100);
for (ConsumerRecord<String, Hainiu> record : records)
System.out.printf("offset = %d, key = %s, value = %s, partition=%d%n", record.offset(), record.key(), record.value(),record.partition());
}
}
}
//--生产者指定分区规则自定义分区器
public class Mypartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return Integer.parseInt(value.toString().split(":")[1])%3;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
public class HainiuKafkaAutoPartitionProducer {
@Test
public void putdata() throws InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//ack模式,all是最慢但最安全的
props.put(ProducerConfig. ACKS_CONFIG, "all");
//失败重试次数
props.put(ProducerConfig.RETRIES_CONFIG, "3");
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10");
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
//--设置自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,Mypartitioner.class);
//序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("hainiudemo1","dd:"+i));
Thread.sleep(10);
producer.close();
}
@Test
public void autoCommit() {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
//开启offset自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交时间间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("hainiudemo1"));
//死循环不停的从broker中拿数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s, partition=%d%n", record.offset(), record.key(), record.value(),record.partition());
}
}
}
手动提交offset
@Test
public void shoudongCommit() {
Properties props = new Properties();
//设置kafka集群的地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker-1:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
//开启offset自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//自动提交时间间隔
//props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("hainiutest"));
final int minBatchSize = 50;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
System.out.println(buffer.size());
//insertIntoDb(buffer);
for (ConsumerRecord bf : buffer) {
System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
}
consumer.commitSync();
buffer.clear();
}
}
}
分区手动提交offset
@Test
public void munualCommitByPartition() {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "worker-1:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "my_group");
//开启offset自动提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("hainiutest"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
/*
提交的偏移量应该始终是您的应用程序将要读取的下一条消息的偏移量。因此,在调用commitSync()时,
offset应该是处理的最后一条消息的偏移量加1
为什么这里要加上面不加喃?因为上面Kafka能够自动帮我们维护所有分区的偏移量设置,有兴趣的同学可以看看SubscriptionState.allConsumed()就知道
*/
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
}
22.8 flume对接kafka
#Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hainiudemo
a1.sinks.k1.brokerList = worker-1:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume进行测试
flume-ng agent -n a1 -f ./kafka.conf -Dflume.root.logger=info,console