CREATE TABLE NIC_EDW_STG.JOB_MASTER
(
JOB_ID NUMBER,
JOB_NAME VARCHAR2(100),
PKG_NAME VARCHAR2(200),
JOB_TYPE_ID NUMBER,
JOB_RUN_DAY NUMBER(2,0),
JOB_RUN_INTERVAL NUMBER,
JOB_RUN_TIME TIMESTAMP (6),
LAST_SCHED_TIME TIMESTAMP (6),
WAIT_TIME TIMESTAMP (6),
JOB_ACTIVE NUMBER(2,0),
JOB_FATAL NUMBER(2,0),
JOB_LOAD_TYPE VARCHAR2(20),
SCENARIO_VER VARCHAR2(10) DEFAULT 001
)
CREATE TABLE NIC_EDW_STG.JOB_QUEUE
(
QUEUE_ID NUMBER,
SEQ_ID NUMBER,
JOB_ID NUMBER,
JOB_START_TIME TIMESTAMP (6),
JOB_END_TIME TIMESTAMP (6),
JOB_STATUS NUMBER(2,0)
)
CREATE TABLE NIC_EDW_STG.JOB_EVENTS
(
EVENT_ID NUMBER,
SEQ_NO NUMBER,
QUEUE_ID NUMBER,
STEP_ID NUMBER,
LOG_TIME TIMESTAMP (6),
STEP_NAME VARCHAR2(100),
STEP_DESC VARCHAR2(100),
STEP_ERROR VARCHAR2(250),
TOTAL_AFFECTED_ROWS NUMBER,
STEP_STATUS NUMBER(2,0)
)
CREATE TABLE NIC_EDW_STG.JOB_DEPENDENCY
(
JOB_ID NUMBER,
JOB_PARENT_ID NUMBER,
PARENT_OLTP_F CHAR(1)
)
/* ODI work repository table */
CREATE TABLE DWPROD_ODIWORK_EXEC.SNP_SESSION
( SESS_NO NUMBER(19,0) NOT NULL ENABLE,
SESS_NAME VARCHAR2(436 CHAR) NOT NULL ENABLE,
SCEN_VERSION VARCHAR2(35 CHAR),
LOG_LEVEL NUMBER(4,0) NOT NULL ENABLE,
THREAD_ID VARCHAR2(35 CHAR),
THREAD_CREATION VARCHAR2(35 CHAR),
SESS_BEG DATE,
SESS_END DATE,
SESS_DUR NUMBER(10,2),
SESS_STATUS VARCHAR2(2 CHAR) NOT NULL ENABLE,
SESS_RC VARCHAR2(35 CHAR),
SESS_MESS VARCHAR2(250 CHAR),
SESS_PARAMS VARCHAR2(250 CHAR),
SESS_KEYWORDS VARCHAR2(250 CHAR),
MASTER_AGENT_NAME VARCHAR2(35 CHAR),
SYNCHRO VARCHAR2(1 CHAR),
AGENT_NAME VARCHAR2(35 CHAR),
CONTEXT_CODE VARCHAR2(35 CHAR) NOT NULL ENABLE,
PARENT_SESS_NO NUMBER(19,0),
NB_CHILD_RUN NUMBER(10,0),
NB_CHILD_OK NUMBER(10,0),
NB_CHILD_KO NUMBER(10,0),
SCEN_NAME VARCHAR2(400 CHAR),
USER_NAME VARCHAR2(35 CHAR),
I_TXT_SESS_MESS NUMBER(19,0),
I_TXT_SESS_PARAMS NUMBER(19,0)
)
**************************************************************************
/* PROCESS_STATUS_PROC-->Procedure-->PROCESS_STATUS_PROC-->Update Status */
DECLARE
CURSOR SESS IS SELECT sess_status FROM odi_exc_repository.snp_session WHERE
SESS_NO=96073565 ;
CURSOR STAT_CUR IS SELECT QUEUE_ID,SEQ_ID,JOB_ID,JOB_START_TIME,DIFF,WAIT_TIME,JOB_STATUS,STEP_STATUS,STEP_STATUS_C,EVENT_D_C FROM (
SELECT DISTINCT a.QUEUE_ID
,a.SEQ_ID
,a.JOB_ID
,a.JOB_START_TIME
,ROUND((TO_DATE(TO_CHAR(SYSDATE,'DD-MM-YYYY HH24:MI:SS'),'DD-MM-YYYY HH24:MI:SS')
- TO_DATE(TO_CHAR(a.job_start_time,'DD-MM-YYYY HH24:MI:SS'),'DD-MM-YYYY HH24:MI:SS'))
*1440) "DIFF"
,d.WAIT_TIME
,a.JOB_STATUS
,NVL(
(SELECT STEP_STATUS FROM job_events WHERE (event_id,seq_no)=( SELECT MAX(event_id),MAX(seq_no) FROM job_events WHERE event_id=
(SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID))),0) "STEP_STATUS"
,DECODE((SELECT NVL(STEP_STATUS,0) FROM job_events WHERE (event_id,seq_no)=( SELECT MAX(event_id),MAX(seq_no) FROM job_events WHERE event_id=
(SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID))),1,'Running',2,'Succeeded',3,'Failed','NR') "STEP_STATUS_C"
,(SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID) "EVENT_D_C"
FROM job_queue a
,job_dependency b
,job_events c
,job_master d
WHERE a.job_id=b.job_id(+) AND a.job_id=d.job_id AND a.queue_id=c.QUEUE_ID(+) -- and a.job_status <=1
GROUP BY a.QUEUE_ID
,a.SEQ_ID
,a.JOB_ID
,a.JOB_START_TIME
,d.WAIT_TIME
,a.JOB_STATUS
ORDER BY queue_id
) WHERE JOB_STATUS <> STEP_STATUS OR job_status=0;
*********************************************************************
/* PROCESS_CONTROL-->Variable-->ETL_PROCESS_CNTL_START--> */
SELECT NVL2(WM_CONCAT(SID||':'||SERIAL#),
'PREVIUOS RUN OF PROCESS_CONTROL IS RUNNING FOR SID:SERIAL# '||WM_CONCAT(SID||':'||SERIAL#),
999) STATUS FROM GV$SESSION WHERE USERNAME='PROC_CNTL' AND STATUS<>'KILLED'
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->CHECK PREV LOAD */
DECLARE
CNT NUMBER(2);
ETL_REF NUMBER(10);
NEW_ERROR EXCEPTION;
BEGIN
SELECT ETL_REF INTO ETL_REF FROM ETL_REF;
SELECT COUNT(DISTINCT A.TABLE_TYPE) INTO CNT FROM ETL_SRCSTG_EXT_STATUS A, ETL_REF B WHERE A.ETL_REF=B.ETL_REF;
IF CNT = 2 THEN
DBMS_OUTPUT.PUT_LINE('PREV LOAD COMPLETED');
ELSE
RAISE NEW_ERROR;
END IF;
EXCEPTION
WHEN NEW_ERROR THEN
RAISE_APPLICATION_ERROR(-20001,'PREVIOUS LOAD FOR THE ETL_REF '||ETL_REF||' NOT COMPLETED');
END;
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->TRUNC_JOB_QUEUE */
TRUNCATE TABLE JOB_QUEUE
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->ETL_REF_INCREMENT */
DECLARE
CUR_ETL NUMBER(10);
TOT NUMBER(2);
CNT NUMBER(1);
BEGIN
SELECT TO_NUMBER(TO_CHAR(SYSDATE-1,'YYYYMMDD')) INTO CUR_ETL FROM DUAL;
DBMS_OUTPUT.PUT_LINE('Todays Etl Ref'||CUR_ETL);
SELECT COUNT(1) INTO TOT FROM ETL_REF WHERE SUBSTR(ETL_REF,1,8)=CUR_ETL;
DBMS_OUTPUT.PUT_LINE('Available Values '||TOT);
SELECT COUNT(1) INTO CNT FROM ETL_REF;
CUR_ETL:=CUR_ETL||'00';
IF CNT =0 THEN
INSERT INTO ETL_REF VALUES(CUR_ETL,SYSDATE);
COMMIT;
ELSE
IF TOT = 0 THEN
UPDATE ETL_REF SET ETL_REF=CUR_ETL,START_TIME=SYSDATE;
COMMIT;
ELSE
DBMS_OUTPUT.PUT_LINE('Already etl_Ref exists');
SELECT ETL_REF INTO CUR_ETL FROM ETL_REF;
CUR_ETL:=CUR_ETL+1;
DBMS_OUTPUT.PUT_LINE('The Next value will be '||CUR_ETL);
UPDATE ETL_REF SET ETL_REF=CUR_ETL,START_TIME=SYSDATE;
COMMIT;
END IF;
END IF;
INSERT INTO ETL_REF_HIS_VALUE
SELECT A.ETL_REF,SYSTIMESTAMP FROM ETL_REF A WHERE NOT EXISTS ( SELECT 'X' FROM ETL_REF_HIS_VALUE B WHERE A.ETL_REF=B.ETL_REF);COMMIT;
END;
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->UPDATE JOB MASTER */
BEGIN
UPDATE JOB_MASTER SET JOB_RUN_TIME=SYSDATE+1/(24*60) WHERE JOB_ACTIVE=1;
COMMIT;
END;
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->TRUNC_JOB_EVENTS */
TRUNCATE TABLE JOB_EVENTS
/* PROCESS_CONTROL-->Procedure-->PROCESS_ETL_REF_LOAD_PROC-->PROCESS_ETL_REF_LOAD_PROC */
BEGIN
/*EXECUTE IMMEDIATE 'UPDATE ETL_REF SET ETL_REF=(SELECT ETL_REF+1 FROM ETL_REF),START_TIME=SYSTIMESTAMP'; TO BE INCLUDEDE LATER */
EXECUTE IMMEDIATE 'TRUNCATE TABLE ETL_REF_LOAD';
EXECUTE IMMEDIATE 'INSERT INTO ETL_REF_LOAD SELECT B.SCENARIO_NAME,B.TABLE_NAME,B.CAT_VAL,A.ETL_REF ETL_REF,D.START_TIME FROM ETL_REF_HIS_VALUE A,
ETL_DATE B, ETL_DATE C, ETL_REF D
WHERE
B.TABLE_NAME=C.TABLE_NAME
AND B.SCENARIO_NAME=C.SCENARIO_NAME
AND NVL(B.CAT_VAL,'||'''1'''||')=NVL(C.CAT_VAL,'||'''1'''||')
AND A.ETL_REF > NVL(B.ETL_REF,0)
AND B.SCENARIO_NAME NOT LIKE '||'''SRCSTG%'''||'
ORDER BY C.TABLE_NAME, C.SCENARIO_NAME, C.CAT_VAL,A.ETL_REF';
END;
***********************************************************************
/* PROCESS_SCHEDULING-->Procedure-->PROCESS_SCHEDULING_PROC-->Process Scheduling */
declare
CURSOR que_cur IS
SELECT job_id,RUN_TIME,FLAG,job_run_interval FROM (
SELECT job_id,TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,'
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') RUN_TIME,'X' FLAG,job_run_interval
FROM job_masterWHERE job_active=1 AND (
CASE job_run_day
WHEN 1 THEN TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,'
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
WHEN 2 THEN TO_DATE(TO_CHAR(NEXT_DAY(SYSDATE,'SAT'),'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,'
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
WHEN 3 THEN TO_DATE(TO_CHAR(LAST_DAY(SYSDATE),'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,'
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
END )
BETWEEN SYSDATE AND SYSDATE+(10/(24*60))
UNION
SELECT job_id,(CASE WHENJOB_RUN_TIME >= LAST_SCHED_TIME THEN JOB_RUN_TIME ELSE
LAST_SCHED_TIME END ) RUN_TIME,'I' FLAG,job_run_interval
FROM ( SELECT job_id,TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,'
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') JOB_RUN_TIME,
NVL(LAST_SCHED_TIME,TO_DATE('01/01/1907','dd/mm/yyyy')) LAST_SCHED_TIME
,job_run_interval
FROM job_master
WHERE job_run_day=0 AND job_active=1)
WHERE ( CASE WHEN JOB_RUN_TIME >= NVL(LAST_SCHED_TIME,TO_DATE('01/01/1907','dd/mm/yyyy'))
THEN JOB_RUN_TIME ELSE LAST_SCHED_TIME END )
BETWEEN SYSDATE AND SYSDATE+(10/(24*60)));
QUEUE_JOB_ID INTEGER:=#NIC_APPLICATION.SCH_QUEUE_ID;
prc_job_id INTEGER:=#NIC_APPLICATION.SCH_JOB_ID;
queue_id_inc INTEGER;
seq_id_inc INTEGER;
src_count INTEGER :=0;
eve_id_inc integer;
no_of_job integer :=0;
err_flag integer :=0;
SESS_NO varchar2(60);
BEGIN
insert into job_events (EVENT_ID,SEQ_NO, QUEUE_ID, STEP_ID, LOG_TIME, STEP_NAME, STEP_DESC, STEP_ERROR,
TOTAL_AFFECTED_ROWS, STEP_STATUS)values(96071565,(select nvl(max(seq_no),0)+1 from job_events where event_id=96071565) ,QUEUE_JOB_ID,10,systimestamp,'Job identification','Collect the job to put in job queue',null,null,1);
COMMIT;
SELECT NVL(MAX(queue_id),0),NVL(MAX(seq_id),0) INTO queue_id_inc,seq_id_inc FROM job_queue;
seq_id_inc:=seq_id_inc+1;
FOR cur IN que_cur LOOP
queue_id_inc:=queue_id_inc+1;
INSERT INTO job_queue (QUEUE_ID, SEQ_ID, JOB_ID, JOB_START_TIME,
JOB_END_TIME, JOB_STATUS)
VALUES (queue_id_inc,seq_id_inc,cur.job_id,cur.RUN_TIME,NULL,0);
no_of_job:=no_of_job+1;
IF cur.flag='I' AND CUR.job_run_interval>0 THEN
UPDATE job_master SET
LAST_SCHED_TIME=cur.run_time+(cur.job_run_interval/(24*60)) WHERE job_id = cur.job_id ;
END IF;
END LOOP;
COMMIT;
insert into job_events (EVENT_ID,SEQ_NO, QUEUE_ID, STEP_ID, LOG_TIME, STEP_NAME, STEP_DESC, STEP_ERROR,
TOTAL_AFFECTED_ROWS, STEP_STATUS)values(96071565,(select nvl(max(seq_no),0)+1 from job_events where event_id=96071565 ),QUEUE_JOB_ID,99,systimestamp,'Job Completed','Completed',null,null,2);
COMMIT;
END;
****************************************************************************
/* PROCESS_CONTROL-->Procedure-->PROCESS_CONTROL_PROC-->Ins into JOB_QUEUE */
declare
sed_id_c integer:=0;
QUEUE_ID_C integer:=0;
begin
select nvl(max(seq_id),0) into sed_id_c from job_queue;
select nvl(max(queue_id),0) into QUEUE_ID_C from job_queue;
sed_id_c:=sed_id_c+1;
QUEUE_ID_C:=QUEUE_ID_C+1;
INSERT INTO JOB_QUEUE ( QUEUE_ID, SEQ_ID, JOB_ID, JOB_START_TIME, JOB_END_TIME,JOB_STATUS ) VALUES
(QUEUE_ID_C, sed_id_c, 1, SYSDATE, NULL, 0);
COMMIT;
end;