Canal常用配置项整理

配置模板

#################################################
# 支持gtid的实例,应该打开了。以前我们默认是false
canal.instance.gtidon=true

# 源服务器的连接串
canal.instance.master.address=mysql3308.dboop.com:3308
canal.instance.dbUsername=canalreader
canal.instance.dbPassword={password}
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false

# 下面这些项需要留空,有且只有需要丢了数据,重新指定binlog点的时候才配置,别乱写
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=



# 启用或禁用时间序列数据库 (TSDB) 功能,用于存储 Canal 的元数据。
# 这个还挺重要的,强烈建议打开,这个在表结构变更时有用,具体可以看看原理
# 可以不写canal.instance.tsdb.url,默认保存在本地${canal.file.data.dir:../conf}/${canal.instance.destination:}路径下
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal




# 过滤器,perl的正则表达式.用逗号分割,可以写多个
canal.instance.filter.regex=db01\\..*,db02\\..*
#canal.instance.filter.black.regex=

# 我们往kafka推消息的配置
canal.mq.topic=secCanal3308
canal.mq.partitionsNum=1 #我们用一个区,如果是分区
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*

# 下面几个如果行里有大json,超过1M有报错时,可以增加maxRequestSize
#canal.mq.canalBatchSize = 500
#canal.mq.batchSize = 81920
#canal.mq.partitionsNum=1
#canal.mq.maxRequestSize = 2097152
#################################################

我们没用到的配置项

# 我们不依赖与canal做这个切换,这里用不着,事实上这几项也确实不好用
# 也有可能是我们没用明白
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=



报错处理:

ERROR c.a.o.canal.connector.kafka.producer.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1333054 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

处理:增加canal.mq.maxRequestSize ,默认是1M,改成2M试试

canal.mq.maxRequestSize = 2097152
CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

调整kafka的分区数和canal.mq.partitionsNum为同一个值,报错通常是canal的分区数大于kafka实际的分区数

>> Home

51ak

2024/09/07

Categories: 运维 centos Tags: 整理

《数据库工作笔记》公众号
扫描上面的二维码,关注我的《数据库工作笔记》公众号