Member since
11-16-2015
905
Posts
666
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 484 | 09-30-2025 05:23 AM | |
| 809 | 06-26-2025 01:21 PM | |
| 738 | 06-19-2025 02:48 PM | |
| 911 | 05-30-2025 01:53 PM | |
| 11628 | 02-22-2024 12:38 PM |
02-26-2018
07:09 PM
If you're updating the record structure, you'll need to update the schema as well. It sounds like the data coming out of ExecuteSQL may have different schemas based on which table is specified by the user. If you know all the possible schemas, you could enumerate them in a Schema Registry, where the key is some attribute (such as table name, let's say it's called table.name), and in the reader(s) you can use ${table.name} as the Schema Name property, and "Use Schema Name" as the access strategy. Then you could add "copies" of the input schemas, with the additional fields and any updated types, as output schemas. Those would be the ones used by the readers/writers after all your scripts have run. If the schemas truly are dynamic and you just need to add fields, then if the schema is in the avro.schema attribute, you could update it manually using a scripted processor such as ExecuteScript. For example, here is a Groovy script to be used in ExecuteScript for updating the avro.schema attribute by adding 2 extra entries to the "fields" array: import groovy.json.*
def flowFiles = session.get(1000)
if(!flowFiles) return
session.transfer(flowFiles.collect { flowFile ->
def schema = flowFile.getAttribute('avro.schema')
def json = new JsonSlurper().parseText(schema)
json.fields += ['name': 'processGroupName', 'type': ['null', 'string']]
json.fields += ['name': 'processGroupId', 'type': ['null', 'string']]
session.putAttribute(flowFile, 'avro.schema', JsonOutput.toJson(json))
}, REL_SUCCESS) The script grabs up to 1000 flow files, then for each one, it reads in the schema from the avro.schema attribute, parses the JSON, then adds processGroupName and processGroupId (both optional strings) to the "fields", then converts the JSON back to a string and updates the avro.schema attribute. Once this is done for all flow files, they are transferred to success.
... View more
02-26-2018
04:40 PM
1 Kudo
For algorithms that don't require secret information, I think it could be helpful to add such functions to NiFi Expression Language, although they don't exist right now (please feel free to file a Jira to add such a capability). With this capability you could use UpdateRecord to replace a field with its hashed value. In the meantime, if you are comfortable with a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, you could write a ScriptedLookupService that can "lookup" a specified field by returning the hash of its value. Then you can use LookupRecord with that service to get the hashed values. Alternatively you could implement a ScriptedRecordSetWriter that is configured to hash the values of the specified field(s), and then ConvertRecord with that writer.
... View more
02-22-2018
05:16 PM
If that is your output schema, then the Result RecordPath in LookupRecord should be /COMPANY not /Company.
... View more
02-14-2018
01:24 AM
The post uses UpdateAttribute to set the CONNECTION_UUID attribute, if you want to monitor more, you can change the upstream flow to get the connection UUID for all connections. For the root process group, you should be able to do this with InvokeHttp for the endpoint /process-groups/root/connections, then parse the JSON and probably split it with SplitJson and get the URL into an attribute for each flow file with EvaluateJsonPath. Then you can use UpdateAttribute (or probably just EvaluateJsonPath) to set the UUID from the incoming JSON document, and then let the rest of the flow "do its thing". You can route/merge later on the UUID if necessary.
... View more
02-12-2018
04:52 PM
1 Kudo
Perhaps we could add "CREATE TABLE" and other DDL commands to ConvertJsonToSQL? The hardest part of the exercise is to infer the types correctly, it's such a difficult problem that there's often a "why bother?" attitude, or the type inference is "good enough" for 80% of the use cases, etc.
... View more
02-12-2018
04:42 PM
1 Kudo
NIFI-978 addresses this capability, I have a Pull Request up with the improvement, perhaps it will make it into NiFi 1.6.0.
... View more
02-12-2018
04:14 PM
4 Kudos
It is possible with ExecuteScript and Groovy if you don't have a SecurityManager set on the JVM that would prevent Groovy from getting at the ProcessorNode which is a private member variable of the StandardProcessContext. The following script can be used in ExecuteScript to add the parent process group ID as an attribute to an incoming flow file: def flowFile = session.get()
if(!flowFile) return
processGroupId = context.procNode?.processGroupIdentifier ?: 'unknown'
flowFile = session.putAttribute(flowFile, 'processGroupId', processGroupId)
session.transfer(flowFile, REL_SUCCESS)
... View more
02-08-2018
05:35 PM
3 Kudos
In the article you mention, the ID of the connection is specified as an attribute, then the InvokeHttp processor uses it to create a URL to poll for status. In your case, you'd just need the list of connection IDs, and you can create a flow file for each connection. You can use the "/process-groups/root/connections" REST endpoint to get a list of connections at the root process group. If you want to get connections for child process groups, you'd first need to get the child process groups' IDs and use those in the connections endpoint to get the connections for each process group. That can recurse down through each child process group and may be unwieldy. For this example I'm going to assume you simply want to monitor your queues in the root process group. With InvokeHttp and the aforementioned REST endpoint to get the connections, you'll get a JSON object back with a field called "connections", which is an array of connections. Then you can use SplitJson with a JSONPath of "$.connections", and it will create a flow file for each of the connections. Then you can use EvaluateJsonPath to extract the connection's URL using a JSONPath of "$.uri". Then you can continue using the flow described in the other article, and for each of the flow files, it will retrieve the status for that connection.
... View more
02-07-2018
08:19 PM
1 Kudo
Avro Schemas can be confusing the first couple of times you create them 🙂 In your case you could use the following: {
"namespace": "nifi",
"name": "cesarPipeDelimitedRecord",
"type": "record",
"fields": [
{"name": "id","type": "string"},
{"name": "sequence","type": "int"},
{"name": "category","type": "int"},
{"name": "text","type": "string"}
]
} If you can have missing values, then you can replace the type with a union, for example if "category" can be missing, then its field entry can be {"name": "category","type": ["null","int"]},
... View more
02-07-2018
05:59 PM
2 Kudos
You don't have to extract the fields to attributes if you are converting the contents to a different format, instead you can use ConvertRecord with a CSVReader with custom format (a pipe delimiter for instance) and name your fields in the Avro schema. Then in ConvertRecord you can set a JsonRecordSetWriter to convert to JSON. This same approach will work for any supported output format, or you can even write your own ScriptedRecordSetWriter if you need a custom format. If you do need to extract to attributes, you can use ExtractText with a regular expression that matches each field, and you can add user-defined properties to extract the group(s) into their associated attributes (the property name is the field name such as "id" or "sequence", and the value is the grouping expression, perhaps $2, $3, etc.)
... View more