Member since
09-13-2017
20
Posts
0
Kudos Received
0
Solutions
11-01-2018
06:15 PM
Hello Jackson, This is how we did it. Step1: create UpdateAttribute with the following three attributes - schemaStart, schemaEndTags, recordDateSchemaElement schemaStart ---> { "type": "record", "name": "Test_Schema", "fields":[ {"name" :"id", "type" : ["string","null"]}, {"name" :"operation", "type" : ["string","null"]} schemaEndTags ---> ]} recordDateSchemaElement ----> ${record_date:isEmpty():ifElse('', ', {"name" :"record_date", "type" : ["string","null"]}')} Step2: Another UpdateAttribute that connects from the above processor completeSchema ---> ${allAttributes("schemaStart", "recordDateSchemaElement","schemaEndTags" ):join(" ")} Now, your completeSchema will have "record_date" element only if it is NOT empty. Hope this helps.
... View more
08-03-2018
03:59 PM
Thanks for that reply, Matt. Here is the scenario...We have 20 nifi flows each using a puthivestreaming processor expecting to handle around 25k flow files per minute per flow (after using a merge before a puthivestreaming processor). At this maximum threshold, we are seeing puthivestreaming processor queuing up lot of flowfiles and the ingestion to the target tables is slower. Is it because of the peak load on hive metastore? If so, how can i minimize the load on metastore that is caused by puthivestreaming? Currently we have two metastore hosts.
... View more
08-02-2018
03:39 PM
We have multiple instances of hive metastore server and since we directly give metastore uri in the puthivestreaming processor which is like thrift://host1:port1,thrift://host2:port2, does it mean that only when one instance is down, the other takes over (as a failover or high availability) or is the load shared between these two instances all the time?
... View more
Labels:
05-15-2018
04:04 PM
I have the target field type in hive as timestamp and from the source I get the json that has either proper timestamp field or "" or null sometimes. I am converting the source JsonToAvro before using PutHiveStreaming processor. The records with proper timestamp format gets into my hive target table successfully. But those that with ""/null (Empty String set) values show the error - Illegal format. Timestamp format should be" YYYY-MM-DD HH:MM:SS[.fffffffff] ". I know if I can default it to some date when it is null/empty, it works.But I do not want that. I want it to be as null in my target table when it is null. How can I achieve this?
... View more
Labels:
- Labels:
-
Apache Hive
10-04-2017
02:30 AM
@bkosaraju I have these attributes in my custom property file and I would like to know if there is any possibility that I can directly use prop_3 without using any intermediate processor like update attribute? Thanks
... View more
10-04-2017
02:24 AM
Please note that I have these properties defined in custom property file.
... View more
10-04-2017
12:34 AM
How would I use one custom property within another in Nifi? @Pierre Villard @Matt Burgess Eg: I have prop_1=/path/to/dir1 prop_2=/path/to/dir2 prop_3=${prop_1}/${prop_2}/file1 prop_4=${prop_1}/${prop_2}/file2 Now, within nifi processor, where expression language is supported, when I give ${prop_3} or ${prop_4} I get error ${prop_1} is not a file or directory. But if I provide the value as ${prop_1}/${prop_2}/file1, it works. What is the problem when I give ${prop_3}? Thanks, John
... View more
Labels:
- Labels:
-
Apache NiFi
10-03-2017
05:09 PM
Yes Paras. It is clear now. Thanks. However, any inputs on the following is highly appreciated. Currently, I have Nifi running on an edge node that has 4 cores. Say I have 20 incoming flow files and I give concurrent tasks as 10 for ExecuteStreamCommand processor, does it mean I get only concurrent execution or both concurrent and parallel execution?
... View more
10-02-2017
09:49 PM
Thanks for your reply Paras. Currently I designed the flow as SelectHiveQL(reading as csv instead of default avro)->SplitText(By Line) ->ExtractText (Here assigning content of the split files to an attribute). This is good so far. Every value of my query result is associated with a flow file attribute. And, I hope this is what you were also mentioning; but in a different way.
Now the question is about the ExecuteStreamProcessor where I pass the flow file attribute to command arguments. So, here could you please clarify if one task handles one spark-submit command with attribute from one flowfile taken at a time? Is my understanding correct? I remember reading somewhere that one task in Nifi can process multiple flow files at a time. So, wanted to understand how the flowfiles are handled by Nifi processor tasks. Regards, John
... View more
10-02-2017
04:19 PM
Hi, I have a scenario and I would like to know your suggestions on how I can achieve it in Nifi. Step1: I got to query a hive table and get the list of values from a particular column Step2: I have a spark job that should be executed in a way that it takes these column values as one of the parameters to spark submit job. And, these spark jobs have to be executed in parallel. So, today if the query result gives me two values for the column that I queried, flow should trigger two spark submit jobs that run in parallel. And tomorrow if the result gives me 10 values for the queried column, 10 jobs should start in parallel. Ofcourse, I understand that when the resources are not available, it cannot start all the jobs. Please advise. On a different note, I would like to know how does a processor typically deal with incoming flow files. Does it process one flow file after the other or does it take a set of flow files and execute all of them in parallel? Thanks, John
... View more
Labels:
- Labels:
-
Apache NiFi
-
Apache Spark
10-02-2017
02:55 PM
Hi Wyner, Yes, after i disabled and enabled the MapCahce service configuration, it was working fine. Thanks for that.
... View more
09-26-2017
06:40 PM
I am using file name as my signal identifier. Filenames before notify and wait are all renamed to a specific value and the count of these signals is what I am checking.
... View more
09-26-2017
06:37 PM
Thanks for your reply, Wyner. Here is the screen shot of the flow that I simulated from the actual ones that we have. Also, the configurations of wait and notify that I am using are shown here: 1. Generate flow file generates a flow file every 5 seconds 2. Flow files are renamed to 'release_signal' (as this is what I am using as release signal identifier in wait and notify processors) 3. When there are 5 such signals, I want wait processor to push all the 5 flow files to the downstream success relation. Schedule on wait processor is 10 sec Given the above scenario, I expect the success relation from wait processor should get flow files in the steps 5.. (like 5,10,15 ..) But I do not see something like that. You may see it in the screen shot too. There are 13 files in total that are pushed to success relation from wait. What I observed is, sometimes 5 flow files are pushed at once. But sometimes even one flow file is pushed to success from wait which I don't understand why that happens. Since I could not get this working as expected, I switched to using "Merge Content" and it works for the use case that I have at hand. Flow: Update Attribute: Notify: Wait:
... View more
09-18-2017
09:46 PM
Hi Wyner, wait-processor.jpeg You can see that am waiting for 10 signals here.
... View more
09-18-2017
03:28 AM
Hi, Could you please let me know how the resetting of target signal count in wait processor work? Refer my question: https://community.hortonworks.com/questions/138762/reset-of-target-signal-count-in-wait-processor.html Thanks, John
... View more
09-18-2017
03:27 AM
Hi, Could you please let me know how the resetting of target signal count in wait processor work? Refer my question: https://community.hortonworks.com/questions/138762/reset-of-target-signal-count-in-wait-processor.html Thanks, John
... View more
09-15-2017
07:33 PM
I have a flow in which I wait for 5 files to come out from each of the respective executeProcessors, after which I start another processor. I achieved this using Wait and Notify processors by target signal count given as 5. I stop and restart the flow, it works as expected. Now, my question is when and how would this counter be reset? If the executeProcessors are scheduled to run for every 30 min, do signals from notify keep increasing the counter. If so, how does my wait processor ever match the signal count of 5 that I gave and proceed to next processor every time. Thanks, John
... View more
Labels:
- Labels:
-
Apache NiFi
09-15-2017
07:22 PM
Thanks Ajay. Would this be equally good when compared to having a sqoop export in a bash script and call that from executeStreamCommand processor? I have millions of records to push to postgres.
... View more
09-13-2017
07:27 PM
I have data in hive tables which I would like to push to tables in postgres. How can I do this using Nifi processors? What are the sequence of processors that I can use for this usecase. Please advise. Before that, I would like to know if Nifi is efficient to achieve this when I have millions of records to be written to Postgres? Thanks, John
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache NiFi