意识 kafka
kafka简介
Kafka 是一个分布式流媒体平台,kafka官网:http://kafka.apache.org/
- (1)流媒体平台有三个要害性能:
- 公布和订阅记录流,相似于音讯队列或企业消息传递零碎。
- 以容错的长久形式存储记录流。
- 记录产生时解决流。
- (2)Kafka通常用于两大类利用:
- 构建可在零碎或应用程序之间牢靠获取数据的实时流数据管道
- 构建转换或响应数据流的实时流应用程序
- (3)首先是几个概念:
- Kafka作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
- Kafka集群以称为 topics主题的类别存储记录流。
- 每条记录都蕴含一个键,一个值和一个工夫戳。
- (4)Kafka有四个外围API:
- Producer API(生产者API)容许应用程序公布记录流至一个或多个kafka的topics(主题)。
- Consumer API(消费者API)容许应用程序订阅一个或多个topics(主题),并解决所产生的对他们记录的数据流。
- Streams API(流API)容许应用程序充当流处理器,从一个或多个topics(主题)耗费的输出流,并产生一个输入流至一个或多个输入的topics(主题),无效地变换所述输出流,以输入流。
- Connector API(连接器API)容许构建和运行kafka topics(主题)连贯到现有的应用程序或数据系统中重用生产者或消费者。例如,关系数据库的连接器可能捕捉对表的每个更改。
在Kafka中,客户端和服务器之间的通信是通过简略,高性能,语言无关的TCP协定实现的。此协定已版本化并放弃与旧版本的向后兼容性。Kafka提供Java客户端,但客户端有多种语言版本。
1.2 Topics主题 和 partitions分区
咱们首先深刻理解 Kafka 为记录流提供的外围形象 – 主题topics
一个Topic能够认为是一类音讯,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件
主题是公布记录的类别或订阅源名称。Kafka的主题总是多用户; 也就是说,一个主题能够有零个,一个或多个消费者订阅写入它的数据。
对于每个主题,Kafka群集都保护一个如下所示的分区日志:
每个分区都是一个有序的,不可变的记录序列,一直附加到结构化的提交日志中。分区中的记录每个都调配了一个称为偏移的程序ID号,它惟一地标识分区中的每个记录。
Kafka集群长久保留所有已公布的记录 – 无论是否已应用 – 应用可配置的保留期。例如,如果保留策略设置为两天,则在公布记录后的两天内,它可供使用,之后将被抛弃以开释空间。Kafka的性能在数据大小方面实际上是恒定的,因而长时间存储数据不是问题。实际上,基于每个消费者保留的惟一元数据是该消费者在日志中的偏移或地位。这种偏移由消费者管制:通常消费者在读取记录时会线性地进步其偏移量,但事实上,因为该地位由消费者管制,因而它能够依照本人喜爱的任何程序生产记录。例如,消费者能够重置为较旧的偏移量来重新处理过来的数据,或者跳到最近的记录并从“当初”开始生产。
这些性能组合意味着Kafka 消费者consumers 十分cheap – 他们能够来来往往对集群或其余消费者没有太大影响。例如,您能够应用咱们的命令行工具“tail”任何主题的内容,而无需更改任何现有使用者所耗费的内容。
日志中的分区有多种用处。首先,它们容许日志扩大到超出适宜单个服务器的大小。每个独自的分区必须适宜托管它的服务器,但主题可能有许多分区,因而它能够解决任意数量的数据。其次,它们充当了并行性的单位 – 更多的是它。
1.3 Distribution调配
一个Topic的多个partitions,被散布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中音讯的读写操作;此外kafka还能够配置partitions须要备份的个数(replicas),每个partition将会被备份到多台机器上,以进步可用性.
基于replicated计划,那么就意味着须要对多个备份进行调度;每个partition都有一个server为”leader”;leader负责所有的读写操作,如果leader生效,那么将会有其余follower来接管(成为新的leader);follower只是枯燥的和leader跟进,同步音讯即可..由此可见作为leader的server承载了全副的申请压力,因而从集群的整体思考,有多少个partitions就意味着有多少个”leader”,kafka会将”leader”平衡的扩散在每个实例上,来确保整体的性能稳固。
1.4 Producers生产者 和 Consumers消费者
1.4.1 Producers生产者
Producers 将数据公布到指定的topics 主题。同时Producer 也能决定将此音讯归属于哪个partition;比方基于”round-robin”形式或者通过其余的一些算法等。
1.4.2 Consumers
- 实质上kafka只反对Topic.每个consumer属于一个consumer group;反过来说,每个group中能够有多个consumer.发送到Topic的音讯,只会被订阅此Topic的每个group中的一个consumer生产。
- 如果所有使用者实例具备雷同的使用者组,则记录将无效地在使用者实例上进行负载平衡。
- 如果所有消费者实例具备不同的消费者组,则每个记录将播送到所有消费者过程。剖析:两个服务器Kafka群集,托管四个分区(P0-P3),蕴含两个使用者组。消费者组A有两个消费者实例,B组有四个消费者实例。
在Kafka中实现生产consumption 的形式是通过在消费者实例上划分日志中的分区,以便每个实例在任何工夫点都是调配的“偏心份额”的独占消费者。保护组中成员资格的过程由Kafka协定动静解决。如果新实例退出该组,他们将从该组的其余成员接管一些分区; 如果实例死亡,其分区将分发给其余实例。
Kafka仅提供分区内记录的总订单,而不是主题中不同分区之间的记录。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。然而,如果您须要对记录进行总订单,则能够应用仅蕴含一个分区的主题来实现,但这将意味着每个使用者组只有一个使用者过程。
1.5 Consumers kafka确保
- 发送到partitions中的音讯将会依照它接管的程序追加到日志中。也就是说,如果记录M1由与记录M2雷同的生成者发送,并且首先发送M1,则M1将具备比M2更低的偏移并且在日志中更早呈现。
- 消费者实例依照它们存储在日志中的程序查看记录。对于消费者而言,它们生产音讯的程序和日志中音讯程序统一。
- 如果Topic的”replicationfactor”为N,那么容许N-1个kafka实例生效,咱们将容忍最多N-1个服务器故障,而不会失落任何提交到日志的记录。
1.6 kafka作为音讯零碎
Kafka的流概念与传统的企业邮件系统相比如何?
(1)传统音讯零碎
音讯传统上有两种模型:queuing排队 and publish-subscribe公布 – 订阅。在队列中,消费者池能够从服务器读取并且每个记录转到其中一个; 在公布 – 订阅中,记录被播送给所有消费者。这两种模型中的每一种都有长处和毛病。排队的劣势在于它容许您在多个消费者实例上划分数据处理,从而能够扩大您的解决。可怜的是,一旦一个过程读取它曾经隐没的数据,队列就不是多用户。公布 – 订阅容许您将数据播送到多个过程,但因为每条音讯都发送给每个订阅者,因而无奈进行扩大解决。
卡夫卡的消费者群体概念概括了这两个概念。与队列一样,使用者组容许您将解决划分为一组过程(使用者组的成员)。与公布 – 订阅一样,Kafka容许您向多个消费者组播送音讯。
(2)kafka 的劣势
Kafka模型的劣势在于每个主题都具备这些属性 – 它能够扩大解决并且也是多用户 – 不须要抉择其中一个。
与传统的音讯零碎相比,Kafka具备更强的订购保障。
传统队列在服务器上按程序保留记录,如果多个消费者从队列中耗费,则服务器依照存储程序散发记录。然而,尽管服务器按程序散发记录,然而记录是异步传递给消费者的,因而它们可能会在不同的消费者处呈现故障。这实际上意味着在存在并行耗费的状况下失落记录的程序。消息传递零碎通常通过具备“独占消费者”概念来解决这个问题,该概念只容许一个过程从队列中耗费,但当然这意味着解决中没有并行性。
kafka做得更好。通过在主题中具备并行性概念 – 分区 – ,Kafka可能在消费者流程池中提供订购保障和负载平衡。这是通过将主题中的分区调配给使用者组中的使用者来实现的,以便每个分区仅由该组中的一个使用者应用。通过这样做,咱们确保使用者是该分区的惟一读者并按程序应用数据。因为有许多分区,这依然能够均衡许多消费者实例的负载。但请留神,消费者组中的消费者实例不能超过分区。
1.7 kafka作为存储系统
- 任何容许公布与生产音讯拆散的音讯的音讯队列实际上充当了正在进行的音讯的存储系统。Kafka的不同之处在于它是一个十分好的存储系统。
- 写入Kafka的数据将写入磁盘并进行复制以实现容错。Kafka容许生产者期待确认,以便在齐全复制之前写入不被认为是残缺的,并且即便写入的服务器失败也保障写入依然存在。
- 磁盘构造Kafka很好地应用了规模 – 无论服务器上有50 KB还是50 TB的持久数据,Kafka都会执行雷同的操作。
- 因为认真对待存储并容许客户端管制其读取地位,您能够将Kafka视为一种专用于高性能,低提早提交日志存储,复制和流传的专用分布式文件系统。
1.8 kafka用于流解决
- 仅仅读取,写入和存储数据流是不够的,目标是实现流的实时处理。
- 在Kafka中,流处理器是指从输出主题获取间断数据流,对此输出执行某些解决以及生成间断数据流以输入主题的任何内容。
- 例如,批发应用程序可能会接管销售和发货的输出流,并输入从新排序流和依据此数据计算的价格调整。
- 能够应用生产者和消费者API间接进行简略解决。然而,对于更简单的转换,Kafka提供了齐全集成的Streams API。这容许构建执行非平庸解决的应用程序,这些应用程序能够计算流的聚合或将流连贯在一起。
- 此工具有助于解决此类应用程序面临的难题:解决无序数据,在代码更改时重新处理输出,执行有状态计算等。
- 流API构建在Kafka提供的外围原语上:它应用生产者和消费者API进行输出,应用Kafka进行有状态存储,并在流处理器实例之间应用雷同的组机制来实现容错。
2、kafka应用场景
2.1 音讯Messaging
Kafka能够代替更传统的音讯代理。音讯代理的应用有多种起因(将解决与数据生成器拆散,缓冲未解决的音讯等)。与大多数消息传递零碎相比,Kafka具备更好的吞吐量,内置分区,复制和容错性能,这使其成为大规模音讯解决应用程序的现实解决方案。
依据教训,消息传递的应用通常绝对较低,但可能须要较低的端到端提早,并且通常取决于Kafka提供的弱小的耐用性保障。
在这个畛域,Kafka可与传统的消息传递零碎(如ActiveMQ或 RabbitMQ)相媲美。
2.2 网站流动跟踪
Kafka的原始用例是可能将用户流动跟踪管道重建为一组实时公布 – 订阅源。这意味着站点流动(页面查看,搜寻或用户可能采取的其余操作)将公布到核心主题,每个流动类型蕴含一个主题。这些源可用于订购一系列用例,包含实时处理,实时监控以及加载到Hadoop或离线数据仓库零碎以进行脱机解决和报告。
流动跟踪通常十分高,因为为每个用户页面视图生成了许多流动音讯。
2.3 度量Metrics
Kafka通常用于经营监控数据。这波及从分布式应用程序聚合统计信息以生成操作数据的集中式提要。
2.4 日志聚合
许多人应用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在地方地位(可能是文件服务器或HDFS)进行解决。Kafka形象出文件的细节,并将日志或事件数据作为音讯流更清晰地形象进去。这容许更低提早的解决并更容易反对多个数据源和分布式数据耗费。与Scribe或Flume等以日志为核心的零碎相比,Kafka提供了同样杰出的性能,因为复制而具备更强的耐用性保障,以及更低的端到端提早。
2.5 流解决
许多Kafka用户在解决由多个阶段组成的管道时解决数据,其中原始输出数据从Kafka主题中生产,而后聚合,丰盛或以其余形式转换为新主题以供进一步生产或后续解决。
例如,用于举荐新闻文章的解决管道能够从RSS订阅源抓取文章内容并将其公布到“文章”主题; 进一步解决可能会对此内容进行规范化或反复数据删除,并将已清理的文章内容公布到新主题; 最终解决阶段可能会尝试向用户举荐此内容。此类解决管道基于各个主题创立实时数据流的图形。从0.10.0.0开始,这是一个轻量级但功能强大的流解决库,名为Kafka Streams 在Apache Kafka中可用于执行如上所述的此类数据处理。除了Kafka Streams之外,其余开源流解决工具包含Apache Storm和 Apache Samza。
2.6 Event Sourcing
Event Sourcing是一种利用程序设计格调,其中状态更改记录为按工夫排序的记录序列。Kafka对十分大的存储日志数据的反对使其成为以这种格调构建的应用程序的杰出后端。
2.7 提交日志
Kafka能够作为分布式系统的一种内部提交日志。该日志有助于在节点之间复制数据,并充当故障节点复原其数据的从新同步机制。Kafka中的日志压缩性能有助于反对此用法。在这种用法中,Kafka相似于Apache BookKeeper我的项目。
3、kafka装置
3.1 下载安装
到官网http://kafka.apache.org/downl…。
注:因为Kafka控制台脚本对于基于Unix和Windows的平台是不同的,因而在Windows平台上应用binwindows 而不是bin/ 将脚本扩展名更改为.bat。
[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz [root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz [root@along ~]# cd /data/kafka_2.11-2.1.0/
3.2 配置启动zookeeper
kafka失常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无奈失常的工作的;所以须要配置启动zookeeper服务。
(1)zookeeper须要java环境
[root@along ~]# yum -y install java-1.8.0
(2)这里kafka下载包曾经包含zookeeper服务,所以只需批改配置文件,启动即可。
如果须要下载指定zookeeper版本;能够独自去zookeeper官网http://mirrors.shu.edu.cn/apa…。
[root@along ~]# cd /data/kafka_2.11-2.1.0/ [root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties dataDir=/tmp/zookeeper #数据存储目录 clientPort=2181 #zookeeper端口 maxClientCnxns=0
注:可自行添加批改zookeeper配置
3.3 配置kafka
(1)批改配置文件
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties broker.id=0 listeners=PLAINTEXT://localhost:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
注:可依据本人需要批改配置文件
broker.id:#惟一标识ID listeners=PLAINTEXT://localhost:9092:#kafka服务监听地址和端口 log.dirs:#日志存储目录 zookeeper.connect:#指定zookeeper服务
(2)配置环境变量
[root@along ~]# vim /etc/profile.d/kafka.sh export KAFKA_HOME="/data/kafka_2.11-2.1.0" export PATH="${KAFKA_HOME}/bin:$PATH" [root@along ~]# source /etc/profile.d/kafka.sh
(3)配置服务启动脚本
[root@along ~]# vim /etc/init.d/kafka #!/bin/sh # # chkconfig: 345 99 01 # description: Kafka # # File : Kafka # # Description: Starts and stops the Kafka server # source /etc/rc.d/init.d/functions KAFKA_HOME=/data/kafka_2.11-2.1.0 KAFKA_USER=root export LOG_DIR=/tmp/kafka-logs [ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka # See how we were called. case "$1" in start) echo -n "Starting Kafka:" /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &" echo " done." exit 0 ;; stop) echo -n "Stopping Kafka: " /sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9" echo " done." exit 0 ;; hardstop) echo -n "Stopping (hard) Kafka: " /sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9" echo " done." exit 0 ;; status) c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'` if [ "$c_pid" = "" ] ; then echo "Stopped" exit 3 else echo "Running $c_pid" exit 0 fi ;; restart) stop start ;; *) echo "Usage: kafka {start|stop|hardstop|status|restart}" exit 1 ;; esac
3.4 启动kafka服务
(1)后盾启动zookeeper服务
[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &
(2)启动kafka服务
[root@along ~]# service kafka start Starting kafka (via systemctl): [ OK ] [root@along ~]# service kafka status Running 86018 [root@along ~]# ss -nutl Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port tcp LISTEN 0 50 :::9092 :::* tcp LISTEN 0 50 :::2181 :::*
4、kafka应用简略入门
4.1 创立主题topics
创立一个名为“along”的主题,它只蕴含一个分区,只有一个正本:
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along Created topic "along".
如果咱们运行list topic命令,咱们当初能够看到该主题:
[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181 along
4.2 发送一些音讯
Kafka附带一个命令行客户端,它将从文件或规范输出中获取输出,并将其作为音讯发送到Kafka集群。默认状况下,每行将作为独自的音讯发送。
运行生产者,而后在控制台中键入一些音讯以发送到服务器。
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along >This is a message >This is another message
4.3 启动消费者
Kafka还有一个命令行使用者,它会将音讯转储到规范输入。
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning This is a message This is another message
所有命令行工具都有其余选项; 运行不带参数的命令将显示更具体地记录它们的应用信息。
5、设置多代理kafka群集
到目前为止,咱们始终在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因而除了启动一些代理实例之外没有太多变动。然而为了感触它,让咱们将咱们的集群扩大到三个节点(依然在咱们的本地机器上)。
5.1 筹备配置文件
[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/ [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties [root@along kafka_2.11-2.1.0]# vim config/server-1.properties broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 [root@along kafka_2.11-2.1.0]# vim config/server-2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
注:该broker.id 属性是群集中每个节点的惟一且永恒的名称。咱们必须笼罩端口和日志目录,因为咱们在同一台机器上运行这些,并且咱们心愿让所有代理尝试在同一端口上注册或笼罩彼此的数据。
5.2 开启集群另2个kafka服务
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties & [root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties & [root@along ~]# ss -nutl Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::* tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::* tcp LISTEN 0 50 ::ffff:127.0.0.1:9094 :::*
5.3 在集群中进行操作
(1)当初创立一个复制因子为3的新主题my-replicated-topic
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic Created topic "my-replicated-topic".
(2)在一个集群中,运行“describe topics”命令查看哪个broker正在做什么
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 #正文:第一行给出了所有分区的摘要,每个附加行提供无关一个分区的信息。因为咱们只有一个分区用于此主题,因而只有一行。 #“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机抉择的分区局部的领导者。 #“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即便它们以后处于活动状态。 # “isr”是“同步”复制品的汇合。这是正本列表的子集,该列表以后处于沉闷状态并且曾经被领导者捕捉。 #请留神,Leader: 2,在我的示例中,节点2 是该主题的惟一分区的Leader。
(3)能够在咱们创立的原始主题上运行雷同的命令,以查看它的地位
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along Topic:along PartitionCount:1 ReplicationFactor:1 Configs: Topic: along Partition: 0 Leader: 0 Replicas: 0 Isr: 0
(4)向咱们的新主题公布一些音讯:
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >my test message 1 >my test message 2 >^C
(5)当初让咱们应用这些音讯:
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic my test message 1 my test message 2
5.4 测试集群的容错性
(1)当初让咱们测试一下容错性。Broker 2 充当leader 所以让咱们杀了它:
[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}' 106737 [root@along ~]# kill -9 106737 [root@along ~]# ss -nutl tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::* tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
(2)leader 已切换到其中一个隶属节点,节点2不再位于同步正本集中:
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
(3)即便最后承受写入的leader 曾经失败,这些音讯仍可供生产:
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic my test message 1 my test message 2
6、应用Kafka Connect导入/导出数据
从控制台写入数据并将其写回控制台是一个不便的终点,但有时候可能心愿应用其余起源的数据或将数据从Kafka导出到其余零碎。对于许多零碎,您能够应用Kafka Connect导入或导出数据,而不是编写自定义集成代码。
Kafka Connect是Kafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩大的工具,运行连接器,实现与内部零碎交互的自定义逻辑。在本疾速入门中,咱们将理解如何应用简略的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。
(1)首先创立一些种子数据进行测试:
[root@along ~]# echo -e "foonbar" > test.txt 或者在Windows上: > echo foo> test.txt > echo bar>> test.txt
(2)接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用过程中运行。提供三个配置文件作为参数。
第一个始终是Kafka Connect流程的配置,蕴含常见配置,例如要连贯的Kafka代理和数据的序列化格局。
其余配置文件均指定要创立的连接器。这些文件包含惟一的连接器名称,要实例化的连接器类以及连接器所需的任何其余配置。
[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties [2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67) [2019-01-16 16:16:31,903] INFO WorkerInfo values: ... ... #注:Kafka附带的这些示例配置文件应用您之前启动的默认本地群集配置并创立两个连接器:第一个是源连接器,它从输出文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取音讯并将每个音讯生成为输入文件中的一行。
(3)验证是否导入胜利(另起终端)
在启动过程中,您将看到许多日志音讯,包含一些批示正在实例化连接器的日志音讯。
① 一旦Kafka Connect过程启动,源连接器应该开始从test.txt主题读取行并将其生成到主题connect-test,并且接收器连接器应该开始从主题读取音讯connect-test 并将它们写入文件test.sink.txt。咱们能够通过查看输入文件的内容来验证数据是否已通过整个管道传递:
[root@along ~]# cat test.sink.txt foo bar
② 请留神,数据存储在Kafka主题中connect-test,因而咱们还能够运行控制台使用者来查看主题中的数据(或应用自定义使用者代码来解决它):
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
(4)持续追加数据,验证
[root@along ~]# echo Another line>> test.txt [root@along ~]# cat test.sink.txt foo bar Another line [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} {"schema":{"type":"string","optional":false},"payload":"Another line"}
起源:https://www.cnblogs.com/along…