• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

关于java:DataX-调度流程

java 搞代码 4年前 (2022-01-27) 42次浏览 已收录 0个评论

工作切分

DataX实现单个数据同步的作业,称之为Job,这个Job会做split阶段,拆分成一个个的Task,每个Task都会有一个Reader和Writer的工作。

拆分成多个Task是为了并发的执行,这个并发数,就是之前的Channel数量。

咱们假如这个Job在split阶段,有100个Task,而Channel数量是5,那理论的Channel数量取100和5的最小值,也就是5。

每个并发执行的Task数量并不是简略地100/5=20,而是先把Task放入TaskGroup中,而后把并发分给TaskGroup。

比方每个TaskGroup的并发数量是5,通过core.container.taskGroup.channel进行设置,那TaskGroup的数量就是总的Channel数/TaskGroup须要的数量,所有TaskGroup的数量就是20/5=4。

因为有100个Task,一共4个TaskGroup,所有每个TaskGroup就有100/4=25个Task。

工作启动

首先是启动线程池,这个线程池具备一个固定的数量,即taskGroup的数量。每个线程都启动一个TaskGroupContainer,每个TaskGroupContainer都蕴含JobId、taskGroupId、channelClazz以及configuration等信息。

TaskGroupContainer启动的时候,就会把25个工作,寄存在taskQueue汇合中。另外会创立一个大小为5的runTasks汇合中,寄存着Task执行器TaskExecutor。

Task执行器的创立,依赖着taskQueue汇合,每次从taskQueue汇合中拿出一个task创立Task执行器并启动,就须要从taskQueue汇合移除这个task。

因为每个TaskGroup的并发数量是5,所以Task执行器最多同时存在5个,也就是说,25个task,先执行5个,等某个或者多个执行完后,再执行剩下的task。直至taskQueue汇合为空以及每个Task执行器都执行完,那这个25个task也就执行完了。

Task执行器

每个Task都对应一个Task执行器,Task执行器包含工作的运行配置、taskId、channel、用来读写的线程等。

读线程启动的是ReaderRunner,外面有插件(比方StreamReader$Task)、RecordSender等属性。

写线程启动的是WriterRunner,外面有插件(比方StreamWriter$Task)、RecordReceiver等属性。

Task执行器启动的时候,首先是启动WriterRunner的线程,而后是ReaderRunner的线程。

WriterRunner和ReaderRunner的执行程序如下,右边是Reader左边是Writer。

次要的局部是Reader的startRead和Writer的startWriter。

首先是Reader把数据读取进去,而后封装在Record中,这个Record蕴含了数据汇合Column、byteSize、以及须要的内存memorySize。Column就是咱们读取的每条数据,包含数据的类型,数据的内容等。

读取进去的数据,发送给Channel,目前默认的是基于内存的MemoryChannel,也能够本人扩大。MemoryChannel保护着Record的阻塞队列queue。当有数据存进的queue时候,就会唤醒Writer的读取操作。

因为Channel是共用的,所以Writer就会从Channel的queue里读取Reader存入的数据,进行业务操作。


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:关于java:DataX-调度流程

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址