Member since
10-08-2015
87
Posts
143
Kudos Received
23
Solutions
03-30-2018
04:19 PM
@Ajay - Thank you for this article! Can you please re-label this to Apache Hadoop HDFS Ozone, rather than Apache Ozone? The latter is not the proper use of Apache branding. Thanks. Tom
... View more
09-14-2016
12:52 AM
Thank you, Ameet! The google browser key was not required by gcm when we first published this. I have heard from others it is now necessary configuration.
... View more
08-01-2016
07:37 PM
10 Kudos
Storage is Fundamental to Big Data Storages
can be chiefly evaluated on three classes of performance metrics:
Cost per Gigabyte Durability
- this is the measure of the permanence of data once it has been successfully
written to the medium. Modern hard disks are highly durable, however given a
large enough collection of disks, regular disk failures are a statistical
certainty. Annual Failure Rates of disks vary between 2 – 8% for 1 – 2 year old
disks as observed in a large-scale study of disk failure rates.[1] Performance
- there are two key measures of storage performance:
Throughput
- this is the maximum raw read/write rate that the storage can support and is
typically measured in MegaBits/second (MBps). This is the primary metric that
batch processing applications care about. IO operations per second (IOPS) - the number of IO operations per second is
affected by the workload and IO size. The rotational latency of spinning disks
limits the maximum IOPS for a random IO workload which can limit the
performance of interactive query applications. e.g. a 7200 RPM hard disk
(typical for commodity hardware) will be limited to a theoretical maximum of
240 IOPS for a purely random IO workload. The
following table summarizes the characteristics of a few common commodity storage types
based on the above metrics. Note: There are SSDs on the horizon that are better suited for Write-Once-Ready-Many (WORM) data (but you can write limited # of times) and the price point will become 15c/GB (so, about 5x vs. 10x HDD today). Also, SSD capacity already crossed HDD capacity in 2016 (16TB 2.5” drives available) and over time, you will see SSDs making inroads to hot and warm tiers as well. HDFS Provides a Proven, Rock-Solid File System We
approached the design of HDFS with the following goals:
HDFS will not know about the performance
characteristics of individual storage types. HDFS just provides a mechanism to
expose storage types to applications. The only exception we make is DISK i.e.
hard disk drives. This is the default fallback storage type. Even this may be
made configurable in the future. As a corollary we avoid using the terms Tiered
Storage or Hierarchical Storage. Pursuant to (1), we do not plan to
provide any APIs to enumerate or choose storage types based on their
characteristics. Storage types will be explicitly enumerated. Administrators
must be able to limit the usage of individual storage types by user.
Changes to HDFS Storage Architecture The
NameNode and HDFS clients have historically viewed each DataNode as a single
storage unit. The NameNode has not been aware of the number of storage volumes
on a given DataNode and their individual storage types and capacities. DataNodes
communicate their storage state through the following types of messages:
Storage Report. A storage report contains summary information about the state of a
storage including capacity and usage details. The Storage Report is contained
within a Heartbeat which is sent once every few seconds by default. Block Report. A block report is a detailed report of the individual block replicas
on a given DataNode. Block reports are split into two types: a. Incremental
block report sent periodically that lists the newly received and deleted blocks
i.e. delta since the last incremental report; and b. Full block report sent
less frequently that has a complete list of all block replicas currently on the
DataNode. Previously,
each DataNode sends a single storage report and a single block report
containing aggregate information about all attached storages. With
Heterogeneous Storage we have changed this picture so that the DataNode exposes
the types and usage statistics for each individual storage to the NameNode.
This is a fundamental change to the internals of HDFS and allows the NameNode
to choose not just a target DataNode when placing replicas, but also the
specific storage type on each target DataNode. Separating
the DataNode storages in this manner will also allow scaling the DataNode to
larger capacity by reducing the size of individual block reports which can be
processed faster by the NameNode.
Storage Types We changed
the datanode storage model from a single storage, which may correspond to
multiple physical storage medias, to a collection of storages with each storage
corresponding to a physical storage media. This change added the notion of
storage types: DISK and SSD, where DISK is the default storage type. An
additional storage type ARCHIVE, which has high storage density (petabyte of
storage) but little compute power, is added for supporting archival storage. Another
new storage type RAM_DISK is added for supporting writing single replica files
in memory.
Storage Policies A new
concept of storage policies is introduced in order to allow files to be stored
in different storage types according to the storage policy. HDFS now has the
following storage policies:
Hot
- for both storage and compute. The data that is popular and still being used
for processing will stay in this policy. When a block is hot, all replicas are
stored in DISK. Cold
- only for storage with limited compute. The data that is no longer being used,
or data that needs to be archived is moved from hot storage to cold storage.
When a block is cold, all replicas are stored in ARCHIVE. Warm
- partially hot and partially cold. When a block is warm, some of its replicas
are stored in DISK and the remaining replicas are stored in ARCHIVE. All_SSD
- for storing all replicas in SSD. One_SSD
- for storing one of the replicas in SSD. The remaining replicas are stored in
DISK. Lazy_Persist
- for writing blocks with single replica in memory. The replica is first
written in RAM_DISK and then it is lazily persisted in DISK. The
following is a typical storage policy table.
Policy ID
Policy Name
Block Placement (n replicas)
Fallback storages for creation
Fallback storages for replication
15
Lazy_Persist
RAM_DISK: 1, DISK: n-1
DISK
DISK
12
All_SSD
SSD: n
DISK
DISK
10
One_SSD
SSD: 1, DISK: n-1
SSD, DISK
SSD, DISK
7
Hot (default)
DISK: n
<none>
ARCHIVE
5
Warm
DISK: 1, ARCHIVE: n-1
ARCHIVE, DISK
ARCHIVE, DISK
2
Cold
ARCHIVE: n
<none>
<none>
When a
file or directory is created, its storage policy is unspecified. The effective
storage policy of a file or directory is resolved by the following rules:
If the file or directory is specified
with a storage policy, return it. For an unspecified file or directory, if
it is the root directory, return the default storage policy. Otherwise, return
its parent’s effective storage policy.
Erasure Coding Replication
is expensive – the default 3x replication scheme in HDFS has 200% overhead in
storage space and other resources (e.g., network bandwidth). However, for certain
warm and most cold datasets with relatively low I/O activities, additional
block replicas are rarely accessed during normal operations, but still consume
the same amount of resources as the first replica. Therefore,
a natural improvement is to use Erasure Coding (EC) in place of replication,
which provides the same level of fault-tolerance with much less storage space.
In typical EC setups, the storage overhead is no more than 50%. As an example,
a 3x replicated file with 6 blocks will consume 6*3 = 18 blocks of disk space.
But with EC (6 data, 3 parity) deployment, it will only consume 9 blocks of
disk space. To apply erasure coding, an EC Zone is created on an empty
directory. All files written under that zone are automatically erasure coded. In
typical HDFS clusters, small files can account for over 3/4 of total storage
consumption. To better support small files, in this first phase of work HDFS
supports EC with striping. In the context of EC, striping has several critical
advantages. First, it enables online EC (writing data immediately in EC
format), avoiding a conversion phase and immediately saving storage space.
Online EC also enhances sequential I/O performance by leveraging multiple disk
spindles in parallel; this is especially desirable in clusters with high end
networking. Second, it naturally distributes a small file to multiple DataNodes
and eliminates the need to bundle multiple files into a single coding group.
This greatly simplifies file operations such as deletion, quota reporting, and
migration between namespaces. Erasure
coding places additional demands on the cluster in terms of CPU and network. Encoding
and decoding work consumes additional CPU on both HDFS clients and DataNodes. Erasure
coded files are also spread across racks for rack fault-tolerance. This means
that when reading and writing striped files, most operations are off-rack.
Network bisection bandwidth is thus very important. For rack
fault-tolerance, it is also important to have at least as many racks as the
configured EC stripe width. For the default EC policy of RS (6,3), this means
minimally 9 racks, and ideally 10 or 11 to handle planned and unplanned
outages. For clusters with fewer racks than the stripe width, HDFS cannot
maintain rack fault-tolerance, but will still attempt to spread a striped file
across multiple nodes to preserve node-level fault-tolerance. EC is currently planned for HDP 3.0. Work remains to support Hive queries on EC data. Also, we are discussing a policy based migration policy where we can age data from warm tier to cold tier and convert from replica to erasure coding.
Cluster Planning – Hardware Recommendations for
Apache Hadoop Disk
space, I/O Bandwidth, and computational power are the most important parameters
for accurate hardware sizing of Apache Hadoop. Hadoop has been architected so
that Disk, Memory, and Computational Power can all be scaled horizontally in a
near-linear fashion as business requirements evolve. With the introduction of
Heterogeneous Storage Types now presented by the Datanode, enterprises can
begin planning their clusters in such a way that ARCHIVE, DISK, SSD, and RAM
can all be scaled uniformly as new nodes are added to the cluster.
Additionally, with the introduction of Erasure Coding for HDFS, enterprises can
begin planning their capacity needs for ARCHIVE using 1.5x replication, instead
of 3x. Our
recommended commodity server profiles, especially for the DataNode, have changed a bit
with these new advancements in HDFS. Instead of 12 2TB HDD per DataNode for DISK only, we
now recommend introducing and scaling the additional storage types (ARCHIVE, SSD) uniformly as
well. This
configuration provides roughly double the raw storage per server from the
previous configuration and triple the usable storage. Raw storage increases
from 24 TB to 48TB per node. Usable storage increases from 8 TB (24/3), to 26
TB per node. This is calculated as 1 TB hot (1/1 – using one_ssd storage
policy) + 4 TB warm (12/3 – using default storage policy) + 21 TB archive
(32/1.5 – using archive storage policy with erasure coding). [1] Eduardo Pinheiro et. al. 2007. Failure Trends in a Large Disk Drive Population.
... View more
Labels:
06-08-2016
01:47 PM
28 Kudos
While
traditional fraud detection systems have focused on looking for factors such as
bad IP addresses or unusual login times based on business rules and events, the
Connected Data Platform renovates such an approach by enabling machine learning
capabilities at scale. The Credit Fraud Prevention Demo is an excellent example
of a Modern Data Application running on the Hortonworks Connected Platform
(HDF/HDP).
Part I: Preparing the Demo Follow
the instructions in the README to install the Credit Card Transaction Monitor
Application on an
HDP 2.4 Sandbox. Note: There is a known bug with
the HAXM emulator used by the Android Studio for the Credit Card Transaction Monitor Mobile App that does not allow it to be launched while a Virtual Box virtual
machine is running. To avoid this, I am using a HDP 2.4 Sandbox instance that I
created in the Azure Cloud directly from the Microsoft Azure Marketplace. The install shell script handles the installation and
configuration of all the application artifacts necessary for the demo onto the
latest version of the Hortonworks Sandbox, including:
Setting YARN container memory size
using the Ambari ReST API Creating the NiFi service
configuration, installing and starting it using the Ambari ReST API Importing the NiFi template, instantiating and starting the
NiFi Flow using the NiFi ReST API Starting the Kafka Ambari service
using the Ambari ReST API and configuring the IncomingTransactions and
CustomerTransactionValidation topics using the kafka-topics shell script Starting the HBase service using
the Ambari ReST API Installing Docker, creating the
working folder with the Slider configuration for the
Transaction Monitor UI, starting the Docker service, and downloading the Docker images Starting the Storm service using
the Ambari ReST API, and building / deploying the Storm topology The startDemoServices shell script should be run each time the
Sandbox VM is (re)started, after all of the default Sandbox services come up
successfully. It handles the initialization of all of the application-specific
components of the demo, including:
Starting Kafka using the Ambari
ReST API Starting NiFi using the Ambari
ReST API Starting HBase using the Ambari
ReST API Starting Storm using the Ambari ReST
API Starting the Docker daemon using
Linux system Starting the UI Servlet and CometD
Server on YARN using Slider To
validate that the Slider Application for the Transaction Monitor UI started,
you can take a look at the YARN Resource Manager UI by selecting the YARN
service under Ambari and using the Quick Link for Resource Manager UI: Part II: Demo Code Walk-Through There are
three types of transactions managed by the NiFi data flow:
Incoming Transaction Fraud Notification Customer Transaction Validation Incoming
Transactions are generated by the TransactionSimulator class and posted as ReST calls over
HTTP to port 8082 of the Sandbox. The Transaction Monitor UI servlet generates Fraud Notifications and
posts them to the same Sandbox port. The NiFi
flow creates an HTTP Listener on the correct Sandbox port when it is deployed: The
Credit Card Transaction Monitor mobile application posts Customer Transaction
Validation messages to Amazon SQS. The NiFi Flow is configured to receive these
messages. After
being received, these transactions are all sent through a simple dataflow that
extracts the event source from the payload and determines the event destination
accordingly. Once the
event destination is determined, these transactions are routed based on the
value of that attribute.
Incoming Transaction Incoming
Transactions are routed to a Kafka topic. Incoming
Transactions posted to that Kafka Topic are then consumed by the CreditCardTransactionMonitor
Storm topology. The Storm
topology enriches the incoming transaction
with customer details from HBase, passes the enriched transaction
into a Spark model to detect fraud, and then either publishes a fraud alert or publishes a legitimate transaction to the TransactionMonitorUI web
application using CometD. While
reviewing the code for each of the bolts in the Storm topology above, in
addition to reading the execute method, be sure take a close look at the
prepare method as well. For
example, in the FraudDetector bolt: while the execute method runs the
model, updates the transaction with the result, and stores the updated
transaction to Phoenix; the prepare method creates the Phoenix
table to hold the transaction history. The Credit
Fraud Detection model uses Spark MLLib Logistic Regression against four primary features of
the credit card transaction to detect fraud:
transactionID latitude longitude transaction time stamp To view
the Zeppelin notebook showing how the model is built and trained, using a
browser go to sandbox port 9995 and select the Credit Fraud Detection ML link
under Notebooks.
Fraud Notification Fraud
Notifications are posted over HTTP to Google Cloud Messaging where the Credit
Card Transaction Mobile Application MessagingClient can pick them up. Customer Transaction Validation Customer
Transaction Validation events are routed to a Kafka topic. Customer
Transaction Validation event posted to that Kafka Topic are then consumed by
the CreditCardTransactionMonitor Storm topology. The Storm
topology processes the Customer Transaction
Validation event,
updating the HBase table for the customer account, as well as publishes
the Account Status Update to the TransactionMonitorUI web application using
CometD.
Part III: Running the Demo Before
running the demo, be sure to install and compile the Credit Card Transaction Mobile App by following the instructions in
the README. Once you’ve done that, follow these steps:
Run the Mobile App from Android
Studio.
Create / select any emulator that
supports the latest Google API (23).
Be sure to wait a few minutes
until the gradle build finishes and the application shows in the emulator.
Bring up the Fraud Analyst Inbox
UI in your browser: http://sandbox:8090/TransactionMonitorUI/CustomerOverview Start the simulator from the
CreditCardTransactionMonitor directory: java -jar CreditCardTransactionSimulator-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Customer 1000 Simulation
When transactions start coming
through from the generator, the Inbox will start to fill up.
Single click on one of the
transactions to see a preview of the transaction statistics and reason for
being flagged. Double Click on the transaction to explore it in detail in the
context of the customer's profile.
After previewing the transaction,
select the command button to Notify Customer / Suspend Account. This will send
a Fraud Notification to the mobile application.
Inside the mobile application,
click the Yes command button to send the Customer Transaction Validation back
to the Fraud Analyst and clear the account suspension. The
Credit Fraud Prevention Demo shows the full power of developing a Modern Data
Application for the Connected Data Platform:
Ingest both data in motion
(customer transaction data, credit card swipes, online usage of credit cards
etc.) & data at rest (core banking data, years worth of historical card
data) using HDF. Perform Predictive Analytics. By
commingling all kinds of data using machine learning techniques to analyze and
predict cardholder behavior on HDP. Provide immediate fraud related
feedback to the Bank Fraud Analyst and the Customer. The
platform identifies and signals fraud in near real time. The result: an
improved customer experience, revenue loss prevention due to fraud and reduced
cost overall.
... View more
Labels: