Member since
11-16-2015
905
Posts
665
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 436 | 09-30-2025 05:23 AM | |
| 773 | 06-26-2025 01:21 PM | |
| 668 | 06-19-2025 02:48 PM | |
| 858 | 05-30-2025 01:53 PM | |
| 11407 | 02-22-2024 12:38 PM |
02-16-2017
04:28 PM
1 Kudo
ExecuteScript creates a new ScriptEngine for each one of the tasks specified in the Max Concurrent Tasks property, and reuses those engines for each flow file. ExecuteScript basically allows you to implement an onTrigger() method using a scripting language, so it doesn't provide for other lifecycle things like setup and shutdown. For that you can use InvokeScriptedProcessor, there's a little more boilerplate as you must implement a subclass of Processor, but in return you can override the initialize() method to connect to the DMC once, as well as provide any number of extra properties and relationships to the "parent" InvokeScriptedProcessor for configuration. I have some examples on my blog including this one. In general, I should mention that the Jython engine is relatively slow anyway, so you won't see great performance from it. You can get better performance by porting to Groovy or Javascript if possible.
... View more
02-13-2017
06:44 PM
1 Kudo
For this, I am assuming that you have a property called "nifi.prefix.cat" defined in your Variable Registry (custom.properties, e.g.): nifi.prefix.cat=my_filename.txt Then assuming a flow file comes into UpdateAttribute and has the "suffix" property set to "cat", you can add a dynamic property called "nifi.filename" set to:
${${literal('nifi.prefix.'):append(${suffix})}} This should give you an attribute called "nifi.filename" set to "my_filename.txt". Please let me know if I've understood what you are trying to do, and I'll edit this as needed.
... View more
02-10-2017
09:30 PM
Try explicitly setting the Return Type as 'json' rather than 'auto-detect' or 'scalar'.
... View more
02-03-2017
07:55 PM
1 Kudo
I was able to get such a script working with those json-lib classes. I had different versions of some of those libraries though, your issue might be from using commons-collections-3.2.2 instead of 3.2.1. I started with the dependencies (and versions) listed here, and only downloaded what I needed to get things compiling: I set the Module Directory property to the folder containing all the JARs (versus an entry for each JAR): /Users/mburgess/Downloads/json-lib Here is the sample script I used (it's not pretty but compiles and runs): import net.sf.json.*
import net.sf.json.xml.*
class POGO {
String a
List<String> b
Map<String, Integer> c
}
def js = new JSONSerializer()
def xs = new XMLSerializer()
def flowFile = session.get()
if(!flowFile) return
def p = new POGO(a: "Hello", b: ["I", "am", "a", "list"], c: ['k1':1, 'k2':2])
def j = js.toJSON(p)
def x = xs.write(j)
flowFile = session.putAttribute(flowFile, 'new.value', x)
session.transfer(flowFile, REL_SUCCESS) This ignores the incoming flow file content and creates an Object which is transformed to JSON then XML (I wanted to exercise the toJSON() and write() methods), then puts the XML in an attribute (to make the example easier) and sends the flow file on.
... View more
02-03-2017
02:53 PM
Correct, the encode() method will return a bytearray already.
... View more
01-27-2017
10:11 PM
3 Kudos
If the CSV doesn't need any work done to it and you just want to put a Hive table over the CSV(s), you can use the following flow: GetFile -> PutHDFS -> ReplaceText -> PutHiveQL GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this. PutHDFS: Puts the CSV file(s) onto the HDFS file system ReplaceText: Replace the content of the flow file with a HiveQL DDL statement, such as "LOAD DATA INFILE ..." or "CREATE TABLE IF NOT EXISTS ..." PutHiveQL: Execute the DDL command. Alternatively, if you want to insert each row individually (note: this is slower), and you know the schema of the CSV file, you could use the following flow: GetFile -> SplitText -> ExtractText -> ReplaceText -> PutHiveQL GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this. SplitText: Split the CSV file into one line/row per flow file ExtractText: Extract each column value into an attribute. There is an example of this in the Working_With_CSV template. ReplaceText: Replace the content of the flow file with a HiveQL statement, using NiFi Expression Language to insert the column values, such as a Replacement Value of "INSERT INTO myTable VALUES ('${col1}', '${col2}', ${col3} )". Note the use of quotes to surround columns whose values are string literals. You could also use JDBC parameters and flow file attributes, see the PutHiveQL documentation for more details (i.e. your Replacement Value would be INSERT INTO myTable VALUES (?,?,?) and you'd need attributes for the JDBC types and values for your columns). PutHiveQL: Execute the INSERT command(s). If instead you need the data in a different format (Avro, JSON, ORC, etc.), then your flow will be more complex (as your example is above). NiFi is highly modular, so although a flow to do something "simple" like get CSV into Hive, there are actually a number of smaller operations to be performed (conversions, input/output, etc.), and thus there may be several processors in your flow. Your example illustrates this modularity in terms of what format(s) the processors are expecting, so if you want to auto-generate the SQL (versus hand-generating it with ReplaceText), then ConvertJsonToSQL is your option, but that requires JSON, and there's no ConvertCSVtoJSON processor at present, so you need the additional conversion processors. There is a Jira case to add the ability to do arbitrary format/type conversions, to avoid the need for multiple conversion processors in a chain (as you have above).
... View more
01-26-2017
04:32 PM
You may want to use ListFile -> FetchFile rather than GetFile. ListFile will keep track of the files it has found and will not list them again unless they have been updated (and still satisfy the other filters you specify in the properties). Can you describe your use case a bit more? Is it the case that many files may be placed in the directory "at once" but you only want the latest one? Also do the files need to remain in that directory? If so, I think ListFile -> FetchFile is your best bet, but if not, you can set GetFile to remove the file on read. Then only "new" files will be found by GetFile (because any files processed would be removed).
... View more
01-26-2017
02:07 PM
2 Kudos
The errors look similar to the ones in some other HCC posts: https://community.hortonworks.com/questions/50301/call-for-help-fail-to-run-puthdfshbase-1-1-2-clien.html https://community.hortonworks.com/questions/66756/spark-hbase-connection-issue.html Do the suggestions there help at all? If there is an issue with adding JARs to the classpath, you can do this via the "Database driver location(s)" property. If there is an issue with including Hadoop configuration files, you can try adding them to the Database driver location(s) property as well, although I don't know if that will work.
... View more
01-25-2017
08:42 PM
Correct, SelectHiveQL is for statements that return ResultSets (like SELECT *), those results are converted to Avro records. PutHiveQL is for executing statements (except callable statements like stored procedures) that do not return results, such as your ALTER TABLE example.
... View more
01-18-2017
08:02 PM
If there is a fixed mapping of names, you can use JoltTransformJSON to rename the fields. For more complex things (dynamic attribute creation), you can use ExecuteScript.
... View more