赵志浩
Published on 2020-08-20 / 17 Visits
0
0

Canal开发:20200820-待验证问题

1、canal 新建db与es的映射文件时,如果有两个名字不同的文件,但是配置内容完全相同的情况,sql不同,对应的处理情况是如何?(会去加载两次?然后一条数据,会读取两次文件,然后同步两次?还是?规则具体是什么?如果是名字不同,但内容也不同,但是内容中使用的key是相同的,处理情况是如何的)

answer:
1、不可以,再对应的ESAdapter的dbTableEsSyncConfig 私有属性中,绑定yml的关联关系时,根据yml中所配置的groupid,database库名,table名称,instance名称,来确定出唯一的key值,如果上述四项在一个yml中全部相同,则会被判断为是一个文件,所以,如果的确存在类似的需求时,在定义sql时,记得定义为不同的表即可;及使用相同的group和db库,以及相同的instance topic,但是要操作的表不同,则也是ok的;

2、canal 修改topic outerAdapter信息时,需要先停止掉该topic的消费者线程,然后再进行修改,修改完成后,重新动态增加该消费者线程?再同步,读取一下所有的db - es 映射数据?

1、新增 topic outerAdapter信息时,在对应的CanalAdapterLoader类中的静态属性canalAdapterProcessors,会以当前新增配置的 instance名称 + “|” + GroupId为key,绑定一个对应的适配处理器AdapterProcessor,processor对象则不断的拉取kafka数据,然后推送给ES的适配器;**一个AdapterProcessor处理器是处理一个instance下的一个Group下的所有的适配器,一个instance下是可以配置多个group**,一个group下又可以配置多个不同的适配器,比如es,logger,db三个适配器,处理器对象在获取到kafka的消费数据后,会根据当前instance的group的数量来开启对应的线程数,一个线程会负责处理一个group的同步;也就是多个group之间是并行运行,但是group内部的多个适配器,则是串行处理;

2、以instance名称和groupId可以确定一个唯一的处理器,并且在实例化AdapterProcessor时也会将对应的instance名称groupId作为初始化对象参数进行传递;处理器在获取到kafka数据后,调用相对应的适配器进行数据同步处理,此时数据同步时,适配器则是根据处理器所给的instance名称,groupid,database名,以及table名称,可以得到一个唯一的Map<key,config>  此处的map key则是文件名称,config则是文件内容,也就是可以确定出唯一的db-es的yml配置信息;然后再进行相关的同步操作;

answer:
不需要,instance topic的消费者处理器和db-es.yml配置没有具体关联,消费者处理器在获取到数据后,会交由适配器处理,此时会去一个全局的MAP中通过key值去获取匹配的xml配置文件对象(key就是上述的规则),如果此时匹配不上,则会异常提示:gangtise log:configmap 获取异常...;获取异常后,也就是不会再做任何处理,所以 instance 的消费和启停,对适配器的配置是完全不影响的,而适配器的业务crud操作,则只是影响到最终是否能被业务上提交映射;仅此而已,并没有强制性的关联;所以,动态的启停消费者线程后,无需重新读取db-es yml的配置信息,两者并不强制关联;

3、canal 多服务的情况下处理方式

4、canal 现有的zk链接问题;

5、测试db > db的映射效果

6、处理器停止后,对应的文件监听也会停止,那么是否会存在当前处理器不再监听交叉监听的情况,比如当前处理器虽然停止了对文件变化的监听,但是其他处理器仍然再监听文件变化,此时如果更改文件是否存在影响?

不会,每一个处理器下面都会对应一批适配器,每一个适配器在初始化的时候,都会读取对应配置目录下的所有的yml配置文件,然后进行过滤,最后只会获取与当前适配器的key值所对应的yml文件,然后进行关联,和监听;我们在配置新增适配器的时候,每一个适配器的key值只要要求是不能重复的即可解决该重复监听的问题,即,无论是否是相同的 instance,都不允许有重复的适配器key值;
所以,由于每一个适配器都只会关联自身所匹配的key的yml文件,所以,当前适配器停止后,关闭对所对应目录的监听,并不影其他业务;另外,如果此时再修改其他文件,或者文件夹下新增yml映射的情况下,此时会被其他适配器监听到,然后会将所对应的文件修改,以及新增的文件信息,都会配置到对应的适配器的map中,尽管和它当前的适配器的key并不匹配,但是也会保存进去,尽管后续处理器通知适配器进行数据同步的时候,由于key值等并不匹配,所以不会使用,但是还是会保存进去;

也就是说,适配器在初始化的时候,会过滤掉不想关的yml配置,然后并不会存储,但是后续再文件产生修改或者新增等变动时,此时会将对应的文件直接保存到对应的适配器的关联关系中,并不会再做区分,尽管并不会影响具体的业务,但也基本算是一个小的阿里的bug了,不过可以修复,后续看时间解决下该非匹配值导入的问题; Arnold.zhao 2020-08-20

总结一下:

一个group是对应一个处理器,处理器是被统一管理的,CanalAdapterLoader下的静态全局属性Map,canalAdapterProcessors 维护所有的处理器,获取处理器的key值,则是instance的名称+“|”+groupId;

然后,一个处理器下会维护一个thread线程,以及维护多个适配器(都是对象的私有属性,并非静态属性),thread线程在处理器启动时候就进行运作,所以,停止该处理器的操作,就是停止掉该thread的执行即可;然后该thread的主要作用则是消费对应的kafka中的数据,然后拿到数据后,串行执行对应的适配器进行数据同步,此处执行适配器时也是单独开了一个线程池进行执行,目的是不阻塞上层thread线程的执行;所以,停止处理器就是销毁线程池,以及停止该thread的状态即可;

然后处理器下维护了多个适配器,但是多个适配器下则是维护了对应的xml的配置关系(也是对象的私有属性进行维护),以及对应的文件变化的监听对象;即每个适配器只维护一套与自己的适配器key关联的yml的配置信息;

7、canal adapter处理器CRUD的操作

新增时:
大胆的新增adapter数据即可,然后重新初始化对应的处理器,以及对应的适配器即可,重新初始化处理器和适配器时,不会有任何其他的影响;
更新时:
先停止掉该处理器,然后也会停止掉该处理下所对应的适配器的监听对象,这些都没有关系;然后再重新进行数据更新,然后保存时,重新初始化该对应的处理器即可;
删除时:
直接停止掉该处理器,然后删除对应的数据,(并不删除对应的适配器数据,这个在适配器管理中手动的进行处理)

8、outer adapter 适配器CRUD的操作

1、新增时,直接新增该适配器,然后存储信息到DB以及保存数据流到yml文件中即可,此时所有在启动的适配器的监听,在获取到文件后,都会将所对应的文件, 添加到对应的适配器对象的关联关系中;
2、修改适配器,直接修改即可,所有监听的适配器,都会获取到文件后,重新将对应的数据yml内容关联到适配器的对象属性中;
3、删除适配器,直接删除,然后所有在运行的适配器,都会直接删除掉对应的关联

这样的话,适配器的操作和处理器完全分开,互相之间没有任何影响;直接尽管进行操作即可;


阿里代码内部的几个问题
1、ExtensionLoader > SPI动态加载问题
2、处理器多线程处理消息推送到适配器时,此处的处理器默认初始化时所指定的线程池的数量永远是1,所以,刻意为之,还是的确有点问题。。
3、适配器在消费对应的数据时,明明根据instance名称以及groupid,database,&table名称,可以拼接出一个唯一的key值,然后通过dbTableEsSyncConfig.get(key)的时候,明明获取的结果只能有一个,


原创声明:作者:赵志浩、个人博客地址:https://zhaozhihao.com

原创声明:笔名:陈咬金、 博客园地址:https://www.cnblogs.com/zh94/


Comment