Sunday, February 17, 2019

Process all

CREATE TABLE NIC_EDW_STG.JOB_MASTER

(
        JOB_ID NUMBER,

JOB_NAME VARCHAR2(100),

PKG_NAME VARCHAR2(200),

JOB_TYPE_ID NUMBER,

JOB_RUN_DAY NUMBER(2,0),

JOB_RUN_INTERVAL NUMBER,

JOB_RUN_TIME TIMESTAMP (6),

LAST_SCHED_TIME TIMESTAMP (6),

WAIT_TIME TIMESTAMP (6),

JOB_ACTIVE NUMBER(2,0),

JOB_FATAL NUMBER(2,0),

JOB_LOAD_TYPE VARCHAR2(20),

SCENARIO_VER VARCHAR2(10) DEFAULT 001

)

CREATE TABLE NIC_EDW_STG.JOB_QUEUE
(         
QUEUE_ID NUMBER,

SEQ_ID NUMBER,

JOB_ID NUMBER,

JOB_START_TIME TIMESTAMP (6),

JOB_END_TIME TIMESTAMP (6),

JOB_STATUS NUMBER(2,0)
)

CREATE TABLE NIC_EDW_STG.JOB_EVENTS
(         
EVENT_ID NUMBER,

SEQ_NO NUMBER,

QUEUE_ID NUMBER,

STEP_ID NUMBER,

LOG_TIME TIMESTAMP (6),

STEP_NAME VARCHAR2(100),

STEP_DESC VARCHAR2(100),

STEP_ERROR VARCHAR2(250),

TOTAL_AFFECTED_ROWS NUMBER,

STEP_STATUS NUMBER(2,0)
)



CREATE TABLE NIC_EDW_STG.JOB_DEPENDENCY
(         
JOB_ID NUMBER,

JOB_PARENT_ID NUMBER,

PARENT_OLTP_F CHAR(1)

)



/* ODI work repository table */



CREATE TABLE DWPROD_ODIWORK_EXEC.SNP_SESSION

(           SESS_NO NUMBER(19,0) NOT NULL ENABLE,

SESS_NAME VARCHAR2(436 CHAR) NOT NULL ENABLE,

SCEN_VERSION VARCHAR2(35 CHAR),

LOG_LEVEL NUMBER(4,0) NOT NULL ENABLE,

THREAD_ID VARCHAR2(35 CHAR),

THREAD_CREATION VARCHAR2(35 CHAR),

SESS_BEG DATE,

SESS_END DATE,

SESS_DUR NUMBER(10,2),

SESS_STATUS VARCHAR2(2 CHAR) NOT NULL ENABLE,

SESS_RC VARCHAR2(35 CHAR),

SESS_MESS VARCHAR2(250 CHAR),

SESS_PARAMS VARCHAR2(250 CHAR),

SESS_KEYWORDS VARCHAR2(250 CHAR),

MASTER_AGENT_NAME VARCHAR2(35 CHAR),

SYNCHRO VARCHAR2(1 CHAR),

AGENT_NAME VARCHAR2(35 CHAR),

CONTEXT_CODE VARCHAR2(35 CHAR) NOT NULL ENABLE,

PARENT_SESS_NO NUMBER(19,0),

NB_CHILD_RUN NUMBER(10,0),

NB_CHILD_OK NUMBER(10,0),

NB_CHILD_KO NUMBER(10,0),

SCEN_NAME VARCHAR2(400 CHAR),

USER_NAME VARCHAR2(35 CHAR),

I_TXT_SESS_MESS NUMBER(19,0),

I_TXT_SESS_PARAMS NUMBER(19,0)

)
**************************************************************************

/* PROCESS_STATUS_PROC-->Procedure-->PROCESS_STATUS_PROC-->Update Status */
 DECLARE
CURSOR SESS IS SELECT sess_status FROM odi_exc_repository.snp_session WHERE
SESS_NO=96073565 ;
CURSOR STAT_CUR IS   SELECT QUEUE_ID,SEQ_ID,JOB_ID,JOB_START_TIME,DIFF,WAIT_TIME,JOB_STATUS,STEP_STATUS,STEP_STATUS_C,EVENT_D_C FROM (
 SELECT DISTINCT a.QUEUE_ID
   ,a.SEQ_ID
   ,a.JOB_ID
   ,a.JOB_START_TIME
   ,ROUND((TO_DATE(TO_CHAR(SYSDATE,'DD-MM-YYYY HH24:MI:SS'),'DD-MM-YYYY HH24:MI:SS')
   - TO_DATE(TO_CHAR(a.job_start_time,'DD-MM-YYYY HH24:MI:SS'),'DD-MM-YYYY HH24:MI:SS'))
   *1440) "DIFF"
   ,d.WAIT_TIME
   ,a.JOB_STATUS
   ,NVL(
   (SELECT STEP_STATUS FROM job_events WHERE (event_id,seq_no)=( SELECT MAX(event_id),MAX(seq_no)  FROM job_events WHERE event_id=
   (SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID))),0)  "STEP_STATUS"
    ,DECODE((SELECT NVL(STEP_STATUS,0) FROM job_events WHERE (event_id,seq_no)=( SELECT MAX(event_id),MAX(seq_no) FROM job_events WHERE event_id=
   (SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID))),1,'Running',2,'Succeeded',3,'Failed','NR') "STEP_STATUS_C"
  ,(SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID) "EVENT_D_C"
FROM    job_queue a
,job_dependency b
,job_events c
,job_master d
WHERE a.job_id=b.job_id(+) AND a.job_id=d.job_id AND a.queue_id=c.QUEUE_ID(+)  -- and a.job_status <=1
GROUP BY a.QUEUE_ID
   ,a.SEQ_ID
   ,a.JOB_ID
   ,a.JOB_START_TIME
       ,d.WAIT_TIME
   ,a.JOB_STATUS
ORDER BY queue_id
) WHERE JOB_STATUS <> STEP_STATUS OR job_status=0;
*********************************************************************


 
/* PROCESS_CONTROL-->Variable-->ETL_PROCESS_CNTL_START--> */ 
 SELECT NVL2(WM_CONCAT(SID||':'||SERIAL#), 
'PREVIUOS RUN OF PROCESS_CONTROL IS RUNNING FOR SID:SERIAL# '||WM_CONCAT(SID||':'||SERIAL#),
999) STATUS FROM GV$SESSION WHERE USERNAME='PROC_CNTL' AND STATUS<>'KILLED' 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->CHECK PREV LOAD */ 
 DECLARE
CNT  NUMBER(2);
ETL_REF NUMBER(10);
NEW_ERROR EXCEPTION;
BEGIN
SELECT ETL_REF INTO ETL_REF FROM ETL_REF;
SELECT COUNT(DISTINCT A.TABLE_TYPE) INTO CNT FROM ETL_SRCSTG_EXT_STATUS A, ETL_REF B WHERE A.ETL_REF=B.ETL_REF;
IF CNT = 2 THEN
DBMS_OUTPUT.PUT_LINE('PREV LOAD COMPLETED');
ELSE
RAISE NEW_ERROR;
END IF;
EXCEPTION
WHEN NEW_ERROR THEN
RAISE_APPLICATION_ERROR(-20001,'PREVIOUS LOAD FOR THE ETL_REF '||ETL_REF||' NOT COMPLETED');
END; 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->TRUNC_JOB_QUEUE */ 
 TRUNCATE TABLE  JOB_QUEUE 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->ETL_REF_INCREMENT */ 
 DECLARE
CUR_ETL NUMBER(10);
TOT NUMBER(2);
CNT NUMBER(1);
BEGIN
SELECT TO_NUMBER(TO_CHAR(SYSDATE-1,'YYYYMMDD'))  INTO CUR_ETL FROM DUAL;
DBMS_OUTPUT.PUT_LINE('Todays Etl Ref'||CUR_ETL);
SELECT COUNT(1) INTO TOT  FROM ETL_REF WHERE SUBSTR(ETL_REF,1,8)=CUR_ETL;
DBMS_OUTPUT.PUT_LINE('Available Values '||TOT);
SELECT COUNT(1) INTO CNT FROM ETL_REF;
CUR_ETL:=CUR_ETL||'00';
IF CNT =0 THEN
   INSERT INTO ETL_REF VALUES(CUR_ETL,SYSDATE);
   COMMIT; 
ELSE
  IF TOT = 0 THEN
    UPDATE ETL_REF SET ETL_REF=CUR_ETL,START_TIME=SYSDATE;
    COMMIT;
    ELSE
    DBMS_OUTPUT.PUT_LINE('Already etl_Ref exists');
    SELECT ETL_REF INTO CUR_ETL FROM ETL_REF;
    CUR_ETL:=CUR_ETL+1;
    DBMS_OUTPUT.PUT_LINE('The Next value will be '||CUR_ETL);
    UPDATE ETL_REF SET ETL_REF=CUR_ETL,START_TIME=SYSDATE;
    COMMIT;
    END IF;
END IF;    
    INSERT INTO ETL_REF_HIS_VALUE 
    SELECT A.ETL_REF,SYSTIMESTAMP FROM ETL_REF A WHERE NOT EXISTS ( SELECT 'X' FROM ETL_REF_HIS_VALUE B WHERE A.ETL_REF=B.ETL_REF);COMMIT;
END; 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->UPDATE JOB MASTER */ 
 BEGIN
UPDATE JOB_MASTER SET JOB_RUN_TIME=SYSDATE+1/(24*60) WHERE JOB_ACTIVE=1;
COMMIT;
END; 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->TRUNC_JOB_EVENTS */ 
 TRUNCATE TABLE JOB_EVENTS 
 
/* PROCESS_CONTROL-->Procedure-->PROCESS_ETL_REF_LOAD_PROC-->PROCESS_ETL_REF_LOAD_PROC */ 
 BEGIN
/*EXECUTE IMMEDIATE 'UPDATE ETL_REF SET ETL_REF=(SELECT ETL_REF+1 FROM ETL_REF),START_TIME=SYSTIMESTAMP';  TO BE INCLUDEDE LATER */ 
EXECUTE IMMEDIATE 'TRUNCATE TABLE ETL_REF_LOAD';
EXECUTE IMMEDIATE 'INSERT INTO ETL_REF_LOAD SELECT B.SCENARIO_NAME,B.TABLE_NAME,B.CAT_VAL,A.ETL_REF ETL_REF,D.START_TIME FROM ETL_REF_HIS_VALUE A,
        ETL_DATE B, ETL_DATE C, ETL_REF D
  WHERE 
      B.TABLE_NAME=C.TABLE_NAME
      AND B.SCENARIO_NAME=C.SCENARIO_NAME
      AND NVL(B.CAT_VAL,'||'''1'''||')=NVL(C.CAT_VAL,'||'''1'''||')
      AND A.ETL_REF > NVL(B.ETL_REF,0)
      AND B.SCENARIO_NAME NOT LIKE '||'''SRCSTG%'''||'
      ORDER BY C.TABLE_NAME, C.SCENARIO_NAME, C.CAT_VAL,A.ETL_REF';
END; 


***********************************************************************

 
 
/* PROCESS_SCHEDULING-->Procedure-->PROCESS_SCHEDULING_PROC-->Process Scheduling */ 
 declare
CURSOR que_cur IS  
SELECT job_id,RUN_TIME,FLAG,job_run_interval FROM (
SELECT job_id,TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') RUN_TIME,'X' FLAG,job_run_interval
FROM job_masterWHERE job_active=1  AND (
CASE job_run_day 
WHEN 1 THEN TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
WHEN 2 THEN TO_DATE(TO_CHAR(NEXT_DAY(SYSDATE,'SAT'),'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
WHEN 3 THEN TO_DATE(TO_CHAR(LAST_DAY(SYSDATE),'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
END )
BETWEEN SYSDATE  AND SYSDATE+(10/(24*60)) 
UNION
SELECT job_id,(CASE  WHENJOB_RUN_TIME >= LAST_SCHED_TIME THEN  JOB_RUN_TIME ELSE 
LAST_SCHED_TIME  END ) RUN_TIME,'I' FLAG,job_run_interval
FROM ( SELECT job_id,TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') JOB_RUN_TIME,
NVL(LAST_SCHED_TIME,TO_DATE('01/01/1907','dd/mm/yyyy')) LAST_SCHED_TIME 
,job_run_interval
FROM job_master 
WHERE job_run_day=0 AND job_active=1)
WHERE ( CASE WHEN  JOB_RUN_TIME >= NVL(LAST_SCHED_TIME,TO_DATE('01/01/1907','dd/mm/yyyy')) 
THEN  JOB_RUN_TIME  ELSE LAST_SCHED_TIME END ) 
BETWEEN SYSDATE AND SYSDATE+(10/(24*60)));
QUEUE_JOB_ID INTEGER:=#NIC_APPLICATION.SCH_QUEUE_ID;
prc_job_id INTEGER:=#NIC_APPLICATION.SCH_JOB_ID;
queue_id_inc INTEGER;
seq_id_inc INTEGER;
src_count INTEGER :=0;
eve_id_inc integer;
no_of_job integer :=0;
err_flag integer :=0;
SESS_NO varchar2(60);
BEGIN
insert into job_events (EVENT_ID,SEQ_NO, QUEUE_ID, STEP_ID, LOG_TIME, STEP_NAME, STEP_DESC, STEP_ERROR, 
TOTAL_AFFECTED_ROWS, STEP_STATUS)values(96071565,(select nvl(max(seq_no),0)+1 from job_events where event_id=96071565) ,QUEUE_JOB_ID,10,systimestamp,'Job identification','Collect the job to put in job queue',null,null,1);
COMMIT;
SELECT NVL(MAX(queue_id),0),NVL(MAX(seq_id),0) INTO queue_id_inc,seq_id_inc FROM job_queue;
seq_id_inc:=seq_id_inc+1;
FOR cur IN que_cur LOOP
queue_id_inc:=queue_id_inc+1;
INSERT INTO job_queue (QUEUE_ID, SEQ_ID, JOB_ID, JOB_START_TIME, 
JOB_END_TIME, JOB_STATUS) 
VALUES (queue_id_inc,seq_id_inc,cur.job_id,cur.RUN_TIME,NULL,0);
no_of_job:=no_of_job+1;
IF cur.flag='I' AND CUR.job_run_interval>0 THEN
UPDATE job_master SET 
LAST_SCHED_TIME=cur.run_time+(cur.job_run_interval/(24*60)) WHERE job_id = cur.job_id ;
END IF;
END LOOP;
COMMIT;
insert into job_events (EVENT_ID,SEQ_NO, QUEUE_ID, STEP_ID, LOG_TIME, STEP_NAME, STEP_DESC, STEP_ERROR, 
TOTAL_AFFECTED_ROWS, STEP_STATUS)values(96071565,(select nvl(max(seq_no),0)+1 from job_events where event_id=96071565 ),QUEUE_JOB_ID,99,systimestamp,'Job Completed','Completed',null,null,2);
COMMIT;
END;

****************************************************************************

 
/* PROCESS_CONTROL-->Procedure-->PROCESS_CONTROL_PROC-->Ins into JOB_QUEUE */ 
 declare
sed_id_c integer:=0;
QUEUE_ID_C integer:=0;
begin
select nvl(max(seq_id),0) into sed_id_c from job_queue;
select nvl(max(queue_id),0) into QUEUE_ID_C from job_queue;
sed_id_c:=sed_id_c+1;
QUEUE_ID_C:=QUEUE_ID_C+1;
INSERT INTO JOB_QUEUE ( QUEUE_ID, SEQ_ID, JOB_ID, JOB_START_TIME, JOB_END_TIME,JOB_STATUS ) VALUES
(QUEUE_ID_C, sed_id_c, 1, SYSDATE, NULL, 0); 
COMMIT;
end; 

No comments:

Post a Comment