• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

Java API方式调用Kafka各种协议的方法

java 搞代码 4年前 (2022-01-05) 71次浏览 已收录 0个评论

本篇文章主要介绍了Java API方式调用Kafka各种协议的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等。具体协议规范参见:Kafka协议  这套协议的具体使用流程为:

1.客户端创建对应协议的请求

2.客户端发送请求给对应的broker

3.broker处理请求,并发送response给客户端

虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用Java API的方式就显得异常地灵活了。本文我将尝试给出Java API底层框架的一个范例,同时也会针对“创建topic”和“查看位移”这两个主要功能给出对应的例子。 需要提前说明的是,本文给出的范例并没有考虑Kafka集群开启安全的情况。另外Kafka的KIP4应该一直在优化命令行工具以及各种管理操作,有兴趣的读者可以关注这个KIP。

本文中用到的API依赖于kafka-clients,所以如果你使用Maven构建的话,请加上:

  org.apache.kafkakafka-clients0.10.2.0

如果是gradle,请加上:

 compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底层框架

 /** * 发送请求主方法 * @param host     目标broker的主机名 * @param port     目标broker的端口 * @param request    请求对象 * @param apiKey    请求类型 * @return       序列化后的response * @throws IOException */ public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException { Socket socket = connect(host, port); try { return send(request, apiKey, socket); } finally { socket.close(); } } /** * 发送序列化请求并等待response返回 * @param socket      连向目标broker的socket * @param request      序列化后的请求 * @return         序列化后的response * @throws IOException */ private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException { sendRequest(socket, request); return getResponse(socket); } /** * 发送序列化请求给socket * @param socket      连向目标broker的socket * @param request      序列化后的请求 * @throws IOException */ private void sendRequest(Socket socket, byte[] request) throws IOException { DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); dos.writeInt(request.length); dos.write(request); dos.flush(); } /** * 从给定socket处获取response * @param socket      连向目标broker的socket * @return         获取到的序列化后的response * @throws IOException */ private byte[] getResponse(Socket socket) throws IOException { DataInputStream dis = null; try { dis = new DataInputStream(socket.getInputStream()); byte[] response = new byte[dis.readInt()]; dis.readFully(response); return response; } finally { if (dis != null) { dis.close(); } } } /** * 创建Socket连接 * @param hostName     目标broker主机名 * @param port       目标broker服务端口, 比如9092 * @return         创建的Socket连接 * @throws IOException */ private Socket connect(String hostName, int port) throws IOException { return new Socket(hostName, port); } /** * 向给定socket发送请求 * @param request    请求对象 * @param apiKey    请求类型, 即属于哪种请求 * @param socket    连向目标broker的socket * @return       序列化后的response * @throws IOException */ private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException { RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0); ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf()); header.writeTo(buffer); request.writeTo(buffer); byte[] serializedRequest = buffer.array(); byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest); ByteBuffer responseBuffer = ByteBuffer.wrap(response); ResponseHeader.parse(responseBuffer); return responseBuffer; } 

有了这些方法的铺垫,我们就可以创建具体的请求了。

创建topic

 /** * 创建topic * 由于只是样例代码,有些东西就硬编码写到程序里面了(比如主机名和端口),各位看官自行修改即可 * @param topicName       topic名 * @param partitions      分区数 * @param replicationFactor   副本数 * @throws IOException */ public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException { Map topics = new HashMap(); // 插入多个元素便可同时创建多个topic topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor)); int creationTimeoutMs = 60000; CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build(); ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS); CreateTopicsResponse.parse(response, request.version()); } 

查看位移

 /** * 获取某个consumer group下的某个topic分区的位移 * @param groupID      group id * @param topic       topic名 * @param parititon     分区号 * @throws IOException */ public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException { TopicPartition tp = new TopicPartition(topic, parititon); OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp)) .setVersion((short)2).build(); ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH); OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version()); OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp); System.out.println(partitionData.offset); } 
 /** * 获取某个consumer group下所有topic分区的位移信息 * @param groupID      group id * @return         (topic分区 --> 分区信息)的map * @throws IOException */ public Map getAllOffsetsForGroup(String groupID) throws IOException { OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build(); ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH); OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version()); return resp.responseData(); } 

okay, 上面就是“创建topic”和“查看位移”的样例来源gaodai#ma#com搞*!代#%^码网代码,各位看官可以参考着这两个例子构建其他类型的请求。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持gaodaima搞代码网

以上就是Java API方式调用Kafka各种协议的方法的详细内容,更多请关注gaodaima搞代码网其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Java API方式调用Kafka各种协议的方法

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址