Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi: Elasticsearch JSON to Parquet to be stored in HDFS.

avatar
Contributor

All - thanks in advance for any help that can be provided. So, big picture: I have stood up a Hadoop/Spark cluster using Ambari (HDP 2.6.2/Hadoop 2.7.3/Spark 2.1.1) and want to do some advanced machine learning/analytics on some data. First use case is anomaly detection in syslog where we get that data from our Elasticsearch cluster. I was pointed to NiFi as a solution for automating our data movement from ES to HDFS. Each day in ES is a unique index (i.e. logstash-syslog-2017.11.28, etc.) and my goal is to setup NiFi to grab those indexes and save them to HDFS is Parquet format. Since everything in HDFS is going to be processes with MapReduce or Spark, this is a natural choice.

I have a basic flow setup with some help from StackOverflow and Reddit (see flow.png). Now, this is using a GenerateFlowFile processor, since I already know how my data comes back. I just got a single document return and put that in as custom text so I wouldn't have to pound my ES cluster during testing. So, note that I have three outputs (PutFule, PutHDFS and PutParquet). The first two work just fine, they output files locally and to HDFS - no problem. The problem comes in on the third output, which is the one I need - PutParquet.

I get an error:

"Failed to write due to org.apache.nifi.processors.hadoop.execption.RecordReaderFactoryExcpetion: Unable to create RecordReader: Unable to create RecordReader."

So, I thought, maybe it's due to all of the nested JSON I get back from Elasticsearch. I decided to go back to basics and get something working - so I followed the example from:

https://community.hortonworks.com/articles/140422/convert-data-from-jsoncsvavro-to-parquet-with-nifi...

I still get the same error back, even when I change the JSON example and the Avro Schema to the ones specified in this example. So, I think I must have something unnecessary in my flow or some odd setting that I am unaware I need to setup. Things must have changed between whatever version of NiFi was being used in the above URL and 1.4, as the example shows Schema settings as part of the PutParquet processor options, however, in my version, that comes as part of the JsonTreeReader and associated AvroSchema.

I am new to all of this and while I get the gist of NiFi, I start to get lost around the JsonTreeReader and AvroSchema stuff. What I'd like is to read in JSON from Elastic, convert that JSON to Parquet and store it. Do I need to define a schema for this, or is there some automated way I can have NiFi convert my read in JSON into something that can be stored as Parquet?

Here is a example of how my data looks coming back from ES (some fields have been masked for obvious reasons). Any help on sorting this out and getting a working flow from ES to a Parquet file would be amazing, I've been working on this for around a week or so and am starting to come to the end of my rope on this.... Thanks so much!

[
  {
    "hits": [
      {
        "app": {
          "threadID": "6DE2CB70",
          "environment": "DEV",
          "service": "commonvpxLro",
          "opID": "opID=HB-host-3009@205149-7520446b-5c",
          "service_info": "VpxLRO"
        },
        "severity": "info",
        "hostIP_geo": {
          "location": {
            "lon": XXXX,
            "lat": XXXX
          },
          "postal_code": "Location 1"
        },
        "hostname": "DEV3-02",
        "@timestamp": "2017-11-27T22:20:51.617Z",
        "hostIP": "10.10.0.1",
        "meta": {
          "grok_match": "grok_match_1",
          "received_at_indexer": "2017-11-27T22:20:51.727Z",
          "received_from": "10.10.0.1",
          "processed_at_indexer": "xvzzpaXXXXc",
          "kafka_topic": "syslog",
          "received_at_shipper": "2017-11-27T22:20:51.661Z",
          "processed_at_shipper": "xvzzpaXXXXb"
        },
        "@version": "1",
        "syslog": {
          "program": "Vpxa",
          "type": "vmware_esxi",
          "priority": "166"
        },
        "message": "-- BEGIN session[938e0611-282b-22c4-8c93-776436e326c7]52dd2640-f406-2da1-6931-24930920b5db --  -- vpxapi.VpxaService.retrieveChanges -- 938e0611-282b-22c4-8c93-776436e326c7\n",
        "type": "syslog",
        "tags": [
          "syslog",
          "vmware",
          "esxi",
          "index_static",
          "geoip"
        ]
      }
    ]
  }
]

43735-flow.png

1 ACCEPTED SOLUTION

avatar
Contributor

All - just an update. I was able to get help resolving this on StackOverflow. See the post here:

https://stackoverflow.com/questions/47399391/using-nifi-to-pull-elasticsearch-indexes?noredirect=1#c...

View solution in original post

3 REPLIES 3

avatar
Master Guru

Can you look in nifi-app.log and find the full stacktrace and error that you are seeing in the UI and provide it here?

avatar
Master Guru

Would also be helpful if you could provide the schema you created for your example data.

avatar
Contributor

All - just an update. I was able to get help resolving this on StackOverflow. See the post here:

https://stackoverflow.com/questions/47399391/using-nifi-to-pull-elasticsearch-indexes?noredirect=1#c...