Member since
11-16-2015
892
Posts
649
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5061 | 02-22-2024 12:38 PM | |
1325 | 02-02-2023 07:07 AM | |
2974 | 12-07-2021 09:19 AM | |
4137 | 03-20-2020 12:34 PM | |
13868 | 01-27-2020 07:57 AM |
06-27-2016
06:33 PM
2 Kudos
Are you looking to use NiFi (on a different cluster) to move files within the Hadoop cluster? This may not be the most efficient approach as you will be moving files off the cluster just to move them back again. If you can install NiFi on the Hadoop cluster (or so it has a Hadoop client), you could use ExecuteProcess or ExecuteStreamCommand to do something like "hadoop fs -copy /path/to/file /new/path/to/file".
... View more
06-22-2016
02:04 PM
1 Kudo
I believe this is usually a conflict between versions of xerces. There is a xerces JAR in the nifi-hadoop-libraries-nar, is your NAR declaring that as a dependency? If so then you might need to exclude xerces from it or your own dependencies, in order to ensure there's only a single version/impl.
... View more
06-10-2016
07:24 PM
So as not to make more work for you, I took the liberty of creating the Jira case: https://issues.apache.org/jira/browse/NIFI-1998 And doing the upgrade: https://github.com/apache/nifi/pull/521 Regards, Matt
... View more
06-10-2016
04:59 PM
There is no workaround, but the Cassandra 3.x driver claims to be backwards-compatible, so NiFi could be upgraded to use the Cassandra 3.x driver. Do you mind filing a Jira case for this at https://issues.apache.org/jira/browse/NIFI ? Thanks!
... View more
06-01-2016
06:44 PM
What are the data types of the fields in the LOGS table? Perhaps there is a type not accounted for either by Avro or the QueryDatabaseTable processor. If it is a timestamp format issue (specifically if only the date and not the time-of-day can be in a literal), try setting the SQL Preprocessing Strategy property to "Oracle". That will strip the time off and may fix the problem, although then you will not get new data until the following day.
... View more
05-03-2016
04:55 PM
1 Kudo
I can confirm this, the PutElasticsearch processor will take each flow file (assumed to contain a single record) and create a Bulk Insert from them. As @drussell mentioned, you can set the batch size to the desired number of flowfiles per batch, and depending on the throughput / latency of the flow, you may want to schedule PutElasticsearch differently. For example if the PutElasticsearch seems to only be handling one flow file at a time (but your batch size is set to, say, 100), then the flow files are not coming in fast enough (i.e. there is only one flow file available when PutElasticsearch runs), so if you want to batch them, you can have the PutElasticsearch processor run less often, such that flow files will queue up in the incoming connection, then when it runs, it will put more flow files into the bulk insert. If you already have a flow file with multiple documents, and if they are single-line JSON objects, you may be able to use SplitText to split them up into individual documents, then use the technique(s) above to get the bulk insert behavior you are looking for. Also feel free to add an Improvement Jira to let the user specify (or possibly detect) whether the flow file contains multiple documents.
... View more
04-27-2016
06:26 PM
3 Kudos
Dynamic properties (like your "testProperty") are passed in as variables, and they are PropertyValue objects. PropertyValue has a method called evaluateAttributeExpressions() and if you want to to resolve attributes from a FlowFile, you can pass in a reference to that flow file. Then you call getValue() (or just ".value" in Groovy) and the property will have been evaluated correctly. Since you're using the "filename" attribute, I assume you will be getting that from an incoming flow file. So you will need to get the flow file from the session, pass it into evaluateAttributeExpressions(), then don't forget to transfer or remove the flow file. Here is an example in Groovy: flowFile = session.get()
if(!flowFile) return
log.info("-----------------------" + testProperty.evaluateAttributeExpressions(flowFile).value)
session.transfer(flowFile, REL_SUCCESS)
... View more
04-27-2016
04:27 AM
Absolutely! with ExecuteScript (with, say, Groovy as the language), a script body could be something like: import org.apache.commons.lang3.StringUtils
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute('my_nifi_attribute', StringUtils.reverse( flowFile.getAttribute('my_nifi_attribute') )
session.transfer(flowFile, REL_SUCCESS)
... View more
04-27-2016
03:34 AM
1 Kudo
To @Artem Ervits comment, if the Java class is available in the class loader (either by being part of the NiFi Core or being specified as a JAR in the ExecuteScript's Module Directory property), then you can call it directly, you don't need reflection. If I'm not answering your question, can you provide more details? I'm happy to help 🙂
... View more
04-05-2016
01:13 PM
8 Kudos
As of NiFi 0.6.0, there is a processor called QueryDatabaseTable that does something like this: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.QueryDatabaseTable/index.html You can enter the columns for which you'd like the processor to keep track of, it will store the maximum value it has "seen" so far for each of the specified columns. When the processor runs, it will generate a SQL query to return rows whose values in those columns are greater than the observed maximum values. So if you have a unique (and increasing) ID field, for example, you can specify that in the QueryDatabaseTable processor, then the first execution will return all rows, and subsequent executions will not return any rows until one (or more) have an value for ID greater than the max. This can be used to do incremental fetching like Sqoop does for only added/updated rows, using columns that contain timestamps, IDs, or other values that are increasing.
... View more