Saturday, April 13, 2019

new jobmaster

package Project

import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, SparkSession}
import Project.config.Settings._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes

object JobMaster extends  App {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()

  val key_list = List("MASTER_ID", "GROUP_ID", "SITE_NM", "COUNTRY_NM")
  val key = args.toList
  run()
  def run() = MetadataManager.reportStatus("JobMaster", key) {
    def name: String = System.getProperty("user.name")

    spark.read.format("csv").option("header", "true").load(s"$jobmaster").createOrReplaceGlobalTempView("JobMaster_Source")
    spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar").createOrReplaceGlobalTempView("JobMaster_Target")

    val df_cdc =
      spark.sql(
        """          |select          |JobMaster_Target.MASTER_ID as MASTER_ID,          |case when JobMaster_Target.GROUP_ID is null then 0 else cast(JobMaster_Target.GROUP_ID as INT) end as GROUP_ID,          |trim(JobMaster_Source.SITE_NM) as SITE_NM,          |trim(JobMaster_Target.SITE_NM) as TAR_SITE_NM,          |trim(JobMaster_Source.COUNTRY_NM) as COUNTRY_NM,          |trim(JobMaster_Source.SITE_DESC) as SITE_DESC,          |trim(JobMaster_Source.PLUG_TYPE) as PLUG_TYPE,          |trim(JobMaster_Source.PLUG_NM) as PLUG_NM,          |trim(JobMaster_Source.SOURCE_NM) as SOURCE_NM,          |trim(JobMaster_Source.IS_ACTIVE) as IS_ACTIVE,          |trim(JobMaster_Source.FILE_NM) as FILE_NM,          |trim(JobMaster_Target.CRT_DATETIME) as CRT_DATETIME,          |case when JobMaster_Target.MASTER_ID is not Null and (JobMaster_Source.SOURCE_NM =JobMaster_Target.SOURCE_NM and          |JobMaster_Source.IS_ACTIVE = JobMaster_Target.IS_ACTIVE and          |JobMaster_Source.FILE_NM =JobMaster_Target.FILE_NM ) then 'N'          | when JobMaster_Target.MASTER_ID is Null then 'I' else 'U' end as CDC_FLG          |from          |global_temp.JobMaster_Source left join global_temp.JobMaster_Target on          |trim(JobMaster_Source.SITE_NM)=trim(JobMaster_Target.SITE_NM) and          |trim(JobMaster_Source.COUNTRY_NM)=trim(JobMaster_Target.COUNTRY_NM) and          |trim(JobMaster_Source.PLUG_TYPE)=trim(JobMaster_Target.PLUG_TYPE) and          |trim(JobMaster_Source.PLUG_NM)=trim(JobMaster_Target.PLUG_NM)        """.stripMargin) //.createOrReplaceGlobalTempView("jb_cdc_data")
    df_cdc.createOrReplaceGlobalTempView("JmCdc")

    val df_grp_jm = df_cdc
      .select(df_cdc("SITE_NM"), df_cdc("GROUP_ID"))
      .distinct
      .groupBy("SITE_NM")
      .max("GROUP_ID")
    df_grp_jm.withColumnRenamed("max(GROUP_ID)", "GROUP_ID").createOrReplaceGlobalTempView("find_grp")

    spark.
      read.format("csv")
      .option("header", "true")
      .load(s"$jobmaster_tar")
      .select("GROUP_ID")
      .withColumnRenamed("GROUP_ID", "MAX_GROUP_ID").createOrReplaceGlobalTempView("max_grp")


    val distinctValuesDF = spark.sql(
      """        |select        |trim(find_grp.SITE_NM) as SITE_NM,        |cast(row_number() over ( order by find_grp.SITE_NM)  + mx.MAX_GROUP_ID as INT) as GROUP_ID        |from        |global_temp.find_grp cross join (select max(MAX_GROUP_ID) as MAX_GROUP_ID from global_temp.max_grp) mx where find_grp.GROUP_ID = 0        |union all        |select trim(find_grp.SITE_NM) as SITE_NM, find_grp.GROUP_ID from global_temp.find_grp where find_grp.GROUP_ID  <> 0      """.stripMargin)

    val maxValue = df_cdc.agg(max("MASTER_ID"))
    val new_rec = df_cdc.filter(col("MASTER_ID").isNull).drop("MASTER_ID")

    import org.apache.spark.sql.expressions.Window

    val new_with_master = new_rec
      .crossJoin(maxValue)
      .withColumn("MASTER_ID", (maxValue("max(MASTER_ID)") + row_number().over(Window.partitionBy("GROUP_ID").orderBy("GROUP_ID"))).cast(DataTypes.IntegerType))
      .drop("GROUP_ID")
      .drop("max(MASTER_ID)")
      .drop("TAR_SITE_NM")


    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    import org.apache.spark.sql.functions.udf

    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)

    val final_upd = df_cdc
      .drop("TAR_SITE_NM")
      .drop("")
      .filter(col("MASTER_ID").isNotNull)
      .filter(col("CDC_FLG") === "U")
      .withColumn("UPD_DATETIME", UDFCURDT())
      .drop("CDC_FLG")

    final_upd.show(100)

    val final_new = new_with_master.join(
      distinctValuesDF
      , new_with_master("SITE_NM") === distinctValuesDF("SITE_NM")
      , "inner")
      .drop(distinctValuesDF("SITE_NM"))
      .drop("CDC_FLG")
      .withColumn("CRT_DATETIME", UDFCURDT())
      .withColumn("UPD_DATETIME", UDFCURDT())
      .select("*").show(100)


  }
}

No comments:

Post a Comment