Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Problem working with mergeRecord using multiple files

Problem working with mergeRecord using multiple files

New Contributor

Hi All,

Below is my use case:
Flow:
1. I have multiple zip files and read it from a folder
2. I use CompressContent processor unzip content -> contains multiple json files
3. Each json file is an array of json object I use split json processor to extract individual json object
4. Each json object contains nested json array, I extract each nested array object and write to a single file using mergeRecord processor

MergeRecord with defragment, csvReader, csvRecordSetWriter and schemaRegistry and updating fragment.identifier(using updateAttribute processor prior to mergeRecord) as filename so that all records from single seed file are kept in single file. My question is how to set fragment.count (giving round figure, say 1000 creates multiple files each with 1000 records but the remainder remains in the queue )

Also, how can I get summary stats like number of nested array records exratcted across all json files.

3 REPLIES 3
Highlighted

Re: Problem working with mergeRecord using multiple files

Expert Contributor

You may be looking for the alternative algorithm for MergeRecord, the bin-packing algorithm. This will put your FlowFiles into bins (you specify how many bins to keep active) and save them off every 1000 (configurable) records. You can specify which attribute to key off of and how long to keep the bins alive before force-writing the remainder of the records to a file.

Regarding summary stats, I think your best bet would be to look to Reporting Tasks to fill that role. Here's a good article on them:

https://pierrevillard.com/tag/reporting-task/

If your needs are simple enough, you may be able to get away with using the NiFi API to query for record counts. You may be looking for something slightly different, but here's something to get your mind thinking: https://community.hortonworks.com/articles/83610/nifi-rest-api-flowfile-count-monitoring.html

And lastly, you can look into NiFi Counters, custom global variables you can increment and decrement. https://pierrevillard.com/2017/02/07/using-counters-in-apache-nifi/

Highlighted

Re: Problem working with mergeRecord using multiple files

New Contributor

@anarasimham Thanks for quick reply

The reason I was using mergeRecord with defragment was that the requirement is to keep all the nested json object extracted from a file to be kept in one output file. Using filename as correlation attribute, the problem is:

1. using defragment algo: I don't know upfront the size of json object extracted from nested arrays, which I could then use as fragment.count

2. using bin-packing algo: Again I don't know the predictable minimum time it will take to complete processing an input file, which I could use as bin-age

I have multiple json each with below schema:

[
    {
        "field1":"value1",
        "nested-array":[
            {
                "inner-field1":"inner-value1",
                "inner-field2":"inner-value2",
                ...
            },
            {
                "inner-field1":"inner-value3",
                "inner-field2":"inner-value4",
                ...
            },
            {
                "inner-field1":"inner-value5",
                "inner-field2":"inner-value6",
                ...
            }
        ]
        
    },
    {
        "field1":"value1",
        "nested-array":[
            {
                "inner-field1":"inner-value7",
                "inner-field2":"inner-value8",
                ...
            },
            {
                "inner-field1":"inner-value9",
                "inner-field2":"inner-value10",
                ...
            }
        ]
        
    }
]

I need to write nested-array object from ALL json files to a single csv file, like below

"inner-field1","inner-field2",..

"inner-value1","inner-value2",..

"inner-value3","inner-value4",..

"inner-value5","inner-value6",..

"inner-value7","inner-value8",..

"inner-value9","inner-value10",..
...

regards,

Hemal

Highlighted

Re: Problem working with mergeRecord using multiple files

Expert Contributor

It looks like you're thinking about the problem in a batch process, rather than a real-time process. In a real-time, streaming flow, there will be no start or end to a unit of work. Things will always be on, and these processors we are discussing are designed to create user-defined chunks of work to segment the real-time flow of data in some way. However what you're thinking about is a very specific unit of work, defined by the set of files you are receiving. You have to understand to some degree the parameters of the work that's being done on those files to window it properly. What is the upper bound of records that you can expect in the largest payload you may get? How long will the largest payload take to process? These answers can give you an idea of how to set those parameters.

Additionally in a real-time workflow, you should not have to combine everything back into one file as that is a serial process and will be a bottleneck. I would suggest taking a look at your downstream processes and consider parallelizing them.

Don't have an account?
Coming from Hortonworks? Activate your account here