本机IP
是10.30.6.24
,前面配置过程当中须要根据本人IP
信息配置批改
kafka
默认应用127.0.0.1
拜访
配置compose.yaml
文件如下
<code class="yaml">services: zookeeper: image: zookeeper container_name: demo-zookeeper ports: - "2181:2181" restart: always kafka: image: wurstmeister/kafka container_name: demo-kafka ports: - "9092:9092" ulimits: nofile: soft: 262144 hard: 262144 environment: DOCKER_API_VERSION: 1.41 KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092" KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092" KAFKA_BROKER_ID: 1 KAFKA_PORT: 9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LOG_DIRS: /kafka/kafka-logs-backend depends_on: - zookeeper volumes: - /var/run/docker.sock:/var/run/docker.sock - kafka-data:/kafka command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh" volumes: kafka-data: {}
启动命令
<code class="shell">$ docker compose up -d
配置参数解释
ulimits
- 操作系统提供限度可应用资源量的形式
linux
零碎默认是1024个,具体执行命令ulimit -a
查看- 因为音讯队列文件读写频繁,须要调大该配置,批改
kafka
的默认最大关上文件数量 - 限度能够是硬限度或软限度,但软限度不能超过硬限度
环境变量解释
DOCKER_API_VERSION
:docker version
命令的API Version
输入信息KAFKA_ADVERTISED_LISTENERS
: 把kafka
的地址端口注册给zookeeper
,这个中央的数据目前是PLAINTEXT://10.30.6.24:9092
,这个IP
地址须要根据具体机器IP
进行批改,指明客户端通过哪个IP
能够拜访到以后节点,如果网卡IP
有批改的话也须要批改这个中央的配置KAFKA_LISTENERS
: 配置kafka
的监听端口,指明kafka
以后节点监听本机的哪个网卡,这个中央的IP
地址能够填写为0.0.0.0
示意监听所有网卡的信息KAFKA_BROKER_ID
: 一个kafka
节点 就是一个broker
,一个集群外面的broker id
惟一KAFKA_PORT
: 配置kafka
凋谢端口KAFKA_ZOOKEEPER_CONNECT
: 配置对应的zookeeper
连贯信息,因为是在同一个docker compose
当中,所以能够应用服务名称作为host
连贯信息KAFKA_LOG_DIRS
: 保留日志数据的目录,默认是/tmp/kafka-logs
挂载卷解释
- /var/run/docker.sock:/var/run/docker.sock
: 把docker
的sock
挂在进去
- kafka-data:/kafka
: 把kafka
日志信息挂载进去进行长久化,如果不须要进行数据长久化,能够去掉这一步挂载
启动命令
/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
因为挂载数据的时候会把kafka
的配置信息也挂载进去,并且保留在meta.properties
文件当中
文件内容如下,会保留一个cluster.id
,当容器销毁重建时候,kafka
会从新创立一个cluster.id
,同时也会去查看meta.properties
的信息
# #Mon Jun 27 06:38:03 GMT 2022 cluster.id=XMHTDGRvQ5yJnEfXKhuabg version=0 broker.id=1
当容器启动中会产生报错如下,次要是kafka
查看cluster.id
不统一导致
kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
所以须要配置在kafka
启动之前删除长久化保留的meta.properties
配置信息,这一步不影响长久化数据,次要是防止抵触报错
python
客户端操作
装置依赖库
$ pip install kafka-python
生产者
producer.py
import json from kafka import KafkaProducer # 配置value序列化办法,抉择kafka节点信息 # 如果是近程broker须要把127.0.0.1批改为对应IP地址 producer = KafkaProducer( value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers=['10.30.6.24:9092']) # 发送操作默认是异步的 for _ in range(100): producer.send('my-topic', {'key': 'value'}) # 阻塞直到操作理论send producer.flush()
消费者
consumer.py
import json from kafka import KafkaConsumer # consumer配置,topic信息和生产者雷同 consumer = KafkaConsumer('my-topic', group_id='my-group', auto_offset_reset='earliest', bootstrap_servers=['10.30.6.24:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii'))) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
auto_offset_reset
可选参数如下
earliest
: 当各分区下有已提交的offset
时,从提交的offset
开始生产,无提交的offset
时,从头开始生产latest
: 当各分区下有已提交的offset
时,从提交的offset
开始生产,无提交的offset
时,生产新产生的该分区下的数据(默认选项是这个)none
:topic
各分区都存在已提交的offset
时,从offset
后开始生产,只有有一个分区不存在已提交的offset
,则抛出异样(不要应用这个)
开启两个终端别离执行
$ python producer.py
$ python consumer.py
拓展
如果python
客户端也是在容器外面,能够批改compose.yaml
的kafka
容器的环境变量配置
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
python
消费者和生产者能够应用kafka:9092
拜访broker
如果程序是在容器里面,也能够配置批改/etc/hosts
新增一行数据
127.0.0.1 kafka
最终实现根据kafka
这个host
参数进行拜访
浏览参考
kafka
官网文档
kafka
配置近程拜访
kafka python
第三方库文档
kafka
了解listeners
和advertised.listeners