目标表集成类型为增量同步
加载知识模块为LKM SQL to Oracle
集成知识模块IKM Oracle Incremental Update
源表
目标表
将源表的数据修改
运行接口
可以看到,接口运行完成后,修改的数据正常同步,但删除的数据没有起作用,有些接口为了处理这种情况,就在运行时先将目标表数据全部删除掉,再插入数据
将DELETE ALL设置为true可以在接口插入数据前将数据全部删除,理论上选择TRUNCATE也行,但没有效果
运行过程如下
步骤详解
接口同步数据时会在目标端数据库创建工作表,以C$_0开头,这条命令在知识模块中配置了忽略错误,所以出错也不会造成接口错误
drop table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE purge
创建工作表,工作表字段基于源表
create table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE ( ID NUMBER NULL, HOST VARCHAR2(200) NULL, POST VARCHAR2(200) NULL, JMS_SERVICE_NAME VARCHAR2(200) NULL, JMS_NAME VARCHAR2(200) NULL, JMS_SERVICE_TARGET VARCHAR2(200) NULL, JMS_SERVICE_HEALTH VARCHAR2(200) NULL, MESSAGES_CURRENT_COUNT NUMBER NULL, MESSAGES_PENDING_COUNT NUMBER NULL, CONSUMERS_CURRENT_COUNT NUMBER NULL, CONSUMERS_HIGH_COUNT NUMBER NULL, CONSUMERS_TOTAL_COUNT NUMBER NULL, MESSAGES_HIGH_COUNT NUMBER NULL, MESSAGES_RECEIVED_COUNT NUMBER NULL, OBJECT_VERSION_NUMBER NUMBER NULL, CREATION_DATE DATE NULL, CREATED_BY VARCHAR2(200) NULL, LAST_UPDATE_DATE DATE NULL, LAST_UPDATED_BY VARCHAR2(200) NULL, DATA_STATUS VARCHAR2(200) NULL ) NOLOGGING
读取源表的数据,并插入到工作表中
目标代码
insert into TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE ( ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS ) values ( :ID, :HOST, :POST, :JMS_SERVICE_NAME, :JMS_NAME, :JMS_SERVICE_TARGET, :JMS_SERVICE_HEALTH, :MESSAGES_CURRENT_COUNT, :MESSAGES_PENDING_COUNT, :CONSUMERS_CURRENT_COUNT, :CONSUMERS_HIGH_COUNT, :CONSUMERS_TOTAL_COUNT, :MESSAGES_HIGH_COUNT, :MESSAGES_RECEIVED_COUNT, :OBJECT_VERSION_NUMBER, :CREATION_DATE, :CREATED_BY, :LAST_UPDATE_DATE, :LAST_UPDATED_BY, :DATA_STATUS )
源代码
select ODI_WLS_JMS_INC_SOURCE.ID AS ID, ODI_WLS_JMS_INC_SOURCE.HOST AS HOST, ODI_WLS_JMS_INC_SOURCE.POST AS POST, ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_NAME AS JMS_SERVICE_NAME, ODI_WLS_JMS_INC_SOURCE.JMS_NAME AS JMS_NAME, ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_TARGET AS JMS_SERVICE_TARGET, ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_HEALTH AS JMS_SERVICE_HEALTH, ODI_WLS_JMS_INC_SOURCE.MESSAGES_CURRENT_COUNT AS MESSAGES_CURRENT_COUNT, ODI_WLS_JMS_INC_SOURCE.MESSAGES_PENDING_COUNT AS MESSAGES_PENDING_COUNT, ODI_WLS_JMS_INC_SOURCE.CONSUMERS_CURRENT_COUNT AS CONSUMERS_CURRENT_COUNT, ODI_WLS_JMS_INC_SOURCE.CONSUMERS_HIGH_COUNT AS CONSUMERS_HIGH_COUNT, ODI_WLS_JMS_INC_SOURCE.CONSUMERS_TOTAL_COUNT AS CONSUMERS_TOTAL_COUNT, ODI_WLS_JMS_INC_SOURCE.MESSAGES_HIGH_COUNT AS MESSAGES_HIGH_COUNT, ODI_WLS_JMS_INC_SOURCE.MESSAGES_RECEIVED_COUNT AS MESSAGES_RECEIVED_COUNT, ODI_WLS_JMS_INC_SOURCE.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER, ODI_WLS_JMS_INC_SOURCE.CREATION_DATE AS CREATION_DATE, ODI_WLS_JMS_INC_SOURCE.CREATED_BY AS CREATED_BY, ODI_WLS_JMS_INC_SOURCE.LAST_UPDATE_DATE AS LAST_UPDATE_DATE, ODI_WLS_JMS_INC_SOURCE.LAST_UPDATED_BY AS LAST_UPDATED_BY, ODI_WLS_JMS_INC_SOURCE.DATA_STATUS AS DATA_STATUS from ODI.ODI_WLS_JMS_INC_SOURCE ODI_WLS_JMS_INC_SOURCE where (1=1)
记录操作日志
BEGIN DBMS_STATS.GATHER_TABLE_STATS ( ownname => 'TESTUSER', tabname => 'C$_0ODI_WLS_JMS_INC_SOURCE', estimate_percent => DBMS_STATS.AUTO_SAMPLE_SIZE ); END;
删除数据插入表,数据插入表是另一张同步中间表,以I$_开头,每次接口执行时执行删除操作,避免上次运行后没有删除造成问题,由于配置了忽略错误,所以出错也不会造成接口问题
drop table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
创建数据插入表
create table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET ( ID NUMBER NULL, HOST VARCHAR2(200) NULL, POST VARCHAR2(200) NULL, JMS_SERVICE_NAME VARCHAR2(200) NULL, JMS_NAME VARCHAR2(200) NULL, JMS_SERVICE_TARGET VARCHAR2(200) NULL, JMS_SERVICE_HEALTH VARCHAR2(200) NULL, MESSAGES_CURRENT_COUNT NUMBER NULL, MESSAGES_PENDING_COUNT NUMBER NULL, CONSUMERS_CURRENT_COUNT NUMBER NULL, CONSUMERS_HIGH_COUNT NUMBER NULL, CONSUMERS_TOTAL_COUNT NUMBER NULL, MESSAGES_HIGH_COUNT NUMBER NULL, MESSAGES_RECEIVED_COUNT NUMBER NULL, OBJECT_VERSION_NUMBER NUMBER NULL, CREATION_DATE DATE NULL, CREATED_BY VARCHAR2(200) NULL, LAST_UPDATE_DATE DATE NULL, LAST_UPDATED_BY VARCHAR2(200) NULL, DATA_STATUS VARCHAR2(400) NULL, IND_UPDATE CHAR(1) ) NOLOGGING
删除目标表,上面配置DELETE ALL后,执行接口会有该步骤,在数据插入前删除所有数据,如果DELETE ALL 选择否,就不会执行这条语句,结果就是接口不同步删除的数据
delete from TESTUSER.ODI_WLS_JMS_INC_TARGET
向I$_表中插入数据,使用NOT EXIST将目标表中已存在的数据过滤掉,向I$_表中插入所有不匹配的数据,标识符都为I
/* DETECTION_STRATEGY = NOT_EXISTS */ insert into TESTUSER.I$_ODI_WLS_JMS_INC_TARGET ( ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS, IND_UPDATE ) select ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS, IND_UPDATE from ( select ODI_WLS_JMS_INC_SOURCE_A.ID AS ID, ODI_WLS_JMS_INC_SOURCE_A.HOST AS HOST, ODI_WLS_JMS_INC_SOURCE_A.POST AS POST, ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_NAME AS JMS_SERVICE_NAME, ODI_WLS_JMS_INC_SOURCE_A.JMS_NAME AS JMS_NAME, ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_TARGET AS JMS_SERVICE_TARGET, ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_HEALTH AS JMS_SERVICE_HEALTH, ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_CURRENT_COUNT AS MESSAGES_CURRENT_COUNT, ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_PENDING_COUNT AS MESSAGES_PENDING_COUNT, ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_CURRENT_COUNT AS CONSUMERS_CURRENT_COUNT, ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_HIGH_COUNT AS CONSUMERS_HIGH_COUNT, ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_TOTAL_COUNT AS CONSUMERS_TOTAL_COUNT, ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_HIGH_COUNT AS MESSAGES_HIGH_COUNT, ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_RECEIVED_COUNT AS MESSAGES_RECEIVED_COUNT, ODI_WLS_JMS_INC_SOURCE_A.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER, ODI_WLS_JMS_INC_SOURCE_A.CREATION_DATE AS CREATION_DATE, ODI_WLS_JMS_INC_SOURCE_A.CREATED_BY AS CREATED_BY, ODI_WLS_JMS_INC_SOURCE_A.LAST_UPDATE_DATE AS LAST_UPDATE_DATE, ODI_WLS_JMS_INC_SOURCE_A.LAST_UPDATED_BY AS LAST_UPDATED_BY, ODI_WLS_JMS_INC_SOURCE_A.DATA_STATUS AS DATA_STATUS, 'I' IND_UPDATE from TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE ODI_WLS_JMS_INC_SOURCE_A where (1=1) ) S where NOT EXISTS ( select 1 from TESTUSER.ODI_WLS_JMS_INC_TARGET T where T.ID = S.ID and T.HOST = S.HOST and ((T.POST = S.POST) or (T.POST IS NULL and S.POST IS NULL)) and ((T.JMS_SERVICE_NAME = S.JMS_SERVICE_NAME) or (T.JMS_SERVICE_NAME IS NULL and S.JMS_SERVICE_NAME IS NULL)) and ((T.JMS_NAME = S.JMS_NAME) or (T.JMS_NAME IS NULL and S.JMS_NAME IS NULL)) and ((T.JMS_SERVICE_TARGET = S.JMS_SERVICE_TARGET) or (T.JMS_SERVICE_TARGET IS NULL and S.JMS_SERVICE_TARGET IS NULL)) and ((T.JMS_SERVICE_HEALTH = S.JMS_SERVICE_HEALTH) or (T.JMS_SERVICE_HEALTH IS NULL and S.JMS_SERVICE_HEALTH IS NULL)) and ((T.MESSAGES_CURRENT_COUNT = S.MESSAGES_CURRENT_COUNT) or (T.MESSAGES_CURRENT_COUNT IS NULL and S.MESSAGES_CURRENT_COUNT IS NULL)) and ((T.MESSAGES_PENDING_COUNT = S.MESSAGES_PENDING_COUNT) or (T.MESSAGES_PENDING_COUNT IS NULL and S.MESSAGES_PENDING_COUNT IS NULL)) and ((T.CONSUMERS_CURRENT_COUNT = S.CONSUMERS_CURRENT_COUNT) or (T.CONSUMERS_CURRENT_COUNT IS NULL and S.CONSUMERS_CURRENT_COUNT IS NULL)) and ((T.CONSUMERS_HIGH_COUNT = S.CONSUMERS_HIGH_COUNT) or (T.CONSUMERS_HIGH_COUNT IS NULL and S.CONSUMERS_HIGH_COUNT IS NULL)) and ((T.CONSUMERS_TOTAL_COUNT = S.CONSUMERS_TOTAL_COUNT) or (T.CONSUMERS_TOTAL_COUNT IS NULL and S.CONSUMERS_TOTAL_COUNT IS NULL)) and ((T.MESSAGES_HIGH_COUNT = S.MESSAGES_HIGH_COUNT) or (T.MESSAGES_HIGH_COUNT IS NULL and S.MESSAGES_HIGH_COUNT IS NULL)) and ((T.MESSAGES_RECEIVED_COUNT = S.MESSAGES_RECEIVED_COUNT) or (T.MESSAGES_RECEIVED_COUNT IS NULL and S.MESSAGES_RECEIVED_COUNT IS NULL)) and ((T.OBJECT_VERSION_NUMBER = S.OBJECT_VERSION_NUMBER) or (T.OBJECT_VERSION_NUMBER IS NULL and S.OBJECT_VERSION_NUMBER IS NULL)) and ((T.CREATION_DATE = S.CREATION_DATE) or (T.CREATION_DATE IS NULL and S.CREATION_DATE IS NULL)) and ((T.CREATED_BY = S.CREATED_BY) or (T.CREATED_BY IS NULL and S.CREATED_BY IS NULL)) and ((T.LAST_UPDATE_DATE = S.LAST_UPDATE_DATE) or (T.LAST_UPDATE_DATE IS NULL and S.LAST_UPDATE_DATE IS NULL)) and ((T.LAST_UPDATED_BY = S.LAST_UPDATED_BY) or (T.LAST_UPDATED_BY IS NULL and S.LAST_UPDATED_BY IS NULL)) and ((T.DATA_STATUS = S.DATA_STATUS) or (T.DATA_STATUS IS NULL and S.DATA_STATUS IS NULL)) )
为I$_表创建索引
create index TESTUSER.I$_ODI_WLS_JMS_INC_TARGET_UK on TESTUSER.I$_ODI_WLS_JMS_INC_TARGET (ID, HOST) NOLOGGING
记录日志
begin dbms_stats.gather_table_stats( ownname => 'TESTUSER', tabname => 'I$_ODI_WLS_JMS_INC_TARGET', estimate_percent => dbms_stats.auto_sample_size ); end;
创建校验表,用来存放插入失败的数据
create table TESTUSER.SNP_CHECK_TAB ( CATALOG_NAME VARCHAR2(100 CHAR) NULL , SCHEMA_NAME VARCHAR2(100 CHAR) NULL , RESOURCE_NAME VARCHAR2(100 CHAR) NULL, FULL_RES_NAME VARCHAR2(100 CHAR) NULL, ERR_TYPE VARCHAR2(1 CHAR) NULL, ERR_MESS VARCHAR2(250 CHAR) NULL , CHECK_DATE DATE NULL, ORIGIN VARCHAR2(250 CHAR) NULL, CONS_NAME VARCHAR2(128 CHAR) NULL, CONS_TYPE VARCHAR2(2 CHAR) NULL, ERR_COUNT NUMBER(10) NULL )
删除以前的校验数据
delete from TESTUSER.SNP_CHECK_TAB where SCHEMA_NAME = 'TESTUSER' and ORIGIN = '(171)mdsProject.My_increment' and ERR_TYPE = 'F'
创建错误表,用来记录插入错误的数据
create table TESTUSER.E$_ODI_WLS_JMS_INC_TARGET ( ODI_ROW_ID UROWID, ODI_ERR_TYPE VARCHAR2(1 CHAR) NULL, ODI_ERR_MESS VARCHAR2(250 CHAR) NULL, ODI_CHECK_DATE DATE NULL, ID NUMBER NULL, HOST VARCHAR2(200) NULL, POST VARCHAR2(200) NULL, JMS_SERVICE_NAME VARCHAR2(200) NULL, JMS_NAME VARCHAR2(200) NULL, JMS_SERVICE_TARGET VARCHAR2(200) NULL, JMS_SERVICE_HEALTH VARCHAR2(200) NULL, MESSAGES_CURRENT_COUNT NUMBER NULL, MESSAGES_PENDING_COUNT NUMBER NULL, CONSUMERS_CURRENT_COUNT NUMBER NULL, CONSUMERS_HIGH_COUNT NUMBER NULL, CONSUMERS_TOTAL_COUNT NUMBER NULL, MESSAGES_HIGH_COUNT NUMBER NULL, MESSAGES_RECEIVED_COUNT NUMBER NULL, OBJECT_VERSION_NUMBER NUMBER NULL, CREATION_DATE DATE NULL, CREATED_BY VARCHAR2(200) NULL, LAST_UPDATE_DATE DATE NULL, LAST_UPDATED_BY VARCHAR2(200) NULL, DATA_STATUS VARCHAR2(400) NULL, ODI_ORIGIN VARCHAR2(250 CHAR) NULL, ODI_CONS_NAME VARCHAR2(128 CHAR) NULL, ODI_CONS_TYPE VARCHAR2(2 CHAR) NULL, ODI_PK VARCHAR2(32 CHAR) PRIMARY KEY, ODI_SESS_NO VARCHAR2(36 CHAR) )
删除错误表之前数据
delete from TESTUSER.E$_ODI_WLS_JMS_INC_TARGET where (ODI_ERR_TYPE = 'S' and 'F' = 'S') or (ODI_ERR_TYPE = 'F' and ODI_ORIGIN = '(171)mdsProject.My_increment')
为I$_表创建索引
/* FLOW CONTROL CREATE THE INDEX ON I$TABLE */ create index TESTUSER.I$_ODI_WLS_JMS_INC_TARGET_PK on TESTUSER.I$_ODI_WLS_JMS_INC_TARGET (ID, HOST)
插入主键不唯一的行
insert into TESTUSER.E$_ODI_WLS_JMS_INC_TARGET ( ODI_PK, ODI_SESS_NO, ODI_ROW_ID, ODI_ERR_TYPE, ODI_ERR_MESS, ODI_ORIGIN, ODI_CHECK_DATE, ODI_CONS_NAME, ODI_CONS_TYPE, ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS ) select SYS_GUID(), 'abb01f39-16b9-41ba-9820-7733e137f237', rowid, 'F', 'ODI-15064: 主键 ODI_WLS_JMS_INC_TARGET_PK 不是唯一的。', '(171)mdsProject.My_increment', sysdate, 'ODI_WLS_JMS_INC_TARGET_PK', 'PK', ODI_WLS_JMS_INC_TARGET.ID, ODI_WLS_JMS_INC_TARGET.HOST, ODI_WLS_JMS_INC_TARGET.POST, ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_NAME, ODI_WLS_JMS_INC_TARGET.JMS_NAME, ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_TARGET, ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_HEALTH, ODI_WLS_JMS_INC_TARGET.MESSAGES_CURRENT_COUNT, ODI_WLS_JMS_INC_TARGET.MESSAGES_PENDING_COUNT, ODI_WLS_JMS_INC_TARGET.CONSUMERS_CURRENT_COUNT, ODI_WLS_JMS_INC_TARGET.CONSUMERS_HIGH_COUNT, ODI_WLS_JMS_INC_TARGET.CONSUMERS_TOTAL_COUNT, ODI_WLS_JMS_INC_TARGET.MESSAGES_HIGH_COUNT, ODI_WLS_JMS_INC_TARGET.MESSAGES_RECEIVED_COUNT, ODI_WLS_JMS_INC_TARGET.OBJECT_VERSION_NUMBER, ODI_WLS_JMS_INC_TARGET.CREATION_DATE, ODI_WLS_JMS_INC_TARGET.CREATED_BY, ODI_WLS_JMS_INC_TARGET.LAST_UPDATE_DATE, ODI_WLS_JMS_INC_TARGET.LAST_UPDATED_BY, ODI_WLS_JMS_INC_TARGET.DATA_STATUS from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET ODI_WLS_JMS_INC_TARGET where exists ( select SUB.ID, SUB.HOST from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET SUB where SUB.ID=ODI_WLS_JMS_INC_TARGET.ID and SUB.HOST=ODI_WLS_JMS_INC_TARGET.HOST group by SUB.ID, SUB.HOST having count(1) > 1 )
插入主键为空的行,如果是多个主键,该步骤执行多次,每次一个主键字段
insert into TESTUSER.E$_ODI_WLS_JMS_INC_TARGET ( ODI_PK, ODI_SESS_NO, ODI_ROW_ID, ODI_ERR_TYPE, ODI_ERR_MESS, ODI_CHECK_DATE, ODI_ORIGIN, ODI_CONS_NAME, ODI_CONS_TYPE, ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS ) select SYS_GUID(), 'abb01f39-16b9-41ba-9820-7733e137f237', rowid, 'F', 'ODI-15066: 列ID不能为空值。', sysdate, '(171)mdsProject.My_increment', 'ID', 'NN', ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET where ID is null
为E$_表创建索引
/* FLOW CONTROL CREATE INDEX ON THE E$TABLE */ create index TESTUSER.E$_ODI_WLS_JMS_INC_TARGET_IDX on TESTUSER.E$_ODI_WLS_JMS_INC_TARGET (ODI_ROW_ID)
从I$_表中删除错误的行
delete from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET T where exists ( select 1 from TESTUSER.E$_ODI_WLS_JMS_INC_TARGET E where ODI_SESS_NO = 'abb01f39-16b9-41ba-9820-7733e137f237' and T.rowid = E.ODI_ROW_ID )
向TESTUSER.SNP_CHECK_TAB中记录错误数据
insert into TESTUSER.SNP_CHECK_TAB ( SCHEMA_NAME, RESOURCE_NAME, FULL_RES_NAME, ERR_TYPE, ERR_MESS, CHECK_DATE, ORIGIN, CONS_NAME, CONS_TYPE, ERR_COUNT ) select 'TESTUSER', 'ODI_WLS_JMS_INC_TARGET', 'TESTUSER.ODI_WLS_JMS_INC_TARGET', E.ODI_ERR_TYPE, E.ODI_ERR_MESS, E.ODI_CHECK_DATE, E.ODI_ORIGIN, E.ODI_CONS_NAME, E.ODI_CONS_TYPE, count(1) from TESTUSER.E$_ODI_WLS_JMS_INC_TARGET E where E.ODI_ERR_TYPE = 'F' and E.ODI_ORIGIN = '(171)mdsProject.My_increment' group by E.ODI_ERR_TYPE, E.ODI_ERR_MESS, E.ODI_CHECK_DATE, E.ODI_ORIGIN, E.ODI_CONS_NAME, E.ODI_CONS_TYPE
设置I$_主键在目标表中存在的行IND_UPDATE 为U,表示这些行需要更新,为I表示需要插入
/* DETECTION_STRATEGY = NOT_EXISTS */ update TESTUSER.I$_ODI_WLS_JMS_INC_TARGET set IND_UPDATE = 'U' where (ID, HOST) in ( select ID, HOST from TESTUSER.ODI_WLS_JMS_INC_TARGET )
更新已存在的行
/* DETECTION_STRATEGY = NOT_EXISTS */ update TESTUSER.ODI_WLS_JMS_INC_TARGET T set ( T.POST, T.JMS_SERVICE_NAME, T.JMS_NAME, T.JMS_SERVICE_TARGET, T.JMS_SERVICE_HEALTH, T.MESSAGES_CURRENT_COUNT, T.MESSAGES_PENDING_COUNT, T.CONSUMERS_CURRENT_COUNT, T.CONSUMERS_HIGH_COUNT, T.CONSUMERS_TOTAL_COUNT, T.MESSAGES_HIGH_COUNT, T.MESSAGES_RECEIVED_COUNT, T.OBJECT_VERSION_NUMBER, T.CREATION_DATE, T.CREATED_BY, T.LAST_UPDATE_DATE, T.LAST_UPDATED_BY, T.DATA_STATUS ) = ( select S.POST, S.JMS_SERVICE_NAME, S.JMS_NAME, S.JMS_SERVICE_TARGET, S.JMS_SERVICE_HEALTH, S.MESSAGES_CURRENT_COUNT, S.MESSAGES_PENDING_COUNT, S.CONSUMERS_CURRENT_COUNT, S.CONSUMERS_HIGH_COUNT, S.CONSUMERS_TOTAL_COUNT, S.MESSAGES_HIGH_COUNT, S.MESSAGES_RECEIVED_COUNT, S.OBJECT_VERSION_NUMBER, S.CREATION_DATE, S.CREATED_BY, S.LAST_UPDATE_DATE, S.LAST_UPDATED_BY, S.DATA_STATUS from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET S where T.ID =S.ID and T.HOST =S.HOST ) where (ID, HOST) in ( select ID, HOST from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET where IND_UPDATE = 'U' )
插入新的行
/* DETECTION_STRATEGY = NOT_EXISTS */ insert into TESTUSER.ODI_WLS_JMS_INC_TARGET T ( ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS ) select ID, HOST, POST, JMS_SERVICE_NAME, JMS_NAME, JMS_SERVICE_TARGET, JMS_SERVICE_HEALTH, MESSAGES_CURRENT_COUNT, MESSAGES_PENDING_COUNT, CONSUMERS_CURRENT_COUNT, CONSUMERS_HIGH_COUNT, CONSUMERS_TOTAL_COUNT, MESSAGES_HIGH_COUNT, MESSAGES_RECEIVED_COUNT, OBJECT_VERSION_NUMBER, CREATION_DATE, CREATED_BY, LAST_UPDATE_DATE, LAST_UPDATED_BY, DATA_STATUS from TESTUSER.I$_ODI_WLS_JMS_INC_TARGET S where IND_UPDATE = 'I'
删除工作表
drop table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE purge
删除I$_表
drop table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
在标记标识符为U之前,向I$_表中插入需要删除的数据
Insert deleted rows
/* DETECTION_STRATEGY = NOT_EXISTS */ insert into <%=odiRef.getTable("L","INT_NAME","A")%> ( <%=odiRef.getColList("", "[COL_NAME]", ", ", "", "UK")%>, IND_UPDATE ) select <%=odiRef.getColList("", "[COL_NAME]", ", ", "", "UK")%>, 'D' from <%=odiRef.getTable("L", "TARG_NAME", "A")%> T2 where NOT EXISTS ( select 'X' from <%=odiRef.getFrom(0)%> where (<%=odiRef.getColList("","[EXPRESSION]/t= T2.[COL_NAME]", "/n/tand/t", "", "UK")%> ) )
Synchronize deletions
从目标表删除I$_表中标识符为D的数据
delete from <%=odiRef.getTable("L","TARG_NAME","A")%> where exists ( select 'X' from <%=odiRef.getTable("L","INT_NAME","A")%> <%=odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> I where <%=odiRef.getColList("", odiRef.getTable("L","TARG_NAME","A") + ".[COL_NAME] = I.[COL_NAME]", "/n/t/tand /t", "", "UK")%> and IND_UPDATE = 'D' )