server (deployer项目,其中canal.properties 用来配置canal启动后的port端口信息,已经canal.admin的一些配置,也就是一些整个server端的配置在这里面,包括如果要配置一些kafka的信息,也是在这里面,更改canal.serverMode=kafka然后配置一些其它的kafka信息即可)
instance(对应的就是deployer中instance.properties,通过更改instance.properties的一些信息,用来配置所监听的DB信息,DB url,password等,该instance就相当于一个server中的一个数据队列,通过接入数据源后,模拟slave协议和master进行交互,然后进行实时的协议解析,)
客户端client*(实时订阅对应的instance然后进行相关的实时交互)
1、(example 已经被写好了的client客户端测试用)
2、(client项目是顶级API,需要自己写一个客户端来接instance数据时,则需要引用该jar包;启动Connection链接时,需要指定server的ip)
3、client-adapter(适配器client项目)
(目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息)
https://github.com/alibaba/canal/wiki/ClientAdapter
针对canal当下的几个问题做下说明:
1、adapter Client客户端适配器项目下有个application.yml,可以用来配置当前所要链接的canal Server端的ip和port,以及所要链接的server端的具体哪一个instance的名称等信息
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
# 配置所要链接的 canal Server端的IP和Port(对应的就是 admin页面中的server管理中ip和port信息,可以直接从页面取即可)
canalServerHost: 10.0.3.14:11111
# 配置所要链接的 canal server的ip和port,建议直接使用zookeeperHosts而不是上面的canalServerHost,上述的canalServerHost是直接链接到某台canal server上,而配置该zookeeper地址,则可以保证如果某台server挂掉了,则会直接使用未down机的server,也就是一个HA的功能;
#zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
username:
password:
vhost:
srcDataSources:
# 配置当前该client端适配器,所要链接的jdbc地址;此处配置jdbc地址的原因是,如果需要使用初始化表数据到es中,则默认是直接查询此处所配置的链接进行读取,然后导入,而不是 instance中所配置的链接;(instance中所配置的链接是etl也就是实时进行数据读取的时候是使用的instance队列中所配置的jdbc链接的库信息)(如果需要更改或者新增一个jdbc地址的话则直接在此处 srcDataSources 下新增jdbc链接即可,可以建立一个新的名字不叫 defaultDS,但是下面的url和username这些key是固定的,也就说此处的yml配置,srcDataSources其实是可以配置为集合的;)
defaultDS:
url: jdbc:mysql://40.73.82.1:3306/gangtise_db?useUnicode=true
username: gangtise@gangtisedb
password: Qwer!234
canalAdapters:
# 此处配置所要连接的 instance的名称;(一般情况下,我们可能会存在当前客户端适配器链接多个instance的情况,毕竟可能会存在每个instance配置的jdbc库不同的情况,所以如果需要配置多个instance的名称,则此处也是直接新增一个 -instance:insName即可,也就是说,此处的canalAdapters也是支持集合的形式进行配置的,需要几个instance就配置几个instance即可;)
- instance: jydb # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
配置完上述的 client端适配器的application.yml配置后,就是配置我们要进行数据导入的表关系了;
此处我们配置并实现:mysql 表到 es6中进行导入;
(base) [root@cdh5 es6]# pwd
/opt/canal/adapter/conf/es6
(base) [root@cdh5 es6]# vim mytest_user_test.yml# 此处的defaultDS就是上述application中所配置的默认初始化全表是JDBC所对应的名称;
dataSourceKey: defaultDS
# 表示所要使用的 instance的名称;
destination: jydb
outerAdapterKey: es6Key
groupId: g1
esMapping:
_index: arnold_test_index
_type: _doc
_id: _id
upsert: true
# pk: id
# sql: "select a.ID as _id, a.ID as id, a.Trade_Date as tradedate, a.S_Security_ID as ssecurityid, a.Security_ID as securityid, a.Gangtise_Code as gangtisecode,a.Security_Code as securitycode, a.Security_Abbr as securityabbr, a.Pre_Close_Price as precloseprice, a.Open_Price as openprice, a.High_Price as highprice,a.Low_Price as lowprice, a.Close_Price as closeprice, a.`Change` as `change`, a.Change_Ratio as changeratio, a.Turnover_Volume as turnovervolume,a.Turnover_Value as turnovervalue, a.Turnover_Ratio as turnoverratio, a.Amplitude as amplitude, a.Turnover_Deal as turnoverdeal, a.Entry_Time as entrytime,a.Update_Time as updatetime, a.Update_ID as updateid from trans_quote_daily a"
sql: "select a.ID as _id, a.ID as id from test_etl.secumain a"
# objFields:
# _labels: array:;
etlCondition: ""
commitBatch: 300异常情况:
无法连接 test_tcp_40的instance,则说明是适配器中application.yml中所配置的要连接的 instance 无法连接,此时可以在admin下检查下对应的 instance是否开启等情况;(需要说明的是,此处异常是说明adapter下配置的instance无法连接,跟其他的一些子的配置,比如es6下面所指向的 instance信息是没有任何关系的;)
2020-07-23 09:26:10.454 [Thread-4] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Start to connect destination: test_tcp_40 <=============
2020-07-23 09:26:10.455 [Thread-4] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - process error!
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198) ~[canal.client-1.1.5-SNAPSHOT.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115) ~[canal.client-1.1.5-SNAPSHOT.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker.process(CanalAdapterWorker.java:88) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_131]
at sun.nio.ch.Net.connect(Net.java:454) ~[na:1.8.0_131]
at sun.nio.ch.Net.connect(Net.java:446) ~[na:1.8.0_131]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) ~[na:1.8.0_131]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:150) ~[canal.client-1.1.5-SNAPSHOT.jar:na]
... 3 common frames omitted
2020-07-23 09:26:10.455 [Thread-4] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Disconnect destination: test_tcp_40 <=============
2020-07-23 09:26:11.455 [Thread-4] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Start to connect destination: test_tcp_40 <=============
2020-07-23 09:26:11.456 [Thread-4] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - process error!具体详情,还是可以接着看github官方文档吧,
一个adapter下只能配置一个server连接,也就是后续所配置的多个instance必须是在该canal server下的instance也才行,
所以,因为一个adapter只能配置一个canal server连接,那如果要同时配其他的server连接然后进行运行的话,只能重新copay后,重新启动一个新的adapter,进行单独跑才行;
除了有server端的HA高可用外,由于客户端也会存在多进程的情况,所以,canal 也支持基于zookeper的客户端HA高可用效果;