Member since
11-16-2015
892
Posts
649
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5397 | 02-22-2024 12:38 PM | |
1348 | 02-02-2023 07:07 AM | |
3020 | 12-07-2021 09:19 AM | |
4165 | 03-20-2020 12:34 PM | |
13986 | 01-27-2020 07:57 AM |
04-08-2019
06:06 PM
As of NiFi 1.9.0 (HDF 3.4), the XMLReader can be configured to infer the schema. If you can't upgrade, you could download NiFi 1.9.0 and run it once to infer the schema and write it to an attribute, then inspect the flow file and copy off the schema for use in your operational NiFi instance. There may also be libraries and/or websites that will infer the Avro schema from the XML file for you.
... View more
04-08-2019
01:50 PM
Since you want to change the array of values into key/value pairs, you'll need to put them in an object inside the variables array, so I'm guessing you want a single-element array "variables" containing an object that has the key value pairs. If that's correct, you can use JoltTransformJSON with the following spec, it adds keys for each value in the array based on its order: [
{
"operation": "shift",
"spec": {
"variables": {
"0": "variables[0].username",
"1": "variables[0].active",
"2": "variables[0].temperature",
"3": "variables[0].age"
},
"*": "&"
}
}
] This gave me the following output: {
"id" : 123456,
"ip" : "*",
"t" : -12.9,
"T" : -23.8,
"variables" : [ {
"username" : "user1",
"active" : 0,
"temperature" : 12.97,
"age" : 23
} ]
}
... View more
04-03-2019
06:27 PM
1 Kudo
I believe the flow file is entering the processor and just taking a very long time to process. In the meantime it will show up in the connection on the UI (although if you try to remove it while it's being processed, you will get a message that zero flow files were removed). The indicator that the flow file is being processed is the grid of light/dark dots on the right side of the processor. While that is shown, the processor is executing, ostensibly on one or more flow files from the incoming queue. For your script, I think the reason for the long processing (which I would think would be followed by errors on the processor and in the log?) is because you're reading the entire file into a String, then calling PDDocument.load() on the String, when there is no method for that (you need byte[] or InputStream). The very unfortunate part here is that Groovy will try to print out the value of your String, and for some unknown reason when you call toString() on a PDDocument, it gives the entire content, which for large PDFs you can imagine is quite cumbersome. Luckily you can skip the representation as a String altogether, since the ProcessSession API gives you an InputStream and/or OutputStream, which you can use for load() and save() methods on a PDDocument. I took the liberty of refactoring your script above, mine's not super sophisticated (especially in terms of error handling) but should give you the gist of the approach: import org.apache.pdfbox.pdmodel.PDDocument
import org.apache.pdfbox.multipdf.Splitter
flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
try {
def document
session.read(flowFile, {inputStream ->
document = PDDocument.load(inputStream)
} as InputStreamCallback)
def splitter = new Splitter()
splitter.setSplitAtPage(2)
try {
def forms = splitter.split(document)
forms.each { form ->
newFlowFile = session.write(session.create(flowFile), {outputStream ->
form.save(outputStream)
} as OutputStreamCallback)
flowFiles << newFlowFile
form.close()
}
} catch(e) {
log.error('Error writing splits', e)
throw e
} finally {
document?.close()
}
session.transfer(flowFiles, REL_SUCCESS)
} catch(Exception e) {
log.error('Error processing incoming PDF', e)
session.remove(flowFiles)
}
session.remove(flowFile)
... View more
04-02-2019
01:29 PM
You have specified the SQL Statement property but haven't supplied any values. I recommend replacing PutSQL with PutDatabaseRecord with a Statement Type of INSERT, this should do what you are trying to do.
... View more
04-02-2019
01:27 PM
1 Kudo
You can use MergeContent or MergeRecord for this, it can take flow files each with a single record and combine them together to make a flow file containing many Avro records, then you can use ConvertAvroToParquet or PutParquet.
... View more
03-29-2019
07:20 PM
That's a great idea, thanks! I've been meaning to update it, hopefully sooner than later 🙂
... View more
03-26-2019
01:58 PM
Once a flow file has been created in a session, it must be removed or transferred before the session is committed (which happens at the end of ExecuteScript). Since your try is outside the loop that creates new flow files, you'll want to remove all the created ones, namely the flowFiles list. You can do that with simply: session.remove(flowFiles) rather than the loop you have in your catch statement.
... View more
03-21-2019
12:55 PM
Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader: {
"namespace": "nifi",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "headers","type": {"type": "map","values": "string"}},
{"name": "info","type": {"type": "map","values": "string"}},
{"name": "payload","type": {"type": "map","values": "string"}}
]
} For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example: {
"namespace": "nifi",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "headers","type": {"type": "map","values": "string"}},
{"name": "info","type": {"type": "map","values": "string"}},
{"name": "uuid","type": "string"},
{"name": "kafka.topic","type": "string"},
{"name": "kafka.partition","type": "string"},
{"name": "kafka.offset","type": "long"}
]
} If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.
... View more
02-25-2019
04:57 PM
It sounds like you're using ExecuteSQL as a source processor, meaning there are no incoming connections. In that case no flow file will be created. Instead, use GenerateFlowFile upstream from ExecuteSQL. You can schedule GenerateFlowFile for 10 minutes and set the content of the flow file to the SQL statement you want to execute. Then in ExecuteSQL you can schedule for 0 sec (as fast as possible, since the upstream processor runs every 10 mins) and remove the SQL Select Query property. At that point ExecuteSQL will expect the flow file content to contain SQL (which it will) and will execute that. If an error occurs, the flow file should be routed to failure, and you can use a PutEmail processor or something downstream for notification.
... View more
02-20-2019
10:44 PM
1 Kudo
I left an answer to your SO post, basically there is a bug or two preventing BLOB transfer from source to target DBs, but I presented a possible workaround.
... View more