Member since
07-30-2019
8
Posts
15
Kudos Received
0
Solutions
09-10-2019
09:00 AM
When developing NiFi flows with python, it's sometimes a headache to build customized configurations for your processors. Every processor is different and has many different parameters to customize depending on your flows needs and requirements. To accelerate development, I use the development tools in my chrome browser to help accelerate development by extracting the config files from the rest calls the NiFi application makes. Open your browser and point it to your NiFi instances. Once in the UI right click on the gray bar and select "inspect". This will open up a development view of the UI. Next, click on the network tab to view and debug network activity. In this view, all network related traffic will be displayed here. Notice you can see all the rest calls that the NiFi is making behind the scene. This can be quite helpful when customizing processors. When you drag a processor on the canvas and parameterize it, a rest call is made that creates the processor. Along with that processor creation rest call is a configuration payload that we can capture and use with our NiFi python development. Example: Create a simple generate flow file processor. When you drag it into the canvas you'll see an entry be posted to the network. It will be the rest call made to create the processor. If you scroll down to the request payload, you'll get a view of the configuration that was submitted with the processor. Next, update the processor by changing the flow file custom text. You'll immediately see an new event posted on the network. This event will give you the configuration for the processor: Here is the JSON representation: Using this tool you can easily extract the configuration properties for any processor and use them when designing programatic flows using python.
... View more
Labels:
09-10-2019
08:06 AM
1 Kudo
Building Basic Flows with Nipyapi If building flows in NiFi wasn't easy enough, developers can use the Nipyapi library to build flows with python. With this model, one can programmatically build flows without having access to the canvas. This comes in handy when deploying highly configurable and repeatable flows. In this simple example below, we'll walk through using the python libraries to build a very simple flow whereby we use the GenerateFlowFile processor to write data to the local file system. Very contrived but one can get the picture of the flexibility afforded by the nipyapi libraries. For more information on the python libraries discussed here, please go to: https://nipyapi.readthedocs.io/en/latest/index.html First things first, you'll need to import the required library and classes for the example. from nipyapi import canvas,config from random import randrange Next set the configuration details of your NiFi environment. Here, I'm setting the api connection details for both NiFi and NiFi Registry config.nifi_config.host = 'http://localhost:8080/nifi-api' config.registry_config.host = 'http://localhost:18080/nifi-registry-api' With NiFi the canvas is where you visually place the various processors you use in your flows. By using python, you can't visually place anything. Therefore we must give the coordinates of where we want the processors to be placed. Here, I'm using a simple random function to derive a coordinate point between 0 and 4000. This will randomly place the processors on the canvas. location_x = randrange(0,4000) location_y = randrange(0,4000) location = (location_x, location_y) NiFi has a ton of configuration properties for each individual processor. Some are required while others are optional. In order to tackle all of the required properties for this example, we must pass in a configuration that tells NiFi how to set up the processor. See the Configuration for more details. See my article on using the chrome browser to help generate these processor configuration files. Click here for information on how to do this. ##send in configuration details for customizing the processors processor_GenFlowFile_config = {"properties":{"generate-ff-custom-text":"hello world"},"schedulingPeriod":"10 sec","schedulingStrategy":"TIMER_DRIVEN"} processor_PutFile_config = {"properties":{"Directory":"${dir}","Conflict Resolution Strategy":"replace","Maximum File Count":100},"autoTerminatedRelationships":["failure","success"]} In order to place a new processor group on the canvas, we need to find the root procesor group which happens to be the canvas you see when you log into NiFi. This is quite simple: ##get the root processgroup ID root_id = canvas.get_root_pg_id() Next, we'll need to pull all the metadata about this processor group in order to create process groups on the canvas. ##get the root process group root_process_group = canvas.get_process_group(root_id, 'id') With all this information, we can now create a new processor group that our test flow will live in. ##create the processor group new_processor_group = canvas.create_process_group(root_process_group, 'test_process_group', location, 'this is a test') Next, we'll need to get the processors we'd like to implement. This code, searches the list of processor types and retrieves the processor we selected. The search criteria is the java processor class name. ##get the processors processor_GenFlowFile = canvas.get_processor_type('org.apache.nifi.processors.standard.GenerateFlowFile', identifier_type='name') processor_PutFile = canvas.get_processor_type('org.apache.nifi.processors.standard.PutFile', identifier_type='name') In this step, we are placing the processors in our newly created processor group. In this code snippet, for each processor, i'm passing in the the processor group object, the processor object, the location I want the processor to live as well as the configuration files for each. ##put processors on canvas in specificed process group GenFlowFile = canvas.create_processor(new_processor_group, processor_GenFlowFile,(randrange(0,4000),randrange(0,4000)),name=None,config=processor_GenFlowFile_config) PutFile = canvas.create_processor(new_processor_group, processor_PutFile,(randrange(0,4000),randrange(0,4000)),name=None,config=processor_PutFile_config) Once the processor have been created, linkages between the processor can be established. Here, the processors that were created in the previous step are used to establish the relationship between them. ##canvas create connection between processors canvas.create_connection(GenFlowFile, PutFile ,relationships=None, name="linkage") The second to last step involves setting any variables for the processor group. This method will allow you to customize any configuration files you used in the prior steps. If you look at the step where the configuration files were defined, you'll see 2 keys where the values are NiFi variables, ${file_count} and ${dir}. All that needs to be done is set the variables in the registry to parameterize your flow. ##get variable registry var_reg = canvas.get_variable_registry(new_processor_group, True) ##set new variables from incoming file canvas.update_variable_registry(new_processor_group,([('dir', '/tmp/test')])) Finally, start the processor flow. ##start process group canvas.schedule_process_group(new_processor_group.id, True) To see the newly running flow, go to the canvas. The processor group should be running. Mine has a error and thats due to the file limits I built into my configuration files (no more than 100 files in a directory) Going into the processor group you should see the two processors we defined: Given the random nature of putting the processors on the canvas, they may be really spread out so you may need to adjust them on the canvas to see them clearly. There you have it! A simple flow built by the nipyapi! I hope this demonstrates the powerful capabilities of the nipyapi and how it can be used to create customizable flows...it certainly beats having to write REST calls!
... View more
Labels:
08-28-2019
07:43 AM
Nice piece, Can you snapshot to a location outside the repository?
... View more
08-19-2019
01:04 PM
1 Kudo
HDFS Snapshots HDFS Snapshots are a great way to backup important data on HDFS. It's extremely easy to implement and it helps safeguard your data from instances where a user or admin accidentally deletes data. In the article below, we'll walkthrough some simple examples of using snapshots and some of the gotchas to look out for when implementing them. Part 1: Understanding Snapshots First lets create some files and directories for testing: echo "Hello World" > file1.txt
echo "How are you" > file2.txt
echo "hdfs snapshots are great" > file3.txt
hdfs dfs -mkdir /tmp/snapshot_dir
hdfs dfs -mkdir /tmp/snapshot_dir/dir1 Next lets put file1.txt in the directory: hdfs dfs -put file1.txt /tmp/snapshot_dir/dir1 Creating a snapshot is a really simple process. You will need a user with superuser permissions to create a snapshottable directory. With this process, we enable the directory to have snapshots but we're not explicitly creating snapshots with this action. hdfs dfsadmin -allowSnapshot /tmp/snapshot_dir/dir1 Next, let's take a look at the size of the directory the files we loaded: hdfs dfs -du -h /tmp/snapshot_dir The output contains 2 numbers, the first is the size of the file and the other is the size of the file + replication. We have 3x replication by default with HDFS and that second number is a multiple of 3. Next, let's create the snapshot. To do this we'll need to identify the directory we want to snapshot as well as a name for a snapshot. I'd recommend a date format like example below to easily keep track of when the snapshot was taken. Note that the directory that the snapshot is taken in will take a snapshot of all the files directories under that directory. Also you can't created nested snapshots so be judicious in your selection. hdfs dfs -createSnapshot /tmp/snapshot_dir/dir1 20190819v1 Once that is completed, let's check the size of the directory again. As you can see, the directory size didn't increase. Thats because the snapshots are point in time logical backups of your data. A pointer is created in the namenode that links the snapshots to the files on disk. If a deletion happens, the name node drops it's is logical reference to that data but the data physically remains. The snapshot acts as a secondary reference that can be used to recover the files logically and restore the namename node reference. To test this let's delete a file: hdfs dfs -rm /tmp/snapshot_dir/dir1/file1.txt First make sure the file is removed: hdfs dfs -ls /tmp/snapshot_dir/dir1/ Now check the directory size: hdfs dfs -du -h /tmp/snapshot_dir Notice that while the file size is 0 because it doesn't logically exist, the second number (replication size) is still populated. Thats because while the file does't logically exist, it is still physically present. This is important to remember because if you delete files with snapshots they aren't physically deleted unless the snapshot holding them is deleted. This can result in lots of "phantom" data in the system taking up valuable HDFS real state. Now let's restore a file. In this process, you navigate to the hidden snapshot folder that holds the individual snapshots. This will be located where you took the first snapshot: hdfs dfs -ls /tmp/snapshot_dir/dir1/.snapshot In here you should see the snapshot we took previously. Now, we can copy the file from the content of the snapshot and restore the file. Not the flags for -ptopax restores the file with the same timestamp, ownership, permissions, ACL's and XAttrs as the original. hdfs dfs -cp -ptopax /tmp/snapshot_dir/dir1/.snapshot/20190819v1/file1.txt /tmp/snapshot_dir/dir1 Now that the data has been copied, lets take a look at the directory sizes: hdfs dfs -du -h /tmp/snapshot_dir
hdfs dfs -ls /tmp/snapshot_dir/dir1/ The file sizes look good but the system sizes are off the charts. We expect 36 bytes but it comes in at 72. Thats because while we copied the file from the snapshot, the original snapshot copy of the file still remains on disk...so now we have 2 copies of the same file, one referenced directly by the name node and the other by the snapshot. To remove this phantom data, we must delete the snapshot: hdfs dfs -deleteSnapshot /tmp/snapshot_dir/dir1 20190819v1 With the snapshot deleted take a look at the size of the directory: hdfs dfs -du -h /tmp/snapshot_dir Phantom Data No More! Everything is back in order and has the expected number of bytes. Look for Part II as we examine what to look out for when managing multiple snapshots.
... View more
Labels:
05-08-2019
04:34 AM
@jtaras, thank you for this wonderful article! I have a follow-up question (just curious) : do Spark workers use the same custom Python path as Zeppelin driver? If yes, how do they know it? (the Zeppelin UI setting seems to apply to the Zeppelin driver application only and its SparkContext) If no, why is there no version conflict between the driver's python and the workers' python? (I tested with the default of 2.X and the custom Anaconda python of 3.7) NOTE: I had to additionally set the PYSPARK_PYTHON environment variable in Spark > Config to the same path in Ambari, in order to be able to use all the Python libraries in the scripts submitted via "spark-submit", but this does not affect the Zeppelin functionality in any way.
... View more
11-02-2018
03:55 AM
6 Kudos
Druid Kafka Integration Service + Hive
This tutorial will focus on was of ingesting raw streaming data into druid and exposing the data in hive for high performance analytics.
For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled and Kafka installed and running
This tutorial will be divided into 6 sections:
Ambari Configuration Kafka Setup Setup Data Stream Generating and Running the Supervisor Spec Hive Integration Create Streaming Druid Data Sources through Hive Kafka Integration Service
With Kafka Ingestion Service, Druid can guarantee that all records received will the processed and stored. That’s in contrast to the older push method of Tranquility. If an event was received after the window period with tranquility, the record was dropped. This meant that a secondary batch process needed to be run to true up the data. With Kafka Ingestion Service, if an event is received late, a segment will be created for it regardless of when it was received. This is hugely powerful as it removes the need to run a secondary batch process. Section I: Ambari Configuration
The Kafka Ingestion Service is still in Technical Preview as of today, so It doesn’t come bundled as part of the platform. As such, some minor configuration changes will need to be applied.
In Ambari, navigate to the druid service and click on the configs tab. Now, using the filter, search for “druid.extensions.loadList”.
For this parameter, enter “druid-kafka-indexing-service” to the list. This parameter essentially tells Druid to load these extensions on startup on the cluster.
Click “Save” and restart any necessary services. Section II: Setup Kafka
Next, create a Kafka topic for the tutorial. On one of the available Kafka brokers, navigate to the directory containing the client utilities. Use the utility kafka-topics.sh to create a topic on your Kafka cluster.
Section III: Setup Data Stream
The data generator can be downloaded from the following link. The data generator is already configured to generate the data set for the tutorial. Only update the topic name and add the kafka broker address. Once that is done, run the python script to begin generating events.
Data Generator
If everything is configured properly, the kafka-console-producer should output events written to the topic.
Section IV: Generating and Running the Supervisor Spec
Since the middle managers are conducting the actual indexing of incoming Kafka events, a configuration file needs to be delivered to the overlord node. The overlord node will take the configuration file and create tasks that will be executed on the middle manager nodes.
To submit the Kafka ingestion spec, the json configuration file must be submitted to the overlord via rest api. If successful, a json response with the ID of the KIS application.
Once the ingestion spec is submitted, navigate to the druid overlord node. A new record is now displayed under Running Supervisors with the name of the data source that was defined in the ingestion spec. There is also a record under Running Tasks.
The coordinator console however tells a different story. There isn’t an entry for the data source that’s seen in the overlord. (sales_data_30min is missing)
This is because the coordinator console only shows data managed by historical nodes. Since the kafka ingestion service was just stared, and the taskDuration is set for 30min, there won’t be an entry in here for at least that amount of time. The taskDuration controls how long a task spends reading a kafka topic before publishing the indexed data to the historical node. The data segments are available, they just aren’t managed by the historical nodes yet. They are under the domain of the middle managers until that taskDuration expires. The data however, is still accessible even though it isn’t on historical nodes. In fact, the data segments can be seen by logging into the middle managers themselves. Navigate to /apps/druid/tasks.
This will show the current task that is running. Going into that directory navigate to work/persist and the index file can be viewed. Whenever a new query is submitted, instead of going to the historical nodes, the query is routed to the middle managers and this directory to get the latest streamed data.
After 30 minutes have elapsed, log back into the coordinator console to view the newly added segments on the historical nodes.
Section V: Hive Integration
Now that that is being streamed into Kafka and that data is being collected and indexed in Druid, the data can now be accessed in Hive. Log onto hive and create a table for the newly created druid data source.
CREATE EXTERNAL TABLE sales_data
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "sales_data_30min");
Next run a series of quick count(*) on the table. Watch how with each query, the result set increases! All the new data streaming into druid is immediately accessible! Behind the scenes the query is being directed to two locations for the count, the historical nodes and the middle managers. The historical nodes hold the committed index segments while the historical nodes hold the most recent data within the given taskDuration .
Even simple aggregations are applied across historical and real-time data sets. The below query came back in .014 seconds!
Section VI: Create Streaming Druid Data Sources through Hive
Like in the previous tutorial, hive has many core integrations with druid. One such integration is through external tables. Through external tables, Hive can mimic the druid supervisor spec that was created in step 5 all without having to code any complex JSON documents or interact with a REST API.
The DDL for the Kafka Ingestion Service follows the same External Table model as the standard hive on Druid table, with some small differences.
While the DDL has been executed, there is still an additional step to deploy the kafka ingestion service. The Kafka ingestion service once initially deployed isn’t running. An alter table command must be executed to “turn on” the service.
-to start the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
-to stop the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = STOP);
Once the alter table command to start the Kafka ingestion service, navigate to the overlord console to see the newly running supervisor and the running middle manager task.
Next, query the table to see the events being added to the real time indexes.
select count(*) from sales_data_kafka;
Run some simple aggregates and watch the data increase with each query.
select sum(discount_amt),sum(qty),rep_id from sales_data_kafka group by rep_id;
One limitation of Druid is that it doesn’t do joins. There is some very limited support for runtime lookups but joining multiple data sources together is not possible today. This is why the integration with Hive is so powerful. With Hive, any dataset accessible to hive is available to be joined with data persisted in Druid. Hive’s intelligent cost based optimizer will push down as much processing down to druid to maximize performance and accelerate query processing. A small example can be viewed here. Below is a lookup table for the sales rep_id that’s found in the streaming dataset that was created in the previous step.
create table sales_reps (rep_id int, rep_name string, quota int);
insert into sales_reps (rep_id, rep_name, quota ) values (0 , "Matt Ryan", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (1 , "Matt Bryant", 150000);
insert into sales_reps (rep_id, rep_name, quota ) values (2 , "Julio Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (3 , "Calvin Ridley", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (4 , "Mohammed Sanu", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (5 , "Deion Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (6 , "Tevin Coleman", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (7 , "Keanu Neal", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (8 , "Alex Mac", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (9 , "Ryan Schraeder", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (10 , "Robert Alford", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (11 , "Desmond Trufant", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (12 , "Ricardo Allen", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (13 , "De'Vondre Cambell", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (14 , "Grady Jarett", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (15 , "Takkarist Mickinley", 100000);
The following query will join the druid data source with a local hive table to determine the sales reps name and their quota attainment.
select
sales.qty_sold,
sales.sale_amt,
sr.rep_name,
(sales.sale_amt/quota) as quota_attainment
from (
select sum(qty) as qty_sold,sum(trxn_amt) as sale_amt,cast(rep_id as int)
as rep_id
from sales_data_kafka
group by rep_id) sales
join sales_reps sr on
sr.rep_id=sales.rep_id;
The output of the query ran in 1.4 seconds. Further runs of the same query brought the runtime below 1 second. The caching of LLAP keeps local data in hot cache which helps accelerated data acquisition.
Conclusion
The integration between Kafka and druid has greatly simplified real time streaming architectures. Instead of having multiple tools to build a complex real time architecture, the combination of Druid + Hive + Kafka can simplify development and streamline integration.
... View more
Labels: