Member since
11-16-2015
911
Posts
668
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 701 | 09-30-2025 05:23 AM | |
| 1076 | 06-26-2025 01:21 PM | |
| 931 | 06-19-2025 02:48 PM | |
| 1101 | 05-30-2025 01:53 PM | |
| 12281 | 02-22-2024 12:38 PM |
02-28-2018
11:58 PM
Sorry about that, I had a copy-paste error, each of those "*...." entries should be followed by a ".&", so "*.*.*.*":"&(0,1)_&(0,2)_&(0,3)_&(0,4).&"
... View more
02-28-2018
09:58 PM
1 Kudo
If you knew the schema of the incoming content, I believe you can use schema aliases in conjunction with ConvertRecord, but since you mention field names are unknown, I'm guessing you won't know the schema(s) either 🙂 You can do this with the JoltTransformJSON processor, although I don't think it supports arbitrary numbers of periods as its matching operator is as non-greedy as possible. Here is a spec that works for 1-3 periods: [
{
"operation": "shift",
"spec": {
"*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)",
"*.*.*": "&(0,1)_&(0,2)_&(0,3)",
"*.*": "&(0,1)_&(0,2)",
"*": "&"
}
}
] Note you could continue this pattern for any discrete number of periods. Also note that the above spec works for "flat" JSON files. For nested fields you'd have to go "one level deeper" and apply the same pattern, here's a spec that works for 1-3 periods, 1-2 fields deep: [
{
"operation": "shift",
"spec": {
"*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)",
"*.*.*": "&(0,1)_&(0,2)_&(0,3)",
"*.*": "&(0,1)_&(0,2)",
"*": {
"*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)",
"*.*.*": "&(0,1)_&(0,2)_&(0,3)",
"*.*": "&1.&(0,1)_&(0,2)",
"*": "&"
}
}
}
] If the incoming JSON is truly "wild west", you could use Groovy in an ExecuteScript processor along with a JsonSlurper (and JsonOutput) to change the keys at arbitrary depths with arbitrary numbers of periods.
... View more
02-28-2018
04:16 PM
2 Kudos
You can install HDF on an HDP cluster, but be mindful of which services are running on which nodes so you don't get performance issues. To install HDF on a new HDP cluster, you can use these instructions; for an existing HDP cluster, use this.
... View more
02-28-2018
12:50 PM
1 Kudo
The scripting processors (ExecuteScript, e.g.) offer Jython, not Python, as a scripting engine. Jython can't use compiled (CPython) modules, or modules whose dependencies include compiled modules. I suspect cx_Oracle or one of the other modules is (or depends on) compiled modules. Since your script uses "print" rather than the NiFi API (see my ExecuteScript Cookbook for examples of the latter), you could use ExecuteProcess or ExecuteStreamCommand to run your script using your native Python interpreter from the command line, the output will become the content of the flow file and should work for your use case.
... View more
02-27-2018
12:34 AM
1 Kudo
You know the path of the XML doc? I'm still looking at memory vs temp disk storage, if you can fill in this blank I hope to have an answer for you (in Groovy probably lol) tomorrow 🙂
... View more
02-26-2018
10:32 PM
What kinds of operations are you trying to perform on the files in the ZIP?
... View more
02-26-2018
07:09 PM
If you're updating the record structure, you'll need to update the schema as well. It sounds like the data coming out of ExecuteSQL may have different schemas based on which table is specified by the user. If you know all the possible schemas, you could enumerate them in a Schema Registry, where the key is some attribute (such as table name, let's say it's called table.name), and in the reader(s) you can use ${table.name} as the Schema Name property, and "Use Schema Name" as the access strategy. Then you could add "copies" of the input schemas, with the additional fields and any updated types, as output schemas. Those would be the ones used by the readers/writers after all your scripts have run. If the schemas truly are dynamic and you just need to add fields, then if the schema is in the avro.schema attribute, you could update it manually using a scripted processor such as ExecuteScript. For example, here is a Groovy script to be used in ExecuteScript for updating the avro.schema attribute by adding 2 extra entries to the "fields" array: import groovy.json.*
def flowFiles = session.get(1000)
if(!flowFiles) return
session.transfer(flowFiles.collect { flowFile ->
def schema = flowFile.getAttribute('avro.schema')
def json = new JsonSlurper().parseText(schema)
json.fields += ['name': 'processGroupName', 'type': ['null', 'string']]
json.fields += ['name': 'processGroupId', 'type': ['null', 'string']]
session.putAttribute(flowFile, 'avro.schema', JsonOutput.toJson(json))
}, REL_SUCCESS) The script grabs up to 1000 flow files, then for each one, it reads in the schema from the avro.schema attribute, parses the JSON, then adds processGroupName and processGroupId (both optional strings) to the "fields", then converts the JSON back to a string and updates the avro.schema attribute. Once this is done for all flow files, they are transferred to success.
... View more
02-26-2018
04:40 PM
1 Kudo
For algorithms that don't require secret information, I think it could be helpful to add such functions to NiFi Expression Language, although they don't exist right now (please feel free to file a Jira to add such a capability). With this capability you could use UpdateRecord to replace a field with its hashed value. In the meantime, if you are comfortable with a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, you could write a ScriptedLookupService that can "lookup" a specified field by returning the hash of its value. Then you can use LookupRecord with that service to get the hashed values. Alternatively you could implement a ScriptedRecordSetWriter that is configured to hash the values of the specified field(s), and then ConvertRecord with that writer.
... View more
02-22-2018
05:16 PM
If that is your output schema, then the Result RecordPath in LookupRecord should be /COMPANY not /Company.
... View more
02-14-2018
01:24 AM
The post uses UpdateAttribute to set the CONNECTION_UUID attribute, if you want to monitor more, you can change the upstream flow to get the connection UUID for all connections. For the root process group, you should be able to do this with InvokeHttp for the endpoint /process-groups/root/connections, then parse the JSON and probably split it with SplitJson and get the URL into an attribute for each flow file with EvaluateJsonPath. Then you can use UpdateAttribute (or probably just EvaluateJsonPath) to set the UUID from the incoming JSON document, and then let the rest of the flow "do its thing". You can route/merge later on the UUID if necessary.
... View more