Sunday, April 28, 2019

Multi delmiter spark

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object PaymentFile extends  App {


  implicit val spark = SparkSession.builder().appName("PaymentFile")
    .config("spark.master", "local")
    .getOrCreate()

 val rdd = spark.sparkContext.textFile("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter.txt")
  val header = rdd.filter(_.contains("input")).map(line => line.split("\\!\\~")).first()
  val schema = StructType(header.map(cols => StructField(cols,StringType)).toSeq)
  val data = spark.createDataFrame(rdd.filter(!_.contains("input"))
    .map(line => Row.fromSeq(line.split("\\!\\~").toSeq)), schema)
  data.write.partitionBy("input").mode("overwrite").parquet("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter")

}

Sunday, April 14, 2019

metadata manger

package Project
import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import Project.config.Settings._
object MetadataManager {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()
  def reportStatus(keyPrefix: String,params: List[String] )(callback: => Unit)(implicit spark: SparkSession)={

    println("Started "+keyPrefix)
    insBatchEntry(keyPrefix,params)

    try{
      callback
    }
    catch{
      case e: Exception => println("exception caught: " + e);      case e: IllegalArgumentException => println("illegal arg. exception");      case e: IllegalStateException    => println("illegal state exception");    }
    //Seq[String] = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(dir))
  }

  def insBatchEntry(keyPrefix: String,key: List[String]) : Unit ={
    val row = Row.fromSeq(key)
    val rdd = spark.sparkContext.makeRDD(List(row))
    import org.apache.spark.sql.types.{StringType,StructField,StructType}

    val fields = List(
      StructField("COUNTRY_NM", StringType, nullable = false),      StructField("PLUG_NM", StringType, nullable = false),      StructField("ETL_NO", StringType, nullable = false)
    )
    val df_inp = spark.createDataFrame(rdd, StructType(fields))
    df_inp.show(2)

    val df_jb= spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar")

    val df_join_jb=  df_jb.join(
      df_inp
      , (df_jb("COUNTRY_NM") === df_inp("COUNTRY_NM")&& df_jb("PLUG_NM") === df_inp("PLUG_NM")  && df_jb("IS_ACTIVE") === "Y")
      , "inner")

    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    def prepareRowCountfromParquet(jobmaster_pa: String)(implicit spark: SparkSession): Int = {
      print("The variable value is " + jobmaster_pa)
      print("the count is " + spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt)
      spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt
    }

def UDFjobSourceStats(jobname : String)(implicit spark: SparkSession): Int={
    jobname match {
    case "JobMaster" => prepareRowCountfromParquet(jobmaster)
    case _ => 0  }
}
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{concat,lit,col}
    spark.udf.register("SRCROWCNT",prepareRowCountfromParquet _)
    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)
    val UDFSRCSTAT = udf(UDFjobSourceStats _)
    val SRCROWCNT = udf(prepareRowCountfromParquet _)

      df_join_jb
      .drop(df_inp("PLUG_NM"))
      .drop(df_inp("COUNTRY_NM"))
      .select(df_jb("SITE_NM"),df_jb("COUNTRY_NM"),df_jb("SITE_DESC"),df_jb("PLUG_TYPE"),df_jb("PLUG_NM"),df_jb("SOURCE_NM"),df_jb("FILE_NM"))
      .withColumn("DAT_BEG",UDFCURDT())
      .withColumn("DAT_END",lit(null))
      .withColumn("TXT_JOB",concat(df_jb("SITE_NM"),lit("_"),df_jb("COUNTRY_NM"),lit("_"),df_jb("PLUG_NM")))
      .withColumn("STATUS",lit("IN PROGRESS"))
      .withColumn("SRC_COUNT", lit(keyPrefix match{ case "JobMaster" => SRCROWCNT(lit(jobmaster))
      case _ => 0}) )
    .show(2)

  }

}

Saturday, April 13, 2019

new jobmaster

package Project

import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, SparkSession}
import Project.config.Settings._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes

object JobMaster extends  App {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()

  val key_list = List("MASTER_ID", "GROUP_ID", "SITE_NM", "COUNTRY_NM")
  val key = args.toList
  run()
  def run() = MetadataManager.reportStatus("JobMaster", key) {
    def name: String = System.getProperty("user.name")

    spark.read.format("csv").option("header", "true").load(s"$jobmaster").createOrReplaceGlobalTempView("JobMaster_Source")
    spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar").createOrReplaceGlobalTempView("JobMaster_Target")

    val df_cdc =
      spark.sql(
        """          |select          |JobMaster_Target.MASTER_ID as MASTER_ID,          |case when JobMaster_Target.GROUP_ID is null then 0 else cast(JobMaster_Target.GROUP_ID as INT) end as GROUP_ID,          |trim(JobMaster_Source.SITE_NM) as SITE_NM,          |trim(JobMaster_Target.SITE_NM) as TAR_SITE_NM,          |trim(JobMaster_Source.COUNTRY_NM) as COUNTRY_NM,          |trim(JobMaster_Source.SITE_DESC) as SITE_DESC,          |trim(JobMaster_Source.PLUG_TYPE) as PLUG_TYPE,          |trim(JobMaster_Source.PLUG_NM) as PLUG_NM,          |trim(JobMaster_Source.SOURCE_NM) as SOURCE_NM,          |trim(JobMaster_Source.IS_ACTIVE) as IS_ACTIVE,          |trim(JobMaster_Source.FILE_NM) as FILE_NM,          |trim(JobMaster_Target.CRT_DATETIME) as CRT_DATETIME,          |case when JobMaster_Target.MASTER_ID is not Null and (JobMaster_Source.SOURCE_NM =JobMaster_Target.SOURCE_NM and          |JobMaster_Source.IS_ACTIVE = JobMaster_Target.IS_ACTIVE and          |JobMaster_Source.FILE_NM =JobMaster_Target.FILE_NM ) then 'N'          | when JobMaster_Target.MASTER_ID is Null then 'I' else 'U' end as CDC_FLG          |from          |global_temp.JobMaster_Source left join global_temp.JobMaster_Target on          |trim(JobMaster_Source.SITE_NM)=trim(JobMaster_Target.SITE_NM) and          |trim(JobMaster_Source.COUNTRY_NM)=trim(JobMaster_Target.COUNTRY_NM) and          |trim(JobMaster_Source.PLUG_TYPE)=trim(JobMaster_Target.PLUG_TYPE) and          |trim(JobMaster_Source.PLUG_NM)=trim(JobMaster_Target.PLUG_NM)        """.stripMargin) //.createOrReplaceGlobalTempView("jb_cdc_data")
    df_cdc.createOrReplaceGlobalTempView("JmCdc")

    val df_grp_jm = df_cdc
      .select(df_cdc("SITE_NM"), df_cdc("GROUP_ID"))
      .distinct
      .groupBy("SITE_NM")
      .max("GROUP_ID")
    df_grp_jm.withColumnRenamed("max(GROUP_ID)", "GROUP_ID").createOrReplaceGlobalTempView("find_grp")

    spark.
      read.format("csv")
      .option("header", "true")
      .load(s"$jobmaster_tar")
      .select("GROUP_ID")
      .withColumnRenamed("GROUP_ID", "MAX_GROUP_ID").createOrReplaceGlobalTempView("max_grp")


    val distinctValuesDF = spark.sql(
      """        |select        |trim(find_grp.SITE_NM) as SITE_NM,        |cast(row_number() over ( order by find_grp.SITE_NM)  + mx.MAX_GROUP_ID as INT) as GROUP_ID        |from        |global_temp.find_grp cross join (select max(MAX_GROUP_ID) as MAX_GROUP_ID from global_temp.max_grp) mx where find_grp.GROUP_ID = 0        |union all        |select trim(find_grp.SITE_NM) as SITE_NM, find_grp.GROUP_ID from global_temp.find_grp where find_grp.GROUP_ID  <> 0      """.stripMargin)

    val maxValue = df_cdc.agg(max("MASTER_ID"))
    val new_rec = df_cdc.filter(col("MASTER_ID").isNull).drop("MASTER_ID")

    import org.apache.spark.sql.expressions.Window

    val new_with_master = new_rec
      .crossJoin(maxValue)
      .withColumn("MASTER_ID", (maxValue("max(MASTER_ID)") + row_number().over(Window.partitionBy("GROUP_ID").orderBy("GROUP_ID"))).cast(DataTypes.IntegerType))
      .drop("GROUP_ID")
      .drop("max(MASTER_ID)")
      .drop("TAR_SITE_NM")


    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    import org.apache.spark.sql.functions.udf

    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)

    val final_upd = df_cdc
      .drop("TAR_SITE_NM")
      .drop("")
      .filter(col("MASTER_ID").isNotNull)
      .filter(col("CDC_FLG") === "U")
      .withColumn("UPD_DATETIME", UDFCURDT())
      .drop("CDC_FLG")

    final_upd.show(100)

    val final_new = new_with_master.join(
      distinctValuesDF
      , new_with_master("SITE_NM") === distinctValuesDF("SITE_NM")
      , "inner")
      .drop(distinctValuesDF("SITE_NM"))
      .drop("CDC_FLG")
      .withColumn("CRT_DATETIME", UDFCURDT())
      .withColumn("UPD_DATETIME", UDFCURDT())
      .select("*").show(100)


  }
}

Sunday, April 7, 2019

IBAN

package Project

object IBANAlgorithm extends App {

  print(apply("DE89370400440532013000"))

  def apply(iban:String):Boolean = {

    val letters = "[A-Z]".r

    val rearranged = iban.takeRight(iban.length-4) + iban.take(4)

    val decimal =
      letters.findAllIn(rearranged).map(l => l.toCharArray.map(c => (l,c.toInt - 55)).head)
        .foldLeft(rearranged){(s,t) => t match{case (char,value) =>s.replace(char,value.toString)}}

    BigDecimal(decimal) % BigDecimal("97") == BigDecimal("1")

  }

}

/*/*/*///////////////////////////////////************
package Project

import scala.collection.immutable.SortedMap
import scala.math.BigInt
import scala.util.matching.Regex

/** The International Bank Account Number (IBAN) is an internationally agreed means of identifying  *  bank accounts across national borders with a reduced risk of propagating transcription errors. The  *  IBAN consists of up to 34 alphanumeric characters: first the two-letter ISO 3166-1 alpha-2 country  *  code, then two check digits, and finally a country-specific Basic Bank Account Number (BBAN). The  *  check digits enable a sanity check of the bank account number to confirm its integrity even before  *  submitting a transaction.  */class Iban(val iban: String) {

  // Isolated tests
  def isAllUpperCase = iban.toUpperCase() == iban

  def isValidpattern = (Iban.pattern findFirstIn iban).nonEmpty

  def isNationalSize = { Iban.ccVsLength.getOrElse(iban.take(2), 0) == iban.size }

  def isCheckNumberOK = {
    def rearrange = ((iban.drop(4) + iban.take(4)). // Move left country code part to end      // continue with each char converted to Int      map(ch => if (ch.isDigit) ch.toInt - '0' else (ch - 'A' + 10))).mkString

    (BigInt(rearrange) mod 97) == 1  }

  def isValidIban = {
    isAllUpperCase &&
      isValidpattern &&
      isNationalSize &&
      isCheckNumberOK
  }
}

object Iban { // IBAN length database  lazy val ccVsLength: SortedMap[String, Int] = SortedMap[String, Int]() ++
    ("""AD24 AE23 AL28 AO25 AT20 AZ28 BA20 BE16 BF27 BG22 BH22 BI16          |BJ28 BR29 CG27 CH21 CI28 CM27 CR21 CV25 CY28 CZ24 DE22 DK18          |DO28 DZ24 EE20 EG27 ES24 FI18 FO18 FR27 GA27 GB22 GE22 GI23          |GL18 GR27 GT28 HR21 HU28 IE22 IL23 IR26 IS26 IT27 JO30 KW30          |KZ20 LB28 LI21 LT20 LU20 LV21 MC27 MD24 ME22 MG27 MK19 ML28          |MR27 MT31 MU30 MZ25 NL18 NO15 PK24 PL28 PS29 PT25 QA29 RO24          |RS22 SA24 SE24 SI19 SK24 SM27 SN28 TN24 TR26 UA29 VG24""".
      stripMargin.replaceAll("""\s""", " ").split(' ').
      map(v => (v.take(2), if (v.isEmpty()) 0 else v.slice(2, 4).toInt)))

  lazy val pattern = "([A-Z]{2})([0-9]{2})([A-Z0-9]{4})([A-Z0-9]{0,2})([0-9]{7})(([A-Z0-9]?){0,16})".r

  def apply(s: String) = new Iban(s.replaceAll("""\s""", ""))
}