Sunday, February 17, 2019

ps fill


/* PROCESS_CONTROL-->Procedure-->PROCESS_CONTROL_PROC-->Job cntl */
 import java.sql as sql
import java.lang as lang
import java.lang.Integer
from com.ziclix.python.sql import zxJDBC
import thread
import synchronize
import os
import sys
import time
import java.util.Date as Date

conn_name='PROC_CNTL'
exe_path=os.environ['ODI_SCRIPT']
# function  check_exception :  This function is called incase of Jython Exception . Upon exception, this checks  job_queue table if all the jobs are completed in job_queue,
#                                                    then exits the program successfully by returning 0, else return 1 noting failure
def check_exception():
    myCon = sql.DriverManager.getConnection('jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))',"NIC_EDW_STG","<@=snpRef.getInfo("SRC_PASS") @>")
    myStmt = myCon.createStatement()
    jobidstr="SELECT count(job_id) as cnt  from nic_edw_stg.job_queue where job_status in (0,1,3)"
    jobidRs = myStmt.executeQuery(jobidstr)
    while(jobidRs.next()):
print jobidRs.getString("cnt")
job_cnt=jobidRs.getString("cnt")
    InsStr = "INSERT INTO NIC_EDW_STG.JOB_EVENTS VALUES(-101,-11,-99,-11,SYSTIMESTAMP,'Process Control Exception','Jython code reached Exception check method',NULL,NULL,2)"
    myStmt.executeUpdate(InsStr)
    myCon.commit()
    myCon.close()
    return job_cnt
# functionstartpath  : Executes the scenario with the given queueid as parameter
def startpath(queueid):
    myCon = sql.DriverManager.getConnection('jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))',"NIC_EDW_STG","<@=snpRef.getInfo("SRC_PASS") @>")
    myStmt = myCon.createStatement()
    jobidstr="SELECT m.job_id job_id,q.queue_id,m.PKG_NAME,m.SCENARIO_VER FROM JOB_QUEUE q,JOB_MASTER m WHERE  m.JOB_ID(+)=q.JOB_ID AND q.QUEUE_ID="+str(queueid)
    jobidRs = myStmt.executeQuery(jobidstr)
    while(jobidRs.next()):
path="./startscen.sh "+jobidRs.getString("PKG_NAME")+" "+jobidRs.getString("SCENARIO_VER")+" DEV -v=5 -NAME=ROOT NIC_APPLICATION.SCH_QUEUE_ID="+str(jobidRs.getString("queue_id"))+" NIC_APPLICATION.SCH_JOB_ID="+str(jobidRs.getString("job_id"))
    myCon.close()
    return pathdef ins(path):
  list=os.system (path)
# function job selects the job to run based on the dependency and status of the job
def job():
try:
job_sat=0
obs_status=0
rec_count=0
curr_cnt=0
cnt_stop=10
str_username=''
str_password=''
# Decryption code to get the credentials of the schema using the encrypted file
try:
str_cmd_dcr="openssl enc -d  -aes-256-cbc -in "+exe_path+"/nic_etl_password.enc -pass file:"+exe_path+"/.nic_edw_key -out "+exe_path+"/nic_etl_password.dcy"
os.system (str_cmd_dcr)
dcy_file=exe_path+"/nic_etl_password.dcy"
fl_dcr_pwd = open (dcy_file,'r')
for str_main in fl_dcr_pwd.readlines():
str_list = str_main.split('\t')
if str_list[0] == conn_name:
str_username=str_list[1]str_password=str_list[2]
except IOError:
print "Error: nic_etl_password.dcy file not found to obtain the connection details"
except:
raise
if str_username == '' or str_password == '' :
print " Error: Username or Password is null \nExiting from the Script"
sys.exit()
db = zxJDBC.connect("jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))", str_username,str_password, "oracle.jdbc.driver.OracleDriver")
c = db.cursor()
d = db.cursor()
e = db.cursor()
cnt= db.cursor()
ext_var=1

while(ext_var==1):
rec_count+=1
c.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)  END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULL END AS status1 FROM (SELECT a.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id  =  (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)  ) ) a , (SELECTDISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id = b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
row_ex=c.fetchone()if row_ex==None:
break
c.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)  END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULL END AS status1 FROM (SELECT a.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id ,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id  =  (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN(0,1,3)  ) ) a , (SELECT DISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id = b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
#sql to find the running and waiting jobs
if c.fetchone()!=None:
e.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULL END AS status1 FROM (SELECTa.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id ,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id  =  (SELECT MIN (seq_id) FROMNIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)  ) ) a , (SELECT DISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id = b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
row=e.fetchone()
if row==None:
break
c.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)  END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULLEND AS status1 FROM (SELECT a.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id ,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id=  (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)  ) ) a , (SELECT DISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id= b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
for cur in c.fetchall():
rec_count=0
obs_status=0
cnt.execute("SELECT count(*) as cnt from nic_edw_stg.job_queue where job_status=1")
rowcnt=cnt.fetchone() # the cursor cnt is used to exit from the Loop when there is no job with the status 1
if rowcnt==None:
break
cnt.execute("SELECT count(*) as cnt from nic_edw_stg.job_queue where job_status=1")
for cnt_c in cnt.fetchall():curr_cnt=cnt_c[0]
# the below code checks the status and dependency of a job to execute
if cur[3]==0.0:
if curr_cnt < cnt_stop:
if Date.getTime(cur[6]) < Date.getTime(cur[7]):
if cur[2]==None:
job_sat=1obs_status=1
else:
if(cur[5]==1.0 or cur[5]==0.0) and (cur[4]==1.0 or cur[4]==0.0):
job_sat=0
elif cur[5]==1.0 and cur[4]==2.0:
job_sat=1
obs_status=1
elif cur[5]==1.0 and (cur[4]==3.0 or cur[4]==4.0):
job_sat=0
obs_status=0
elif cur[5]==0.0 and cur[4]==2.0:
job_sat=1
obs_status=1
elif cur[5] == 0.0 and (cur[4]==2.0 or cur[4]==3.0 or cur[4]==4.0):
job_sat=1
obs_status=1
if job_sat==1: # update the job id 's  status as 1 which is to be executed next
upd='UPDATE NIC_EDW_STG.JOB_QUEUE set job_start_time=sysdate,job_status='+str(obs_status)+' where queue_id='+str(cur[0])d.execute(upd)
db.commit()
b=str(cur[0])
a=b.split(".")
path=startpath(a[0])
thread.start_new_thread(ins,(path,))

if rec_count>0: # exits from the loop when there is no job with the status 0,1,3 (WAITING, STARTED,FAILED)
d.execute("SELECT * FROM NIC_EDW_STG.JOB_QUEUE a  WHERE  a.job_status IN (0,1,3)  AND a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.JOB_QUEUE WHERE job_status IN (0,1,3))")
if d.fetchone()==None:
time.sleep(30)
if rec_count>40:
break
except:
if c:
c.close()
if d:
d.close()
if e:
e.close()
if cnt:
cnt.close()
if db:
db.close()
print 'Attrib Error catched'
s=check_exception() # funtion 'check_exception' tohandle the Attribexception
if s=='0':
print 'caught Attrib error - No impact'
else:
print sys.exc_info()[0] # raise the Exception
raise

else:
if c:
c.close()
if d:
d.close()
if e:
e.close()
if cnt:
cnt.close()
if db:
db.close()

lang.Class.forName('oracle.jdbc.driver.OracleDriver')
print job()
# subqry block A retrives the jobs_id,job_status,job_fatal of jobs that are scheduled using the tables job_queue, job_dependency,job_master
# subqry blocl B retrives the jobs and its job_parent_id with the status
# select block joins subqry A and subqry B takes the job_id and queue_id ,job_status,job_status of parent_job
# over all Select Block gets the queue_id and job_id based on the Status of parent_status and Job_fatel

No comments:

Post a Comment