1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1841 | 04-03-2024 06:39 AM | |
| 2859 | 01-12-2024 08:19 AM | |
| 1581 | 12-07-2023 01:49 PM | |
| 2344 | 08-02-2023 07:30 AM | |
| 3231 | 03-29-2023 01:22 PM |
03-30-2020
06:39 AM
Was MiNiO running? Did it crash? Not run on supported ports? Need admin permissions? Reboot? Firewall something blocking it. This error is either it is down or needs HTTPS. can you connect from amazon s3 client or telnet on that port. wireshark debugging?
... View more
03-19-2020
09:12 AM
3 Kudos
https://community.cloudera.com/t5/Community-Articles/Integrating-Apache-NiFi-with-AWS-S3-and-SQS/ta-p/246956 https://community.cloudera.com/t5/Community-Articles/Working-with-S3-Compatible-Data-Stores-via-Apache-NiFi/ta-p/244584 https://github.com/minio/nifi-minio we don't explicitly support ceph
... View more
03-03-2020
09:19 AM
https://community.cloudera.com/t5/Community-Articles/ETL-With-Lookups-with-Apache-HBase-and-Apache-NiFi/ta-p/248243 That is an HBase example, just use a different one. We also teach how to do the lookup / update record processors in our CDF Workshop roadshow
... View more
03-03-2020
09:18 AM
Use LookupRecord processor it can use JDBC sources http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.11.3/org.apache.nifi.processors.standard.LookupRecord/
... View more
01-30-2020
01:46 PM
That is a custom processor I wrote, you need to install it in the lib directory and restart nifi download nar from here . https://github.com/tspannhw/nifi-corenlp-processor/releases
... View more
01-23-2020
11:46 AM
Okay so I wrote an example nifi process to do it https://www.datainmotion.dev/2020/01/flank-stack-nifi-processor-for-kafka.html
... View more
01-17-2020
08:50 AM
1 Kudo
Exploring Apache NiFi 1.10: Stateless Engine and Parameters Apache NiFi is now available in 1.10! https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993 You can now use JDK 8 or JDK 11! I am running in JDK 11, seems a bit faster. A huge feature is the addition of Parameters! And you can use these to pass parameters to Apache NiFi Stateless! A few lesser Processors have been moved from the main download, see here for migration hints: https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance Release Notes: https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0 Example Source Code: https://github.com/tspannhw/stateless-examples More New Features: ParquetReader/Writer (See: https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html) Prometheus Reporting Task. Expect more Prometheus stuff coming. Experimental Encrypted content repository. People asked me for this one before. Parameters!! Time to replace Variables/Variable Registry. Parameters are better in every way. Toolkit module to generate and build Swagger API library for NiFi PostSlack processor PublishKafka Partition Support GeoEnrichIPRecord Processor Remote Input Port in a Process Group Command Line Diagnostics RocksDB FlowFile Repository PutBigQueryStreaming Processor nifi.analytics.predict.enabled - Turn on Back Pressure Prediction More Lookup Services for ETL/ELT: DatabaseRecordLookupService KuduLookupService HBase_2_ListLookupService Stateless First we will run in the command line straight from the NiFi Registry. This is easiest. Then we will run from YARN! Yes you can now run your Apache NiFi flows on your giant Cloudera CDH/HDP/CDP YARN clusters! Let's make use of your hundreds of Hadoop nodes. Stateless Examples Let's Build A Stateless Flow The first thing to keep in mind, is we will want anything that might change to be a parameter that we can pass with our JSON file. It's very easy to set parameters even for drop downs! You even get prompted to pick a parameter from a selection list. Before parameters are available you will need to add them to a parameter list and assign that parameter context to your Process Group. A Parameter in a Processor Configuration is shown as #{broker} Parameter Context Connected to a Process Group, Controller Service, ... Apply those parameters Param(eter) is now an option for properties Pop-up Hint for Using Parameters Edit a Parameter in a Parameter Context We can configure parameters in Controller Services as well. So easy to choose an existing one. Use them for anything that can change or is a something you don't want to hardcode. Apache Kafka Consumer to Sink This is a simple two step Apache NiFi flow the reads from Kafka and sends to a sink, for example a File. Let's make sure we use that Parameter Context To Build Your JSON Configuration File you will need the bucket ID and flow ID from your Apache NiFi Registry. You will also need the URL for that registry. You can browse that registry at a URL similiar to http://tspann-mbp15-hw14277:18080. My Command Line Runner /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Continuous --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafkaconsumer.json RunFromRegistry [Once|Continuous] --file <File Name> This is the basic use case of running from the command-line using a file. The flow must exist in the reference Apache NiFi Registry. JSON Configuration File (kafkaconsumer.json) { "registryUrl": "http://tspann-mbp15-hw14277:18080", "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993", "flowId": "0540e1fd-c7ca-46fb-9296-e37632021945", "ssl": { "keystoreFile": "", "keystorePass": "", "keyPass": "", "keystoreType": "", "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts", "truststorePass": "changeit", "truststoreType": "JKS" }, "parameters": { "broker" : "4.317.852.100:9092", "topic" : "iot", "group_id" : "nifi-stateless-kafka-consumer", "DestinationDirectory" : "/tmp/nifistateless/output2/", "output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output" } } Example Run 12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles 12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-5 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-2 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-3 at offset 19 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-0 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-1 at offset 20 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Built incremental fetch (sessionId=1943199939, epoch=5) for node 8. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 10 partition(s) 12:25:38.729 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(iot-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) to broker ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.737 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles Example Output cat output/247361879273711.statelessFlowFile {"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-MBP15-HW14277","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"} We can also run Once in this example to send one Kafka message. Generator to Apache Kafka Producer My Command Line Runner /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Once --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafka.json JSON Configuration File (kafka.json) { "registryUrl": "http://tspann-mbp15-hw14277:18080", "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993", "flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67", "flowVersion": "1", "ssl": { "keystoreFile": "", "keystorePass": "", "keyPass": "", "keystoreType": "", "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts", "truststorePass": "changeit", "truststoreType": "JKS" }, "parameters": { "broker" : "3.218.152.236:9092" } } Example Output 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions. 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8. 12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0) 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries 12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-errors 12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: [] 12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: [] Flow Succeeded Other Runtime Options: RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name> RunOpenwhiskActionServer <Port> References: Awesome Article on NiFi 1.10 Error Handling: https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10-series-simplifying-error-handling-7de86f130acd https://www.datainmotion.dev/2019/08/find-cacerts-from-java-jre-lib-security.html https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless https://nifi.apache.org/docs/nifi-docs/html/user-guide.html Parameters Added to API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html http://bit.ly/cdf-platform https://www.mtnfog.com/blog/apache-nifi-phi-processing https://www.slideshare.net/BryanBende/apache-nifi-sdlc-improvements https://nifi.apache.org/registry Add A S2S Port Inside Process Group ParquetReader ParquetRecordSetWriter
... View more
12-27-2019
06:18 AM
Hive doesn't use thrift anymore. new Python APIs work fine. https://github.com/tebeka/hiver
... View more
11-18-2019
08:29 AM
1 Kudo
Exploring Apache NiFi 1.10: Parameters and Stateless Engine
Apache NiFi is now available in 1.10!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993
You can now use JDK 8 or JDK 11! I am running in JDK 11, seems a bit faster.
A huge feature is the addition of Parameters! And you can use these to pass parameters to Apache NiFi Stateless!
A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance
Release Notes: https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0
Example Source Code: https://github.com/tspannhw/stateless-examples
More New Features:
ParquetReader/Writer (See: https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)
Prometheus Reporting Task. Expect more Prometheus stuff coming.
Experimental Encrypted content repository. People asked me for this one before.
Parameters!! Time to replace Variables/Variable Registry. Parameters are better in every way.
Toolkit module to generate and build Swagger API library for NiFi
PostSlack processor
PublishKafka Partition Support
GeoEnrichIPRecord Processor
Remote Input Port in a Process Group
Command Line Diagnostics
RocksDB FlowFile Repository
PutBigQueryStreaming Processor
nifi.analytics.predict.enabled - Turn on Back Pressure Prediction
More Lookup Services for ETL/ELT: DatabaseRecordLookupService
KuduLookupService
HBase_2_ListLookupService
Stateless
First we will run in the command line straight from the NiFi Registry. This is easiest. Then we will run from YARN! Yes you can now run your Apache NiFi flows on your giant Cloudera CDH/HDP/CDP YARN clusters! Let's make use of your hundreds of Hadoop nodes.
Stateless Examples
Let's Build A Stateless Flow
The first thing to keep in mind, is we will want anything that might change to be a parameter that we can pass with our JSON file. It's very easy to set parameters even for drop downs! You even get prompted to pick a parameter from a selection list. Before parameters are available you will need to add them to a parameter list and assign that parameter context to your Process Group.
A Parameter in a Processor Configuration is shown as #{broker}
Parameter Context Connected to a Process Group, Controller Service, ...
Apply those parameters
Param(eter) is now an option for properties
Pop-up Hint for Using Parameters
Edit a Parameter in a Parameter Context
We can configure parameters in Controller Services as well.
So easy to choose an existing one.
Use them for anything that can change or is a something you don't want to hardcode.
Apache Kafka Consumer to Sink
This is a simple two step Apache NiFi flow the reads from Kafka and sends to a sink, for example a File.
Let's make sure we use that Parameter Context
To Build Your JSON Configuration File you will need the bucket ID and flow ID from your Apache NiFi Registry. You will also need the URL for that registry. You can browse that registry at a URL similar to http://tspann-xx-xx:18080.
My Command Line Runner
/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Continuous --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafkaconsumer.json
RunFromRegistry [Once|Continuous] --file <File Name>
This is the basic use case of running from the command-line using a file. The flow must exist in the reference Apache NiFi Registry.
JSON Configuration File (kafkaconsumer.json)
{
"registryUrl": "http://tspann-xx-xx:18080",
"bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
"flowId": "0540e1fd-c7ca-46fb-9296-e37632021945",
"ssl": {
"keystoreFile": "",
"keystorePass": "",
"keyPass": "",
"keystoreType": "",
"truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
"truststorePass": "changeit",
"truststoreType": "JKS"
},
"parameters": {
"broker" : "x.x.x.x:9092",
"topic" : "iot",
"group_id" : "nifi-stateless-kafka-consumer",
"DestinationDirectory" : "/tmp/nifistateless/output2/",
"output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output"
}
}
Example Run
12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-5 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-2 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-3 at offset 19 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-0 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-1 at offset 20 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Built incremental fetch (sessionId=1943199939, epoch=5) for node 8. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 10 partition(s)
12:25:38.729 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(iot-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) to broker ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.737 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles
Example Output
cat output/247361879273711.statelessFlowFile
{"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-xx-xx","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"}
We can also run Once in this example to send one Kafka message.
Generator to Apache Kafka Producer
My Command Line Runner
/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Once --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafka.json
JSON Configuration File (kafka.json)
{
"registryUrl": "http://tspann-xx-xx:18080",
"bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
"flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67",
"flowVersion": "1",
"ssl": {
"keystoreFile": "",
"keystorePass": "",
"keyPass": "",
"keystoreType": "",
"truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
"truststorePass": "changeit",
"truststoreType": "JKS"
},
"parameters": {
"broker" : "x.x.x.x.x:9092"
}
}
Example Output
12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG
Flow Succeeded
Other Runtime Options:
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>
RunOpenwhiskActionServer <Port>
References:
Awesome Article on NiFi 1.10 Error Handling: https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10-series-simplifying-error-handling-7de86f130acd
https://www.datainmotion.dev/2019/08/find-cacerts-from-java-jre-lib-security.html
https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html
Parameters Added to API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
http://bit.ly/cdf-platform
https://www.mtnfog.com/blog/apache-nifi-phi-processing
https://www.slideshare.net/BryanBende/apache-nifi-sdlc-improvements
https://nifi.apache.org/registry
Add A S2S Port Inside Process Group
ParquetReader
ParquetRecordSetWriter
© 2019 Timothy Spann
... View more
Labels: