Member since
09-29-2015
871
Posts
723
Kudos Received
255
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 4275 | 12-03-2018 02:26 PM | |
| 3224 | 10-16-2018 01:37 PM | |
| 4336 | 10-03-2018 06:34 PM | |
| 3193 | 09-05-2018 07:44 PM | |
| 2442 | 09-05-2018 07:31 PM |
06-09-2017
03:08 PM
I think the problem is the client used by NiFi thinks the response should be GZIP'd, but the service isn't returning a GZIP'd response. Can you edit nifi_home/conf/logback.xml and turn on debug logging for InvokeHttp by adding: <logger name="org.apache.nifi.processors.standard.InvokeHTTP" level="DEBUG"/> Then wait 30 seconds and run some data through the processor and look in nifi-app.log for something like: Response from remote service After that line there should be a list of all the headers from the response, and if you could paste them here that would be helpful for debugging this.
... View more
06-09-2017
02:44 PM
The difference between event-driven and timer-driven is the following... In timer-driven the framework is always checking if there is work to do (i.e. data available in a queue) and then triggering the processor when there is. In event-driven the framework would take a flow file and pass it directly from the previous processor to the next processor. So in both cases onTrigger is only being called when a flow file is available, but event-driven would be more efficient for the framework.
... View more
06-09-2017
02:38 PM
If you want to get the HBaseClientService outside of onTrigger you can do the following... private volatile HBaseClientService service;
@OnScheduled
public void onScheduled(ProcessContext context) {
this.service = context.getProperty(HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
}
A method annotated with @OnScheduled will be called one time when the processor is first started. Also, keep in mind that getting the HBaseClientService every time in onTrigger is totally fine because its not making a new connection every time. The connection is made when HBaseClientService is first started, and then you would just be getting access to that existing connection each time in onTrigger.
... View more
06-09-2017
01:35 PM
1 Kudo
Its trying to read the response from the POST in order to write it to the content of the flow file, but its getting an End-Of-File exception. The response seems to be expecting gzip compression which expects a 10 byte header, and its throwing the exception because 10 bytes don't exist.
... View more
06-09-2017
01:24 PM
1 Kudo
I'm not sure I understand the question... There is a controller service called HBaseClientService which would be the connection to HBase, and then your processor can declare a property like: static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build(); Then in your onTrigger method you get the service by doing: HBaseClientService clientService = context.getProperty(HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class); You can look at how the existing HBase processors work: https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors
... View more
06-09-2017
01:21 PM
1 Kudo
I believe event-driven is still somewhat experimental... For processors that have in an incoming queue, generally you schedule them with timer-driver and a run schedule of 0 which means run all the time, and then the framework only runs them when the queue has data in it. So if your queue is always empty it won't be using any CPU cycles. Is there something about your processor that would cause the above approach to not work?
... View more
06-08-2017
05:38 PM
1 Kudo
In Apache NiFi 1.2, there are is PublishKafkaRecord_0_10 which uses a RecordReader service for reading incoming flow files as records, and a RecordWriter service for serializing records to bytes that will be published to a Kafka topic. Each RecordReader and RecordWriter can be configured with a strategy for accessing a schema, one strategy is to get a schema by name from a schema registry. If using a schema registry, you can configure a schema registry service. Apache NiFi provides AvroSchemaRegistry as a local schema registry that can be used within NiFi, or a HortonworksSchemaRegistry service which can be used with an external Hortonworks schema registry. In addition, anyone could implement their own schema registry service to talk to any other schema registry or backing storage mechanism.
... View more
06-07-2017
02:10 PM
5 Kudos
I agree with what Matt said above and I had been working on a template to achieve this before I saw his answer so figured I would post it anyway... I made up the following two schemas: {"name": "shortSchema",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "a", "type": "string" },
{ "name": "b", "type": "string" }
]} {"name": "fullSchema",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "c", "type": ["null", "string"], "default" : "default value for field c" },
{ "name": "d", "type": ["null", "string"], "default" : "default value for field d" },
{ "name": "a", "type": "string" },
{ "name": "b", "type": "string" }
]} Then made the following flow: What this flow does it the following: Generate a CSV with two rows and the columns a,b,c Reads the CSV using the shortSchema and writes as Avro with the shortSchema Updates the schema.name attribute to fullSchema Reads the Avro using the embedded schema (shortSchema) and writes it using the schema from schema.name (fullSchema) Reads the Avro using the embedded schema (fullSchema) and writes it back to CSV At the end the CSV that is printed now has the new fields with default values filled in. The CSV part is just for being able to easily see what is going on and is obviously not required if you already have Avro data. Here is a template of the flow: convert-avro-short-to-full.xml
... View more
06-06-2017
01:53 PM
1 Kudo
The session provides methods to read and write to the flow file content. If you are reading only then session.read with an InputStreamCallback will give you an InputStream to the flow file content If you are writing only then session.write with an OutputStreamCallback will give you an OutputStream to the flow file content If you are reading and writing at the same time then a StreamCallback will give access to the both an InputStream and OutputStream In your case, if you are just looking to extract a value then you likely need an InputStreamCallback and you would use the InputStream to read the content and parse it appropriately for your data. You can look at examples in the existing processors: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java#L313-L318 Keep in mind, the above example reads the whole content of the flow file into memory which can be dangerous when there are very large flow files, so whenever possible it is best to process the content in chunks.
... View more
06-01-2017
03:15 PM
1 Kudo
You could add something specific to the host machine into the filename through expression language. One option would be to update the filename to ${hostname(false)}${filename} Or you could define a variable in the bootstrap.conf of each node like java.arg.16=-Dsystem.id=<SOME_NUMBER> and then set the filename to be ${system.id}-${filename} Also, appending to zip files will not work even if everything was running a single NiFi node. An HDFS append just writes raw bytes to the end of a file, so you'll end up with a single file that actually has the bytes of multiple zip files and won't be able to be processed by any tools that expect to read zip files.
... View more