1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2468 | 04-03-2024 06:39 AM | |
| 3814 | 01-12-2024 08:19 AM | |
| 2061 | 12-07-2023 01:49 PM | |
| 3044 | 08-02-2023 07:30 AM | |
| 4181 | 03-29-2023 01:22 PM |
05-12-2020
11:47 AM
You should not use Flume. Flume and it's connectors are deprecated. This and any flow can easily move to NiFi. https://dev.to/tspannhw/migrating-apache-flume-flows-to-apache-nifi-jms-to-x-and-x-to-jms-1g02
... View more
05-08-2020
12:03 PM
Awesome. Good luck with NiFi.
... View more
04-27-2020
05:22 PM
1 Kudo
In the use case solved for this webinar, I am a Streaming Engineer at an airline, CloudAir. I need to find, filter and clean Twitter streams then perform sentiment analysis.
Score Models in the Stream to Act
As the Streaming Engineer at CloudAIR I am responsible for ingesting data from thousands of sources, operationalizing machine learning models as part of our streams, running real-time ELT/ETL processes and building event processing systems running from devices, servers and edge nodes. For today’s use case, one of our Machine Learning engineers had given me a model that was deployed into one of our production Cloudera Machine Learning (CML) environments. I logged into Cloudera, found the model, tested it, and then extracted the information I need to add this model to our streaming ingest flow for the social media team.
I have been given permissions to access the airline-sentiment workshop in CDP Public Cloud.
I can see all the models deployed in the project I have access to. I see that predict-sentiment is the one I am to use. It is deployed and has 8GB of RAM and 2 vCPU.
I can see that it has been running successfully for a while and I can test it right from the project.
You can see the URL after the POST and the accessKey is in JSON.
Ingesting & Pre-processing Data from Twitter
Using Cloudera Flow Management (CFM) I am ingesting real-time Twitter streams which I filter for only airline specific data. I then clean and transform these records in a few simple steps. The next pieces I will need are those two critical values from CML: the Access Key and URL for the model. I will add them to an instance of an ExecuteClouderaML processor.
I am also sending the raw tweet (large JSON files) to a Kafka topic for further processing by other teams.
I also need to store this data to tables for ad-hoc queries. So I quickly spin up a virtual warehouse with Impala for reporting uses. I will put my data into S3 buckets as Parquet files, with an external Impala table on top, for these reports.
Defining the Impala Table for Supporting Queries
Once my environment is ready, which will only take a few minutes, I will launch Hue to create a table.
From the virtual warehouse I can grab the JDBC URL that I will need to add to my Impala Connection pool in CFM for connecting to the warehouse. I will also need the JDBC driver.
From CFM I add a JDBC Controller and copy in the URL, the Impala driver name and a link to that JDBC jar. I will also set my user and password, or Kerberos credentials, to connect.
Using the Scores
After having called CML from CFM, I can see the scoring results and now use them to augment my twitter data. The data is added to the attributes for each event and does not affect the current flowfile data.
Now that data is streaming into Impala I can run ad-hoc queries and build charts on my sentiment-enriched, cleaned-up twitter data.
For those of you that love the command line, we can grab a link to the Impala command line tool for the virtual warehouse as well, and query from there. Good for quick checks.
Storing the Twitter Data in a Kudu Table
In another section of our flow we are also storing our enriched tweets in a CDP Data Center (CDP-DC) Kudu table for additional analytics that we are running in Hue and in a Jupyter notebook
that we spin up with our CDP-DC CML.
Jupyter notebooks spun up from Cloudera Machine Learning let me explore my data and do some charting, graphs and SQL work in Python3.
Assuring Data Governance and Lineage
One of the amazing features that comes in handy when you have a complex flow that spans a hybrid environment is to have data management and governance abilities. We can do that with Apache Atlas.
We can navigate and search through Atlas to see how data travels through Apache NiFi, Apache Kafka, tables and Cloudera Machine Learning model activities like deployment.
Final DataFlow For Scoring
We have a Query Record processor in CFM that analyzes the streaming events and looks for Negative sentiment by influencers, we then push those events to a Slack channel for our social media team to handle.
As we have seen, we are sending several different streams of data to Kafka topics for further processing with Spark Streaming, Flink, NiFi, Java and Kafka Streams applications. Using Cloudera Streams Messaging Manager we can see all the components of our Kafka cluster and where our events are as they travel through topics in various brokers. You can see messages in all of the partitions, you can also build up alerts for any part of your Kafka system. An important piece is you can trace messages from all of the consumers to all of the producers and see any lag or latency that occurs in clients.
We can also push to our Operational Database (HBase) and easily scan through the rapidly inserted rows.
This demo was presented in the webinar, Harnessing the Data Lifecycle for Customer Experience Optimization.
Source Code Resources
Queries, Python, Models, Notebooks
Example Cloudera Machine Learning Connector
SQL
... View more
04-03-2020
03:08 PM
Thanks for this writeup. I was having a heck of a time authenticating, was expecting the authentication keys to be separate fields in the processor configuration. I didn't realize you could add all that right in the connection string, although it leaves your passwords in plain text. Seems like an opportunity for improvement.
... View more
03-30-2020
11:45 AM
Can I use pyhive to connect to Hive using Hive JDBC string instead of a single hostname? The following doesn't work for me. from pyhive import hive hive_conn = hive.Connection(host=<JDBC STRING>, configuration {'serviceDiscoveryMode':'zooKeeper','zooKeeperNamespace':'hiveserver2'})
... View more
03-13-2020
12:14 AM
Can you please help me with this: Once I successfully upload the template and when using it, I got the following error: Error org.apache.nifi.processors.kite.InferAvroSchema is not known to this NiFi instance. My version: 1.11.3 02/21/2020 21:06:05 EST Tagged nifi-1.11.3-RC1
... View more
03-03-2020
09:19 AM
https://community.cloudera.com/t5/Community-Articles/ETL-With-Lookups-with-Apache-HBase-and-Apache-NiFi/ta-p/248243 That is an HBase example, just use a different one. We also teach how to do the lookup / update record processors in our CDF Workshop roadshow
... View more
01-30-2020
02:54 PM
Thanks you. This Worked!
... View more
01-17-2020
08:50 AM
1 Kudo
Exploring Apache NiFi 1.10: Stateless Engine and Parameters Apache NiFi is now available in 1.10! https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993 You can now use JDK 8 or JDK 11! I am running in JDK 11, seems a bit faster. A huge feature is the addition of Parameters! And you can use these to pass parameters to Apache NiFi Stateless! A few lesser Processors have been moved from the main download, see here for migration hints: https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance Release Notes: https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0 Example Source Code: https://github.com/tspannhw/stateless-examples More New Features: ParquetReader/Writer (See: https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html) Prometheus Reporting Task. Expect more Prometheus stuff coming. Experimental Encrypted content repository. People asked me for this one before. Parameters!! Time to replace Variables/Variable Registry. Parameters are better in every way. Toolkit module to generate and build Swagger API library for NiFi PostSlack processor PublishKafka Partition Support GeoEnrichIPRecord Processor Remote Input Port in a Process Group Command Line Diagnostics RocksDB FlowFile Repository PutBigQueryStreaming Processor nifi.analytics.predict.enabled - Turn on Back Pressure Prediction More Lookup Services for ETL/ELT: DatabaseRecordLookupService KuduLookupService HBase_2_ListLookupService Stateless First we will run in the command line straight from the NiFi Registry. This is easiest. Then we will run from YARN! Yes you can now run your Apache NiFi flows on your giant Cloudera CDH/HDP/CDP YARN clusters! Let's make use of your hundreds of Hadoop nodes. Stateless Examples Let's Build A Stateless Flow The first thing to keep in mind, is we will want anything that might change to be a parameter that we can pass with our JSON file. It's very easy to set parameters even for drop downs! You even get prompted to pick a parameter from a selection list. Before parameters are available you will need to add them to a parameter list and assign that parameter context to your Process Group. A Parameter in a Processor Configuration is shown as #{broker} Parameter Context Connected to a Process Group, Controller Service, ... Apply those parameters Param(eter) is now an option for properties Pop-up Hint for Using Parameters Edit a Parameter in a Parameter Context We can configure parameters in Controller Services as well. So easy to choose an existing one. Use them for anything that can change or is a something you don't want to hardcode. Apache Kafka Consumer to Sink This is a simple two step Apache NiFi flow the reads from Kafka and sends to a sink, for example a File. Let's make sure we use that Parameter Context To Build Your JSON Configuration File you will need the bucket ID and flow ID from your Apache NiFi Registry. You will also need the URL for that registry. You can browse that registry at a URL similiar to http://tspann-mbp15-hw14277:18080. My Command Line Runner /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Continuous --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafkaconsumer.json RunFromRegistry [Once|Continuous] --file <File Name> This is the basic use case of running from the command-line using a file. The flow must exist in the reference Apache NiFi Registry. JSON Configuration File (kafkaconsumer.json) { "registryUrl": "http://tspann-mbp15-hw14277:18080", "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993", "flowId": "0540e1fd-c7ca-46fb-9296-e37632021945", "ssl": { "keystoreFile": "", "keystorePass": "", "keyPass": "", "keystoreType": "", "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts", "truststorePass": "changeit", "truststoreType": "JKS" }, "parameters": { "broker" : "4.317.852.100:9092", "topic" : "iot", "group_id" : "nifi-stateless-kafka-consumer", "DestinationDirectory" : "/tmp/nifistateless/output2/", "output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output" } } Example Run 12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles 12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-5 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-2 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-3 at offset 19 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-0 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-1 at offset 20 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Built incremental fetch (sessionId=1943199939, epoch=5) for node 8. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 10 partition(s) 12:25:38.729 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(iot-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) to broker ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null) 12:25:38.737 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles Example Output cat output/247361879273711.statelessFlowFile {"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-MBP15-HW14277","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"} We can also run Once in this example to send one Kafka message. Generator to Apache Kafka Producer My Command Line Runner /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Once --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafka.json JSON Configuration File (kafka.json) { "registryUrl": "http://tspann-mbp15-hw14277:18080", "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993", "flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67", "flowVersion": "1", "ssl": { "keystoreFile": "", "keystorePass": "", "keyPass": "", "keystoreType": "", "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts", "truststorePass": "changeit", "truststoreType": "JKS" }, "parameters": { "broker" : "3.218.152.236:9092" } } Example Output 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions. 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8. 12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0) 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries 12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-errors 12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: [] 12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: [] Flow Succeeded Other Runtime Options: RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name> RunOpenwhiskActionServer <Port> References: Awesome Article on NiFi 1.10 Error Handling: https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10-series-simplifying-error-handling-7de86f130acd https://www.datainmotion.dev/2019/08/find-cacerts-from-java-jre-lib-security.html https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless https://nifi.apache.org/docs/nifi-docs/html/user-guide.html Parameters Added to API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html http://bit.ly/cdf-platform https://www.mtnfog.com/blog/apache-nifi-phi-processing https://www.slideshare.net/BryanBende/apache-nifi-sdlc-improvements https://nifi.apache.org/registry Add A S2S Port Inside Process Group ParquetReader ParquetRecordSetWriter
... View more