起因
- 上周我们做的一次Oracle到MySQL迁移,在迁移完成后需要将MySQL数据反向同步到Oracle中,以便于没有迁移干净的原Oracle业务还能提供只读访问
- 反向同步用的是OGG,我们在OGG同步层面加了异常报警(但是被DBA给人为禁用了
- 除掉同步软件层面的报警外,我们需要一个偏业务层面的,针对表数据的报警和监控
需求
- 支持各种数据源的同步对比
- 支持表的行数对比,最大id对比,表最后更新时间对比
- 扩展性好,方便配置
- 异常报警
- 监控同步状态的页面
解决
拆解思路
- 需要一个建两张表:表1:存放任务配置信息,表2:存放采集到的数据
- 需要一个任务:定时(5分钟)去源库和目标库,运行一个SQL,取当前状态并存入到刚才建的history表中
- 需要一个页面:展示采集结果,可以方便的查看同步状态和延时
- 需要一个报警任务:异常数据时,发送报警
任务1:建表
CREATE TABLE `msync_config` (
`sync_name` varchar(50) NOT NULL,
`source_linkconnstr` varchar(100) NOT NULL,
`target_linkconnstr` varchar(100) NOT NULL,
`source_sqlstr` varchar(2000) NOT NULL,
`target_sqlstr` varchar(2000) NOT NULL,
`alert_count` int NOT NULL DEFAULT '5',
`alert_maxid` int NOT NULL DEFAULT '5',
`alert_delaysecond` int NOT NULL DEFAULT '60',
`alert_userlist` varchar(100) NOT NULL DEFAULT '',
`add_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`alert_type` varchar(50) NOT NULL DEFAULT 'count+time' COMMENT 'count,id,time三种组合,+代表and,-代表or'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
CREATE TABLE `msync_history` (
`addtime` datetime NOT NULL,
`sync_name` varchar(50) NOT NULL,
`source_count` bigint NOT NULL DEFAULT '0',
`target_count` bigint NOT NULL DEFAULT '0',
`source_maxid` bigint NOT NULL DEFAULT '0',
`target_maxid` bigint NOT NULL DEFAULT '0',
`source_maxtime` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
`target_maxtime` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
PRIMARY KEY (`addtime`,`sync_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
任务2:配置采集任务
<action result="allcount,okcount,errcount" type="sql_loop" >
<connstr>link:dboop_db</connstr>
<sqlstr>select sync_name,source_linkconnstr,target_linkconnstr,source_sqlstr,target_sqlstr from msync_config</sqlstr>
<action result="sync_name,source_linkconnstr,target_linkconnstr,source_sqlstr,target_sqlstr" type="sql_select" >
<connstr>link:dboop_db</connstr>
<sqlstr>select sync_name,source_linkconnstr,target_linkconnstr,source_sqlstr,target_sqlstr from msync_config where sync_name=%s</sqlstr>
<sqlpara>{__0}</sqlpara>
</action>
<!--target-->
<action result="isok,okcount,errcount,errmess" type="sql_to_sql">
<from_server>{target_linkconnstr}</from_server>
<to_server>link:dboop_db</to_server>
<from_sqlstr>{target_sqlstr}</from_sqlstr>
<to_sqlstr><![CDATA[
insert into msync_history set addtime=%s,sync_name=%s,target_count=%s,target_maxid=%s,target_maxtime=%s
]]></to_sqlstr>
<to_sqlpara>{addtime}</to_sqlpara>
<to_sqlpara>{sync_name}</to_sqlpara>
<to_sqlpara>{0}</to_sqlpara>
<to_sqlpara>{1}</to_sqlpara>
<to_sqlpara>{2}</to_sqlpara>
</action>
<!--source-->
<action result="isok,okcount,errcount,errmess" type="sql_to_sql">
<from_server>{source_linkconnstr}</from_server>
<to_server>link:dboop_db</to_server>
<from_sqlstr>{source_sqlstr}</from_sqlstr>
<to_sqlstr><![CDATA[
insert into msync_history set addtime=%s,sync_name=%s,source_count=%s,source_maxid=%s,source_maxtime=%s
on DUPLICATE KEY UPDATE source_count=%s,source_maxid=%s,source_maxtime=%s
;]]></to_sqlstr>
<to_sqlpara>{addtime}</to_sqlpara>
<to_sqlpara>{sync_name}</to_sqlpara>
<to_sqlpara>{0}</to_sqlpara>
<to_sqlpara>{1}</to_sqlpara>
<to_sqlpara>{2}</to_sqlpara>
<to_sqlpara>{0}</to_sqlpara>
<to_sqlpara>{1}</to_sqlpara>
<to_sqlpara>{2}</to_sqlpara>
</action>
</action>
- 将此任务设置成每5分钟运行一次
- 该任务会分别去目标库和源库执行一段msync_config里配置好的SQL
- 将结果存入到msync_history表中
- 完成数据采集工作
任务3:配置一张报表
<para>
<name>btime</name>
<title>开始时间</title>
<type>datetime</type>
<defaultvalue>getdate()-1d</defaultvalue>
<format>%Y-%m-%d %H:%M:00</format>
</para>
<para>
<name>etime</name>
<title>结束时间</title>
<type>datetime</type>
<defaultvalue>getdate()-1m</defaultvalue>
<format>%Y-%m-%d 23:59:59</format>
</para>
<para>
<name>syncname</name>
<title>表名</title>
<type>select</type>
<connstr>link:dboop_db</connstr>
<sqlstr><![CDATA[
select '' as qavalue,'----ALL----' as qatext union all
select `sync_name`,`sync_name` from msync_config
]]>
</sqlstr>
<defaultvalue></defaultvalue>
</para>
<!--每个报表都可以定义多个page用来展示和多个para用来接收用户输入-->
<page>
<viewtype>line</viewtype>
<width>1040px</width>
<height>280px</height>
<chart_title>title : {text: '异构数据同步延时监控(单位:行数)',x:'center',y:20},</chart_title>
<chart_datazoom>60</chart_datazoom>
<connstr>link:dboop_db</connstr>
<sqlstr><![CDATA[
select
date_format(addtime,'%%m-%%d %%H:%%i') as addtime
,sync_name,abs(source_count- target_count) as diffcount
from msync_history where addtime between %s and %s
and (%s ='' or sync_name=%s)
order by addtime
]]>
</sqlstr>
<sqlpara>#1</sqlpara>
<sqlpara>#2</sqlpara>
<sqlpara>#3</sqlpara>
<sqlpara>#3</sqlpara>
<tableformat>rowconvert 1 5</tableformat>
</page>
<page>
<viewtype>table</viewtype>
<width>980px</width>
<title2>异构数据同步对比明细</title2>
<connstr>link:dboop_db</connstr>
<sqlstr><![CDATA[
select a.addtime,a.sync_name
,concat('<span class="f_666">',source_count,'-',target_count,'=</span><span class="',case when diffcount>b.alert_count or diffcount<0 then 'f_red' when diffcount>0 then '' else 'f_green' end ,'">',diffcount,'</span>行') as '行数差异_count(*)'
,concat('<span class="f_666">',source_maxid,'-',target_maxid,'=</span><span class="',case when diffmaxid>b.alert_maxid or diffmaxid<0 then 'f_red' when diffmaxid>0 then '' else 'f_green' end ,'">',diffmaxid,'</span>') as 'maxid差异_max(id)'
,concat('<span class="f_666">',source_time,'-',target_time,'=</span><span class="',case when difftime>b.alert_delaysecond or difftime<0 then 'f_red' when difftime>0 then '' else 'f_green' end ,'">',difftime,'</span>秒') as '最后修改时间差异_max(mdf_time)'
from (
select addtime,sync_name,source_count,target_count,source_maxid,target_maxid,source_maxtime as source_time,target_maxtime as target_time
,source_count-target_count as diffcount
,source_maxid-target_maxid as diffmaxid
,TIMESTAMPDIFF(second,target_maxtime,source_maxtime) as difftime
from msync_history where addtime between %s and %s
and (%s ='' or sync_name=%s)
) a
join msync_config b on a.sync_name=b.sync_name
order by a.addtime desc
]]>
</sqlstr>
<sqlpara>#1</sqlpara>
<sqlpara>#2</sqlpara>
<sqlpara>#3</sqlpara>
<sqlpara>#3</sqlpara>
</page>
- 该报表会将收集到的数据用图形和表格的方式展示出来
任务4:增加报警
- 这里我们直接在采集任务的底部增加一个action即可
<action result="isok,okcount,errcount,errmess" type="sql_to_sql">
<from_server>link:********</from_server>
<to_server>link:********</to_server>
<from_sqlstr><![CDATA[
select a.sync_name,
concat(
case when diffcount>0 then concat('少',diffcount,'条数据.') else '' end ,
case when diffmaxid>0 then concat('ID差',diffmaxid,'.') else '' end ,
case when difftime>0 then concat('修改时间差',difftime,'秒') else '' end) as diffdisplay
from (
select sync_name,source_count,target_count,source_maxid,target_maxid,source_maxtime,target_maxtime
,source_count-target_count as diffcount
,source_maxid-target_maxid as diffmaxid
,source_maxtime-target_maxtime as difftime
from msync_history where addtime =%s
) a
join msync_config b on a.sync_name=b.sync_name
where
( b.alert_type='count+time' and (a.diffcount>b.alert_count or a.diffmaxid>b.alert_delaysecond ))
or ( b.alert_type='count-time' and (a.diffcount>b.alert_count or a.diffmaxid>b.alert_delaysecond ))
or ....
]]>
</from_sqlstr>
<from_sqlpara>{addtime}</from_sqlpara>
<to_sqlstr><![CDATA[insert into alert_log_db
set altertype='msync',
idtype='sync_name',
dbid=%s,
reciveuser='',
mess=%s,
messdetail=%s,
errorlevel=5
]]></to_sqlstr>
<to_sqlpara>{0}</to_sqlpara>
<to_sqlpara>异构数据同步异常报警{0}:{1}</to_sqlpara>
<to_sqlpara><![CDATA[
**异构数据同步异常报警**
- 表名:{0}
- 异常:{1}
- 来源:异构数据采集任务
- 触发时间:{addtime}
- 了解[慢SQL详细](view.html?action=v&report_action=v&report_ptname=di10vnxfejhostpkq4z58a2ry6)
]]></to_sqlpara>
</action>
小结
- 这里我们快速的配置了一个任务和报表,完成了异构数据同步过程中从数据层面进行对比的简易功能
- 可以和同步软件层的异常报警结合,一起来监控整个同步过程中是否有明显异常
- 设计思路时,从源库和目标库分别执行一个SQL,应该出来接近的结果。
>> Home