Member since
05-17-2016
190
Posts
46
Kudos Received
11
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1388 | 09-07-2017 06:24 PM | |
1793 | 02-24-2017 06:33 AM | |
2583 | 02-10-2017 09:18 PM | |
7071 | 01-11-2017 08:55 PM | |
4712 | 12-15-2016 06:16 PM |
01-26-2017
07:57 PM
@Timothy Spann : I tried editing the avro schema manually adding "type": "timestamp-millis" in place of "string". However the processor does not accept this and notifies a "Schema Validation Failure"
... View more
01-26-2017
06:31 PM
My Sample JSON File is
{
"timestamp": "2017-01-26T00:00:00-05:00",
"c1": 73.0,
"c2": 36.5,
"c3": 43.8,
"c4": 0.1,
"c5": 75.4,
"c6": 997.8,
"c7": 0.5,
"c8": 4.58,
"c9": 43.8,
"c10": 1.5,
"c11": 40.6,
"postal_code": "08863",
"country": "us"
}
And the Avro schema inferred by NiFi is
{
"type": "record",
"name": "date",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "c1",
"type": "double",
"doc": "Type inferred from '73'"
},
{
"name": "c2",
"type": "double",
"doc": "Type inferred from '36.5'"
},
{
"name": "c3",
"type": "double",
"doc": "Type inferred from '43.8'"
},
{
"name": "c4",
"type": "double",
"doc": "Type inferred from '0'"
},
{
"name": "c5",
"type": "double",
"doc": "Type inferred from '75.4'"
},
{
"name": "c6",
"type": "double",
"doc": "Type inferred from '997.8'"
},
{
"name": "c7",
"type": "double",
"doc": "Type inferred from '0'"
},
{
"name": "c8",
"type": "double",
"doc": "Type inferred from '4.58'"
},
{
"name": "c9",
"type": "double",
"doc": "Type inferred from '43.8'"
},
{
"name": "c10",
"type": "double",
"doc": "Type inferred from '1.5'"
},
{
"name": "c11",
"type": "double",
"doc": "Type inferred from '40.6'"
},
{
"name": "postal_code",
"type": "string",
"doc": "Type inferred from '\"08863\"'"
},
{
"name": "country",
"type": "string",
"doc": "Type inferred from '\"us\"'"
}
]
}
... View more
01-26-2017
05:37 PM
1 Kudo
Hi All,
What is the best approach to convert a JSON to AVRO preserving the source datatypes.
My source JSON has a field with timestamp (value would look like 2017-01-26T00:00:00-05:00) which I need to eventually insert to a hive table with column type timestamp.
When I infer the schema, I get String for the timestamp field. Is there some pre-formatting that I can do on the timestamp field so that it gets inferred as timestamp field.
Current flow is as below -
JSON>>AVRO(infer/manually add schema)>>Streaming Insert to hive
... View more
Labels:
- Labels:
-
Apache NiFi
01-11-2017
08:57 PM
++ you could then convert the RDD to a dataframe if required.
... View more
01-11-2017
08:55 PM
1 Kudo
Something similar using RDDs Steps Read file as RDD Create new RDD - for each line/entry on the file create a list of tuples (id,date), for each date between d1 and d2 Flatten the list to generate the final RDD with each id, date combination per row def main(args: Array[String]): Unit =
{
var sc = new SparkContext("local[*]", "app1")
varfileRdd = sc.textFile("inFile");
var explodedRdd = fileRdd.map{x=>getRddList(x)}.flatMap(y=>y)
explodedRdd.saveAsTextFile("outDir")
}
def getDaysBetweenDates(startdate: Date, enddate: Date): ListBuffer[String] = {
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
while (calendar.getTime().before(enddate)) {
dateList += calendar.getTime().toString()
calendar.add(Calendar.DATE, 1)
}
dateList += calendar.getTime().toString()
dateList
}
def getRddList(a :String) : ListBuffer[(String,String)] = {
var allDates = new ListBuffer[(String,String)]()
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
for (x <- getDaysBetweenDates(format.parse(a.split(",")(1)), format.parse(a.split(",")(2)))){
allDates += ((a.split(",")(0).toString(),x))
}
allDates
}
... View more
12-27-2016
06:29 PM
To add to @milind pandit, tried opening the AirPassengers file. The first column is enclosed in quotes. This is the same for BJsales.csv as well.
... View more
12-15-2016
06:16 PM
2 Kudos
Thanks @Karthik Narayanan. I was able to resolve the issue. Before diving into the solutions, I should make the below statement - With NiFi 1.0 and 1.1,
LZO compression cannot be achieved using the PutHDFS processor. The only supported compressions are the ones listed in the compression codec drop down. With the LZO related classes being present in the core-site.xml, the NiFi processor fails to run. The suggestion from the previous HCC post was to remove those classes. It needed to be retained so that NiFi's copy and HDP's copy of core-site are always in sync.
NiFi 1.0
I created the hadoop-lzo jar by building it from sources and added the same to the NiFi lib directory and restarted NiFi.
This resolved the issue and I am able to proceed using the PutHDFS without it erroring out. NiFi 1.1
Configure the processor's additional classpath to the jar file. No restart required.
Note : This does not provide LZO compression, it just can run the processor without ERROR even when you have the LZO classes in the core site.
UNSATISFIED LINK ERROR WITH SNAPPY I also had issue with Snappy Compression codec in NiFi. Was able to resolve it setting the path to the .so file. This did not work on the ambari-vagrant boxes, but I was able to get this working on an openstack cloud instance. The issue on the virtual box could be systemic.
To resolve the link error, I copied the .so files from HDP cluster and recreated the links. And as @Karthik Narayanan suggested, added the java library path to the directory containing the .so files. Below is the list of .so and links
And below is the bootstrap configuration change
... View more
12-13-2016
06:20 PM
@Karthik Narayanan : Just to give you an update, this did not work for me. Tried the same with Snappy. Even snappy does not seem to work. Throws an unsatisfied link even though I have the ".so" added to the bootstrap.conf
... View more
12-12-2016
08:50 PM
Thanks @Karthik Narayanan, yet to try this, could you also help on the compression codec to be used in this case? I haven't been able to find out what NONE, AUTOMATIC and DEFAULT means.
... View more