Member since
08-11-2017
21
Posts
1
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
12650 | 12-06-2017 04:26 PM | |
26603 | 08-15-2017 12:18 PM |
01-09-2018
07:33 PM
3 Kudos
You have a few different kinds of transformations going on there: 1) Value -> Category, such as is_rain, season, and is_rushHour 2) Hoisting values from nested fields (possibly renaming the field), such as wind_speed 3) Fetch on match, such as PM10 and sensor_id, using nested values when a particular value is P1) In NiFi, some processors are better at different transformations than others. For the first kind, I was able to achieve this with EvaluateJsonPath followed by two UpdateAttribute processors: - EvaluateJsonPath extracts the "timestamp" field into an attribute called "dt". I couldn't use the actual "dt" field because it appears to be number of seconds since midnight or something, the date evaluates to 1/18/1970: - UpdateAttribute 1 extracts the hour and month values from the timestamp. Note the use of GMT timestamp in the functions, you may need to set that to something different (or exclude it altogether): - UpdateAttribute 2 performs the categorization logic, by checking if the month is between 12 and 2 (1 = winter), 3-5 (2 = spring), 6-8 (3=summer), 9-11 (4=fall), and also if the hour of day is between 8-9 AM or 5-6 PM for is_rushHour: The Expression for rush.hour is as follows: ${hour:ge(8):and(${hour:le(9)}):ifElse(1,${hour:ge(17):and(${hour:lt(19)}):ifElse(1,0)})} The Expression for season is as follows: ${month:gt(11):or(${month:lt(3)}):ifElse(1,
${month:ge(3):and(${month:lt(6)}):ifElse(2,
${month:ge(6):and(${month:lt(9)}):ifElse(3,4)})})} - JoltTransformJSON performs the other two types of transformation (with a "shift" spec), along with injecting the categorical fields using Expression Language (in a "default" spec), combined as a "Chain" spec: [
{
"operation": "shift",
"spec": {
"timestamp": "timestamp",
"wind": {
"deg": "wind_deg",
"speed": "wind_speed"
},
"main": {
"humidity": "humidity",
"pressure": "pressure",
"temp": "temperature"
},
"location": {
"longitude": "long",
"latitude": "lat"
},
"sensordatavalues": {
"*": {
"value_type": {
"P1": {
"@(2,value)": "PM10",
"@(2,id)": "sensor_id"
}
}
}
},
"cod": {
"200": {
"#1": "isRain"
},
"*": {
"#0": "isRain"
}
}
}
},
{
"operation": "default",
"spec": {
"season": "${season}",
"is_rushHour": "${rush.hour}"
}
}
] Note that I am assuming a "cod" value of 200 is rain and everything else is not. If there are other codes, you can add other blocks to the "cod" section of the spec after the 200 block. The * block handles anything not matched (basically an "else" clause). The flow is put together in the aforementioned order:
... View more
12-22-2017
03:16 PM
1 Kudo
@Shu that did the trick! Many thanks for your help and your patience as well.
... View more
12-19-2017
03:27 PM
@Lukas Müller Use EvaluateJSON path processor with below configs:- then we are adding longitude and latitude attributes to the flowfile as then use url in invoke http processor as http://api.openweathermap.org/data/2.5/weather?lat=${location.latitude}&lon=${location.longitude}&APPID=myapikey If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
... View more
05-22-2018
01:59 PM
i can understand Lukas' issue with "*-2" named .repo files. my install is error'ing out and giving me no clues, no breadcrumbs to follow. all my /var/lib/ambari-agent/data/errors* log files are either size 0-length or 86-length, with latter: "Server considered task failed and automatically aborted it." on centos7.4, Ambari2.6.1.5 when i installed with a ambari-hdp.repo Ambari complained and duplicated it as ambari-hdp-1.repo Justin
... View more
08-12-2019
11:33 AM
I have a PySpark dataframe with 87 columns. I want to pass each row of the dataframe to a function and get a list for each row so that I can create a column separately. ` PySpark code UDF: def make_range_vector(row,categories,ledger): print(type(row),type(categories),type(ledger)) category_vector=[] for category in categories: if(row[category]!=0): category_percentage=func.round(row[category]*100/row[ledger]) category_vector.append(category_percentage) else: category_vector.append(0) category_vector=sqlCtx.createDataFrame(category_vector,IntegerType()) return category_vector Main function pivot_card.withColumn('category_debit_vector',(make_range_vector(struct([pivot_card[x] for x in pivot_card.columns] ),pivot_card.columns[3:],'debit'))) I am beginner in PySpark, and I can't find answers to below questions. Print statement outputs <class 'pyspark.sql.column.Column'> <class 'list'> <class #'str'> . Shouldn't it be StructType? Can I pass a Row object and do something similar, like we do in Pandas ?
... View more
08-15-2017
12:18 PM
If someone else wanna know I've found something that is working for me def convert_single_object_per_line(json_list):
json_string = ""
for line in json_list:
json_string += json.dumps(line) + "\n"
return json_string
def parse_dataframe(json_data):
r = convert_single_object_per_line(json_data)
mylist = []
for line in r.splitlines():
mylist.append(line)
rdd = sc.parallelize(mylist)
df = sqlContext.jsonRDD(rdd)
return df
url = "myurl.json"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)<br>
... View more
08-11-2017
06:28 PM
@Lukas Müller Great, happy you got it working!
... View more