Created 06-20-2017 02:11 PM
Hi
I am currently running NiFi on my laptop and not in HDP.
I have a huge CSV file (2.7 GB). The csv file contains New York Taxi events, containing basic information about a taxi trip. The header is as follows:
vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
I ran the follwoing command to get a subset of the file (command line linux):
head -n 50000 taxi_sorted.csv > NifiSparkSample50k.csv
and the file size turns out to be 5.6 MB.
The file contains approximately (2.5 GB * 1024 /5.6) * 50000= 22857142 records
What I do is read the file and split it per record and then send the records to Spark Streaming via the nifi spark receiver. Unfortunately (and obviously ) the system gets stuck and NiFi does not respond.
I want to have data streamed to Spark. What could be a better way to do this? Instead of splitting the file per record, should i split the file to say 50 000 records and then send these files to Spark?
Regards
The following is the log entry
2017-06-20 16:00:11,363 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process due to java.lang.NullPointerException; rolling back session: {} java.lang.NullPointerException: null 2017-06-20 16:00:11,380 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process session due to java.lang.NullPointerException: {} java.lang.NullPointerException: null 2017-06-20 16:00:11,381 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] Processor Administratively Yielded for 1 sec due to processing failure 2017-06-20 16:00:11,381 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] due to uncaught Exception: java.lang.NullPointerException 2017-06-20 16:00:11,381 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.NullPointerException: null 2017-06-20 16:00:41,385 ERROR [Timer-Driven Process Thread-10] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process due to java.lang.NullPointerException; rolling back session: {} java.lang.NullPointerException: null 2017-06-20 16:00:41,385 ERROR [Timer-Driven Process Thread-10] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process session due to java.lang.NullPointerException: {} java.lang.NullPointerException: null
Created 06-20-2017 02:21 PM
@Arsalan Siddiqi You might get some mileage out running NiFi in cluster mode (you don't say whether it's a single instance or not). I can't imagine being able to pull 22M records over the Spark receiver in a timely manner otherwise. Also, I'd use a cascade of Splits, breaking down the file in steps so 22M -> 1M -> 100k -> 1. This will help a lot with memory utilization. And if you haven't seen it before, there's a trick to re-balance FlowFiles across a cluster by doing a self site-to-site transmission with a RPG pointing at the current cluster.
That said, for this kind of workload you will also be better served by delivering the data to Kafka and using the Kafka receiver. You might still need to consider a cluster for NiFi however.
Last, make sure that you have chosen an appropriate batch interval on the Spark Streaming side. Start with something large, e.g., 10 secs, and work down from there when you're tuning your app.
Created 06-20-2017 02:21 PM
@Arsalan Siddiqi You might get some mileage out running NiFi in cluster mode (you don't say whether it's a single instance or not). I can't imagine being able to pull 22M records over the Spark receiver in a timely manner otherwise. Also, I'd use a cascade of Splits, breaking down the file in steps so 22M -> 1M -> 100k -> 1. This will help a lot with memory utilization. And if you haven't seen it before, there's a trick to re-balance FlowFiles across a cluster by doing a self site-to-site transmission with a RPG pointing at the current cluster.
That said, for this kind of workload you will also be better served by delivering the data to Kafka and using the Kafka receiver. You might still need to consider a cluster for NiFi however.
Last, make sure that you have chosen an appropriate batch interval on the Spark Streaming side. Start with something large, e.g., 10 secs, and work down from there when you're tuning your app.
Created 06-21-2017 07:36 AM
i am running NiFi locally on my laptop... The breaking of file in steps works... thanks... will consider kafka 🙂