工作切分
DataX实现单个数据同步的作业,称之为Job,这个Job会做split阶段,拆分成一个个的Task,每个Task都会有一个Reader和Writer的工作。
拆分成多个Task是为了并发的执行,这个并发数,就是之前的Channel数量。
咱们假如这个Job在split阶段,有100个Task,而Channel数量是5,那理论的Channel数量取100和5的最小值,也就是5。
每个并发执行的Task数量并不是简略地100/5=20,而是先把Task放入TaskGroup中,而后把并发分给TaskGroup。
比方每个TaskGroup的并发数量是5,通过core.container.taskGroup.channel进行设置,那TaskGroup的数量就是总的Channel数/TaskGroup须要的数量,所有TaskGroup的数量就是20/5=4。
因为有100个Task,一共4个TaskGroup,所有每个TaskGroup就有100/4=25个Task。
工作启动
首先是启动线程池,这个线程池具备一个固定的数量,即taskGroup的数量。每个线程都启动一个TaskGroupContainer,每个TaskGroupContainer都蕴含JobId、taskGroupId、channelClazz以及configuration等信息。
TaskGroupContainer启动的时候,就会把25个工作,寄存在taskQueue汇合中。另外会创立一个大小为5的runTasks汇合中,寄存着Task执行器TaskExecutor。
Task执行器的创立,依赖着taskQueue汇合,每次从taskQueue汇合中拿出一个task创立Task执行器并启动,就须要从taskQueue汇合移除这个task。
因为每个TaskGroup的并发数量是5,所以Task执行器最多同时存在5个,也就是说,25个task,先执行5个,等某个或者多个执行完后,再执行剩下的task。直至taskQueue汇合为空以及每个Task执行器都执行完,那这个25个task也就执行完了。
Task执行器
每个Task都对应一个Task执行器,Task执行器包含工作的运行配置、taskId、channel、用来读写的线程等。
读线程启动的是ReaderRunner,外面有插件(比方StreamReader$Task)、RecordSender等属性。
写线程启动的是WriterRunner,外面有插件(比方StreamWriter$Task)、RecordReceiver等属性。
Task执行器启动的时候,首先是启动WriterRunner的线程,而后是ReaderRunner的线程。
WriterRunner和ReaderRunner的执行程序如下,右边是Reader左边是Writer。
次要的局部是Reader的startRead和Writer的startWriter。
首先是Reader把数据读取进去,而后封装在Record中,这个Record蕴含了数据汇合Column、byteSize、以及须要的内存memorySize。Column就是咱们读取的每条数据,包含数据的类型,数据的内容等。
读取进去的数据,发送给Channel,目前默认的是基于内存的MemoryChannel,也能够本人扩大。MemoryChannel保护着Record的阻塞队列queue。当有数据存进的queue时候,就会唤醒Writer的读取操作。
因为Channel是共用的,所以Writer就会从Channel的queue里读取Reader存入的数据,进行业务操作。