生产者发送到对应的分区有以下几种方式:
(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:
1、实现一个自定义分区类,CustomPartitioner实现Partitioner
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomPartitioner implements Partitioner { /** * * @param topic 当前的发送的topic * @param key 当前的key值 * @param keyBytes 当前的key的字节数组 * @param value 当前的value值 * @param valueBytes 当前的value的字节数组 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //这边根据返回值就是分区号, 这边就是固定发送到三号分区 return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
2、producer配置文件指定,具体的分区类
// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, “kafka.CustomPartitioner”);
技巧:可以使用ProducerConfig中提供的配置ProducerConfig
kafka producer拦截器
拦截器(interceptor)是在Kafka 0.10版本被引入的。
interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
所使用的类为:
org.apache.kafka.clients.producer.ProducerInterceptor
我们可以编码测试下:
1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; import java.util.UUID; public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println("这是MessageInterceptor的configure方法"); } /** * 这个是消息发送之前进行处理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 创建一个新的record,把uuid入消息体的最前部 System.out.println("为消息添加uuid"); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), UUID.randomUUID().toString().replace("-", "") + "," + record.value()); } /** * 这个是生产者回调函数调用之前处理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("MessageInterceptor拦截器的onAcknowledgement方法"); } @Override public void close() { System.out.println("MessageInterceptor close 方法"); } }
2、定义计数拦截器
import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println("这是CounterInterceptor的configure方法"); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> reco<div style="color:transparent">本文来源gaodai.ma#com搞##代!^码网(</div>rd) { System.out.println("CounterInterceptor计数过滤器不对消息做任何操作"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 统计成功和失败的次数 System.out.println("CounterInterceptor过滤器执行统计失败和成功数量"); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }