/* PROCESS_CONTROL-->Procedure-->PROCESS_CONTROL_PROC-->Job cntl */
import java.sql as sql
import java.lang as lang
import java.lang.Integer
from com.ziclix.python.sql import zxJDBC
import thread
import synchronize
import os
import sys
import time
import java.util.Date as Date
conn_name='PROC_CNTL'
exe_path=os.environ['ODI_SCRIPT']
# function check_exception : This function is called incase of Jython Exception . Upon exception, this checks job_queue table if all the jobs are completed in job_queue,
# then exits the program successfully by returning 0, else return 1 noting failure
def check_exception():
myCon = sql.DriverManager.getConnection('jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))',"NIC_EDW_STG","<@=snpRef.getInfo("SRC_PASS") @>")
myStmt = myCon.createStatement()
jobidstr="SELECT count(job_id) as cnt from nic_edw_stg.job_queue where job_status in (0,1,3)"
jobidRs = myStmt.executeQuery(jobidstr)
while(jobidRs.next()):
print jobidRs.getString("cnt")
job_cnt=jobidRs.getString("cnt")
InsStr = "INSERT INTO NIC_EDW_STG.JOB_EVENTS VALUES(-101,-11,-99,-11,SYSTIMESTAMP,'Process Control Exception','Jython code reached Exception check method',NULL,NULL,2)"
myStmt.executeUpdate(InsStr)
myCon.commit()
myCon.close()
return job_cnt
# functionstartpath : Executes the scenario with the given queueid as parameter
def startpath(queueid):
myCon = sql.DriverManager.getConnection('jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))',"NIC_EDW_STG","<@=snpRef.getInfo("SRC_PASS") @>")
myStmt = myCon.createStatement()
jobidstr="SELECT m.job_id job_id,q.queue_id,m.PKG_NAME,m.SCENARIO_VER FROM JOB_QUEUE q,JOB_MASTER m WHERE m.JOB_ID(+)=q.JOB_ID AND q.QUEUE_ID="+str(queueid)
jobidRs = myStmt.executeQuery(jobidstr)
while(jobidRs.next()):
path="./startscen.sh "+jobidRs.getString("PKG_NAME")+" "+jobidRs.getString("SCENARIO_VER")+" DEV -v=5 -NAME=ROOT NIC_APPLICATION.SCH_QUEUE_ID="+str(jobidRs.getString("queue_id"))+" NIC_APPLICATION.SCH_JOB_ID="+str(jobidRs.getString("job_id"))
myCon.close()
return pathdef ins(path):
list=os.system (path)
# function job selects the job to run based on the dependency and status of the job
def job():
try:
job_sat=0
obs_status=0
rec_count=0
curr_cnt=0
cnt_stop=10
str_username=''
str_password=''
# Decryption code to get the credentials of the schema using the encrypted file
try:
str_cmd_dcr="openssl enc -d -aes-256-cbc -in "+exe_path+"/nic_etl_password.enc -pass file:"+exe_path+"/.nic_edw_key -out "+exe_path+"/nic_etl_password.dcy"
os.system (str_cmd_dcr)
dcy_file=exe_path+"/nic_etl_password.dcy"
fl_dcr_pwd = open (dcy_file,'r')
for str_main in fl_dcr_pwd.readlines():
str_list = str_main.split('\t')
if str_list[0] == conn_name:
str_username=str_list[1]str_password=str_list[2]
except IOError:
print "Error: nic_etl_password.dcy file not found to obtain the connection details"
except:
raise
if str_username == '' or str_password == '' :
print " Error: Username or Password is null \nExiting from the Script"
sys.exit()
db = zxJDBC.connect("jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))", str_username,str_password, "oracle.jdbc.driver.OracleDriver")
c = db.cursor()
d = db.cursor()
e = db.cursor()
cnt= db.cursor()
ext_var=1
while(ext_var==1):
rec_count+=1
c.execute("SELECT queue_id , job_id , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1) END AS status , MAX (job_fatal) AS job_fatal , MAX( JOB_START_TIME) AS JOB_START_TIME , TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy') ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE WHEN b.job_fatal = 1 THEN b.status WHEN b.job_fatal = 0 AND b.status <= 2 THEN b.status WHEN b.job_fatal = 0 AND b.status = 3 THEN 2 ELSE NULL END AS status1 FROM (SELECT a.queue_id, a.job_status , a.seq_id , a.job_id , b.job_parent_id, a.job_start_time FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b WHERE a.job_id = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3) ) ) a , (SELECTDISTINCT b.job_parent_id, a.queue_id , a.seq_id , d.job_id , a.job_status status , d.job_fatal FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b , NIC_EDW_STG.job_master d WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+) AND a.seq_id = b.seq_id(+)) GROUP BY queue_id, job_id ORDER BY queue_id, job_id")
row_ex=c.fetchone()if row_ex==None:
break
c.execute("SELECT queue_id , job_id , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1) END AS status , MAX (job_fatal) AS job_fatal , MAX( JOB_START_TIME) AS JOB_START_TIME , TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy') ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE WHEN b.job_fatal = 1 THEN b.status WHEN b.job_fatal = 0 AND b.status <= 2 THEN b.status WHEN b.job_fatal = 0 AND b.status = 3 THEN 2 ELSE NULL END AS status1 FROM (SELECT a.queue_id, a.job_status , a.seq_id , a.job_id , b.job_parent_id , a.job_start_time FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b WHERE a.job_id = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN(0,1,3) ) ) a , (SELECT DISTINCT b.job_parent_id, a.queue_id , a.seq_id , d.job_id , a.job_status status , d.job_fatal FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b , NIC_EDW_STG.job_master d WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+) AND a.seq_id = b.seq_id(+)) GROUP BY queue_id, job_id ORDER BY queue_id, job_id")
#sql to find the running and waiting jobs
if c.fetchone()!=None:
e.execute("SELECT queue_id , job_id , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)END AS status , MAX (job_fatal) AS job_fatal , MAX( JOB_START_TIME) AS JOB_START_TIME , TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy') ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE WHEN b.job_fatal = 1 THEN b.status WHEN b.job_fatal = 0 AND b.status <= 2 THEN b.status WHEN b.job_fatal = 0 AND b.status = 3 THEN 2 ELSE NULL END AS status1 FROM (SELECTa.queue_id, a.job_status , a.seq_id , a.job_id , b.job_parent_id , a.job_start_time FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b WHERE a.job_id = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id = (SELECT MIN (seq_id) FROMNIC_EDW_STG.job_queue WHERE job_status IN (0,1,3) ) ) a , (SELECT DISTINCT b.job_parent_id, a.queue_id , a.seq_id , d.job_id , a.job_status status , d.job_fatal FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b , NIC_EDW_STG.job_master d WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+) AND a.seq_id = b.seq_id(+)) GROUP BY queue_id, job_id ORDER BY queue_id, job_id")
row=e.fetchone()
if row==None:
break
c.execute("SELECT queue_id , job_id , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1) END AS status , MAX (job_fatal) AS job_fatal , MAX( JOB_START_TIME) AS JOB_START_TIME , TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy') ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE WHEN b.job_fatal = 1 THEN b.status WHEN b.job_fatal = 0 AND b.status <= 2 THEN b.status WHEN b.job_fatal = 0 AND b.status = 3 THEN 2 ELSE NULLEND AS status1 FROM (SELECT a.queue_id, a.job_status , a.seq_id , a.job_id , b.job_parent_id , a.job_start_time FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b WHERE a.job_id = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id= (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3) ) ) a , (SELECT DISTINCT b.job_parent_id, a.queue_id , a.seq_id , d.job_id , a.job_status status , d.job_fatal FROM NIC_EDW_STG.job_queue a, NIC_EDW_STG.job_dependency b , NIC_EDW_STG.job_master d WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+) AND a.seq_id= b.seq_id(+)) GROUP BY queue_id, job_id ORDER BY queue_id, job_id")
for cur in c.fetchall():
rec_count=0
obs_status=0
cnt.execute("SELECT count(*) as cnt from nic_edw_stg.job_queue where job_status=1")
rowcnt=cnt.fetchone() # the cursor cnt is used to exit from the Loop when there is no job with the status 1
if rowcnt==None:
break
cnt.execute("SELECT count(*) as cnt from nic_edw_stg.job_queue where job_status=1")
for cnt_c in cnt.fetchall():curr_cnt=cnt_c[0]
# the below code checks the status and dependency of a job to execute
if cur[3]==0.0:
if curr_cnt < cnt_stop:
if Date.getTime(cur[6]) < Date.getTime(cur[7]):
if cur[2]==None:
job_sat=1obs_status=1
else:
if(cur[5]==1.0 or cur[5]==0.0) and (cur[4]==1.0 or cur[4]==0.0):
job_sat=0
elif cur[5]==1.0 and cur[4]==2.0:
job_sat=1
obs_status=1
elif cur[5]==1.0 and (cur[4]==3.0 or cur[4]==4.0):
job_sat=0
obs_status=0
elif cur[5]==0.0 and cur[4]==2.0:
job_sat=1
obs_status=1
elif cur[5] == 0.0 and (cur[4]==2.0 or cur[4]==3.0 or cur[4]==4.0):
job_sat=1
obs_status=1
if job_sat==1: # update the job id 's status as 1 which is to be executed next
upd='UPDATE NIC_EDW_STG.JOB_QUEUE set job_start_time=sysdate,job_status='+str(obs_status)+' where queue_id='+str(cur[0])d.execute(upd)
db.commit()
b=str(cur[0])
a=b.split(".")
path=startpath(a[0])
thread.start_new_thread(ins,(path,))
if rec_count>0: # exits from the loop when there is no job with the status 0,1,3 (WAITING, STARTED,FAILED)
d.execute("SELECT * FROM NIC_EDW_STG.JOB_QUEUE a WHERE a.job_status IN (0,1,3) AND a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.JOB_QUEUE WHERE job_status IN (0,1,3))")
if d.fetchone()==None:
time.sleep(30)
if rec_count>40:
break
except:
if c:
c.close()
if d:
d.close()
if e:
e.close()
if cnt:
cnt.close()
if db:
db.close()
print 'Attrib Error catched'
s=check_exception() # funtion 'check_exception' tohandle the Attribexception
if s=='0':
print 'caught Attrib error - No impact'
else:
print sys.exc_info()[0] # raise the Exception
raise
else:
if c:
c.close()
if d:
d.close()
if e:
e.close()
if cnt:
cnt.close()
if db:
db.close()
lang.Class.forName('oracle.jdbc.driver.OracleDriver')
print job()
# subqry block A retrives the jobs_id,job_status,job_fatal of jobs that are scheduled using the tables job_queue, job_dependency,job_master
# subqry blocl B retrives the jobs and its job_parent_id with the status
# select block joins subqry A and subqry B takes the job_id and queue_id ,job_status,job_status of parent_job
# over all Select Block gets the queue_id and job_id based on the Status of parent_status and Job_fatel
No comments:
Post a Comment