import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} object PaymentFile extends App { implicit val spark = SparkSession.builder().appName("PaymentFile") .config("spark.master", "local") .getOrCreate() val rdd = spark.sparkContext.textFile("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter.txt") val header = rdd.filter(_.contains("input")).map(line => line.split("\\!\\~")).first() val schema = StructType(header.map(cols => StructField(cols,StringType)).toSeq) val data = spark.createDataFrame(rdd.filter(!_.contains("input")) .map(line => Row.fromSeq(line.split("\\!\\~").toSeq)), schema) data.write.partitionBy("input").mode("overwrite").parquet("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter") }
Sunday, April 28, 2019
Multi delmiter spark
Sunday, April 14, 2019
metadata manger
package Project import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.sql.{DataFrame, Row, SparkSession} import Project.config.Settings._ object MetadataManager { implicit val spark = SparkSession.builder().appName("Job Master") .config("spark.master", "local") .getOrCreate() def reportStatus(keyPrefix: String,params: List[String] )(callback: => Unit)(implicit spark: SparkSession)={ println("Started "+keyPrefix) insBatchEntry(keyPrefix,params) try{ callback } catch{ case e: Exception => println("exception caught: " + e); case e: IllegalArgumentException => println("illegal arg. exception"); case e: IllegalStateException => println("illegal state exception"); } //Seq[String] = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(dir)) } def insBatchEntry(keyPrefix: String,key: List[String]) : Unit ={ val row = Row.fromSeq(key) val rdd = spark.sparkContext.makeRDD(List(row)) import org.apache.spark.sql.types.{StringType,StructField,StructType} val fields = List( StructField("COUNTRY_NM", StringType, nullable = false), StructField("PLUG_NM", StringType, nullable = false), StructField("ETL_NO", StringType, nullable = false) ) val df_inp = spark.createDataFrame(rdd, StructType(fields)) df_inp.show(2) val df_jb= spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar") val df_join_jb= df_jb.join( df_inp , (df_jb("COUNTRY_NM") === df_inp("COUNTRY_NM")&& df_jb("PLUG_NM") === df_inp("PLUG_NM") && df_jb("IS_ACTIVE") === "Y") , "inner") import java.sql.Timestamp def UDFgetCurrentdateTimeStamp(): Timestamp = { val today: java.util.Date = Calendar.getInstance.getTime val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val now: String = timeFormat.format(today) val re = java.sql.Timestamp.valueOf(now) re } def prepareRowCountfromParquet(jobmaster_pa: String)(implicit spark: SparkSession): Int = { print("The variable value is " + jobmaster_pa) print("the count is " + spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt) spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt } def UDFjobSourceStats(jobname : String)(implicit spark: SparkSession): Int={ jobname match { case "JobMaster" => prepareRowCountfromParquet(jobmaster) case _ => 0 } } import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.{concat,lit,col} spark.udf.register("SRCROWCNT",prepareRowCountfromParquet _) val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _) val UDFSRCSTAT = udf(UDFjobSourceStats _) val SRCROWCNT = udf(prepareRowCountfromParquet _) df_join_jb .drop(df_inp("PLUG_NM")) .drop(df_inp("COUNTRY_NM")) .select(df_jb("SITE_NM"),df_jb("COUNTRY_NM"),df_jb("SITE_DESC"),df_jb("PLUG_TYPE"),df_jb("PLUG_NM"),df_jb("SOURCE_NM"),df_jb("FILE_NM")) .withColumn("DAT_BEG",UDFCURDT()) .withColumn("DAT_END",lit(null)) .withColumn("TXT_JOB",concat(df_jb("SITE_NM"),lit("_"),df_jb("COUNTRY_NM"),lit("_"),df_jb("PLUG_NM"))) .withColumn("STATUS",lit("IN PROGRESS")) .withColumn("SRC_COUNT", lit(keyPrefix match{ case "JobMaster" => SRCROWCNT(lit(jobmaster)) case _ => 0}) ) .show(2) } }
Saturday, April 13, 2019
new jobmaster
package Project import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.sql.{DataFrame, SparkSession} import Project.config.Settings._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DataTypes object JobMaster extends App { implicit val spark = SparkSession.builder().appName("Job Master") .config("spark.master", "local") .getOrCreate() val key_list = List("MASTER_ID", "GROUP_ID", "SITE_NM", "COUNTRY_NM") val key = args.toList run() def run() = MetadataManager.reportStatus("JobMaster", key) { def name: String = System.getProperty("user.name") spark.read.format("csv").option("header", "true").load(s"$jobmaster").createOrReplaceGlobalTempView("JobMaster_Source") spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar").createOrReplaceGlobalTempView("JobMaster_Target") val df_cdc = spark.sql( """ |select |JobMaster_Target.MASTER_ID as MASTER_ID, |case when JobMaster_Target.GROUP_ID is null then 0 else cast(JobMaster_Target.GROUP_ID as INT) end as GROUP_ID, |trim(JobMaster_Source.SITE_NM) as SITE_NM, |trim(JobMaster_Target.SITE_NM) as TAR_SITE_NM, |trim(JobMaster_Source.COUNTRY_NM) as COUNTRY_NM, |trim(JobMaster_Source.SITE_DESC) as SITE_DESC, |trim(JobMaster_Source.PLUG_TYPE) as PLUG_TYPE, |trim(JobMaster_Source.PLUG_NM) as PLUG_NM, |trim(JobMaster_Source.SOURCE_NM) as SOURCE_NM, |trim(JobMaster_Source.IS_ACTIVE) as IS_ACTIVE, |trim(JobMaster_Source.FILE_NM) as FILE_NM, |trim(JobMaster_Target.CRT_DATETIME) as CRT_DATETIME, |case when JobMaster_Target.MASTER_ID is not Null and (JobMaster_Source.SOURCE_NM =JobMaster_Target.SOURCE_NM and |JobMaster_Source.IS_ACTIVE = JobMaster_Target.IS_ACTIVE and |JobMaster_Source.FILE_NM =JobMaster_Target.FILE_NM ) then 'N' | when JobMaster_Target.MASTER_ID is Null then 'I' else 'U' end as CDC_FLG |from |global_temp.JobMaster_Source left join global_temp.JobMaster_Target on |trim(JobMaster_Source.SITE_NM)=trim(JobMaster_Target.SITE_NM) and |trim(JobMaster_Source.COUNTRY_NM)=trim(JobMaster_Target.COUNTRY_NM) and |trim(JobMaster_Source.PLUG_TYPE)=trim(JobMaster_Target.PLUG_TYPE) and |trim(JobMaster_Source.PLUG_NM)=trim(JobMaster_Target.PLUG_NM) """.stripMargin) //.createOrReplaceGlobalTempView("jb_cdc_data") df_cdc.createOrReplaceGlobalTempView("JmCdc") val df_grp_jm = df_cdc .select(df_cdc("SITE_NM"), df_cdc("GROUP_ID")) .distinct .groupBy("SITE_NM") .max("GROUP_ID") df_grp_jm.withColumnRenamed("max(GROUP_ID)", "GROUP_ID").createOrReplaceGlobalTempView("find_grp") spark. read.format("csv") .option("header", "true") .load(s"$jobmaster_tar") .select("GROUP_ID") .withColumnRenamed("GROUP_ID", "MAX_GROUP_ID").createOrReplaceGlobalTempView("max_grp") val distinctValuesDF = spark.sql( """ |select |trim(find_grp.SITE_NM) as SITE_NM, |cast(row_number() over ( order by find_grp.SITE_NM) + mx.MAX_GROUP_ID as INT) as GROUP_ID |from |global_temp.find_grp cross join (select max(MAX_GROUP_ID) as MAX_GROUP_ID from global_temp.max_grp) mx where find_grp.GROUP_ID = 0 |union all |select trim(find_grp.SITE_NM) as SITE_NM, find_grp.GROUP_ID from global_temp.find_grp where find_grp.GROUP_ID <> 0 """.stripMargin) val maxValue = df_cdc.agg(max("MASTER_ID")) val new_rec = df_cdc.filter(col("MASTER_ID").isNull).drop("MASTER_ID") import org.apache.spark.sql.expressions.Window val new_with_master = new_rec .crossJoin(maxValue) .withColumn("MASTER_ID", (maxValue("max(MASTER_ID)") + row_number().over(Window.partitionBy("GROUP_ID").orderBy("GROUP_ID"))).cast(DataTypes.IntegerType)) .drop("GROUP_ID") .drop("max(MASTER_ID)") .drop("TAR_SITE_NM") import java.sql.Timestamp def UDFgetCurrentdateTimeStamp(): Timestamp = { val today: java.util.Date = Calendar.getInstance.getTime val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val now: String = timeFormat.format(today) val re = java.sql.Timestamp.valueOf(now) re } import org.apache.spark.sql.functions.udf val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _) val final_upd = df_cdc .drop("TAR_SITE_NM") .drop("") .filter(col("MASTER_ID").isNotNull) .filter(col("CDC_FLG") === "U") .withColumn("UPD_DATETIME", UDFCURDT()) .drop("CDC_FLG") final_upd.show(100) val final_new = new_with_master.join( distinctValuesDF , new_with_master("SITE_NM") === distinctValuesDF("SITE_NM") , "inner") .drop(distinctValuesDF("SITE_NM")) .drop("CDC_FLG") .withColumn("CRT_DATETIME", UDFCURDT()) .withColumn("UPD_DATETIME", UDFCURDT()) .select("*").show(100) } }
Sunday, April 7, 2019
IBAN
package Project object IBANAlgorithm extends App { print(apply("DE89370400440532013000")) def apply(iban:String):Boolean = { val letters = "[A-Z]".r val rearranged = iban.takeRight(iban.length-4) + iban.take(4) val decimal = letters.findAllIn(rearranged).map(l => l.toCharArray.map(c => (l,c.toInt - 55)).head) .foldLeft(rearranged){(s,t) => t match{case (char,value) =>s.replace(char,value.toString)}} BigDecimal(decimal) % BigDecimal("97") == BigDecimal("1") } }
/*/*/*///////////////////////////////////************
package Project import scala.collection.immutable.SortedMap import scala.math.BigInt import scala.util.matching.Regex /** The International Bank Account Number (IBAN) is an internationally agreed means of identifying * bank accounts across national borders with a reduced risk of propagating transcription errors. The * IBAN consists of up to 34 alphanumeric characters: first the two-letter ISO 3166-1 alpha-2 country * code, then two check digits, and finally a country-specific Basic Bank Account Number (BBAN). The * check digits enable a sanity check of the bank account number to confirm its integrity even before * submitting a transaction. */class Iban(val iban: String) { // Isolated tests def isAllUpperCase = iban.toUpperCase() == iban def isValidpattern = (Iban.pattern findFirstIn iban).nonEmpty def isNationalSize = { Iban.ccVsLength.getOrElse(iban.take(2), 0) == iban.size } def isCheckNumberOK = { def rearrange = ((iban.drop(4) + iban.take(4)). // Move left country code part to end // continue with each char converted to Int map(ch => if (ch.isDigit) ch.toInt - '0' else (ch - 'A' + 10))).mkString (BigInt(rearrange) mod 97) == 1 } def isValidIban = { isAllUpperCase && isValidpattern && isNationalSize && isCheckNumberOK } } object Iban { // IBAN length database lazy val ccVsLength: SortedMap[String, Int] = SortedMap[String, Int]() ++ ("""AD24 AE23 AL28 AO25 AT20 AZ28 BA20 BE16 BF27 BG22 BH22 BI16 |BJ28 BR29 CG27 CH21 CI28 CM27 CR21 CV25 CY28 CZ24 DE22 DK18 |DO28 DZ24 EE20 EG27 ES24 FI18 FO18 FR27 GA27 GB22 GE22 GI23 |GL18 GR27 GT28 HR21 HU28 IE22 IL23 IR26 IS26 IT27 JO30 KW30 |KZ20 LB28 LI21 LT20 LU20 LV21 MC27 MD24 ME22 MG27 MK19 ML28 |MR27 MT31 MU30 MZ25 NL18 NO15 PK24 PL28 PS29 PT25 QA29 RO24 |RS22 SA24 SE24 SI19 SK24 SM27 SN28 TN24 TR26 UA29 VG24""". stripMargin.replaceAll("""\s""", " ").split(' '). map(v => (v.take(2), if (v.isEmpty()) 0 else v.slice(2, 4).toInt))) lazy val pattern = "([A-Z]{2})([0-9]{2})([A-Z0-9]{4})([A-Z0-9]{0,2})([0-9]{7})(([A-Z0-9]?){0,16})".r def apply(s: String) = new Iban(s.replaceAll("""\s""", "")) }
Subscribe to:
Posts (Atom)