Member since
11-16-2015
892
Posts
649
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5217 | 02-22-2024 12:38 PM | |
1337 | 02-02-2023 07:07 AM | |
3004 | 12-07-2021 09:19 AM | |
4155 | 03-20-2020 12:34 PM | |
13952 | 01-27-2020 07:57 AM |
08-18-2016
03:57 PM
What does your table look like? I'm wondering if there are special characters in the column names, complex datatypes, etc.
... View more
08-18-2016
01:44 AM
ConvertJSONtoSQL expects the JSON object (to be converted to an INSERT/UPDATE statement) to be in the flow file content, not the attributes. Between your SplitJson and your ConvertJSONtoSQL you have a couple of processors like EvaluateJsonPath and AttributesToJSON. What is your configuration for AttributesToJSON? What does that last UpdateAttribute processor do before it goes to ConvertJSONtoSQL? If you could share a template (perhaps a scrubbed version to remove your server-specific information?) that would be helpful as well.
... View more
08-16-2016
09:56 PM
21 Kudos
NiFi is most effectively used as an "always-on" system, meaning that the data flows are often always operational (running). Doing batch processing is a more difficult task and usually requires some user intervention (such as stopping a source processor).
For relational databases (RDBMS), a common use case is to migrate, replicate, or otherwise move the data from the source RDBMS to some target. If ExecuteSQL were used to get all data from a table (for example), then the processor will execute the query each time it is triggered to run, and will return whatever results correspond to the query.
If the goal is to simply move all rows to some target, then ExecuteSQL could be started then immediately stopped, such that it would only execute once, and all results will be sent down the flow.
However a more common use case is that the source database is being updated from some external process (user, webapp, ERP/CRM/etc. system). In order to get the new rows, the table needs to be queried again. However assuming the "old" rows had already been moved, then many duplicate rows would continue to be processed in the flow.
As an alternative the QueryDatabaseTable processor allows you to specify column(s) in a table that are increasing in value (such as an "ID" or "timestamp" column), and the processor will only retrieve rows from the table whose values in those columns are greater than the maximum value(s) observed so far.
To illustrate, consider the following database table called "users":
id
name
email
age
1
Joe
joe@fakemail.com
42
2
Mary
mary@fakemail.com
24
3
Matt
matt@fakemail.com
38
Here, QueryDatabaseTable would be configured to use a table name of "users" and a "Maximum-Value Column" of "id". When the processor runs the first time, it will not have seen any values from the "id" column and thus all rows will be returned. The query executed is:
SELECT * FROM users
However after that query has completed, QueryDatabaseTable stores the maximum value for the "id" column that it has seen; namely, 3.
Now let's say QueryDatabaseTable has been scheduled to run every 5 minutes, and the next time it runs, the table looks like this:
id
name
email
age
1
Joe
joe@fakemail.com
42
2
Mary
mary@fakemail.com
24
3
Matt
matt@fakemail.com
38
4
Armando
armando@fakemail.com
20
5
Jim
jeff@fakemail.com
30
Because QueryDatabaseTable had stored the maximum value of 3 for the "id" column, this time when the processor runs it executes the following query:
SELECT * FROM users WHERE id > 3
Now you will see just the last two rows (the recently added ones) are returned, and QueryDatabaseTable has stored the new maximum value of 5.
Here is the concept applied to a NiFi flow:
For this example, I have a "users" table containing many attributes about randomly-generated users, and there are 100 records to start with:
If we run QueryDatabaseTable, we see that after the SplitAvro we get 100 separate flow files:
The flow remains this way until new data comes in (with an "id" value larger than 100). Adding one:
Once this row is added, you can see the additional record move through the flow:
This approach works for any column that has increasing values, such as timestamps. If you want to clear out the maximum value(s) from the processor's state, right click on the processor and choose View State. Then you can click "Clear State" on the dialog:
Hopefully this article has shown how to use QueryDatabaseTable to do incremental fetching of database tables in NiFi. Please let me know how/if this works for you. Cheers!
... View more
Labels:
08-16-2016
06:32 PM
1 Kudo
Do you have Groovy installed on your machine? If so try: grape install org.codehaus.groovy.modules.http-builder http-builder 0.7.1 If that doesn't work, then the @Grab won't work either (lots of folks seem to have trouble grabbing that module). In that case you should put the http-builder JAR (and its dependencies) into some folder and use Module Directory as Bryan suggested.
... View more
08-16-2016
04:10 PM
1 Kudo
Provenance fields are not available in Expression Language per se. Only the flow file's attributes (including the core attributes, some of which are pictured) are available to Expression Language (EL). The Provenance Event metadata is retrieved by a separate query, which is likely too expensive to be done for Expression Language evaluation. Also if EL is used in a processor, which provenance event would be bound to it? The event(s) generated by the processor acting on the flow file have likely not yet been generated, and getting the previous event could be difficult (and slow). Having said that, the above information is certainly available to data flows. IMO the best way to get the provenance event data is to use the SiteToSiteProvenanceReportingTask to send provenance events over Site-to-Site to your flow. There you can parse the events (they are in JSON), filter for the flow file UUID if you want, and/or extract into attributes (using EvaluateJsonPath for example) fields such as Transit Uri.
... View more
08-13-2016
06:58 PM
1 Kudo
Absolutely correct! If you were to add any jars to lib/ it would be only the Apache Ivy JAR so you could use the @Grab annotation as described here: http://funnifi.blogspot.com/2016/05/using-groovy-grab-with-executescript.html. Otherwise, use Bryan's suggestion for best results.
... View more
08-12-2016
01:44 PM
1 Kudo
The PutSQL processor issues statements and does not expect a ResultSet to be returned, you can use that to issue DDL/DML commands.
... View more
08-11-2016
03:09 PM
1 Kudo
In addition to @Andrew Grande 's answer, since it is a custom processor you could add a property to specify additional folders, JARs, etc. and have the processor build a classloader using those locations. Then you wouldn't have to worry about a NiFi-relative location, although the location(s) would still need to be accessible by each node running the processors (to Andrew's point). There are examples of this "modules" property in the ExecuteScript and JoltTransformJSON processors.
... View more
08-10-2016
12:43 AM
2 Kudos
This is a bug in NiFi/HDF, recorded here. As you mentioned, due to limitations in the various JDBC drivers, it is usually expected that the returned values will adhere to the specification. In this case, a Timestamp value should have supported as much precision as feasible, hence the bug. In contrast however, for Time types this is not as consistent. A possible workaround is to schedule the QueryDatabaseTable processor to run at intervals when you expect new data, and also maybe a DetectDuplicate processor somewhere in your flow.
... View more
08-03-2016
07:29 PM
4 Kudos
I’ve seen this too, I believe the errors are mostly red herrings. I believe somewhere in the stack trace will be a statement that the client can’t connect to the data node, and it will list the internal IP (10.0.2.15, e.g.) instead of 127.0.0.1. That causes the minReplication issue, etc. This setting is supposed to fix it: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HdfsMultihoming.html#Clients_use_Hostnames_when_connecting_to_DataNodes It is supposed to make the client(s) do DNS resolution, but I never got it working. If it does work, then when the NameNode or whoever tells the client where the DataNode is, it gives back a hostname instead of an IP, which should then resolve on the client (may need to update /etc/hosts) to localhost (since the sandbox is using NAT). The ports also need to be directly-forwarded (50010 I believe). An alternative could be to switch the sandbox to a Host-Only Adapter, then it has its own IP. However IIRC Hadoop hard codes (at least used to) the IP down in its bowels so I’m not sure that would work by itself either. @bbende has some ideas in another HCC post as well: https://community.hortonworks.com/questions/47083/nifi-puthdfs-error.html
... View more