package Project import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.sql.{DataFrame, Row, SparkSession} import Project.config.Settings._ object MetadataManager { implicit val spark = SparkSession.builder().appName("Job Master") .config("spark.master", "local") .getOrCreate() def reportStatus(keyPrefix: String,params: List[String] )(callback: => Unit)(implicit spark: SparkSession)={ println("Started "+keyPrefix) insBatchEntry(keyPrefix,params) try{ callback } catch{ case e: Exception => println("exception caught: " + e); case e: IllegalArgumentException => println("illegal arg. exception"); case e: IllegalStateException => println("illegal state exception"); } //Seq[String] = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(dir)) } def insBatchEntry(keyPrefix: String,key: List[String]) : Unit ={ val row = Row.fromSeq(key) val rdd = spark.sparkContext.makeRDD(List(row)) import org.apache.spark.sql.types.{StringType,StructField,StructType} val fields = List( StructField("COUNTRY_NM", StringType, nullable = false), StructField("PLUG_NM", StringType, nullable = false), StructField("ETL_NO", StringType, nullable = false) ) val df_inp = spark.createDataFrame(rdd, StructType(fields)) df_inp.show(2) val df_jb= spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar") val df_join_jb= df_jb.join( df_inp , (df_jb("COUNTRY_NM") === df_inp("COUNTRY_NM")&& df_jb("PLUG_NM") === df_inp("PLUG_NM") && df_jb("IS_ACTIVE") === "Y") , "inner") import java.sql.Timestamp def UDFgetCurrentdateTimeStamp(): Timestamp = { val today: java.util.Date = Calendar.getInstance.getTime val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val now: String = timeFormat.format(today) val re = java.sql.Timestamp.valueOf(now) re } def prepareRowCountfromParquet(jobmaster_pa: String)(implicit spark: SparkSession): Int = { print("The variable value is " + jobmaster_pa) print("the count is " + spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt) spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt } def UDFjobSourceStats(jobname : String)(implicit spark: SparkSession): Int={ jobname match { case "JobMaster" => prepareRowCountfromParquet(jobmaster) case _ => 0 } } import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.{concat,lit,col} spark.udf.register("SRCROWCNT",prepareRowCountfromParquet _) val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _) val UDFSRCSTAT = udf(UDFjobSourceStats _) val SRCROWCNT = udf(prepareRowCountfromParquet _) df_join_jb .drop(df_inp("PLUG_NM")) .drop(df_inp("COUNTRY_NM")) .select(df_jb("SITE_NM"),df_jb("COUNTRY_NM"),df_jb("SITE_DESC"),df_jb("PLUG_TYPE"),df_jb("PLUG_NM"),df_jb("SOURCE_NM"),df_jb("FILE_NM")) .withColumn("DAT_BEG",UDFCURDT()) .withColumn("DAT_END",lit(null)) .withColumn("TXT_JOB",concat(df_jb("SITE_NM"),lit("_"),df_jb("COUNTRY_NM"),lit("_"),df_jb("PLUG_NM"))) .withColumn("STATUS",lit("IN PROGRESS")) .withColumn("SRC_COUNT", lit(keyPrefix match{ case "JobMaster" => SRCROWCNT(lit(jobmaster)) case _ => 0}) ) .show(2) } }
Sunday, April 14, 2019
metadata manger
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment