Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

huge csv to spark

avatar
Expert Contributor

Hi

I am currently running NiFi on my laptop and not in HDP.

I have a huge CSV file (2.7 GB). The csv file contains New York Taxi events, containing basic information about a taxi trip. The header is as follows:

vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount

I ran the follwoing command to get a subset of the file (command line linux):

head -n 50000 taxi_sorted.csv > NifiSparkSample50k.csv

and the file size turns out to be 5.6 MB.

The file contains approximately (2.5 GB * 1024 /5.6) * 50000= 22857142 records

What I do is read the file and split it per record and then send the records to Spark Streaming via the nifi spark receiver. Unfortunately (and obviously ) the system gets stuck and NiFi does not respond.

I want to have data streamed to Spark. What could be a better way to do this? Instead of splitting the file per record, should i split the file to say 50 000 records and then send these files to Spark?

Regards

The following is the log entry

2017-06-20 16:00:11,363 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process due to java.lang.NullPointerException; rolling back session: {}
java.lang.NullPointerException: null
2017-06-20 16:00:11,380 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process session due to java.lang.NullPointerException: {}
java.lang.NullPointerException: null
2017-06-20 16:00:11,381 WARN [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] Processor Administratively Yielded for 1 sec due to processing failure
2017-06-20 16:00:11,381 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] due to uncaught Exception: java.lang.NullPointerException
2017-06-20 16:00:11,381 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask 
java.lang.NullPointerException: null
2017-06-20 16:00:41,385 ERROR [Timer-Driven Process Thread-10] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process due to java.lang.NullPointerException; rolling back session: {}
java.lang.NullPointerException: null
2017-06-20 16:00:41,385 ERROR [Timer-Driven Process Thread-10] o.a.nifi.processors.standard.SplitText SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] SplitText[id=d8251eb7-102e-115c-ee77-109159d0b9ea] failed to process session due to java.lang.NullPointerException: {}
java.lang.NullPointerException: null
1 ACCEPTED SOLUTION

avatar

@Arsalan Siddiqi You might get some mileage out running NiFi in cluster mode (you don't say whether it's a single instance or not). I can't imagine being able to pull 22M records over the Spark receiver in a timely manner otherwise. Also, I'd use a cascade of Splits, breaking down the file in steps so 22M -> 1M -> 100k -> 1. This will help a lot with memory utilization. And if you haven't seen it before, there's a trick to re-balance FlowFiles across a cluster by doing a self site-to-site transmission with a RPG pointing at the current cluster.

That said, for this kind of workload you will also be better served by delivering the data to Kafka and using the Kafka receiver. You might still need to consider a cluster for NiFi however.

Last, make sure that you have chosen an appropriate batch interval on the Spark Streaming side. Start with something large, e.g., 10 secs, and work down from there when you're tuning your app.

View solution in original post

2 REPLIES 2

avatar

@Arsalan Siddiqi You might get some mileage out running NiFi in cluster mode (you don't say whether it's a single instance or not). I can't imagine being able to pull 22M records over the Spark receiver in a timely manner otherwise. Also, I'd use a cascade of Splits, breaking down the file in steps so 22M -> 1M -> 100k -> 1. This will help a lot with memory utilization. And if you haven't seen it before, there's a trick to re-balance FlowFiles across a cluster by doing a self site-to-site transmission with a RPG pointing at the current cluster.

That said, for this kind of workload you will also be better served by delivering the data to Kafka and using the Kafka receiver. You might still need to consider a cluster for NiFi however.

Last, make sure that you have chosen an appropriate batch interval on the Spark Streaming side. Start with something large, e.g., 10 secs, and work down from there when you're tuning your app.

avatar
Expert Contributor

i am running NiFi locally on my laptop... The breaking of file in steps works... thanks... will consider kafka 🙂