Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Nifi batch read every k-th row form a source

Highlighted

Nifi batch read every k-th row form a source

How can I batch read data from a source (lets say CSV or JDBC) into NIFI to query a webservice?

I want to read only every k-th row (lets say 10th row) and read 50.000 records into a single batch. However, I must keep a state, i.e. know which records have already been processed by NIFI.

1 REPLY 1
Highlighted

Re: Nifi batch read every k-th row form a source

Super Guru

Unless you are running a command-line tool (for CSV) or executing a SQL statement that allows sampling (for JDBC), then you won't be able to read every kth row at the source. Instead you will likely want SplitText (for CSV) or SplitAvro (for JDBC), these processors will add (among other attributes) a "fragment.index" attribute, which is basically a zero-based "row number". You can send these to a RouteOnAttribute processor which checks to see if the index is divisible by k (in this case 10):

${fragment.index:toNumber():plus(1):mod(10):equals(0)}

So the flow file with fragment.index set to 9 (which is the 10th file, since it starts counting at zero) or 19 or 29, etc. will be routed to the "matched" relationship, and all others will be routed to "unmatched". If you want to "discard" the unmatched flow files, auto-terminate the "unmatched" relationship via the processor's configuration dialog.

Now that you have every 10th row, you can check for duplicates using the DetectDuplicate processor. The trick here is to pick a Cache Entry Identifier that will be unique across multiple files (for CSV) or ResultSets (for JDBC). For this example I'm assuming we're working on a single file (whose 10th row might change the next time you look at it, e.g.) so you can set the Cache Entry Identifier to

${fragment.index}

That should remove all duplicate rows. Your use case might be more complicated than this, but the approach is the same. Now that you have unique rows, you can merge them back together using MergeContent, setting the Maximum Number of Entries to 50000, and the Merge Format to Binary Concatenation (for CSV, also make sure your rows have newlines at the end!) or Avro (for JDBC, if you used ExecuteSQL or another NiFi database processor, the output is likely in Avro and must be merged accordingly.

Please let me know if I've misunderstood your use case, or if you have any questions about these processors or this sample flow.

Don't have an account?
Coming from Hortonworks? Activate your account here