Member since
09-15-2017
5
Posts
2
Kudos Received
0
Solutions
12-12-2017
03:10 AM
1 Kudo
Thanks @Shu
... View more
12-11-2017
07:20 AM
thanks @Shu, (1). I want to delete duplicate in 1 second window frame if both attributes values (ie, device_no and device_value) already exists in past 1 sec. So if I will delete age off Duration (1 sec) , how will it work. (2). And the value of cache Entry identifier in which I am trying to detect duplicates based on 2 attributes (ie, device_no and device_value) which are separated by double colon. Is this the correct way of doing this
... View more
12-11-2017
02:50 AM
1 Kudo
Hi, I am using NIFI DetectDuplicate processor to filter out duplicates per second based on 2 attributes simultaneously. My flow file contect in csv type. My DetectDuplicate processor looks like: detectduplicate.png I am getting duplicates with exact same content in different flow-files while listing non-duplicates queue. Help is appreciated. Thanks.
... View more
Labels:
- Labels:
-
Apache NiFi
09-15-2017
03:30 PM
I have two dataframes:
First frame *ClassRecord* has 10 different entries like following: Class, Calculation
first, Average
Second, Sum
Third, Average` Second dataframe *StudentRecord* has around 50K entries like following:
`Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second` From second frame, based on class type, I would like to perform calculation on
*height* (for class first--average, for class second--sum likewise) based on
*camp* separately (If class is first, avg of yellow,white and so on separately).
I tried following:
`//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame ) : Array[(String, Double)] = {
val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2).collect
return avg_by_key
}
//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)
// for each loop on each class type
classRecord.rdd.foreach{
//filter students based on camps
var campYellow =studentRecord.filter($"Camp" === "yellow")
var campWhite =studentRecord.filter($"Camp" === "white")
var campRed =studentRecord.filter($"Camp" === "red")
// since I know that calculation for first class is average, so representing calculation only for class first
val avgcampYellow = averageOnName(campYellow )
val avgcampWhite = averageOnName(campWhite )
val avgcampRed = averageOnName(campRed)
// union of all
val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
//union with yellow camp data
val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
var dfYellWhite = dfYellow.union(dfWhite)
/union with yellow,white camp data
val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfRed = sqlContext.createDataFrame(rddRed, schema)
var dfYellWhiteRed = dfYellWhite .union(dfRed)
// other modifications and final result to hive
}`
Here I am struggling with: 1.hardcoding Yellow, red and white, there may be other camp type also. 2. Filtering same dataframe many times 3. Not able to figure out how to calculate differently according to class calculation type. Help is appreciated. Thanks.
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark