Member since
09-29-2015
871
Posts
723
Kudos Received
255
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4065 | 12-03-2018 02:26 PM | |
3056 | 10-16-2018 01:37 PM | |
4181 | 10-03-2018 06:34 PM | |
3023 | 09-05-2018 07:44 PM | |
2291 | 09-05-2018 07:31 PM |
05-29-2018
07:49 PM
1 Kudo
One way would be to define the schema ahead of time in one of the schema registries, and then have your CSVReader's Schema Access Strategy set to "Schema Name" so that it uses the schema from the registry, and then tell it to ignore the first line of the CSV. The downside is you have to define the schema rather than just using the column headers. Besides that, the next easiest option would probably be to use ExecuteScript to write a simple script that reads the first line and converts the spaces in the column names to underscores, and then wrote it back out converted along with all the other unmodified lines. It is possible there might be a way to do it with ReplaceText, but I'm not exactly sure how to modify only the first line.
... View more
05-23-2018
06:11 PM
Variable registry is not the same thing as flow file attributes. What you are setting here are flow file attributes with the topic names, which will now be passed along with that specific flow file and accessible through expression language only when that particular flow file is involved. There is no programmatic access to the variable registry from a processor or ExecuteScript, it would have to be done via NiFi's REST API.
... View more
05-23-2018
02:47 PM
What format is the data being picked up by GetFile? I'm assuming new-line delimited text? There are several options that could be used between GetFile and the RPG to split it up, but you would need to use an approach that splits correctly on the logical boundaries of the data in the file. Ideally whatever is producing the file before NiFi could produce multiple smaller files.
... View more
05-23-2018
02:20 PM
Increasing the back-pressure threshold will allow you to send more data to the input port, but it won't help make PublishKafka work. Where does the 1.2GB file come from and how does it get sent to the input port? I'm wondering if you can create multiple smaller files, instead of one huge file, and see if that helps at all.
... View more
05-23-2018
01:26 PM
Thanks for the info... From the looking at the picture, the reason the input port stops working is because back-pressure has occurred on the queue between the input port and PublishKafka, which you can tell from the red indicator on the queue. When back-pressure engages it means the component before the queue is no longer scheduled to run until the queue has gone below the threshold. You can configure the threshold by right-clicking on the queue, I believe it defaults to 10,000 flow files or 1GB of total size, which has been exceeded with the 1.2GB flow file. As far as PublishKafka... as far as I can tell, the thread dump does not show a stuck thread, it shows that a thread for PublishKafka is reading the 1.2 GB file from NiFi's content repository. How long have you waited to see if it finishes? I would assume it could take a while to read and send 1.2 GB to Kafka.
... View more
05-22-2018
01:24 PM
When the flow file is in the queue before PublishKafka, is there a number like "1" in the top right corner of PublishKafka? If so, this means there is a thread processing the flow file, and we need to see what it is doing which can be done by taking a thread dump: ./bin/nifi.sh dump dump.txt Please provide the contents of dump.txt
... View more
05-17-2018
03:43 PM
1 Kudo
The Run Schedule is the schedule of when the NiFi framework will execute a processor. The default of timer driver 0 seconds means to execute as fast as possible when there is data available in the incoming queue, if no data is there then it doesn't execute. The rate of the data depends on what the processor does during one execution... for example, lets say a queue has 100 flow files in it and you set the processor to run every 5 minutes. Some processors may grab a batch of files during one execution, so even tough the processor executes once, it may grab 50 of those flow files. It also depends if your flows files have multiple logical messages in the content. If you have 1 record per flow file, and if the processor only grabs 1 flow file at a time (most only take one at a time), then the run schedule does control the rate. You can look at ControlRate processor as well.
... View more
04-25-2018
06:42 PM
2 Kudos
You can bulk manipulate records using UpdateRecord and Record Path: https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.UpdateRecord/index.html Changing the name of the field is a little tricky because you are reading based on a schema, so the field name is part of the schema, but it can be done if you define both field names in the schema and have the second field default to null, then you can move the value from the first field to the second field. Regarding splitting a record in the binary and metadata... One thing to understand is that even though your Avro data file has a schema embedded in it, you don't have to use this schema for reading/writing in NiFi. Imagine this scenario... Embedded schema: { "name": "foo", "namespace": "nifi", "type": "record", "fields": [ { "name": "field1", "type": "string" }, { "name": "field2", "type": "bytes" } ] } You could define two other schemas and put them in NiFi's schema registry... Metadata only: { "name": "foo", "namespace": "nifi", "type": "record", "fields": [ { "name": "field1", "type": "string" } ] } Binary Only: { "name": "foo", "namespace": "nifi", "type": "record", "fields": [ { "name": "field2", "type": "bytes" } ] } Then you can have a ConvertRecord that has the AvroReader using embedded schema, and the AvroWriter using a schema from the schema registry. So on the way out it would rewrite the Avro with only the fields from the writer schema. So if you sent your original data to two different ConvertRecord processors, one for metadata only and one for binary only, then can essentially select out the parts for each one. ConvertRecord is preferred over other convert processors. How the schema is accessed is based on Schema Access Strategy in the services. If a record reader says "Schema By Name" then it requires a schema registry to look up by name. If the Avro reader has "Embedded Schema" select then you shouldn't need to use a schema registry because it will use the schema embedded in the Avro file.
... View more
04-24-2018
05:17 PM
The main reason that there is no "EvaluateRecordPath" is because it only really makes sense when there is one record per flow file, and major goal of NiFi's record processing is to have flow files with many, many records in them. It would still be valuable to have a processor like this, but it wasn't a primary use case so hasn't be developed yet. The conversion that is happening with the JSON array is the correct behavior for how NiFi's record handling works... the Avro type of "bytes" is treated as an "array of byte", and since JSON only really has string and numeric types, all it can really do is write an array of ints (bytes). Unfortunately I think the best route for your would be a custom processor that reads the content of a flow file using Avro's Java library, then gets the byte[] of the binary field and writes it to the flow file's output stream to replace the content, and add the other fields as attributes. Lots of examples in NiFi code of how to read an Avro datafile, here is from ConvertAvroToJson processor: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java#L177-L216
... View more
04-09-2018
01:36 PM
1 Kudo
The process for creating a custom processor with the archetype has not changed. Running the command here, but change the versions to the appropriate version like 1.5.0: https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions#MavenProjectsforExtensions-MavenProcessorArchetype What errors are you getting?
... View more