Sunday, April 14, 2019

metadata manger

package Project
import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import Project.config.Settings._
object MetadataManager {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()
  def reportStatus(keyPrefix: String,params: List[String] )(callback: => Unit)(implicit spark: SparkSession)={

    println("Started "+keyPrefix)
    insBatchEntry(keyPrefix,params)

    try{
      callback
    }
    catch{
      case e: Exception => println("exception caught: " + e);      case e: IllegalArgumentException => println("illegal arg. exception");      case e: IllegalStateException    => println("illegal state exception");    }
    //Seq[String] = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(dir))
  }

  def insBatchEntry(keyPrefix: String,key: List[String]) : Unit ={
    val row = Row.fromSeq(key)
    val rdd = spark.sparkContext.makeRDD(List(row))
    import org.apache.spark.sql.types.{StringType,StructField,StructType}

    val fields = List(
      StructField("COUNTRY_NM", StringType, nullable = false),      StructField("PLUG_NM", StringType, nullable = false),      StructField("ETL_NO", StringType, nullable = false)
    )
    val df_inp = spark.createDataFrame(rdd, StructType(fields))
    df_inp.show(2)

    val df_jb= spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar")

    val df_join_jb=  df_jb.join(
      df_inp
      , (df_jb("COUNTRY_NM") === df_inp("COUNTRY_NM")&& df_jb("PLUG_NM") === df_inp("PLUG_NM")  && df_jb("IS_ACTIVE") === "Y")
      , "inner")

    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    def prepareRowCountfromParquet(jobmaster_pa: String)(implicit spark: SparkSession): Int = {
      print("The variable value is " + jobmaster_pa)
      print("the count is " + spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt)
      spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt
    }

def UDFjobSourceStats(jobname : String)(implicit spark: SparkSession): Int={
    jobname match {
    case "JobMaster" => prepareRowCountfromParquet(jobmaster)
    case _ => 0  }
}
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{concat,lit,col}
    spark.udf.register("SRCROWCNT",prepareRowCountfromParquet _)
    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)
    val UDFSRCSTAT = udf(UDFjobSourceStats _)
    val SRCROWCNT = udf(prepareRowCountfromParquet _)

      df_join_jb
      .drop(df_inp("PLUG_NM"))
      .drop(df_inp("COUNTRY_NM"))
      .select(df_jb("SITE_NM"),df_jb("COUNTRY_NM"),df_jb("SITE_DESC"),df_jb("PLUG_TYPE"),df_jb("PLUG_NM"),df_jb("SOURCE_NM"),df_jb("FILE_NM"))
      .withColumn("DAT_BEG",UDFCURDT())
      .withColumn("DAT_END",lit(null))
      .withColumn("TXT_JOB",concat(df_jb("SITE_NM"),lit("_"),df_jb("COUNTRY_NM"),lit("_"),df_jb("PLUG_NM")))
      .withColumn("STATUS",lit("IN PROGRESS"))
      .withColumn("SRC_COUNT", lit(keyPrefix match{ case "JobMaster" => SRCROWCNT(lit(jobmaster))
      case _ => 0}) )
    .show(2)

  }

}

No comments:

Post a Comment