• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

Java kafka如何实现自定义分区类和拦截器

java 搞代码 4年前 (2022-01-09) 40次浏览 已收录 0个评论

生产者发送到对应的分区有以下几种方式:

(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

1、实现一个自定义分区类,CustomPartitioner实现Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic 当前的发送的topic
   * @param key  当前的key值
   * @param keyBytes 当前的key的字节数组
   * @param value 当前的value值
   * @param valueBytes 当前的value的字节数组
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //这边根据返回值就是分区号, 这边就是固定发送到三号分区
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map<String, ?> configs) {

  }

}

2、producer配置文件指定,具体的分区类

// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, “kafka.CustomPartitioner”);

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer拦截器

拦截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

所使用的类为:

org.apache.kafka.clients.producer.ProducerInterceptor

我们可以编码测试下:

1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor<String, String> {

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("这是MessageInterceptor的configure方法");
  }

  /**
   * 这个是消息发送之前进行处理
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    // 创建一个新的record,把uuid入消息体的最前部
    System.out.println("为消息添加uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   * 这个是生产者回调函数调用之前处理
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor拦截器的onAcknowledgement方法");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close 方法");
  }
}

2、定义计数拦截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("这是CounterInterceptor的configure方法");
  }

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> reco<div style="color:transparent">本文来源gaodai.ma#com搞##代!^码网(</div>rd) {
    System.out.println("CounterInterceptor计数过滤器不对消息做任何操作");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 统计成功和失败的次数
    System.out.println("CounterInterceptor过滤器执行统计失败和成功数量");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    // 保存结果
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}

搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Java kafka如何实现自定义分区类和拦截器

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址