Member since
04-05-2016
130
Posts
93
Kudos Received
29
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3791 | 06-05-2018 01:00 AM | |
5142 | 04-10-2018 08:23 AM | |
5650 | 07-18-2017 02:16 AM | |
2914 | 07-11-2017 01:02 PM | |
3342 | 07-10-2017 02:10 AM |
07-11-2017
07:08 PM
Wow the solution to remove the extra lines was creating a giant bottleneck... Hmm
... View more
07-07-2017
08:43 AM
1 Kudo
Hello @M R I was able to reproduce this behavior. The reason is timestamp-millis logicalType is not used as expected. When PutDatabaseRecord executes SQL, 'UPDATED_DATE' was set as it is as Long representation, so Oracle complains about it. Oracle expects Date type. Debugging further, I found that Avro doesn't read logicalType information if the type is defined as a String JSON node. Your schema text is defined as follows: {
"name": "UPDATED_DATE",
"type": "long",
"logicalType": "timestamp-millis"
} This way, 'logicalType' exists in the Field object, not for the 'type'. Since the 'type' element is textual, Avro parser don't decorate it. It has to be: {
"name": "UPDATED_DATE",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
} To correctly annotate type with logicalType. Now, the 'type' element is an JSON object, and Avro parser uses 'logicalType' definition. Then it works as expected.
... View more
06-07-2017
04:03 PM
Thanks for the help. Your estimate on the Run Schedule was a bit high though. When I changed it to even 30 seconds, it bottlenecked badly right before MergeContent. You were right though - when I lowered it to 1 sec, I have very little bottleneck and the error is gone.
... View more
06-08-2017
08:19 AM
Hello @rian amrodin
As @Wynner suggested, you should be able to route FlowFiles based on file size using 'gt()' EL function. However surprisingly, ListFTP and ListSFTP doesn't write 'file.size' attribute where it's supposed to do.. I submitted a JIRA NIFI-4041 and a fix for that. I hope it will be merged and available soon. Thanks for sharing your experience with Apache NiFi! https://issues.apache.org/jira/browse/NIFI-4041
... View more
05-29-2017
08:45 AM
@Pierre Leroy when you are referring to download content from the URL, in general GET method is supported to get content from the webservice. I am not sure if you are configuring the POST request wrongly. "HTTP error 405 method not allowed" means URL exists with webservice but you are not authorized to access/perform that action. Please check if you need to specify any authorization headers to perform the action. I would suggest to get more details about the URL like uri, http method, is it ssl encrypted, any authorization fields required to access the URL etc and configure the Nifi processor to fulfill the same!
... View more
03-10-2017
12:47 AM
1 Kudo
How to connect GetKafka to Kafka through Stunnel Stunnel is a proxy that can make insecure network transmission secure by wrapping it with SSL. This article contains example and illustrations describing how it works and how to configure it. Most part of it is derived from this informative Git comment I wouldn't be able to set it up without this comment. Thank you for sharing such detailed example.
How it works? Let's see how it can be applied to NiFi GetKafka.
I used two servers for this experimentation. 0 and 1.server.aws.mine. A single Zookeeper and Kafka broker is running on 0.server.
A GetKafka NiFi processor in 1.server consumes messages through Stunnel: Kafka Broker joins the Kafka cluster and declares its address as 127.0.0.1:9092 . If Zookeeper is in different server (recommended) and you need to secure this connection via Stunnel as well, then you need to apply the same method as the one used between GetKafka and Zookeeper. GetKafka's Zookeeper Connection String is set to 127.0.0.1:2181 which is local Stunnel is listening to. Then the local Stunnel on 1.server proxies the request to 0.server:2181 over SSL. At 0.server, the request is proxied again by the Stunnel running on 0.server, then finally arrives at Zookeeper. Since the Kafka Broker running on 0.server declares its address as 127.0.0.1:9092 , GetKafka (Kafka client) sends request to 127.0.0.1:9092 , and the request eventually transferred to the Broker through Stunnel pair. Here is the relevant configurations in 1.server's stunnel.conf (entire file is available here😞 client = yes
[zookeeper]
accept = 127.0.0.1:2181
connect = 0.server.aws.mine:2181
[kafka]
accept = 127.0.0.1:9092
connect = 0.server.aws.mine:9092
And this is for 0.server (entire file is available here😞 client = no
[zookeeper]
accept = 0.server.aws.mine:2181
connect = 127.0.0.1:2181
[kafka]
accept = 0.server.aws.mine:9092
connect = 127.0.0.1:9092
Kafka server.properties: host.name=127.0.0.1
zookeeper.connect=127.0.0.1:2181
Zookeeper zookeeper.properties clientPort=2181
clientPortAddress=127.0.0.1
How to authorize client access? Each Stunnel server has to have its own pem file containing a private key and a certificate. Also, a CA certificate file (or directory) is also needed to authorize client access. I used tls-toolkit.sh that is available in NiFi toolkit, to generate required files. Toolkit can generate three files, keystore.jks , truststore.jks and nifi.properties for each server. Server's key and cert can be extracted from keystore.jks. To do so, convert keystore.jks into keystore.p12 file by following commands (credit goes to this Stackoverflow) : # It's not important which server to run the toolkit on.
$ ./bin/tls-toolkit.sh standalone -n [0-1].server.aws.mine -C 'CN=server,OU=mine'
# Password for keystore.jks can be found in generated nifi.properties 'nifi.security.keystorePasswd'.
$ keytool -importkeystore -srckeystore keystore.jks -destkeystore keystore.p12 -srcstoretype jks -deststoretype pkcs12
Then extract key and cert from the p12 file:
$ openssl pkcs12 -in keystore.p12 -nokeys -out cert.pem $ openssl pkcs12 -in keystore.p12 -nodes -nocerts -out key.pem
Concatenate key and cert to create stunnel.pem, and deploy stunnel.pem to servers:
$ cat key.pem cert.pem >> stunnel.pem I used cert.pem as the CAFile for Stunnel on 0.server. In stunnel.conf on 0.server, following settings are needed to enable client cert verification: verify = 3
CAFile = /etc/stunnel/certs
Refer Stunnel manual for further description on these configurations. I confirmed that GetKafka running on 1.server can consume messages through Stunnel. If I used a cert which is not configured in the certs file on 0.server, GetKafka got timeout exception as follows: 2017-03-09 06:50:48,690 WARN [Timer-Driven Process Thread-5] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=b0a21b5d-015a-1000-fbba-2648095ae625] Executor did not stop in 30 sec. Terminated.
2017-03-09 06:50:48,691 WARN [Timer-Driven Process Thread-5] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=b0a21b5d-015a-1000-fbba-2648095ae625] Timed out after 60000 milliseconds while waiting to get connection
java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205) [na:1.8.0_121]
at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[nifi-kafka-0-8-processors-1.1.2.jar:1.1.2]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.2.jar:1.1.2]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.2.jar:1.1.2]
Stunnel commands # Install
sudo yum -y install stunnel
# Edit config
sudo vi /etc/stunnel/stunnel.conf
# Start
sudo stunnel
# Stop
sudo kill `cat /var/run/stunnel.pid`
Conclusion Although Stunnel works with GetKafka and Kafka 0.8.x, I recommend to use newer version of Kafka and ConsumeKafka NiFi processor with SSL if possible. As it's written in the Git comment, this workaround is not scalable (in terms of required administration tasks) and complicated.
... View more
Labels:
03-05-2017
11:43 PM
Glad to hear you figured it out. Thanks!
... View more
02-27-2017
11:41 PM
1 Kudo
Hello @Faruk Berksoz In the screenshot, TailFile Task/Time is shown as 30 (times) in the last 5 min (300 secs). If you scheduled TailFile to run every 10 sec, I think the stats is correct. It seems TailFile is scheduled correctly but there hasn't been new lines found so no FlowFile is produced by TailFile. When new lines are added to the file that being watched by the TailFile, those will be picked up by TailFile and passed to PublishKafka. If you're sure that new lines are appended but not seeing any data is ingested to NiFi, then please elaborate the issue. Regards, Koji
... View more
02-20-2017
11:27 PM
Thanks for the update, glad to hear you managed to make it working!
... View more
02-13-2017
09:29 AM
4 Kudos
Hi @Avijeet Dash What @Jobin George suggested would help to share common static configuratiosn at various part of a NiFi flow. In addition to that, if you'd like to know how to Put/Get from distributed cache, and how to enrich FlowFiles with cached values, this example might be helpful: Template file is available here: https://gist.github.com/ijokarumawak/8ba9a2a1b224603f877e960a942a6f2b Thanks, Koji
... View more