Member since
01-14-2019
144
Posts
48
Kudos Received
17
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
576 | 10-05-2018 01:28 PM | |
559 | 07-23-2018 12:16 PM | |
807 | 07-23-2018 12:13 PM | |
4850 | 06-25-2018 03:01 PM | |
2209 | 06-20-2018 12:15 PM |
01-22-2019
07:40 PM
2 Kudos
The Data Load Process We looked at the performance that these engines have in the last article, now it’s time to look at how the data got loaded in. There are trade-offs here to be aware of when loading data into each of these engines, as they use different mechanisms to accomplish the task. Hive Load There is no immediate need for a Schema-on-Write data load when you are using Hive with your native file format. Your only “load” operation is copying the files from your local file system to HDFS. With schema-on-read functionality, Hive can instantly access data as soon as its’ underlying file is loaded into HDFS. In our case the real data load step was in converting this Schema-on-Read external Hive table data into optimized ORC format, therefore loading it from an external table to a Hive-managed table. This was a relatively short process, coming in much under an hour. HBase Load Contrast that with HBase, where a bulk data load for our sample data set of 200M rows (around 30GB of disk size in CSV format) took 4+ hours using a single-threaded Java application running in the cluster. In this case, HBase went through a process of taking several columns of the CSV data and concatenating them together to come up with a composite key. This, along with the fact that the inserts were causing hot-spotting within the Region Servers, slowed things down. One way to improve this performance would be to pre-split the regions so your inserts aren’t all going to one region to start with. We could have parallelized the data load as well to improve the performance, writing a MapReduce job to distribute the work. Druid Load Let’s also contrast that with the Druid load, which took about 2 hours. Druid bulk loads data using a MapReduce job; this is a fairly efficient way of doing things since it distributes the work across the cluster and is why we’re seeing a lower time relative to HBase. Druid still has to do the work of adding its own indexes on top of the data and optionally pre-aggregate the data to a certain user-defined level, so it doesn’t have a trivial path to getting the data in either. Although we didn’t choose to pre-aggregate this data, this is what allows Druid to save a lot of space; instead of storing the raw data, Druid can roll the data up to a minute-level granularity if you think your users will not query deeper than that. But remember - Once you aggregate the data, you no longer have the raw data. Space Considerations Another interesting way to slice this data is by how much space it takes up in each of the 3 columnar formats. Engine Size on Disk with Replication Hive - ORC w/ Zlib 28.4GB HBase - Snappy compression 89.5GB Druid 31.5GB
Hive and Druid have compressed the data very efficiently considering the initial data size was 90GB with replication, but HBase is sitting right around the raw data size. At this point, we've covered both relative loading times for the three engines as well as data storage space requirements across the three. These may change as you use different compression formats or load different kinds of data into the engines, but this is intended as a general reference to understand relative strengths between the three.
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- data-ingestion
- data-processing
- druid
- HBase
- Hive
- How-ToTutorial
Labels:
01-03-2019
06:55 PM
It looks like you're thinking about the problem in a batch process, rather than a real-time process. In a real-time, streaming flow, there will be no start or end to a unit of work. Things will always be on, and these processors we are discussing are designed to create user-defined chunks of work to segment the real-time flow of data in some way. However what you're thinking about is a very specific unit of work, defined by the set of files you are receiving. You have to understand to some degree the parameters of the work that's being done on those files to window it properly. What is the upper bound of records that you can expect in the largest payload you may get? How long will the largest payload take to process? These answers can give you an idea of how to set those parameters. Additionally in a real-time workflow, you should not have to combine everything back into one file as that is a serial process and will be a bottleneck. I would suggest taking a look at your downstream processes and consider parallelizing them.
... View more
01-03-2019
04:55 PM
5 Kudos
This article series is an expansion into the technical details behind the Big Data Processing Engines blog post: https://hortonworks.com/blog/big-data-processing-engines-which-one-do-i-use-part-1/
Intro to performance analysis
Here, we will be deep diving into the performance side of the three Big Data Processing Engines discussed in the above blog post: Druid, HBase, and Hive. I ran a number of query types to represent the various workloads generally executed on these processing engines, and measured their performance in a consistent manner. I ran a few tests on each of the three engines to showcase their different strengths, and show where some are less effective (but could still fill the gap in a pinch).
We will be executing the following queries:
Simple count of all records in a table, highlighting aggregation capabilities
A select with a where clause, highlighting drill-down and "needle in the haystack" OLTP queries
A join, showcasing ad-hoc analysis across the dataset
Updates, showcasing scenarios in which data is constantly changing and our dataset needs to stay up to date Performance Analysis
An aggregation much like an analyst would do, such as summing data on a column
Performance Analysis
A few notes about setup:
Data size: 200 million rows, 30GB on disk (90GB after replication)
Cluster size: 8 nodes, broken down into 2 masters and 6 workers
Node size: 8 core, 16 GB RAM machines in a virtualized environment
Method of querying: Hive on Tez+LLAP was used to query Hive-managed and Druid-managed data. Phoenix was used to query HBase-managed data. A reasonable configuration was used for each of the engines
Cache was taken out of the picture in order to get accurate estimates for initial query execution. Query execution and re-execution times will be much faster with cache in place for each of these engines
Note that this is not an ideal setup for Hive+HBase+Druid. Dedicated nodes for each of these services would yield better numbers but we decided to keep it approachable so you can reproduce these results on your own small cluster. As I laid out, the three processing engines performed about how you would expect given their relative strengths and weaknesses. Take a look at the table below.
Query
HBase/Phoenix (seconds)
Hive (seconds)
Druid (seconds)
Count(*)
281.44
4.72
0.71
Select with filter
1.35
8.71
0.34
Select with join and filter
365.41
9.16
N/A
Update with filter
1.52
9.75
N/A
Aggregation with filter
353.07
8.66
1.72
Here is that same information in a graph format, with HBase capped at 15s to keep the scale readable.
As expected, HBase outshined the other two when it came to ACID operations, with an average of 1.5 seconds on the updates. Druid is not capable of them and Hive took a bit longer. HBase however is not great at aggregation queries, as seen in the ~6 minute query times. Druid is extremely efficient at everything it does, giving no results above 2 seconds and mostly under 1 second. Lastly, Hive with its latest updates has become a real-time database and serviced all queries thrown at it in under 10 seconds.
Queries
Here are all of the queries that were run, multiple times each, to arrive at the results above.
--queries
select count(*) from transactions;
select count(*) from transactions_hbase;
select count(*) from transactions_druid;
select trxn_amt,rep_id from transactions_partitioned where trxn_date="2018-10-09" and trxn_hour=0 and trxn_time="2018-10-09 00:33:59.07";
select * from transactions_hbase_simple where row_key>="2018-09-11 12:03:05.860" and row_key<"2018-09-11 12:03:05.861";
select * from transactions_druid where `__time`='2018-09-11 12:03:05.85 UTC';
select distinct(b.name) from transactions_partitioned a join rep b on a.rep_id=b.id where rep_id in (1,2,3) and trxn_amt > 180;
select distinct b."name" from "transactions_hbase" a join "rep_hbase" b on a."rep_id"=b."ROW_KEY" where b."ROW_KEY" in (1,2,3) and a."trxn_amt">180.0;
update transactions_partitioned set qty=10 where trxn_date="2018-10-09" and trxn_hour=0 and trxn_time>="2018-10-09 00:33:59.07" and trxn_time<"2018-10-09 00:33:59.073";
insert into table transactions_hbase_simple values ('2018-09-11 12:03:05.860~xxx-xxx~xxx xxx~1~2017-02-09', null,null,null,10,null,null,null);
select sum(trxn_amt),rep_id from transactions_partitioned group by rep_id;
select sum("trxn_amt"),"rep_id" from "transactions_hbase" group by "rep_id";
select sum(trxn_amt),rep_id from transactions_druid group by rep_id;
... View more
- Find more articles tagged with:
- Data Processing
- data-processing
- druid
- HBase
- Hive
- How-ToTutorial
- performance
Labels:
01-02-2019
06:25 PM
You may be looking for the alternative algorithm for MergeRecord, the bin-packing algorithm. This will put your FlowFiles into bins (you specify how many bins to keep active) and save them off every 1000 (configurable) records. You can specify which attribute to key off of and how long to keep the bins alive before force-writing the remainder of the records to a file. Regarding summary stats, I think your best bet would be to look to Reporting Tasks to fill that role. Here's a good article on them: https://pierrevillard.com/tag/reporting-task/ If your needs are simple enough, you may be able to get away with using the NiFi API to query for record counts. You may be looking for something slightly different, but here's something to get your mind thinking: https://community.hortonworks.com/articles/83610/nifi-rest-api-flowfile-count-monitoring.html And lastly, you can look into NiFi Counters, custom global variables you can increment and decrement. https://pierrevillard.com/2017/02/07/using-counters-in-apache-nifi/
... View more
01-02-2019
04:55 PM
You most likely have an issue with either root privileges on the destination machine or a networking issue blocking you from reaching the machine altogether. Can you try pinging the machine from the Ambari server host? Can you try ssh'ing to it and then escalating to root from the Ambari server host?
... View more
11-13-2018
09:24 PM
You should be able to use the HDF management pack (mpack) to install NiFi on top of your existing HDP cluster. See the following documentation for details: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.1.1/bk_installing-hdf-on-hdp/content/ch_install-mpack.html
... View more
11-13-2018
09:20 PM
@satish pujari Specific HDF-related service recommendations can be found at the following link, this article has been extremely useful for me and hopefully will be for you as well: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_planning-your-deployment/content/ch_hardware-sizing.html Here's a good general read for HDP: https://community.hortonworks.com/articles/16763/cheat-sheet-and-tips-for-a-custom-install-of-horto.html Regarding some of the specific components you've called out: NameNode - HDFS master service, needs to be on a node with enough cores (probably 16 in your case but can get by with 8). RAM requirements are at least 32GB but preferably 64. You can probably stay with 32 given a 7 node cluster, but the Java heap will grow as your NameNode keeps track of a larger number of files distributed across the data nodes. For disk - separate OS disk from data disks and follow the HDP guide above. Data node - multiple disks for parallel I/O and enough cores for parallel block processing are the main requirements Hive/YARN/Spark - Both Hive and Spark have computationally intensive workloads. Higher cores (at least 16) and higher RAM (at least 64 but recommend 128GB) are important here. YARN will be co-located with the data node so you will have lots of disk space on these nodes. HBase - enough RAM to maintain temp space for CRUD operations (memstore) as well as cache results to serve, RAM scales up as number of regions on the node scales up. Similar recommendations to the Hive workloads, you need more cores for more parallel processing and enough disk to store regions. Should not exceed 200 regions at 10GB/region, so don't need more than ~3TB spread across multiple disks. General recommendations: Looking at the number of nodes you've got versus the number of services planned to be used, I'd recommend at least a 12 node cluster (if not 16, which is preferable) to create more compute capacity or reducing the number of workloads to start with. Hope that helps.
... View more
11-13-2018
08:00 PM
It all depends on if there were any architectural changes to the parts of the OS that HDP interacts with. You can try installing it and testing it out to know for sure, but I would imagine it is possible. I have not tried it myself nor would I recommend it until it is certified.
... View more
11-13-2018
07:56 PM
It's hard to tell why from this exception, but you'll need to run the command hdfs dfs -chown kafka:hdfs /user/kafka and possibly do that recursively, if you have files in that directory created with the incorrect permissions, with hdfs dfs -chown -R kafka:hdfs /user/kafka That should fix the immediate issue for you. However, I would recommend you use Beeline rather than the Hive CLI - Hive CLI is deprecated in HDP 2.6 and is fully removed starting with HDP 3. Therefore you won't be able to use it if you upgrade. Performance for Beeline is faster, and it also respects security controls that have been put in place by Ranger whereas Hive CLI does not. To use Beeline, you can go to the Hive service in Ambari to get the JDBC connection string to connect with. Here's some documentation on how to use that tool: https://community.hortonworks.com/articles/155890/working-with-beeline.html
... View more
11-02-2018
10:15 PM
3 Kudos
With the arrival of the Hive3Streaming processors, performance has never been better between NiFi and Hive3. Below, we'll be taking a look at the PutHive3Streaming (separate from the PutHiveStreaming processor) processor and how it can fit into a basic Change Data Capture workflow. We will be performing only inserts on the source Hive table and then carrying over those inserts into a destination Hive table. Updates are also supported through this process with the addition of a 'last_updated_datetime' timestamp, but that is out of scope for this article. This is meant to simulate copying data from one HDP cluster to another - we're using the same cluster as the source and destination here however. Here is the completed flow. Take note that most of this flow is just keeping track of the latest ID that we've seen, so that we can pull that back out of HDFS periodically and query Hive for records that were added beyond that latest ID. These last two processors, SelectHive3QL and PutHive3Streaming, are the ones doing the heavy lifting. They are the ones first getting the data from the source table (based on the pre-determined latest ID) and then inserting that retrieved data into the destination Hive table. Here's the configuration for the SelectHive3QL processor. Note the ${curr_id} variable used in the HiveQL Select Query field. That ensures our query will be dynamic. This is the configuration for the PutHive3Streaming processor. Nothing special here - we've configured the Hive Configuration Resources with the hive-site.xml file and used the Avro format (above) to retrieve data and (below) to write it back out. Here's the state of the table as we first check the destination table: And then insert a record into the source table: Here's the new state of the table: Here is the full NiFi flow: puthive3streaming-cdc-flow.xml In conclusion, we've shown one of the ways you can utilize the new PutHive3Streaming processors to perform quick inserts into Hive and at a broader level perform Change Data Capture.
... View more
- Find more articles tagged with:
- cdc
- Data Ingestion & Streaming
- hive3
- How-ToTutorial
- NiFi
- puthive3streaming
Labels:
11-02-2018
09:10 PM
Take a look at HDFS quotas, that may be what you're after: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HdfsQuotaAdminGuide.html
... View more
11-02-2018
09:06 PM
Unfortunately each table in HBase requires at least one region backing it. To resolve your issue you will have to merge the tables before loading into HBase. Hive is another component that would probably fit your use case bettter. There are no regions in Hive, rather the data is store in the file system, so lots of small tables are possible.
... View more
11-02-2018
08:36 PM
NiFi doesn't do joins in-stream. To accomplish this, I'd recommend processing the first CSV file and storing it in HBase with key=id. You can then process the second csv file, creating a flow to use the id as a query to HBase and get back the reference data from the first CSV. If you'd like to do windowed joins, take a look at Streaming Analytics Manager inside of the HDF platform.
... View more
10-05-2018
01:28 PM
If you are running MapReduce on a single node, it will take more time than a sequential application due to the job creation overhead that MapReduce must undertake. There is extra time taken in the case of MapReduce to submit the job, copy the code and dependencies into a YARN container, and start the job. As you scale out to several nodes and more, you will see the performance benefits of MapReduce. In general however, MapReduce is used less often now on the platform - Hive runs on Tez now rather than MapReduce and I've only seen MapReduce of late being used for things like bulk loading data into HBase/Druid. In-memory processing, the likes of which both Hive/LLAP and Spark provide, can net you a significant performance boost depending on what you're trying to accomplish and the tool best suited for the job.
... View more
08-04-2018
01:37 PM
@Banshidhar Sahoo I'm unclear on what you mean by Reports Manager Working Directory. Could you explain what that is? FSImage is a file that HDFS uses to store metadata about the blocks on the cluster. In general you should not change or move this file as HDFS relies on it to perform its functions. See the HDFS Architecture documentation: https://hadoop.apache.org/docs/r3.1.0/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
... View more
07-23-2018
12:25 PM
@JAy PaTel It looks like you don't have properly formatted JSON. Try running your JSON through an online validator like this one before you submit to Druid: https://jsonformatter.curiousconcept.com/ I've identified the issue to be an extra comma you have in your spec, here's the corrected JSON: [
{
"dataSource":[
{
"spec":{
"dataSchema":{
"granularitySpec":{
"queryGranularity":"none",
"type":"uniform",
"segmentGranularity":"hour"
},
"dataSource":"stockexchange",
"parser":{
"type":"string",
"parseSpec":{
"format":"csv",
"timestampSpec":{
"format":"auto",
"column":"timestamp"
},
"columns":[
"timestamp",
"open",
"high",
"low",
"close",
"volume"
],
"dimensionsSpec":{
"dimensions":[
"open",
"high",
"low",
"close",
"volume"
]
}
}
}
},
"ioConfig":{
"type":"realtime"
},
"tuningConfig":{
"type":"realtime",
"intermediatePersistPeriod":"PT10M",
"windowPeriod":"PT10M",
"maxRowsInMemory":75000
}
},
"properties":{
"task.partitions":"2",
"task.replicants":"2",
"topicPattern":"stockexchange.*",
"topicPattern.priority":"1"
}
}
],
"properties":{
"zookeeper.connect":"ip-xxx-xx-xxxx-xx.ec2.internal:2181",
"zookeeper.timeout":"PT20S",
"druid.selectors.indexing.serviceName":"druid/overlord",
"druid.discovery.curator.path":"/druid/discovery",
"kafka.zookeeper.connect":"ip-xxx-xx-xxxx-xx.ec2.internal:2181",
"kafka.group.id":"xxxx-xxxxx-xxxx",
"consumer.numThreads":"2",
"commit.periodMillis":"15000",
"reportDropsAsExceptions":"false"
}
}
]
... View more
07-23-2018
12:16 PM
1 Kudo
@laiju cbabu You may be using a region that the practice exam image is not uploaded on. Please try the US East - Northern Virginia region, I'm able to find it there.
... View more
07-23-2018
12:13 PM
@kanna k You will have to use the Ambari REST API in conjunction with a custom application that you create to measure the total uptime of the system through restarts. You can also use Ambari Alerts + notifications to get push notifications when services are stopped or started. That way, you can have an email with a timestamp of when a service was stopped/started and calculate the total uptime based on those numbers.
... View more
07-20-2018
07:54 PM
If you are trying to stream records from Kafka to HBase, I'd recommend giving Apache NiFi a look. It's part of our stack and is geared towards moving data from one place to another.
... View more
07-20-2018
07:38 PM
You could do this in NiFi using the following steps: Store the "1" and the "r1" in variables using the UpdateAttribute processor Substring the record after the first semicolon and before the last semicolon - ExtractText processor Use SplitText to split on every comma. You will now have 1 FlowFile for each param Re-structure the data as you need - add the "1" to the beginning and the "r1" to the end. This uses the ExtractText processor again All of the above requires the use of the NiFi Expression Language, documented here: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
... View more
07-20-2018
07:27 PM
@Harry Li If you do not already have Zookeeper installed on your cluster then yes you will need to install it. The Ambari UI gives you the default setup for installing Zookeeper, so use that and it will go smoothly. Zookeeper by default installs on 3 separate nodes of your cluster, so you don't need it on all of them but you do need it on at least 1 node (and the default is 3).
... View more
07-20-2018
07:17 PM
Yes, Ambari has a REST API. Documented here: https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/index.md However I don't think it has exactly what you are looking for - you will not be able to retrieve the IP address of the user performing any actions. It is also a polling mechanism rather than a publish/subscribe mechanism, so you will have to ask it what the state of a particular service is to receive an answer. This is in contrast to your ask, which is to be notified when a service changes state. You can probably accomplish what you are asking for through Ambari Alerts paired with the Ambari REST API. Here are some examples of common actions: https://community.hortonworks.com/articles/81353/popular-ambari-rest-api-commands.html Here's another example, using the REST API with Ambari Alerts: https://community.hortonworks.com/questions/9124/ambari-alert-rest-api-to-filter-out-services-on-ma.html
... View more
07-13-2018
12:39 PM
@Satya Nittala The Atlas API functionality is documented here: https://atlas.apache.org/api/v2/index.html I've used it before and it is straightforward to get started with. You can also refer to the following GitHub repo for many examples: https://github.com/vspw/atlas-custom-types
... View more
06-29-2018
08:26 PM
That is interesting - in my testing it receives the value of the last modified time in the filesystem you are pulling from. Can you double-check the file you're pulling on your FTP server? Or perhaps try creating a new file and testing? I tested using this public FTP test site: ftp://speedtest.tele2.net/ You can see my date matches the date on the FTP site:
... View more
06-29-2018
02:16 AM
The response headers all come back as attributes in the FlowFile that goes to the downstream queue. See the following example for more details. I've made a sample request to a test website on the internet: I can then view the 'response' queue: When I view the attributes in the FlowFile, I can get access to all of the HTTP headers that come back with the response:
... View more
06-28-2018
11:27 PM
I would recommend you deploy the cluster directly onto your 3 machines (VMs). You shouldn't have to do anything related to Docker to setup an HDF cluster using Ambari. The step-by-step guide for how to set it up is here: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_installing-hdf/content/ch_install-ambari.html One of the things you will need is to be able to connect to the internet from your nodes so that you can download Ambari and then setup the rest of the services on the cluster. Once you get Ambari set up, the rest is a click-through interface and UI-based. For your 3-node cluster, you will need to designate one node as both a 'management' node as well as a 'worker' node. That node will run Ambari as well as any other master/management services you end up installing as a part of HDF (such as Storm Nimbus). Yes you can install MySQL alongside HDF on several of those nodes if you want a MySQL HA configuration, but be aware that this may impact performance. Services like NiFi and Kafka require dedicated disks for maximum throughput. For a production deployment, bare metal (rather than VMs or Docker) is recommended for highest performance so yes you will have to install HDF from scratch (using the instructions linked above).
... View more
06-28-2018
04:12 PM
Upon further research, I see what you're running into. The processor does not accept incoming connections so you cannot give it any dynamic properties. Unfortunately because of the way that processor keeps track of state and keeps track of which files it has already retrieved, it does not support dynamically changing the properties of its source. See this question from 2016 on the same topic: http://apache-nifi.1125220.n5.nabble.com/ListSFTP-processor-with-dynamic-hostname-td12495.html You can try the suggestions outlined there. There is also the option of extending the processor to do what you want to do.
... View more
06-27-2018
11:28 PM
I haven't had time to try this out yet, but you could try exporting the tag-based policy to get the general JSON structure. Then modify if necessary and use the REST API to send the JSON back to Ranger to create policies. I don't see it documented anywhere either.
... View more
06-27-2018
11:05 PM
@Rangareddy Y Is there a reason you have decided to use the NiFi API to perform this task? You could read from a configuration the list of SFTP endpoints (along with username, password, etc.), parse that, and store those values as FlowFile attributes that are passed into the ListSFTP processor. You can re-use the ListSFTP processor that way, much like you are envisioning. NiFi has the NiFi expression language which can be used to reference FlowFile attributes inside of processor configuration values.
... View more
06-27-2018
10:51 PM
@Jonathan Bell ListSFTP doesn't have any built-in way to do this, but what I would suggest to accomplish your task is to use the file.lastModifiedTime attribute that the processor writes into the FlowFiles it creates. That way, you can add a RouteOnAttribute processor immediately downstream of this one that filters out any dates that you are not interested in based on the last time the file was modified. This also allows you to filter on other attributes if you desire, full list of written attributes is contained in the NiFi ListSFTP processor documentation: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.ListSFTP/index.html
... View more