Member since
10-08-2015
87
Posts
143
Kudos Received
23
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1031 | 03-02-2017 03:39 PM | |
4698 | 02-09-2017 06:43 PM | |
14255 | 02-04-2017 05:38 PM | |
4525 | 01-10-2017 10:24 PM | |
3565 | 01-05-2017 06:58 PM |
08-23-2016
03:35 PM
2 Kudos
@learninghuman To maximize the capabilities of "Predicate Push Down" and the "Row Index", you should apply some type of secondary sorting to the data while inserting it. Where the "primary" filter is usually the "partition", sorting your data within the "partition" will increase the effectiveness of the row index structure, ensuring the clustering of records allows you to skip large sections of data. Apache Hive is a very fast moving project, and Hive 2 only recently came out. I expect that updated performance benchmarks will come out over the course of the next year. With that said, here is a relevant independent benchmark performed on Hive 0.14 by Yahoo! Japan in September of last year. Hive has only gotten more performant in the year since this benchmark, but the biggest reason I share this one with you is that focus of Yahoo! Japan (and many other enterprise customers of ours) is not flat-out speed on individual queries, but mainly performance across multiple types of queries (ETL, reporting, adhoc) at high-levels of concurrency - which is what you would have if using Hive as an Operational Data Store as you are thinking.
... View more
08-19-2016
12:30 PM
3 Kudos
@learninghuman For supporting adhoc reporting queries, we recommend storing the raw data in Apache ORC and using Apache Hive to achieve the desired performance. ORC is a self-describing type-aware columnar file format designed for Hadoop ecosystem workloads. The columnar format lets the reader read, decompress, and process only the columns that are required for the current query. In addition, it has support for ACID transactions and snapshot isolation, build-in indexes and complex types. Many large Hadoop deployments rely on ORC, including those at Yahoo! and Facebook. In addition to Partitioning and Bucketing with Hive, where each partition is physically a separate subdirectory under the table directory and each Bucket is physically a separate file within those subdirectories, ORC provides three level of indexes within each file: file level - statistics about the values in each column across the entire file stripe level - statistics about the values in each column for each stripe row level - statistics about the values in each column for each set of 10,000 rows within a stripe Bucket pruning is not yet available with Hive so there are two ways to reduce the amount of data that needs to be processed: Partitioning ( and partition pruning during joins). The limitation here is that too many partitions are hard on the hive server and the metastore so you shouldn't have more than a couple thousand partitions. Predicate Pushdown in ORC. Meant to enhance partitioning, this allows the map task to open the ORC index and skip all stripes and blocks that cannot contain relevant data based on min/max values (always on ) and bloom filters ( if configured).
... View more
08-17-2016
05:09 PM
7 Kudos
Great line of questioning @learninghuman! Yes - we see a lot of customers attacking the ODS first with Hadoop in respect to EDW offload. There are a few motivations with this: All data is not tabular. A lot of areas of the business, such as R&D, aren't able to conform their data to canonical tabular representations in today's ODS. The ODS needs to manage a variety of source data and represent it in different ways, not just through tabular views - but also through index / search, modeling, and graph views. Hadoop is able to handle the variety of source data that is out there, and present it to the analyst through a number of different views - by applying schema on read, not on write. A very common use case that we see in Insurance is 360 degree view of customer - bringing together structured data from source systems with unstructured data (such as social media) from the outside world to mine for life events that may drive changes in coverage. Another common use case is an actuarial data lake - bringing structured and unstructured data together for modeling / machine learning purposes to better price insurance products at an individualized level. Current architectures that move the data in batches from source system to ODS to EDW are inefficient and brittle. Time to insight is a real motivation for most businesses we see considering Hadoop. As a business owner, I just can't wait as long as I have to wait today to get the insights I need. Hadoop allows me to ingest the data into my ODS more efficiently - in its raw form, as fast as it's produced. Secondly, ETL-based architectures today are brittle in that they end up dropping a lot of data before it gets into the ODS. This "loss of signal" can be really difficult to overcome when the business starts asking new questions. Hadoop brings agility to the ODS, as I can go back and replay my processing over that raw source data, transforming, enriching, aggregating new answers for different questions. A great example of this is the increased use of geo-location fields embedded in raw source system data to gain new insights downstream with adhoc queries. Data governance is often an after-thought in today's ODS architecture ... a "write-only" database of metadata. With Hadoop, I can profile and tag all the data on ingest - allowing me to address issues of data quality much more easily than today. I can then use that profiling and tagging downstream to effect certain processing (such as consistent handling of nulls, address normalization, encryption of PII fields, etc.) as well as uniformly restricting access downstream to that data by associating tags to groups and permissions. Finally, Hadoop architectures for the ODS push the processing to the data, rather than the data to the processing. This makes data lineage a lot easier to track within the context of your operational reporting, and eliminates a lot of reconciliation issues caused by today's disjoint approaches to data governance. With less data movement, and by holding and processing more data in one place, I can put real "teeth" into my data governance approach. Perhaps the biggest con or anti-pattern here is "lift and shift" of current relational ODS architecture and processes to Hadoop. Yes - you can save money with Hadoop, but that doesn't excite the business nearly as much as becoming a more agile partner with them and helping them create new revenue opportunities along the way.
... View more
08-01-2016
07:37 PM
10 Kudos
Storage is Fundamental to Big Data Storages
can be chiefly evaluated on three classes of performance metrics:
Cost per Gigabyte Durability
- this is the measure of the permanence of data once it has been successfully
written to the medium. Modern hard disks are highly durable, however given a
large enough collection of disks, regular disk failures are a statistical
certainty. Annual Failure Rates of disks vary between 2 – 8% for 1 – 2 year old
disks as observed in a large-scale study of disk failure rates.[1] Performance
- there are two key measures of storage performance:
Throughput
- this is the maximum raw read/write rate that the storage can support and is
typically measured in MegaBits/second (MBps). This is the primary metric that
batch processing applications care about. IO operations per second (IOPS) - the number of IO operations per second is
affected by the workload and IO size. The rotational latency of spinning disks
limits the maximum IOPS for a random IO workload which can limit the
performance of interactive query applications. e.g. a 7200 RPM hard disk
(typical for commodity hardware) will be limited to a theoretical maximum of
240 IOPS for a purely random IO workload. The
following table summarizes the characteristics of a few common commodity storage types
based on the above metrics. Note: There are SSDs on the horizon that are better suited for Write-Once-Ready-Many (WORM) data (but you can write limited # of times) and the price point will become 15c/GB (so, about 5x vs. 10x HDD today). Also, SSD capacity already crossed HDD capacity in 2016 (16TB 2.5” drives available) and over time, you will see SSDs making inroads to hot and warm tiers as well. HDFS Provides a Proven, Rock-Solid File System We
approached the design of HDFS with the following goals:
HDFS will not know about the performance
characteristics of individual storage types. HDFS just provides a mechanism to
expose storage types to applications. The only exception we make is DISK i.e.
hard disk drives. This is the default fallback storage type. Even this may be
made configurable in the future. As a corollary we avoid using the terms Tiered
Storage or Hierarchical Storage. Pursuant to (1), we do not plan to
provide any APIs to enumerate or choose storage types based on their
characteristics. Storage types will be explicitly enumerated. Administrators
must be able to limit the usage of individual storage types by user.
Changes to HDFS Storage Architecture The
NameNode and HDFS clients have historically viewed each DataNode as a single
storage unit. The NameNode has not been aware of the number of storage volumes
on a given DataNode and their individual storage types and capacities. DataNodes
communicate their storage state through the following types of messages:
Storage Report. A storage report contains summary information about the state of a
storage including capacity and usage details. The Storage Report is contained
within a Heartbeat which is sent once every few seconds by default. Block Report. A block report is a detailed report of the individual block replicas
on a given DataNode. Block reports are split into two types: a. Incremental
block report sent periodically that lists the newly received and deleted blocks
i.e. delta since the last incremental report; and b. Full block report sent
less frequently that has a complete list of all block replicas currently on the
DataNode. Previously,
each DataNode sends a single storage report and a single block report
containing aggregate information about all attached storages. With
Heterogeneous Storage we have changed this picture so that the DataNode exposes
the types and usage statistics for each individual storage to the NameNode.
This is a fundamental change to the internals of HDFS and allows the NameNode
to choose not just a target DataNode when placing replicas, but also the
specific storage type on each target DataNode. Separating
the DataNode storages in this manner will also allow scaling the DataNode to
larger capacity by reducing the size of individual block reports which can be
processed faster by the NameNode.
Storage Types We changed
the datanode storage model from a single storage, which may correspond to
multiple physical storage medias, to a collection of storages with each storage
corresponding to a physical storage media. This change added the notion of
storage types: DISK and SSD, where DISK is the default storage type. An
additional storage type ARCHIVE, which has high storage density (petabyte of
storage) but little compute power, is added for supporting archival storage. Another
new storage type RAM_DISK is added for supporting writing single replica files
in memory.
Storage Policies A new
concept of storage policies is introduced in order to allow files to be stored
in different storage types according to the storage policy. HDFS now has the
following storage policies:
Hot
- for both storage and compute. The data that is popular and still being used
for processing will stay in this policy. When a block is hot, all replicas are
stored in DISK. Cold
- only for storage with limited compute. The data that is no longer being used,
or data that needs to be archived is moved from hot storage to cold storage.
When a block is cold, all replicas are stored in ARCHIVE. Warm
- partially hot and partially cold. When a block is warm, some of its replicas
are stored in DISK and the remaining replicas are stored in ARCHIVE. All_SSD
- for storing all replicas in SSD. One_SSD
- for storing one of the replicas in SSD. The remaining replicas are stored in
DISK. Lazy_Persist
- for writing blocks with single replica in memory. The replica is first
written in RAM_DISK and then it is lazily persisted in DISK. The
following is a typical storage policy table.
Policy ID
Policy Name
Block Placement (n replicas)
Fallback storages for creation
Fallback storages for replication
15
Lazy_Persist
RAM_DISK: 1, DISK: n-1
DISK
DISK
12
All_SSD
SSD: n
DISK
DISK
10
One_SSD
SSD: 1, DISK: n-1
SSD, DISK
SSD, DISK
7
Hot (default)
DISK: n
<none>
ARCHIVE
5
Warm
DISK: 1, ARCHIVE: n-1
ARCHIVE, DISK
ARCHIVE, DISK
2
Cold
ARCHIVE: n
<none>
<none>
When a
file or directory is created, its storage policy is unspecified. The effective
storage policy of a file or directory is resolved by the following rules:
If the file or directory is specified
with a storage policy, return it. For an unspecified file or directory, if
it is the root directory, return the default storage policy. Otherwise, return
its parent’s effective storage policy.
Erasure Coding Replication
is expensive – the default 3x replication scheme in HDFS has 200% overhead in
storage space and other resources (e.g., network bandwidth). However, for certain
warm and most cold datasets with relatively low I/O activities, additional
block replicas are rarely accessed during normal operations, but still consume
the same amount of resources as the first replica. Therefore,
a natural improvement is to use Erasure Coding (EC) in place of replication,
which provides the same level of fault-tolerance with much less storage space.
In typical EC setups, the storage overhead is no more than 50%. As an example,
a 3x replicated file with 6 blocks will consume 6*3 = 18 blocks of disk space.
But with EC (6 data, 3 parity) deployment, it will only consume 9 blocks of
disk space. To apply erasure coding, an EC Zone is created on an empty
directory. All files written under that zone are automatically erasure coded. In
typical HDFS clusters, small files can account for over 3/4 of total storage
consumption. To better support small files, in this first phase of work HDFS
supports EC with striping. In the context of EC, striping has several critical
advantages. First, it enables online EC (writing data immediately in EC
format), avoiding a conversion phase and immediately saving storage space.
Online EC also enhances sequential I/O performance by leveraging multiple disk
spindles in parallel; this is especially desirable in clusters with high end
networking. Second, it naturally distributes a small file to multiple DataNodes
and eliminates the need to bundle multiple files into a single coding group.
This greatly simplifies file operations such as deletion, quota reporting, and
migration between namespaces. Erasure
coding places additional demands on the cluster in terms of CPU and network. Encoding
and decoding work consumes additional CPU on both HDFS clients and DataNodes. Erasure
coded files are also spread across racks for rack fault-tolerance. This means
that when reading and writing striped files, most operations are off-rack.
Network bisection bandwidth is thus very important. For rack
fault-tolerance, it is also important to have at least as many racks as the
configured EC stripe width. For the default EC policy of RS (6,3), this means
minimally 9 racks, and ideally 10 or 11 to handle planned and unplanned
outages. For clusters with fewer racks than the stripe width, HDFS cannot
maintain rack fault-tolerance, but will still attempt to spread a striped file
across multiple nodes to preserve node-level fault-tolerance. EC is currently planned for HDP 3.0. Work remains to support Hive queries on EC data. Also, we are discussing a policy based migration policy where we can age data from warm tier to cold tier and convert from replica to erasure coding.
Cluster Planning – Hardware Recommendations for
Apache Hadoop Disk
space, I/O Bandwidth, and computational power are the most important parameters
for accurate hardware sizing of Apache Hadoop. Hadoop has been architected so
that Disk, Memory, and Computational Power can all be scaled horizontally in a
near-linear fashion as business requirements evolve. With the introduction of
Heterogeneous Storage Types now presented by the Datanode, enterprises can
begin planning their clusters in such a way that ARCHIVE, DISK, SSD, and RAM
can all be scaled uniformly as new nodes are added to the cluster.
Additionally, with the introduction of Erasure Coding for HDFS, enterprises can
begin planning their capacity needs for ARCHIVE using 1.5x replication, instead
of 3x. Our
recommended commodity server profiles, especially for the DataNode, have changed a bit
with these new advancements in HDFS. Instead of 12 2TB HDD per DataNode for DISK only, we
now recommend introducing and scaling the additional storage types (ARCHIVE, SSD) uniformly as
well. This
configuration provides roughly double the raw storage per server from the
previous configuration and triple the usable storage. Raw storage increases
from 24 TB to 48TB per node. Usable storage increases from 8 TB (24/3), to 26
TB per node. This is calculated as 1 TB hot (1/1 – using one_ssd storage
policy) + 4 TB warm (12/3 – using default storage policy) + 21 TB archive
(32/1.5 – using archive storage policy with erasure coding). [1] Eduardo Pinheiro et. al. 2007. Failure Trends in a Large Disk Drive Population.
... View more
Labels:
06-18-2016
06:11 PM
I can't wait to see this demo!
... View more
06-08-2016
01:47 PM
28 Kudos
While
traditional fraud detection systems have focused on looking for factors such as
bad IP addresses or unusual login times based on business rules and events, the
Connected Data Platform renovates such an approach by enabling machine learning
capabilities at scale. The Credit Fraud Prevention Demo is an excellent example
of a Modern Data Application running on the Hortonworks Connected Platform
(HDF/HDP).
Part I: Preparing the Demo Follow
the instructions in the README to install the Credit Card Transaction Monitor
Application on an
HDP 2.4 Sandbox. Note: There is a known bug with
the HAXM emulator used by the Android Studio for the Credit Card Transaction Monitor Mobile App that does not allow it to be launched while a Virtual Box virtual
machine is running. To avoid this, I am using a HDP 2.4 Sandbox instance that I
created in the Azure Cloud directly from the Microsoft Azure Marketplace. The install shell script handles the installation and
configuration of all the application artifacts necessary for the demo onto the
latest version of the Hortonworks Sandbox, including:
Setting YARN container memory size
using the Ambari ReST API Creating the NiFi service
configuration, installing and starting it using the Ambari ReST API Importing the NiFi template, instantiating and starting the
NiFi Flow using the NiFi ReST API Starting the Kafka Ambari service
using the Ambari ReST API and configuring the IncomingTransactions and
CustomerTransactionValidation topics using the kafka-topics shell script Starting the HBase service using
the Ambari ReST API Installing Docker, creating the
working folder with the Slider configuration for the
Transaction Monitor UI, starting the Docker service, and downloading the Docker images Starting the Storm service using
the Ambari ReST API, and building / deploying the Storm topology The startDemoServices shell script should be run each time the
Sandbox VM is (re)started, after all of the default Sandbox services come up
successfully. It handles the initialization of all of the application-specific
components of the demo, including:
Starting Kafka using the Ambari
ReST API Starting NiFi using the Ambari
ReST API Starting HBase using the Ambari
ReST API Starting Storm using the Ambari ReST
API Starting the Docker daemon using
Linux system Starting the UI Servlet and CometD
Server on YARN using Slider To
validate that the Slider Application for the Transaction Monitor UI started,
you can take a look at the YARN Resource Manager UI by selecting the YARN
service under Ambari and using the Quick Link for Resource Manager UI: Part II: Demo Code Walk-Through There are
three types of transactions managed by the NiFi data flow:
Incoming Transaction Fraud Notification Customer Transaction Validation Incoming
Transactions are generated by the TransactionSimulator class and posted as ReST calls over
HTTP to port 8082 of the Sandbox. The Transaction Monitor UI servlet generates Fraud Notifications and
posts them to the same Sandbox port. The NiFi
flow creates an HTTP Listener on the correct Sandbox port when it is deployed: The
Credit Card Transaction Monitor mobile application posts Customer Transaction
Validation messages to Amazon SQS. The NiFi Flow is configured to receive these
messages. After
being received, these transactions are all sent through a simple dataflow that
extracts the event source from the payload and determines the event destination
accordingly. Once the
event destination is determined, these transactions are routed based on the
value of that attribute.
Incoming Transaction Incoming
Transactions are routed to a Kafka topic. Incoming
Transactions posted to that Kafka Topic are then consumed by the CreditCardTransactionMonitor
Storm topology. The Storm
topology enriches the incoming transaction
with customer details from HBase, passes the enriched transaction
into a Spark model to detect fraud, and then either publishes a fraud alert or publishes a legitimate transaction to the TransactionMonitorUI web
application using CometD. While
reviewing the code for each of the bolts in the Storm topology above, in
addition to reading the execute method, be sure take a close look at the
prepare method as well. For
example, in the FraudDetector bolt: while the execute method runs the
model, updates the transaction with the result, and stores the updated
transaction to Phoenix; the prepare method creates the Phoenix
table to hold the transaction history. The Credit
Fraud Detection model uses Spark MLLib Logistic Regression against four primary features of
the credit card transaction to detect fraud:
transactionID latitude longitude transaction time stamp To view
the Zeppelin notebook showing how the model is built and trained, using a
browser go to sandbox port 9995 and select the Credit Fraud Detection ML link
under Notebooks.
Fraud Notification Fraud
Notifications are posted over HTTP to Google Cloud Messaging where the Credit
Card Transaction Mobile Application MessagingClient can pick them up. Customer Transaction Validation Customer
Transaction Validation events are routed to a Kafka topic. Customer
Transaction Validation event posted to that Kafka Topic are then consumed by
the CreditCardTransactionMonitor Storm topology. The Storm
topology processes the Customer Transaction
Validation event,
updating the HBase table for the customer account, as well as publishes
the Account Status Update to the TransactionMonitorUI web application using
CometD.
Part III: Running the Demo Before
running the demo, be sure to install and compile the Credit Card Transaction Mobile App by following the instructions in
the README. Once you’ve done that, follow these steps:
Run the Mobile App from Android
Studio.
Create / select any emulator that
supports the latest Google API (23).
Be sure to wait a few minutes
until the gradle build finishes and the application shows in the emulator.
Bring up the Fraud Analyst Inbox
UI in your browser: http://sandbox:8090/TransactionMonitorUI/CustomerOverview Start the simulator from the
CreditCardTransactionMonitor directory: java -jar CreditCardTransactionSimulator-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Customer 1000 Simulation
When transactions start coming
through from the generator, the Inbox will start to fill up.
Single click on one of the
transactions to see a preview of the transaction statistics and reason for
being flagged. Double Click on the transaction to explore it in detail in the
context of the customer's profile.
After previewing the transaction,
select the command button to Notify Customer / Suspend Account. This will send
a Fraud Notification to the mobile application.
Inside the mobile application,
click the Yes command button to send the Customer Transaction Validation back
to the Fraud Analyst and clear the account suspension. The
Credit Fraud Prevention Demo shows the full power of developing a Modern Data
Application for the Connected Data Platform:
Ingest both data in motion
(customer transaction data, credit card swipes, online usage of credit cards
etc.) & data at rest (core banking data, years worth of historical card
data) using HDF. Perform Predictive Analytics. By
commingling all kinds of data using machine learning techniques to analyze and
predict cardholder behavior on HDP. Provide immediate fraud related
feedback to the Bank Fraud Analyst and the Customer. The
platform identifies and signals fraud in near real time. The result: an
improved customer experience, revenue loss prevention due to fraud and reduced
cost overall.
... View more
Labels:
05-26-2016
01:26 PM
1 Kudo
Sorry, this was the article I meant to point you to: https://community.hortonworks.com/articles/25578/how-to-access-data-files-stored-in-aws-s3-buckets.html
... View more
05-12-2016
11:34 AM
@Shikha Verma - did this answer help? If so, please accept. Otherwise, let me know if you need further assistance.
... View more
05-11-2016
02:22 PM
1 Kudo
By not specifying the hostname, you are attempting to connect to "localhost". When you connect to "localhost", the socket connector is used, which doesn't look like it was configured properly. To use the TCP/IP connector, you should connect to mysql using the hostname: mysql --host your.hostname.goes.here --user root -p
... View more
04-27-2016
03:03 PM
2 Kudos
In Hive, each partition is physically a separate subdirectory under the table directory. Buckets would then be physically represented as separate files within those subdirectories. Using your example above where you have 4 countries and 32 buckets, this would result in 4 subdirectories under the table directory, each containing 32 files.
... View more