• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

python操作docker-kafka

python 搞java代码 3年前 (2022-07-03) 44次浏览 已收录 0个评论
文章目录[隐藏]

本机IP10.30.6.24,前面配置过程当中须要根据本人IP信息配置批改

kafka默认应用127.0.0.1拜访

配置compose.yaml文件如下

<code class="yaml">services:
  zookeeper:
    image: zookeeper
    container_name: demo-zookeeper
    ports:
      - "2181:2181"
    restart: always

  kafka:
    image: wurstmeister/kafka
    container_name: demo-kafka
    ports:
      - "9092:9092"
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
    environment:
      DOCKER_API_VERSION: 1.41
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
      KAFKA_BROKER_ID: 1
      KAFKA_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka-data:/kafka
    command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"


volumes:
  kafka-data: {}

启动命令

<code class="shell">$ docker compose up -d

配置参数解释

ulimits

  • 操作系统提供限度可应用资源量的形式
  • linux零碎默认是1024个,具体执行命令ulimit -a查看
  • 因为音讯队列文件读写频繁,须要调大该配置,批改kafka的默认最大关上文件数量
  • 限度能够是硬限度或软限度,但软限度不能超过硬限度

环境变量解释

  • DOCKER_API_VERSION: docker version命令的API Version输入信息
  • KAFKA_ADVERTISED_LISTENERS: 把kafka的地址端口注册给zookeeper,这个中央的数据目前是PLAINTEXT://10.30.6.24:9092,这个IP地址须要根据具体机器IP进行批改,指明客户端通过哪个 IP 能够拜访到以后节点,如果网卡IP有批改的话也须要批改这个中央的配置
  • KAFKA_LISTENERS: 配置kafka的监听端口,指明 kafka 以后节点监听本机的哪个网卡,这个中央的IP地址能够填写为0.0.0.0示意监听所有网卡的信息
  • KAFKA_BROKER_ID: 一个 kafka节点 就是一个 broker,一个集群外面的broker id惟一
  • KAFKA_PORT: 配置kafka凋谢端口
  • KAFKA_ZOOKEEPER_CONNECT: 配置对应的zookeeper连贯信息,因为是在同一个docker compose当中,所以能够应用服务名称作为host连贯信息
  • KAFKA_LOG_DIRS: 保留日志数据的目录,默认是/tmp/kafka-logs

挂载卷解释

- /var/run/docker.sock:/var/run/docker.sock: 把dockersock挂在进去

- kafka-data:/kafka: 把kafka日志信息挂载进去进行长久化,如果不须要进行数据长久化,能够去掉这一步挂载

启动命令

/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"

因为挂载数据的时候会把kafka的配置信息也挂载进去,并且保留在meta.properties文件当中

文件内容如下,会保留一个cluster.id,当容器销毁重建时候,kafka会从新创立一个cluster.id,同时也会去查看meta.properties的信息

#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1

当容器启动中会产生报错如下,次要是kafka查看cluster.id不统一导致

kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.

所以须要配置在kafka启动之前删除长久化保留的meta.properties配置信息,这一步不影响长久化数据,次要是防止抵触报错

python客户端操作

装置依赖库

$ pip install kafka-python

生产者

producer.py

import json

from kafka import KafkaProducer

# 配置value序列化办法,抉择kafka节点信息
# 如果是近程broker须要把127.0.0.1批改为对应IP地址
producer = KafkaProducer(
    value_serializer=lambda m: json.dumps(m).encode('ascii'),
    bootstrap_servers=['10.30.6.24:9092'])

# 发送操作默认是异步的
for _ in range(100):
    producer.send('my-topic', {'key': 'value'})

# 阻塞直到操作理论send
producer.flush()

消费者

consumer.py

import json

from kafka import KafkaConsumer

# consumer配置,topic信息和生产者雷同
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         auto_offset_reset='earliest',
                         bootstrap_servers=['10.30.6.24:9092'],
                         value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

auto_offset_reset可选参数如下

  • earliest: 当各分区下有已提交的offset时,从提交的offset开始生产,无提交的offset时,从头开始生产
  • latest: 当各分区下有已提交的offset时,从提交的offset开始生产,无提交的offset时,生产新产生的该分区下的数据(默认选项是这个)
  • none: topic各分区都存在已提交的offset时,从offset后开始生产,只有有一个分区不存在已提交的offset,则抛出异样(不要应用这个)

开启两个终端别离执行

$ python producer.py
$ python consumer.py

拓展

如果python客户端也是在容器外面,能够批改compose.yamlkafka容器的环境变量配置

KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"

python消费者和生产者能够应用kafka:9092拜访broker

如果程序是在容器里面,也能够配置批改/etc/hosts新增一行数据

127.0.0.1 kafka

最终实现根据kafka这个host参数进行拜访

浏览参考

kafka官网文档

kafka配置近程拜访

kafka python第三方库文档

kafka了解listenersadvertised.listeners


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:python操作docker-kafka

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址