Sunday, June 30, 2019

doc


1.       INTRODUCTION

This Document Describes the historical loading process of payments files. Historical payments load process  automated using shell and python scripts


2.       PROCESS FLOW
Payments loading process is a two-step process in general
1.       Standardization
2.       SAS Loading
     Standardization :
It’s a process of converting payments files into parquet files and applying pre check evaluation that is required to process any payment file. Pre checks are carried out using Unix Scripts and parquet file creation is done by spark code.  
   SAS Loading:
SAS Load is the loading of  parquet file to SAS_ALL_PAYMENT  using spark code.
This automated Payment loading is also two-step process as mentioned above.
                                                                                                                                                                           
3.       SCRIPT DETAILS

Sunday, June 16, 2019

Py count thread

import threading
import time


class mythread(threading.Thread):
    def __init__(self, i):
        threading.Thread.__init__(self)
        self.h = i
        # Script will call the function
    def run(self):
        time.sleep(1)
        print("Value send ", self.h)


f = open('C:\Senthil\SenStudy\Python\Date.txt').readlines()
num = threading.activeCount()
x=2for i in f:
    print("Active threads are ", num)
    time.sleep(1)
    if x <= 3:
        thread1 = mythread(i)
        thread1.start()
        x += 1
    else:
        print("Number of Threads are More than 5 .. going to sleep state for 1 mint ..." , threading.activeCount())
        time.sleep(1)

Monday, May 6, 2019

sparkread

var df2 = spark.sqlContext.read
  .option("header", "true")
  //.option("inferSchema", "true")  .option("delimiter", "\t").option("encode","UTF-8").option("charset", "UTF-8")
  .schema(schema = schema)
  .csv(spark.sqlContext.read.textFile("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter_out")
    .map(line => line.split("\\!\\~").mkString("\t")))

Sunday, April 28, 2019

Multi delmiter spark

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object PaymentFile extends  App {


  implicit val spark = SparkSession.builder().appName("PaymentFile")
    .config("spark.master", "local")
    .getOrCreate()

 val rdd = spark.sparkContext.textFile("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter.txt")
  val header = rdd.filter(_.contains("input")).map(line => line.split("\\!\\~")).first()
  val schema = StructType(header.map(cols => StructField(cols,StringType)).toSeq)
  val data = spark.createDataFrame(rdd.filter(!_.contains("input"))
    .map(line => Row.fromSeq(line.split("\\!\\~").toSeq)), schema)
  data.write.partitionBy("input").mode("overwrite").parquet("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter")

}

Sunday, April 14, 2019

metadata manger

package Project
import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import Project.config.Settings._
object MetadataManager {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()
  def reportStatus(keyPrefix: String,params: List[String] )(callback: => Unit)(implicit spark: SparkSession)={

    println("Started "+keyPrefix)
    insBatchEntry(keyPrefix,params)

    try{
      callback
    }
    catch{
      case e: Exception => println("exception caught: " + e);      case e: IllegalArgumentException => println("illegal arg. exception");      case e: IllegalStateException    => println("illegal state exception");    }
    //Seq[String] = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(dir))
  }

  def insBatchEntry(keyPrefix: String,key: List[String]) : Unit ={
    val row = Row.fromSeq(key)
    val rdd = spark.sparkContext.makeRDD(List(row))
    import org.apache.spark.sql.types.{StringType,StructField,StructType}

    val fields = List(
      StructField("COUNTRY_NM", StringType, nullable = false),      StructField("PLUG_NM", StringType, nullable = false),      StructField("ETL_NO", StringType, nullable = false)
    )
    val df_inp = spark.createDataFrame(rdd, StructType(fields))
    df_inp.show(2)

    val df_jb= spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar")

    val df_join_jb=  df_jb.join(
      df_inp
      , (df_jb("COUNTRY_NM") === df_inp("COUNTRY_NM")&& df_jb("PLUG_NM") === df_inp("PLUG_NM")  && df_jb("IS_ACTIVE") === "Y")
      , "inner")

    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    def prepareRowCountfromParquet(jobmaster_pa: String)(implicit spark: SparkSession): Int = {
      print("The variable value is " + jobmaster_pa)
      print("the count is " + spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt)
      spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt
    }

def UDFjobSourceStats(jobname : String)(implicit spark: SparkSession): Int={
    jobname match {
    case "JobMaster" => prepareRowCountfromParquet(jobmaster)
    case _ => 0  }
}
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{concat,lit,col}
    spark.udf.register("SRCROWCNT",prepareRowCountfromParquet _)
    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)
    val UDFSRCSTAT = udf(UDFjobSourceStats _)
    val SRCROWCNT = udf(prepareRowCountfromParquet _)

      df_join_jb
      .drop(df_inp("PLUG_NM"))
      .drop(df_inp("COUNTRY_NM"))
      .select(df_jb("SITE_NM"),df_jb("COUNTRY_NM"),df_jb("SITE_DESC"),df_jb("PLUG_TYPE"),df_jb("PLUG_NM"),df_jb("SOURCE_NM"),df_jb("FILE_NM"))
      .withColumn("DAT_BEG",UDFCURDT())
      .withColumn("DAT_END",lit(null))
      .withColumn("TXT_JOB",concat(df_jb("SITE_NM"),lit("_"),df_jb("COUNTRY_NM"),lit("_"),df_jb("PLUG_NM")))
      .withColumn("STATUS",lit("IN PROGRESS"))
      .withColumn("SRC_COUNT", lit(keyPrefix match{ case "JobMaster" => SRCROWCNT(lit(jobmaster))
      case _ => 0}) )
    .show(2)

  }

}

Saturday, April 13, 2019

new jobmaster

package Project

import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, SparkSession}
import Project.config.Settings._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes

object JobMaster extends  App {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()

  val key_list = List("MASTER_ID", "GROUP_ID", "SITE_NM", "COUNTRY_NM")
  val key = args.toList
  run()
  def run() = MetadataManager.reportStatus("JobMaster", key) {
    def name: String = System.getProperty("user.name")

    spark.read.format("csv").option("header", "true").load(s"$jobmaster").createOrReplaceGlobalTempView("JobMaster_Source")
    spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar").createOrReplaceGlobalTempView("JobMaster_Target")

    val df_cdc =
      spark.sql(
        """          |select          |JobMaster_Target.MASTER_ID as MASTER_ID,          |case when JobMaster_Target.GROUP_ID is null then 0 else cast(JobMaster_Target.GROUP_ID as INT) end as GROUP_ID,          |trim(JobMaster_Source.SITE_NM) as SITE_NM,          |trim(JobMaster_Target.SITE_NM) as TAR_SITE_NM,          |trim(JobMaster_Source.COUNTRY_NM) as COUNTRY_NM,          |trim(JobMaster_Source.SITE_DESC) as SITE_DESC,          |trim(JobMaster_Source.PLUG_TYPE) as PLUG_TYPE,          |trim(JobMaster_Source.PLUG_NM) as PLUG_NM,          |trim(JobMaster_Source.SOURCE_NM) as SOURCE_NM,          |trim(JobMaster_Source.IS_ACTIVE) as IS_ACTIVE,          |trim(JobMaster_Source.FILE_NM) as FILE_NM,          |trim(JobMaster_Target.CRT_DATETIME) as CRT_DATETIME,          |case when JobMaster_Target.MASTER_ID is not Null and (JobMaster_Source.SOURCE_NM =JobMaster_Target.SOURCE_NM and          |JobMaster_Source.IS_ACTIVE = JobMaster_Target.IS_ACTIVE and          |JobMaster_Source.FILE_NM =JobMaster_Target.FILE_NM ) then 'N'          | when JobMaster_Target.MASTER_ID is Null then 'I' else 'U' end as CDC_FLG          |from          |global_temp.JobMaster_Source left join global_temp.JobMaster_Target on          |trim(JobMaster_Source.SITE_NM)=trim(JobMaster_Target.SITE_NM) and          |trim(JobMaster_Source.COUNTRY_NM)=trim(JobMaster_Target.COUNTRY_NM) and          |trim(JobMaster_Source.PLUG_TYPE)=trim(JobMaster_Target.PLUG_TYPE) and          |trim(JobMaster_Source.PLUG_NM)=trim(JobMaster_Target.PLUG_NM)        """.stripMargin) //.createOrReplaceGlobalTempView("jb_cdc_data")
    df_cdc.createOrReplaceGlobalTempView("JmCdc")

    val df_grp_jm = df_cdc
      .select(df_cdc("SITE_NM"), df_cdc("GROUP_ID"))
      .distinct
      .groupBy("SITE_NM")
      .max("GROUP_ID")
    df_grp_jm.withColumnRenamed("max(GROUP_ID)", "GROUP_ID").createOrReplaceGlobalTempView("find_grp")

    spark.
      read.format("csv")
      .option("header", "true")
      .load(s"$jobmaster_tar")
      .select("GROUP_ID")
      .withColumnRenamed("GROUP_ID", "MAX_GROUP_ID").createOrReplaceGlobalTempView("max_grp")


    val distinctValuesDF = spark.sql(
      """        |select        |trim(find_grp.SITE_NM) as SITE_NM,        |cast(row_number() over ( order by find_grp.SITE_NM)  + mx.MAX_GROUP_ID as INT) as GROUP_ID        |from        |global_temp.find_grp cross join (select max(MAX_GROUP_ID) as MAX_GROUP_ID from global_temp.max_grp) mx where find_grp.GROUP_ID = 0        |union all        |select trim(find_grp.SITE_NM) as SITE_NM, find_grp.GROUP_ID from global_temp.find_grp where find_grp.GROUP_ID  <> 0      """.stripMargin)

    val maxValue = df_cdc.agg(max("MASTER_ID"))
    val new_rec = df_cdc.filter(col("MASTER_ID").isNull).drop("MASTER_ID")

    import org.apache.spark.sql.expressions.Window

    val new_with_master = new_rec
      .crossJoin(maxValue)
      .withColumn("MASTER_ID", (maxValue("max(MASTER_ID)") + row_number().over(Window.partitionBy("GROUP_ID").orderBy("GROUP_ID"))).cast(DataTypes.IntegerType))
      .drop("GROUP_ID")
      .drop("max(MASTER_ID)")
      .drop("TAR_SITE_NM")


    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    import org.apache.spark.sql.functions.udf

    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)

    val final_upd = df_cdc
      .drop("TAR_SITE_NM")
      .drop("")
      .filter(col("MASTER_ID").isNotNull)
      .filter(col("CDC_FLG") === "U")
      .withColumn("UPD_DATETIME", UDFCURDT())
      .drop("CDC_FLG")

    final_upd.show(100)

    val final_new = new_with_master.join(
      distinctValuesDF
      , new_with_master("SITE_NM") === distinctValuesDF("SITE_NM")
      , "inner")
      .drop(distinctValuesDF("SITE_NM"))
      .drop("CDC_FLG")
      .withColumn("CRT_DATETIME", UDFCURDT())
      .withColumn("UPD_DATETIME", UDFCURDT())
      .select("*").show(100)


  }
}

Sunday, April 7, 2019

IBAN

package Project

object IBANAlgorithm extends App {

  print(apply("DE89370400440532013000"))

  def apply(iban:String):Boolean = {

    val letters = "[A-Z]".r

    val rearranged = iban.takeRight(iban.length-4) + iban.take(4)

    val decimal =
      letters.findAllIn(rearranged).map(l => l.toCharArray.map(c => (l,c.toInt - 55)).head)
        .foldLeft(rearranged){(s,t) => t match{case (char,value) =>s.replace(char,value.toString)}}

    BigDecimal(decimal) % BigDecimal("97") == BigDecimal("1")

  }

}

/*/*/*///////////////////////////////////************
package Project

import scala.collection.immutable.SortedMap
import scala.math.BigInt
import scala.util.matching.Regex

/** The International Bank Account Number (IBAN) is an internationally agreed means of identifying  *  bank accounts across national borders with a reduced risk of propagating transcription errors. The  *  IBAN consists of up to 34 alphanumeric characters: first the two-letter ISO 3166-1 alpha-2 country  *  code, then two check digits, and finally a country-specific Basic Bank Account Number (BBAN). The  *  check digits enable a sanity check of the bank account number to confirm its integrity even before  *  submitting a transaction.  */class Iban(val iban: String) {

  // Isolated tests
  def isAllUpperCase = iban.toUpperCase() == iban

  def isValidpattern = (Iban.pattern findFirstIn iban).nonEmpty

  def isNationalSize = { Iban.ccVsLength.getOrElse(iban.take(2), 0) == iban.size }

  def isCheckNumberOK = {
    def rearrange = ((iban.drop(4) + iban.take(4)). // Move left country code part to end      // continue with each char converted to Int      map(ch => if (ch.isDigit) ch.toInt - '0' else (ch - 'A' + 10))).mkString

    (BigInt(rearrange) mod 97) == 1  }

  def isValidIban = {
    isAllUpperCase &&
      isValidpattern &&
      isNationalSize &&
      isCheckNumberOK
  }
}

object Iban { // IBAN length database  lazy val ccVsLength: SortedMap[String, Int] = SortedMap[String, Int]() ++
    ("""AD24 AE23 AL28 AO25 AT20 AZ28 BA20 BE16 BF27 BG22 BH22 BI16          |BJ28 BR29 CG27 CH21 CI28 CM27 CR21 CV25 CY28 CZ24 DE22 DK18          |DO28 DZ24 EE20 EG27 ES24 FI18 FO18 FR27 GA27 GB22 GE22 GI23          |GL18 GR27 GT28 HR21 HU28 IE22 IL23 IR26 IS26 IT27 JO30 KW30          |KZ20 LB28 LI21 LT20 LU20 LV21 MC27 MD24 ME22 MG27 MK19 ML28          |MR27 MT31 MU30 MZ25 NL18 NO15 PK24 PL28 PS29 PT25 QA29 RO24          |RS22 SA24 SE24 SI19 SK24 SM27 SN28 TN24 TR26 UA29 VG24""".
      stripMargin.replaceAll("""\s""", " ").split(' ').
      map(v => (v.take(2), if (v.isEmpty()) 0 else v.slice(2, 4).toInt)))

  lazy val pattern = "([A-Z]{2})([0-9]{2})([A-Z0-9]{4})([A-Z0-9]{0,2})([0-9]{7})(([A-Z0-9]?){0,16})".r

  def apply(s: String) = new Iban(s.replaceAll("""\s""", ""))
}

Sunday, February 24, 2019

package hermes

import hermes.config.Settings._
import scala.util.{Failure,Success,Try}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object Logger extends App {
  implicit val spark= SparkSession.builder().appName("Good Program")
    .config("spark.master", "local")
    .getOrCreate()

  val schema = StructType(
    Array(
      StructField("File_Date",StringType,false),      StructField("Crt_Time",StringType,false),      StructField("Source_Name",StringType,false) ,      StructField("Site_Name", StringType, false),      StructField("File_Name", StringType, false),      StructField("FileRec_cnt", IntegerType, false),      StructField("File_Path", StringType, false),      StructField("bad_record", StringType, false)
    )
  )
  run(createDFfromCSV)

  def run(dataFrame: DataFrame)=InsertMapRDB(dataFrame){
    isDatasetEmpty(dataFrame)
  }

  def isDatasetEmpty(dataFrame: DataFrame): Boolean =
  {
    Try{dataFrame.first.length != 0} match {
      case Success(_) => false      case Failure(_) => true }
    }

private def createDFfromCSV(): DataFrame= {
    spark    .read
    .format("CSV")
    .option("header","false")
    .schema(schema).option("delimiter",",")
    .option("columnNameOfCorruptRecord", "bad_record")
    .load(s"$log_path")
    .toDF()
  }

  def InsertMapRDB(dataFrame: DataFrame)(callback: => Unit)(implicit spark: SparkSession)={
    print("Insert to dB Started")
    val toIns= dataFrame.where(dataFrame.col("bad_record").isNull)

    Try(callback) match {
      case Success(v) => postRun(success = true,toIns)
      case Failure(e) => postRun(success = false,toIns) ; throw e
    }
  }
  def postRun(success: Boolean,dataFrame: DataFrame): Unit =
  {
    print("MapR DB started")
    dataFrame.show()

  }

}

Sunday, February 17, 2019

adoc

select a.*,b.*,isnull(b.status,0) as q_Statis from  (
SELECT
a.queue_id,
a.job_status ,
a.seq_id ,
a.job_id ,
b.job_parent_id,
a.job_start_time FROM Hermes.dbo.job_queue a left join  Hermes.dbo.job_dependency b
on
a.job_id  = b.job_id where
a.job_status IN (0,1,3) --AND
--a.seq_id  =  (SELECT MIN (seq_id) FROM Hermes.dbo.job_queue WHERE job_status IN (0,1,3)  )
) a
left join
(
  SELECT
  b.job_parent_id,  a.queue_id ,  a.seq_id  ,    a.job_status as status from
 Hermes.dbo.job_queue a join 
 Hermes.dbo.job_dependency b
 on a.job_id = b.job_parent_id --where   a.seq_id  = (SELECT MIN (seq_id) FROM Hermes.dbo.job_queue WHERE job_status IN (0,1,3))
)b

on  a.job_parent_id = b.job_parent_id

where isnull(b.status,0) in (0,1,3)

Process all

CREATE TABLE NIC_EDW_STG.JOB_MASTER

(
        JOB_ID NUMBER,

JOB_NAME VARCHAR2(100),

PKG_NAME VARCHAR2(200),

JOB_TYPE_ID NUMBER,

JOB_RUN_DAY NUMBER(2,0),

JOB_RUN_INTERVAL NUMBER,

JOB_RUN_TIME TIMESTAMP (6),

LAST_SCHED_TIME TIMESTAMP (6),

WAIT_TIME TIMESTAMP (6),

JOB_ACTIVE NUMBER(2,0),

JOB_FATAL NUMBER(2,0),

JOB_LOAD_TYPE VARCHAR2(20),

SCENARIO_VER VARCHAR2(10) DEFAULT 001

)

CREATE TABLE NIC_EDW_STG.JOB_QUEUE
(         
QUEUE_ID NUMBER,

SEQ_ID NUMBER,

JOB_ID NUMBER,

JOB_START_TIME TIMESTAMP (6),

JOB_END_TIME TIMESTAMP (6),

JOB_STATUS NUMBER(2,0)
)

CREATE TABLE NIC_EDW_STG.JOB_EVENTS
(         
EVENT_ID NUMBER,

SEQ_NO NUMBER,

QUEUE_ID NUMBER,

STEP_ID NUMBER,

LOG_TIME TIMESTAMP (6),

STEP_NAME VARCHAR2(100),

STEP_DESC VARCHAR2(100),

STEP_ERROR VARCHAR2(250),

TOTAL_AFFECTED_ROWS NUMBER,

STEP_STATUS NUMBER(2,0)
)



CREATE TABLE NIC_EDW_STG.JOB_DEPENDENCY
(         
JOB_ID NUMBER,

JOB_PARENT_ID NUMBER,

PARENT_OLTP_F CHAR(1)

)



/* ODI work repository table */



CREATE TABLE DWPROD_ODIWORK_EXEC.SNP_SESSION

(           SESS_NO NUMBER(19,0) NOT NULL ENABLE,

SESS_NAME VARCHAR2(436 CHAR) NOT NULL ENABLE,

SCEN_VERSION VARCHAR2(35 CHAR),

LOG_LEVEL NUMBER(4,0) NOT NULL ENABLE,

THREAD_ID VARCHAR2(35 CHAR),

THREAD_CREATION VARCHAR2(35 CHAR),

SESS_BEG DATE,

SESS_END DATE,

SESS_DUR NUMBER(10,2),

SESS_STATUS VARCHAR2(2 CHAR) NOT NULL ENABLE,

SESS_RC VARCHAR2(35 CHAR),

SESS_MESS VARCHAR2(250 CHAR),

SESS_PARAMS VARCHAR2(250 CHAR),

SESS_KEYWORDS VARCHAR2(250 CHAR),

MASTER_AGENT_NAME VARCHAR2(35 CHAR),

SYNCHRO VARCHAR2(1 CHAR),

AGENT_NAME VARCHAR2(35 CHAR),

CONTEXT_CODE VARCHAR2(35 CHAR) NOT NULL ENABLE,

PARENT_SESS_NO NUMBER(19,0),

NB_CHILD_RUN NUMBER(10,0),

NB_CHILD_OK NUMBER(10,0),

NB_CHILD_KO NUMBER(10,0),

SCEN_NAME VARCHAR2(400 CHAR),

USER_NAME VARCHAR2(35 CHAR),

I_TXT_SESS_MESS NUMBER(19,0),

I_TXT_SESS_PARAMS NUMBER(19,0)

)
**************************************************************************

/* PROCESS_STATUS_PROC-->Procedure-->PROCESS_STATUS_PROC-->Update Status */
 DECLARE
CURSOR SESS IS SELECT sess_status FROM odi_exc_repository.snp_session WHERE
SESS_NO=96073565 ;
CURSOR STAT_CUR IS   SELECT QUEUE_ID,SEQ_ID,JOB_ID,JOB_START_TIME,DIFF,WAIT_TIME,JOB_STATUS,STEP_STATUS,STEP_STATUS_C,EVENT_D_C FROM (
 SELECT DISTINCT a.QUEUE_ID
   ,a.SEQ_ID
   ,a.JOB_ID
   ,a.JOB_START_TIME
   ,ROUND((TO_DATE(TO_CHAR(SYSDATE,'DD-MM-YYYY HH24:MI:SS'),'DD-MM-YYYY HH24:MI:SS')
   - TO_DATE(TO_CHAR(a.job_start_time,'DD-MM-YYYY HH24:MI:SS'),'DD-MM-YYYY HH24:MI:SS'))
   *1440) "DIFF"
   ,d.WAIT_TIME
   ,a.JOB_STATUS
   ,NVL(
   (SELECT STEP_STATUS FROM job_events WHERE (event_id,seq_no)=( SELECT MAX(event_id),MAX(seq_no)  FROM job_events WHERE event_id=
   (SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID))),0)  "STEP_STATUS"
    ,DECODE((SELECT NVL(STEP_STATUS,0) FROM job_events WHERE (event_id,seq_no)=( SELECT MAX(event_id),MAX(seq_no) FROM job_events WHERE event_id=
   (SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID))),1,'Running',2,'Succeeded',3,'Failed','NR') "STEP_STATUS_C"
  ,(SELECT MAX(event_id) FROM job_events WHERE queue_id=a.QUEUE_ID) "EVENT_D_C"
FROM    job_queue a
,job_dependency b
,job_events c
,job_master d
WHERE a.job_id=b.job_id(+) AND a.job_id=d.job_id AND a.queue_id=c.QUEUE_ID(+)  -- and a.job_status <=1
GROUP BY a.QUEUE_ID
   ,a.SEQ_ID
   ,a.JOB_ID
   ,a.JOB_START_TIME
       ,d.WAIT_TIME
   ,a.JOB_STATUS
ORDER BY queue_id
) WHERE JOB_STATUS <> STEP_STATUS OR job_status=0;
*********************************************************************


 
/* PROCESS_CONTROL-->Variable-->ETL_PROCESS_CNTL_START--> */ 
 SELECT NVL2(WM_CONCAT(SID||':'||SERIAL#), 
'PREVIUOS RUN OF PROCESS_CONTROL IS RUNNING FOR SID:SERIAL# '||WM_CONCAT(SID||':'||SERIAL#),
999) STATUS FROM GV$SESSION WHERE USERNAME='PROC_CNTL' AND STATUS<>'KILLED' 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->CHECK PREV LOAD */ 
 DECLARE
CNT  NUMBER(2);
ETL_REF NUMBER(10);
NEW_ERROR EXCEPTION;
BEGIN
SELECT ETL_REF INTO ETL_REF FROM ETL_REF;
SELECT COUNT(DISTINCT A.TABLE_TYPE) INTO CNT FROM ETL_SRCSTG_EXT_STATUS A, ETL_REF B WHERE A.ETL_REF=B.ETL_REF;
IF CNT = 2 THEN
DBMS_OUTPUT.PUT_LINE('PREV LOAD COMPLETED');
ELSE
RAISE NEW_ERROR;
END IF;
EXCEPTION
WHEN NEW_ERROR THEN
RAISE_APPLICATION_ERROR(-20001,'PREVIOUS LOAD FOR THE ETL_REF '||ETL_REF||' NOT COMPLETED');
END; 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->TRUNC_JOB_QUEUE */ 
 TRUNCATE TABLE  JOB_QUEUE 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->ETL_REF_INCREMENT */ 
 DECLARE
CUR_ETL NUMBER(10);
TOT NUMBER(2);
CNT NUMBER(1);
BEGIN
SELECT TO_NUMBER(TO_CHAR(SYSDATE-1,'YYYYMMDD'))  INTO CUR_ETL FROM DUAL;
DBMS_OUTPUT.PUT_LINE('Todays Etl Ref'||CUR_ETL);
SELECT COUNT(1) INTO TOT  FROM ETL_REF WHERE SUBSTR(ETL_REF,1,8)=CUR_ETL;
DBMS_OUTPUT.PUT_LINE('Available Values '||TOT);
SELECT COUNT(1) INTO CNT FROM ETL_REF;
CUR_ETL:=CUR_ETL||'00';
IF CNT =0 THEN
   INSERT INTO ETL_REF VALUES(CUR_ETL,SYSDATE);
   COMMIT; 
ELSE
  IF TOT = 0 THEN
    UPDATE ETL_REF SET ETL_REF=CUR_ETL,START_TIME=SYSDATE;
    COMMIT;
    ELSE
    DBMS_OUTPUT.PUT_LINE('Already etl_Ref exists');
    SELECT ETL_REF INTO CUR_ETL FROM ETL_REF;
    CUR_ETL:=CUR_ETL+1;
    DBMS_OUTPUT.PUT_LINE('The Next value will be '||CUR_ETL);
    UPDATE ETL_REF SET ETL_REF=CUR_ETL,START_TIME=SYSDATE;
    COMMIT;
    END IF;
END IF;    
    INSERT INTO ETL_REF_HIS_VALUE 
    SELECT A.ETL_REF,SYSTIMESTAMP FROM ETL_REF A WHERE NOT EXISTS ( SELECT 'X' FROM ETL_REF_HIS_VALUE B WHERE A.ETL_REF=B.ETL_REF);COMMIT;
END; 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->UPDATE JOB MASTER */ 
 BEGIN
UPDATE JOB_MASTER SET JOB_RUN_TIME=SYSDATE+1/(24*60) WHERE JOB_ACTIVE=1;
COMMIT;
END; 
 
/* PROCESS_CONTROL-->Procedure-->JOB_UPDATE_JOB_MASTER-->TRUNC_JOB_EVENTS */ 
 TRUNCATE TABLE JOB_EVENTS 
 
/* PROCESS_CONTROL-->Procedure-->PROCESS_ETL_REF_LOAD_PROC-->PROCESS_ETL_REF_LOAD_PROC */ 
 BEGIN
/*EXECUTE IMMEDIATE 'UPDATE ETL_REF SET ETL_REF=(SELECT ETL_REF+1 FROM ETL_REF),START_TIME=SYSTIMESTAMP';  TO BE INCLUDEDE LATER */ 
EXECUTE IMMEDIATE 'TRUNCATE TABLE ETL_REF_LOAD';
EXECUTE IMMEDIATE 'INSERT INTO ETL_REF_LOAD SELECT B.SCENARIO_NAME,B.TABLE_NAME,B.CAT_VAL,A.ETL_REF ETL_REF,D.START_TIME FROM ETL_REF_HIS_VALUE A,
        ETL_DATE B, ETL_DATE C, ETL_REF D
  WHERE 
      B.TABLE_NAME=C.TABLE_NAME
      AND B.SCENARIO_NAME=C.SCENARIO_NAME
      AND NVL(B.CAT_VAL,'||'''1'''||')=NVL(C.CAT_VAL,'||'''1'''||')
      AND A.ETL_REF > NVL(B.ETL_REF,0)
      AND B.SCENARIO_NAME NOT LIKE '||'''SRCSTG%'''||'
      ORDER BY C.TABLE_NAME, C.SCENARIO_NAME, C.CAT_VAL,A.ETL_REF';
END; 


***********************************************************************

 
 
/* PROCESS_SCHEDULING-->Procedure-->PROCESS_SCHEDULING_PROC-->Process Scheduling */ 
 declare
CURSOR que_cur IS  
SELECT job_id,RUN_TIME,FLAG,job_run_interval FROM (
SELECT job_id,TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') RUN_TIME,'X' FLAG,job_run_interval
FROM job_masterWHERE job_active=1  AND (
CASE job_run_day 
WHEN 1 THEN TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
WHEN 2 THEN TO_DATE(TO_CHAR(NEXT_DAY(SYSDATE,'SAT'),'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
WHEN 3 THEN TO_DATE(TO_CHAR(LAST_DAY(SYSDATE),'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss')
END )
BETWEEN SYSDATE  AND SYSDATE+(10/(24*60)) 
UNION
SELECT job_id,(CASE  WHENJOB_RUN_TIME >= LAST_SCHED_TIME THEN  JOB_RUN_TIME ELSE 
LAST_SCHED_TIME  END ) RUN_TIME,'I' FLAG,job_run_interval
FROM ( SELECT job_id,TO_DATE(TO_CHAR(SYSDATE,'dd/mm/yyyy')||TO_CHAR(JOB_RUN_TIME,' 
HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') JOB_RUN_TIME,
NVL(LAST_SCHED_TIME,TO_DATE('01/01/1907','dd/mm/yyyy')) LAST_SCHED_TIME 
,job_run_interval
FROM job_master 
WHERE job_run_day=0 AND job_active=1)
WHERE ( CASE WHEN  JOB_RUN_TIME >= NVL(LAST_SCHED_TIME,TO_DATE('01/01/1907','dd/mm/yyyy')) 
THEN  JOB_RUN_TIME  ELSE LAST_SCHED_TIME END ) 
BETWEEN SYSDATE AND SYSDATE+(10/(24*60)));
QUEUE_JOB_ID INTEGER:=#NIC_APPLICATION.SCH_QUEUE_ID;
prc_job_id INTEGER:=#NIC_APPLICATION.SCH_JOB_ID;
queue_id_inc INTEGER;
seq_id_inc INTEGER;
src_count INTEGER :=0;
eve_id_inc integer;
no_of_job integer :=0;
err_flag integer :=0;
SESS_NO varchar2(60);
BEGIN
insert into job_events (EVENT_ID,SEQ_NO, QUEUE_ID, STEP_ID, LOG_TIME, STEP_NAME, STEP_DESC, STEP_ERROR, 
TOTAL_AFFECTED_ROWS, STEP_STATUS)values(96071565,(select nvl(max(seq_no),0)+1 from job_events where event_id=96071565) ,QUEUE_JOB_ID,10,systimestamp,'Job identification','Collect the job to put in job queue',null,null,1);
COMMIT;
SELECT NVL(MAX(queue_id),0),NVL(MAX(seq_id),0) INTO queue_id_inc,seq_id_inc FROM job_queue;
seq_id_inc:=seq_id_inc+1;
FOR cur IN que_cur LOOP
queue_id_inc:=queue_id_inc+1;
INSERT INTO job_queue (QUEUE_ID, SEQ_ID, JOB_ID, JOB_START_TIME, 
JOB_END_TIME, JOB_STATUS) 
VALUES (queue_id_inc,seq_id_inc,cur.job_id,cur.RUN_TIME,NULL,0);
no_of_job:=no_of_job+1;
IF cur.flag='I' AND CUR.job_run_interval>0 THEN
UPDATE job_master SET 
LAST_SCHED_TIME=cur.run_time+(cur.job_run_interval/(24*60)) WHERE job_id = cur.job_id ;
END IF;
END LOOP;
COMMIT;
insert into job_events (EVENT_ID,SEQ_NO, QUEUE_ID, STEP_ID, LOG_TIME, STEP_NAME, STEP_DESC, STEP_ERROR, 
TOTAL_AFFECTED_ROWS, STEP_STATUS)values(96071565,(select nvl(max(seq_no),0)+1 from job_events where event_id=96071565 ),QUEUE_JOB_ID,99,systimestamp,'Job Completed','Completed',null,null,2);
COMMIT;
END;

****************************************************************************

 
/* PROCESS_CONTROL-->Procedure-->PROCESS_CONTROL_PROC-->Ins into JOB_QUEUE */ 
 declare
sed_id_c integer:=0;
QUEUE_ID_C integer:=0;
begin
select nvl(max(seq_id),0) into sed_id_c from job_queue;
select nvl(max(queue_id),0) into QUEUE_ID_C from job_queue;
sed_id_c:=sed_id_c+1;
QUEUE_ID_C:=QUEUE_ID_C+1;
INSERT INTO JOB_QUEUE ( QUEUE_ID, SEQ_ID, JOB_ID, JOB_START_TIME, JOB_END_TIME,JOB_STATUS ) VALUES
(QUEUE_ID_C, sed_id_c, 1, SYSDATE, NULL, 0); 
COMMIT;
end; 

ps fill


/* PROCESS_CONTROL-->Procedure-->PROCESS_CONTROL_PROC-->Job cntl */
 import java.sql as sql
import java.lang as lang
import java.lang.Integer
from com.ziclix.python.sql import zxJDBC
import thread
import synchronize
import os
import sys
import time
import java.util.Date as Date

conn_name='PROC_CNTL'
exe_path=os.environ['ODI_SCRIPT']
# function  check_exception :  This function is called incase of Jython Exception . Upon exception, this checks  job_queue table if all the jobs are completed in job_queue,
#                                                    then exits the program successfully by returning 0, else return 1 noting failure
def check_exception():
    myCon = sql.DriverManager.getConnection('jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))',"NIC_EDW_STG","<@=snpRef.getInfo("SRC_PASS") @>")
    myStmt = myCon.createStatement()
    jobidstr="SELECT count(job_id) as cnt  from nic_edw_stg.job_queue where job_status in (0,1,3)"
    jobidRs = myStmt.executeQuery(jobidstr)
    while(jobidRs.next()):
print jobidRs.getString("cnt")
job_cnt=jobidRs.getString("cnt")
    InsStr = "INSERT INTO NIC_EDW_STG.JOB_EVENTS VALUES(-101,-11,-99,-11,SYSTIMESTAMP,'Process Control Exception','Jython code reached Exception check method',NULL,NULL,2)"
    myStmt.executeUpdate(InsStr)
    myCon.commit()
    myCon.close()
    return job_cnt
# functionstartpath  : Executes the scenario with the given queueid as parameter
def startpath(queueid):
    myCon = sql.DriverManager.getConnection('jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))',"NIC_EDW_STG","<@=snpRef.getInfo("SRC_PASS") @>")
    myStmt = myCon.createStatement()
    jobidstr="SELECT m.job_id job_id,q.queue_id,m.PKG_NAME,m.SCENARIO_VER FROM JOB_QUEUE q,JOB_MASTER m WHERE  m.JOB_ID(+)=q.JOB_ID AND q.QUEUE_ID="+str(queueid)
    jobidRs = myStmt.executeQuery(jobidstr)
    while(jobidRs.next()):
path="./startscen.sh "+jobidRs.getString("PKG_NAME")+" "+jobidRs.getString("SCENARIO_VER")+" DEV -v=5 -NAME=ROOT NIC_APPLICATION.SCH_QUEUE_ID="+str(jobidRs.getString("queue_id"))+" NIC_APPLICATION.SCH_JOB_ID="+str(jobidRs.getString("job_id"))
    myCon.close()
    return pathdef ins(path):
  list=os.system (path)
# function job selects the job to run based on the dependency and status of the job
def job():
try:
job_sat=0
obs_status=0
rec_count=0
curr_cnt=0
cnt_stop=10
str_username=''
str_password=''
# Decryption code to get the credentials of the schema using the encrypted file
try:
str_cmd_dcr="openssl enc -d  -aes-256-cbc -in "+exe_path+"/nic_etl_password.enc -pass file:"+exe_path+"/.nic_edw_key -out "+exe_path+"/nic_etl_password.dcy"
os.system (str_cmd_dcr)
dcy_file=exe_path+"/nic_etl_password.dcy"
fl_dcr_pwd = open (dcy_file,'r')
for str_main in fl_dcr_pwd.readlines():
str_list = str_main.split('\t')
if str_list[0] == conn_name:
str_username=str_list[1]str_password=str_list[2]
except IOError:
print "Error: nic_etl_password.dcy file not found to obtain the connection details"
except:
raise
if str_username == '' or str_password == '' :
print " Error: Username or Password is null \nExiting from the Script"
sys.exit()
db = zxJDBC.connect("jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1a-dc-vip)(PORT = 1540))(ADDRESS = (PROTOCOL = TCP)(HOST = nic-tuatou1b-dc-vip)(PORT = 1540)) (LOAD_BALANCE=ON)(CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = DWBIUAT)))", str_username,str_password, "oracle.jdbc.driver.OracleDriver")
c = db.cursor()
d = db.cursor()
e = db.cursor()
cnt= db.cursor()
ext_var=1

while(ext_var==1):
rec_count+=1
c.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)  END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULL END AS status1 FROM (SELECT a.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id  =  (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)  ) ) a , (SELECTDISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id = b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
row_ex=c.fetchone()if row_ex==None:
break
c.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)  END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULL END AS status1 FROM (SELECT a.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id ,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id  =  (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN(0,1,3)  ) ) a , (SELECT DISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id = b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
#sql to find the running and waiting jobs
if c.fetchone()!=None:
e.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULL END AS status1 FROM (SELECTa.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id ,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id  =  (SELECT MIN (seq_id) FROMNIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)  ) ) a , (SELECT DISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id = b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
row=e.fetchone()
if row==None:
break
c.execute("SELECT queue_id , job_id  , MAX (job_parent_id) AS job_parent_id, MAX (job_status) AS job_status , CASE WHEN MAX (status1) = 3 THEN MAX (status1) ELSE MIN (status1)  END  AS status ,  MAX (job_fatal)  AS job_fatal  ,  MAX( JOB_START_TIME) AS JOB_START_TIME ,  TO_TIMESTAMP(TO_CHAR(SYSDATE,'dd/mm/yyyy')  ||TO_CHAR(SYSDATE,' HH24:MI:ss'),'dd/mm/yyyy HH24:MI:ss') sdate FROM  (SELECT a.queue_id, a.job_id , b.job_parent_id , a.job_status , b.status , b.job_fatal , a.job_start_time, CASE  WHEN b.job_fatal = 1  THEN b.status  WHEN b.job_fatal = 0  AND b.status <= 2  THEN b.status  WHEN b.job_fatal = 0  AND b.status = 3  THEN 2  ELSE NULLEND AS status1 FROM (SELECT a.queue_id,  a.job_status ,  a.seq_id ,  a.job_id ,  b.job_parent_id ,  a.job_start_time FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b  WHERE a.job_id  = b.job_id(+) AND a.job_status IN (0,1,3) AND a.seq_id=  (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)  ) ) a , (SELECT DISTINCT b.job_parent_id,  a.queue_id ,  a.seq_id  ,  d.job_id  ,  a.job_status status ,  d.job_fatal FROM NIC_EDW_STG.job_queue a,  NIC_EDW_STG.job_dependency b ,  NIC_EDW_STG.job_master d  WHERE a.job_id = b.job_parent_id AND a.job_id = d.job_id and  a.seq_id  = (SELECT MIN (seq_id) FROM NIC_EDW_STG.job_queue WHERE job_status IN (0,1,3)) ) b WHERE a.job_parent_id = b.job_parent_id(+)  AND a.seq_id= b.seq_id(+)) GROUP BY queue_id,  job_id ORDER BY queue_id,  job_id")
for cur in c.fetchall():
rec_count=0
obs_status=0
cnt.execute("SELECT count(*) as cnt from nic_edw_stg.job_queue where job_status=1")
rowcnt=cnt.fetchone() # the cursor cnt is used to exit from the Loop when there is no job with the status 1
if rowcnt==None:
break
cnt.execute("SELECT count(*) as cnt from nic_edw_stg.job_queue where job_status=1")
for cnt_c in cnt.fetchall():curr_cnt=cnt_c[0]
# the below code checks the status and dependency of a job to execute
if cur[3]==0.0:
if curr_cnt < cnt_stop:
if Date.getTime(cur[6]) < Date.getTime(cur[7]):
if cur[2]==None:
job_sat=1obs_status=1
else:
if(cur[5]==1.0 or cur[5]==0.0) and (cur[4]==1.0 or cur[4]==0.0):
job_sat=0
elif cur[5]==1.0 and cur[4]==2.0:
job_sat=1
obs_status=1
elif cur[5]==1.0 and (cur[4]==3.0 or cur[4]==4.0):
job_sat=0
obs_status=0
elif cur[5]==0.0 and cur[4]==2.0:
job_sat=1
obs_status=1
elif cur[5] == 0.0 and (cur[4]==2.0 or cur[4]==3.0 or cur[4]==4.0):
job_sat=1
obs_status=1
if job_sat==1: # update the job id 's  status as 1 which is to be executed next
upd='UPDATE NIC_EDW_STG.JOB_QUEUE set job_start_time=sysdate,job_status='+str(obs_status)+' where queue_id='+str(cur[0])d.execute(upd)
db.commit()
b=str(cur[0])
a=b.split(".")
path=startpath(a[0])
thread.start_new_thread(ins,(path,))

if rec_count>0: # exits from the loop when there is no job with the status 0,1,3 (WAITING, STARTED,FAILED)
d.execute("SELECT * FROM NIC_EDW_STG.JOB_QUEUE a  WHERE  a.job_status IN (0,1,3)  AND a.seq_id = (SELECT MIN (seq_id) FROM NIC_EDW_STG.JOB_QUEUE WHERE job_status IN (0,1,3))")
if d.fetchone()==None:
time.sleep(30)
if rec_count>40:
break
except:
if c:
c.close()
if d:
d.close()
if e:
e.close()
if cnt:
cnt.close()
if db:
db.close()
print 'Attrib Error catched'
s=check_exception() # funtion 'check_exception' tohandle the Attribexception
if s=='0':
print 'caught Attrib error - No impact'
else:
print sys.exc_info()[0] # raise the Exception
raise

else:
if c:
c.close()
if d:
d.close()
if e:
e.close()
if cnt:
cnt.close()
if db:
db.close()

lang.Class.forName('oracle.jdbc.driver.OracleDriver')
print job()
# subqry block A retrives the jobs_id,job_status,job_fatal of jobs that are scheduled using the tables job_queue, job_dependency,job_master
# subqry blocl B retrives the jobs and its job_parent_id with the status
# select block joins subqry A and subqry B takes the job_id and queue_id ,job_status,job_status of parent_job
# over all Select Block gets the queue_id and job_id based on the Status of parent_status and Job_fatel

Monday, February 11, 2019

table

Batch_Control
ETL_NO BATCH_Type BATCH_Date batch_Start_Time batch_End_Time IS_Daily IS_Reload is_part_load
1 Hermes Daily Load 05/02/2019 05/02/2019 5:05 05/02/2019 15:00 N N N
2 Hermes Daily Load 06/02/2019 06/02/2019 5:05 06/02/2019 15:00 Y N N
Job_Master
job_id Group_Id Site/Country COUNTRY Site_Description Plug_Type Plug_Name Source Mandatory_flag is_active Dependency_order File_Name CRT_datetime UPD_DATETIME
1 1 LATAM AR Latin America Referential Account ATLAS Y Y 1 AR_Account 05/02/2019 5:05 05/02/2019 5:05
2 1 LATAM AR Latin America Referential Client ATLAS Y Y 1 AR_Client 05/02/2019 5:05 05/02/2019 5:05
3 1 LATAM AR Latin America Referential National Client ATLAS Y Y 1 AR_National Client 05/02/2019 5:05 05/02/2019 5:05
4 1 LATAM AR Latin America Referential Account External ATLAS Y Y 1 AR_Account External 05/02/2019 5:05 05/02/2019 5:05
5 1 LATAM AR Latin America operational 09391ST SCT4CIB Y Y 2 BFI_OPE_09391ST 05/02/2019 5:05 05/02/2019 5:05
6 1 LATAM AR Latin America operational 09108SP ORION y Y 2 BFI_OPE_09108SP 05/02/2019 5:05 05/02/2019 5:05
7 1 LATAM AR Latin America operational 09888IP IP4CIB y Y 2 BFI_OPE_09888IP 05/02/2019 5:05 05/02/2019 5:05
8 1 LATAM BR Latin America Referential Account ATLAS Y Y 1 AR_Account 05/02/2019 5:05 05/02/2019 5:05
9 1 LATAM BR Latin America Referential Client ATLAS Y Y 1 AR_Client 05/02/2019 5:05 05/02/2019 5:05
10 1 LATAM BR Latin America Referential National Client ATLAS Y Y 1 AR_National Client 05/02/2019 5:05 05/02/2019 5:05
11 1 LATAM BR Latin America Referential Account External ATLAS Y Y 1 AR_Account External 05/02/2019 5:05 05/02/2019 5:05
12 1 LATAM BR Latin America operational 09391ST SCT4CIB Y Y 2 BFI_OPE_09391ST 05/02/2019 5:05 05/02/2019 5:05
13 1 LATAM BR Latin America operational 09108SP ORION y Y 2 BFI_OPE_09108SP 05/02/2019 5:05 05/02/2019 5:05
14 1 LATAM BR Latin America operational 09888IP IP4CIB y Y 2 BFI_OPE_09888IP 05/02/2019 5:05 05/02/2019 5:05
15 1 LATAM CO Latin America Referential Account ATLAS Y Y 1 AR_Account 05/02/2019 5:05 05/02/2019 5:05
16 1 LATAM CO Latin America Referential Client ATLAS Y Y 1 AR_Client 05/02/2019 5:05 05/02/2019 5:05
17 1 LATAM CO Latin America Referential National Client ATLAS Y Y 1 AR_National Client 05/02/2019 5:05 05/02/2019 5:05
18 1 LATAM CO Latin America Referential Account External ATLAS Y Y 1 AR_Account External 05/02/2019 5:05 05/02/2019 5:05
19 1 LATAM CO Latin America operational 09391ST SCT4CIB Y Y 2 BFI_OPE_09391ST 05/02/2019 5:05 05/02/2019 5:05
20 1 LATAM CO Latin America operational 09108SP ORION y Y 2 BFI_OPE_09108SP 05/02/2019 5:05 05/02/2019 5:05
21 1 LATAM CO Latin America operational 09888IP IP4CIB y Y 2 BFI_OPE_09888IP 05/02/2019 5:05 05/02/2019 5:05
22 2 CEM DE Central Europe Referential Account ATLAS y Y 1 AU_Account 05/02/2019 5:05 05/02/2019 5:05
23 2 CEM DE Central Europe Referential Client ATLAS y N 1 AU_Client 05/02/2019 5:05 06/02/2019 5:05
24 2 CEM DE Central Europe operational 06545SP ORION y Y 2 BFI_OPE_06545SP 05/02/2019 5:05 05/02/2019 5:05
25 2 CEM DE Central Europe operational 08541IP IP4CIB y Y 2 BFI_OPE_08541IP 05/02/2019 5:05 05/02/2019 5:05
Batch_Daily
BATCH_ID ETL_NO RUN_DATE JOB_ID GROUP_ID Plug_Name Source Site_Description Is_active is_completed ETL_START_TIME ETL_END_TIME ETL_UPD_DATE USER
1 2 06/02/2019 1 1 Account ATLAS Latin America Y N scsvcauxhmsd
2 2 06/02/2019 2 1 Client ATLAS Latin America Y N scsvcauxhmsd
3 2 06/02/2019 3 1 National Client ATLAS Latin America Y N scsvcauxhmsd
4 2 06/02/2019 4 1 Account External ATLAS Latin America Y N scsvcauxhmsd
5 2 06/02/2019 5 1 09391ST SCT4CIB Latin America Y N scsvcauxhmsd
6 2 06/02/2019 6 1 09108SP ORION Latin America Y N scsvcauxhmsd
7 2 06/02/2019 7 1 09888IP IP4CIB Latin America Y N scsvcauxhmsd
8 2 06/02/2019 8 2 Referential ATLAS Central Europe Y N scsvcauxhmsd
9 2 06/02/2019 10 2 operational ORION Central Europe Y N scsvcauxhmsd
10 2 06/02/2019 11 2 operational IP4CIB Central Europe Y N scsvcauxhmsd
Job_Flow
flow_id work_flow job_name site country is_active Crt_dt upd_Dt
1 Referential Load REF_ACCOUNT latam AR y 06/02/2019 06/02/2019
2 Referential Load REF_CLIENT latam AR y 06/02/2019 06/02/2019
3 Referential Load REF_RFRMPM latam AR y 06/02/2019 06/02/2019
4 Referential Load REF_ACT_TYP latam AR y 06/02/2019 06/02/2019
5 Referential Load REF_ATL_BRC latam AR y 06/02/2019 06/02/2019
6 operatial Load CM_VOL_LOAD latam AR y 06/02/2019 06/02/2019
7 File_watcher fw latam AR y