Tuesday, March 24, 2020

flask

from flask import Flask, render_template
import pandas as pd

app = Flask(__name__)

@app.route('/')
def home_page():
    return render_template("home.html")

@app.route('/')
def another_page():
    table = pd.DataFrame.from_csv("gender_submission.csv")
    return render_template("dataset.html", data=table.to_html())

if __name__ == '__main__':
    app.run(debug=True, use_reloader=True,host = '192.168.1.7')

Sunday, March 8, 2020

pyhton

import numpy as np
import pandas as pd
import pyarrow as pa
#df = pd.DataFrame({'one': [20, np.nan, 2.5],'two': ['january', 'february', 'march'],'three': [True, False, True]},index=list('abc'))#table = pa.Table.from_pandas(df)
import pyarrow.parquet as pq
d = pd.read_csv('C:\\Users\\senthil\\PycharmProjects\\s3aws\\app.csv')
t = pa.Table.from_pandas(d)
print(t)
pq.write_table(t, 'C:\\Users\\senthil\\PycharmProjects\\s3aws\\app.parquet')

#pdf = pq.read_pandas('C:\\Users\\senthil\\PycharmProjects\\s3aws\\gender_submission.parquet').to_pandas()pdf=pd.read_parquet('C:\\Users\\senthil\\PycharmProjects\\s3aws\\userdata1.parquet', engine='pyarrow').to_csv()
print(pdf)

#table2 = pq.read_table('C:\\Users\\senthil\\PycharmProjects\\s3aws\\gen4der_submission.parquet')#print(table2)

Sunday, June 30, 2019

doc


1.       INTRODUCTION

This Document Describes the historical loading process of payments files. Historical payments load process  automated using shell and python scripts


2.       PROCESS FLOW
Payments loading process is a two-step process in general
1.       Standardization
2.       SAS Loading
     Standardization :
It’s a process of converting payments files into parquet files and applying pre check evaluation that is required to process any payment file. Pre checks are carried out using Unix Scripts and parquet file creation is done by spark code.  
   SAS Loading:
SAS Load is the loading of  parquet file to SAS_ALL_PAYMENT  using spark code.
This automated Payment loading is also two-step process as mentioned above.
                                                                                                                                                                           
3.       SCRIPT DETAILS

Sunday, June 16, 2019

Py count thread

import threading
import time


class mythread(threading.Thread):
    def __init__(self, i):
        threading.Thread.__init__(self)
        self.h = i
        # Script will call the function
    def run(self):
        time.sleep(1)
        print("Value send ", self.h)


f = open('C:\Senthil\SenStudy\Python\Date.txt').readlines()
num = threading.activeCount()
x=2for i in f:
    print("Active threads are ", num)
    time.sleep(1)
    if x <= 3:
        thread1 = mythread(i)
        thread1.start()
        x += 1
    else:
        print("Number of Threads are More than 5 .. going to sleep state for 1 mint ..." , threading.activeCount())
        time.sleep(1)

Monday, May 6, 2019

sparkread

var df2 = spark.sqlContext.read
  .option("header", "true")
  //.option("inferSchema", "true")  .option("delimiter", "\t").option("encode","UTF-8").option("charset", "UTF-8")
  .schema(schema = schema)
  .csv(spark.sqlContext.read.textFile("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter_out")
    .map(line => line.split("\\!\\~").mkString("\t")))

Sunday, April 28, 2019

Multi delmiter spark

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object PaymentFile extends  App {


  implicit val spark = SparkSession.builder().appName("PaymentFile")
    .config("spark.master", "local")
    .getOrCreate()

 val rdd = spark.sparkContext.textFile("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter.txt")
  val header = rdd.filter(_.contains("input")).map(line => line.split("\\!\\~")).first()
  val schema = StructType(header.map(cols => StructField(cols,StringType)).toSeq)
  val data = spark.createDataFrame(rdd.filter(!_.contains("input"))
    .map(line => Row.fromSeq(line.split("\\!\\~").toSeq)), schema)
  data.write.partitionBy("input").mode("overwrite").parquet("C:\\Senthil\\SenStudy\\Scala\\Files\\multidelimiter")

}

Sunday, April 14, 2019

metadata manger

package Project
import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import Project.config.Settings._
object MetadataManager {

  implicit val spark = SparkSession.builder().appName("Job Master")
    .config("spark.master", "local")
    .getOrCreate()
  def reportStatus(keyPrefix: String,params: List[String] )(callback: => Unit)(implicit spark: SparkSession)={

    println("Started "+keyPrefix)
    insBatchEntry(keyPrefix,params)

    try{
      callback
    }
    catch{
      case e: Exception => println("exception caught: " + e);      case e: IllegalArgumentException => println("illegal arg. exception");      case e: IllegalStateException    => println("illegal state exception");    }
    //Seq[String] = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(dir))
  }

  def insBatchEntry(keyPrefix: String,key: List[String]) : Unit ={
    val row = Row.fromSeq(key)
    val rdd = spark.sparkContext.makeRDD(List(row))
    import org.apache.spark.sql.types.{StringType,StructField,StructType}

    val fields = List(
      StructField("COUNTRY_NM", StringType, nullable = false),      StructField("PLUG_NM", StringType, nullable = false),      StructField("ETL_NO", StringType, nullable = false)
    )
    val df_inp = spark.createDataFrame(rdd, StructType(fields))
    df_inp.show(2)

    val df_jb= spark.read.format("csv").option("header", "true").load(s"$jobmaster_tar")

    val df_join_jb=  df_jb.join(
      df_inp
      , (df_jb("COUNTRY_NM") === df_inp("COUNTRY_NM")&& df_jb("PLUG_NM") === df_inp("PLUG_NM")  && df_jb("IS_ACTIVE") === "Y")
      , "inner")

    import java.sql.Timestamp
    def UDFgetCurrentdateTimeStamp(): Timestamp = {
      val today: java.util.Date = Calendar.getInstance.getTime
      val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val now: String = timeFormat.format(today)
      val re = java.sql.Timestamp.valueOf(now)
      re
    }
    def prepareRowCountfromParquet(jobmaster_pa: String)(implicit spark: SparkSession): Int = {
      print("The variable value is " + jobmaster_pa)
      print("the count is " + spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt)
      spark.read.format("csv").option("header", "true").load(jobmaster_pa).count().toInt
    }

def UDFjobSourceStats(jobname : String)(implicit spark: SparkSession): Int={
    jobname match {
    case "JobMaster" => prepareRowCountfromParquet(jobmaster)
    case _ => 0  }
}
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{concat,lit,col}
    spark.udf.register("SRCROWCNT",prepareRowCountfromParquet _)
    val UDFCURDT = udf(UDFgetCurrentdateTimeStamp _)
    val UDFSRCSTAT = udf(UDFjobSourceStats _)
    val SRCROWCNT = udf(prepareRowCountfromParquet _)

      df_join_jb
      .drop(df_inp("PLUG_NM"))
      .drop(df_inp("COUNTRY_NM"))
      .select(df_jb("SITE_NM"),df_jb("COUNTRY_NM"),df_jb("SITE_DESC"),df_jb("PLUG_TYPE"),df_jb("PLUG_NM"),df_jb("SOURCE_NM"),df_jb("FILE_NM"))
      .withColumn("DAT_BEG",UDFCURDT())
      .withColumn("DAT_END",lit(null))
      .withColumn("TXT_JOB",concat(df_jb("SITE_NM"),lit("_"),df_jb("COUNTRY_NM"),lit("_"),df_jb("PLUG_NM")))
      .withColumn("STATUS",lit("IN PROGRESS"))
      .withColumn("SRC_COUNT", lit(keyPrefix match{ case "JobMaster" => SRCROWCNT(lit(jobmaster))
      case _ => 0}) )
    .show(2)

  }

}