package Project import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.sql.{DataFrame, SparkSession} import Project.config.Settings._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DataTypes object JobMaster extends App { implicit val spark = SparkSession.builder().appName("Job Master") .config("spark.master", "local") .getOrCreate() val key_list = List("MASTER_ID", "GROUP_ID", "SITE_NM", "COUNTRY_NM") val key = args.toList run() def run() = MetadataManager.reportStatus("JobMaster", key) { def name: String = System.getProperty("user.name") spark.read.format("csv").option("header", "true").load(s"$jobmaster").createOrReplaceGlobalTempView("JobMaster_Source") spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar").createOrReplaceGlobalTempView("JobMaster_Target") val df_cdc = spark.sql( """ |select |JobMaster_Target.MASTER_ID as MASTER_ID, |case when JobMaster_Target.GROUP_ID is null then 0 else cast(JobMaster_Target.GROUP_ID as INT) end as GROUP_ID, |trim(JobMaster_Source.SITE_NM) as SITE_NM, |trim(JobMaster_Target.SITE_NM) as TAR_SITE_NM, |trim(JobMaster_Source.COUNTRY_NM) as COUNTRY_NM, |trim(JobMaster_Source.SITE_DESC) as SITE_DESC, |trim(JobMaster_Source.PLUG_TYPE) as PLUG_TYPE, |trim(JobMaster_Source.PLUG_NM) as PLUG_NM, |trim(JobMaster_Source.SOURCE_NM) as SOURCE_NM, |trim(JobMaster_Source.IS_ACTIVE) as IS_ACTIVE, |trim(JobMaster_Source.FILE_NM) as FILE_NM, |trim(JobMaster_Target.CRT_DATETIME) as CRT_DATETIME, |case when JobMaster_Target.MASTER_ID is not Null and (JobMaster_Source.SOURCE_NM =JobMaster_Target.SOURCE_NM and |JobMaster_Source.IS_ACTIVE = JobMaster_Target.IS_ACTIVE and |JobMaster_Source.FILE_NM =JobMaster_Target.FILE_NM ) then 'N' | when JobMaster_Target.MASTER_ID is Null then 'I' else 'U' end as CDC_FLG |from |global_temp.JobMaster_Source left join global_temp.JobMaster_Target on |trim(JobMaster_Source.SITE_NM)=trim(JobMaster_Target.SITE_NM) and |trim(JobMaster_Source.COUNTRY_NM)=trim(JobMaster_Target.COUNTRY_NM) and |trim(JobMaster_Source.PLUG_TYPE)=trim(JobMaster_Target.PLUG_TYPE) and |trim(JobMaster_Source.PLUG_NM)=trim(JobMaster_Target.PLUG_NM) """.stripMargin) //.createOrReplaceGlobalTempView("jb_cdc_data") df_cdc.createOrReplaceGlobalTempView("JmCdc") val df_grp_jm = df_cdc .select(df_cdc("SITE_NM"), df_cdc("GROUP_ID")) .distinct .groupBy("SITE_NM") .max("GROUP_ID") df_grp_jm.withColumnRenamed("max(GROUP_ID)", "GROUP_ID").createOrReplaceGlobalTempView("find_grp") spark. read.format("csv") .option("header", "true") .load(s"$jobmaster_tar") .select("GROUP_ID") .withColumnRenamed("GROUP_ID", "MAX_GROUP_ID").createOrReplaceGlobalTempView("max_grp") val distinctValuesDF = spark.sql( """ |select |trim(find_grp.SITE_NM) as SITE_NM, |cast(row_number() over ( order by find_grp.SITE_NM) + mx.MAX_GROUP_ID as INT) as GROUP_ID |from |global_temp.find_grp cross join (select max(MAX_GROUP_ID) as MAX_GROUP_ID from global_temp.max_grp) mx where find_grp.GROUP_ID = 0 |union all |select trim(find_grp.SITE_NM) as SITE_NM, find_grp.GROUP_ID from global_temp.find_grp where find_grp.GROUP_ID <> 0 """.stripMargin) val maxValue = df_cdc.agg(max("MASTER_ID")) val new_rec = df_cdc.filter(col("MASTER_ID").isNull).drop("MASTER_ID") import org.apache.spark.sql.expressions.Window val new_with_master = new_rec .crossJoin(maxValue) .withColumn("MASTER_ID", (maxValue("max(MASTER_ID)") + row_number().over(Window.partitionBy("GROUP_ID").orderBy("GROUP_ID"))).cast(DataTypes.IntegerType)) .drop("GROUP_ID") .drop("max(MASTER_ID)") .drop("TAR_SITE_NM") import java.sql.Timestamp def UDFgetCurrentdateTimeStamp(): Timestamp = { val today: java.util.Date = Calendar.getInstance.getTime val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val now: String = timeFormat.format(today) val re = java.sql.Timestamp.valueOf(now) re } import org.apache.spark.sql.functions.udf val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _) val final_upd = df_cdc .drop("TAR_SITE_NM") .drop("") .filter(col("MASTER_ID").isNotNull) .filter(col("CDC_FLG") === "U") .withColumn("UPD_DATETIME", UDFCURDT()) .drop("CDC_FLG") final_upd.show(100) val final_new = new_with_master.join( distinctValuesDF , new_with_master("SITE_NM") === distinctValuesDF("SITE_NM") , "inner") .drop(distinctValuesDF("SITE_NM")) .drop("CDC_FLG") .withColumn("CRT_DATETIME", UDFCURDT()) .withColumn("UPD_DATETIME", UDFCURDT()) .select("*").show(100) } }
Saturday, April 13, 2019
new jobmaster
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment