Member since
09-25-2015
33
Posts
49
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
76977 | 10-26-2015 04:42 PM | |
1488 | 10-21-2015 08:03 PM |
09-11-2018
03:35 PM
Hi Aditya, You need to quote the schema here. It's a reserved word. This works. select * from "SYSTEM"."FUNCTION";
... View more
08-20-2018
09:40 PM
This thread is old but wanted to throw in my $.02. You can do this by creating a Hive external table over the Phoenix table using the Phoenix Storage Handler for Hive then export the data from that Hive table right into HDFS. https://phoenix.apache.org/hive_storage_handler.html https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Writingdataintothefilesystemfromqueries
... View more
06-14-2018
06:16 PM
5 Kudos
I was recently working with a customer to deploy a Storm topology in their dev environment. I had built and tested the topology in my environment but when we went to deploy it at the customer site, nothing happened - literally. We could see the topology was in fact running but it wasn’t consuming any data from the Kafka topic to which it was subscribing. We verified that data was in fact being written there using the kafka-console-consumer.sh utility but still the topology wasn’t doing anything and even more confusing, the storm log for the topology didn’t show anything beyond the deployment steps. Going into the Storm UI we wanted to drill down to get the specifics of the topology and potentially turn on debug logging to see what was up, however when we clicked on the topology the Storm UI is unresponsive trying to load the topology summary. Even more confusing was the fact that the Storm UI didn’t show any error messages. Turns out that at some point in the past in this development environment, the originally installed Kafka brokers had been removed, then later reinstalled. The new brokers are given new broker IDs by the installer but the metadata stored in ZK regarding the customer offsets, still had pointers to the old broker IDs. Running ./kafka-topics.sh --describe on the queue in question showed that it was referencing the newer broker IDs but running the same command on the __consumer_offsets queue showed pointers to the old IDs. The all important __consumer_offsets queue is where consumers store the offset of the last record they read. This queue is automatically created by the system. In our case, ZK still stored the metadata to the old broker IDs when this topic was initially. Because our clients (both the client running in the topology and the Storm UI client) were getting the wrong info for the broker IDs, they were unable to run and manifested as the behavior we were seeing. To resolve this we need to go into ZK and delete this metadata. Run the zkCli.sh utility from any ZK client machine in your cluster. Once your in delete the metadata for the __consumer_offsets topic by running the following command rmr /brokers/topics/__consumer_offsets Exit out of ZK then restart your Kafka brokers. After removing this info and restarting, we re-launched our Storm topology and navigated to this in our Storm UI. The hung state of the UI was resolved and we saw that our Kafka spout was now successfully consuming data. Here are some other additional links for more information on offsets and the __consumer_offsets queue. Good luck! http://wanwenli.com/kafka/2016/11/04/Kafka-Group-Coordinator.html http://www.waitingforcode.com/apache-kafka/the-role-of-apache-zookeeper-in-apache-kafka/read
... View more
Labels:
10-03-2017
02:26 PM
1 Kudo
Here's an exmaple of what a Timestamp schema entry would look like: https://avro.apache.org/docs/1.8.0/spec.html#Timestamp+%28millisecond+precision%29 { "type": "record", "namespace": "test", "name": "test1", "fields": [ { "name": "ID", "type": "string" }, { "name": "V", "type": "float" }, { "name": "T", "type": { "type": "long", "logicalType": "timestamp-millis" } } ] }
... View more
04-28-2017
06:19 PM
15 Kudos
Interest in Time Series data bases and historian
augmentation/replacement is currently at an all-time high with the explosion of
IoT use cases. So many devices sending out unbounded streams of data provides
data scientists opportunities for predictive failure analysis, there needs to
be a robust scalable architecture to land that and at the same time provide a
responsive environment that allows a high numbers of concurrent users access to
this data. This article discusses how the union of features that exist
in Apache Phoenix and Hive to can be used construct a robust time series oriented
architecture the enables a scalable streaming ingest layer while simultaneously
providing a low latency data exploration layer for data scientists and BI
reports. The examples referenced in this article are a subset of a
larger CDR processing project I’ve been working with for a while now. This will
focus on the downstream data collection dealing specifically with Phoenix and
Hive. Data is streamed into a cdr_fact table that contains references to
various dimensions such as geography, customer and vendor information and call
termination details. The ETL and data enrichment is handled by the upstream
Storm topology which is not addressed in this article. The architecture described here uses Phoenix/Hbase as the
high performance data ingestion layer and I’ll utilize some features here to
boost read/write performance for temporally oriented data. Hive with LLAP will
act as the low latency data serving layer that hosts aggregate rollups across
dimensions. LLAP facilitates caching of data, increases concurrency and Hive
offers superior SQL semantics and a more mature UDF eco-system. I use the Phoenix Storage
Handler for Hive to bridge the communication between these two services. I started with a Phoenix table that looks like this: CREATE TABLE IF NOT EXISTS CDR_FACT(
ID bigint not null,
CALL_TS timestamp not null,
GEO_ID integer,
CUST_ID smallint,
VEND_ID smallint,
CUST_REL_ID smallint,
VEND_REL_ID smallint,
RTE tinyint,
CNCT tinyint,
EARLY_EVENT tinyint,
DURATION double,
I_PDD bigint,
E_PDD bigint,
constraint pk primary key(ID, CALL_TS ROW_TIMESTAMP))
IMMUTABLE_ROWS=true,SALT_BUCKETS=6,COMPRESSION='Snappy'; The ROW_TIMESTAMP
feature is an enhancement that helps
take advantage of various optimizations that HBase provides for time ranges on
the store files as well as various query optimization capabilities built within
Phoenix. I chose to use SALT_BUCKETS here to evenly
distribute data across the region servers of my cluster. This can improve write
performance at the cost of read performance down the line but since I’m pushing
ad-hoc browsing of data into the Hive layer, I found this to be a fair trade
off. In Hive, I’ll define an external table that overlays my
Phoenix table using the Phoenix Storage Handler for Hive. This allows Hive to
read data directly from Phoenix tables and utilizes some optimizations built
into the Phoenix layer while doing so. The storage handler .jar isn’t
configured into Hive’s classpath by default so you will need to follow the
instructions on the Phoenix website to set this up. create external table if not exists CDR_FACT (
id_col bigint,
call_ts timestamp,
geo_id int,
cust_id smallint,
vend_id smallint,
cust_rel_id smallint,
vend_rel_id smallint,
rte tinyint,
cnct tinyint,
early_event tinyint,
duration double,
i_pdd bigint,
e_pdd bigint)
STORED BY 'org.apache.phoenix.hive.PhoenixStorageHandler'
TBLPROPERTIES (
"phoenix.table.name" = "CDR_FACT",
"phoenix.zookeeper.quorum" = "zk1.field.hortonworks.com",
"phoenix.zookeeper.znode.parent" = "/hbase-unsecure",
"phoenix.zookeeper.client.port" = "2181",
"phoenix.rowkeys" = "ID, CALL_TS",
"phoenix.column.mapping" = "id_col:ID, call_ts:CALL_TS, geo_id:GEO_ID, cust_id:CUST_ID, vend_id:VEND_ID, cust_rel_id:CUST_REL_ID, vend_rel_id:VEND_REL_ID, rte:RTE, cnct:CNCT, early_event:EARLY_EVENT, duration:DURATION, i_pdd:I_PDD, e_pdd:E_PDD"); Note the case sensitive column name mapping between Hive and
Phoenix columns. If you don’t get this right, you won’t see an exception when
creating the table, but you will get a ColumnNotFoundException when you go to
query. You can actually create a Phoenix table managed by Hive, but
in my case the Phoenix table was already in place and populated before I
started working on this. I also like the flexibility that comes with being able
to drop the Hive overlay without affecting the underlying Phoenix table. Doing
it this way may also offer additional Phoenix configuration options not exposed
through the Hive interface. The next step here is to periodically fetch data from my
Phoenix table and rollup summaries across the various dimensions then store
those off in Hive managed ORC tables to be served through LLAP. I created a Hive UDF called timemachine
to aid in creating these rollups. This also provides a mechanism for me to poll
latent records from Phoenix as well so my rollups can pick up records which may
have arrived late to my Phoenix data store. hive> create temporary function timemachine as
'com.github.rtempleton.HiveUDFs.TimeMachine' using jar
'hdfs:///udf/HiveUDFs-0.0.1-SNAPSHOT.jar'; Here I group records into 5 minute “buckets” by geography,
customer id and vendor id and store the results into a temporary table. This
queries the trailing 60 minutes of data to accommodate any records which may
have arrived late. hive> create temporary table cdr_summary_temp as select
timemachine(call_ts, 5, 0) as call_ts,
geo_id,
cust_id,
vend_id,
count(*) as cnt,
sum(cnct) as sum_cnct,
sum(early_event) as sum_ee,
sum(rte) as sum_rte,
max(rte) as max_rte,
sum(duration) as sum_dur,
sum(i_pdd) as sum_ipdd,
sum(e_pdd) as sum_epdd
from cdr_fact where call_ts >= unix_timestamp(timemachine(current_timestamp, 5, -12))
group by timemachine(call_ts, 5, 0), geo_id, cust_id, vend_id;
Table default.cdr_summary_temp stats: [numFiles=1, numRows=68707, totalSize=661741, rawDataSize=4031686]
OK
Time taken: 33.769 seconds Note the unix_timestamp() function needed to wrap our
timemachine UDF call. This is due to the way Phoenix
serializes timestamps so we need to account for that when applying the
predicate. At this point my summary data is stored in a temp table
which may or may not contain different values from previously calculated
rollups due to late arriving records. I’ll use the new SQL Merge functionality
in Hive to move these records into permanent Hive managed table that is backed
by ORC. Using ORC as the storage format will allow this table to benefit from
the caching features offered by LLAP. I know the users will tend to view this data along the
geography dimension but due to the high cardinality of the geography id, I will
bucket on this rather than partition my table by this value. hive> CREATE TABLE cdr_summary(
call_ts timestamp,
geo_id int,
cust_id smallint,
vend_id smallint,
cnt bigint,
sum_cnct bigint,
sum_ee bigint,
sum_rte bigint,
max_rte tinyint,
sum_dur double,
sum_ipdd bigint,
sum_epdd bigint)
CLUSTERED BY (geo_id) INTO 32 BUCKETS
stored as ORC
TBLPROPERTIES ("orc.compress"="ZLIB", "transactional"="true"); Obviously this create statement is only a one-time step but
now with this table in place I can go ahead and execute my merge statement. hive> merge into cdr_summary as t using cdr_summary_temp as s on s.call_ts = t.call_ts and s.geo_id = t.geo_id and s.cust_id = t.cust_id and s.vend_id = t.vend_id
when matched then update set
cnt = s.cnt,
sum_rte = s.sum_rte,
max_rte = s.max_rte,
sum_cnct = s.sum_cnct,
sum_ee = s.sum_ee,
sum_dur = s.sum_dur,
sum_ipdd = s.sum_ipdd,
sum_epdd = s.sum_epdd
when not matched then insert
values (s.call_ts, s.geo_id, s.cust_id, s.vend_id, s.cnt, s.sum_cnct, s.sum_ee, s.sum_rte, s.max_rte, s.sum_dur, s.sum_ipdd, s.sum_epdd);
Table default.cdr_summary stats: [numFiles=256, numRows=0, totalSize=3066701, rawDataSize=0]
OK
Time taken: 26.313 seconds You can see the execution time on the rollup plus the the merge
statement is right around a minute so it’s conceivable to set this up on
schedule to poll every few minutes in order to keep the summary table up to
date. I can encapsulate the UDF declaration, rollup and merge queries into a SQL
script and schedule this with Oozie or a local cron job. I used a temporary
table in roll-up query to specifically avoid having to clean up after the fact. Now that I have some data to work with, an interesting metric to track
in our summarized data set is Answer Seizure Ratio (ASR) which is a measure of
completed calls to total routing attempts. This can be used to determine how
efficient a telco routing network is and help determine when downstream vendors
aren’t completing calls. This info will allow admins to make adjustments to
routing tables in trouble areas. We can look at this over time and across area
codes. I will put the data in one hour buckets. hive> select
timemachine(s.call_ts, 60, 0),
g.state,
g.npa,
sum(cnt) as totalRouteAttempts,
sum(s.sum_cnct)/sum(cnt) as ASR,
sum(s.sum_cnct)/(sum(cnt)-sum(s.sum_ee)) as AdjustedASR
from cdr_summary s inner join geography_dim g on s.geo_id = g.geo_id
where s.call_ts >= timemachine(current_timestamp, 60, -24)
group by timemachine(s.call_ts, 60, 0), g.state, g.npa;
2017-04-27 18:00:00 AB 403 34 0.20588235294117646 0.22580645161290322
2017-04-27 18:00:00 AB 780 117 0.358974358974359 0.4158415841584158
2017-04-27 18:00:00 AK 907 198 0.3383838383838384 0.6836734693877551
2017-04-27 18:00:00 AL 205 158 0.5189873417721519 0.6356589147286822
2017-04-27 18:00:00 AL 251 115 0.5391304347826087 0.6739130434782609
2017-04-27 18:00:00 AL 256 106 0.44339622641509435 0.5662650602409639
2017-04-27 18:00:00 AL 334 130 0.4307692307692308 0.5137614678899083
2017-04-27 18:00:00 AR 479 50 0.5 0.6097560975609756
2017-04-27 18:00:00 AR 501 83 0.4457831325301205 0.6166666666666667 This executes against the
abbreviated dataset rather than the full fidelity fact table so the result is
almost immediate. A BI report riding on top of this will quickly be able to
locate problem areas and allow us to drill in to identify the troublesome
vendor. hive> select
s.vend_id,
sum(s.cnt) as totalRouteAttempts,
sum(s.sum_cnct) as totalConnects,
sum(s.sum_cnct)/sum(cnt) as ASR
from cdr_summary s inner join geography_dim g on s.geo_id = g.geo_id
where timemachine(s.call_ts, 60, 0) = '2017-04-27 18:00:00' and
g.npa = '907'
group by vend_id
order by totalRouteAttempts desc;
9154 89 15 0.16853932584269662
8573 41 29 0.7073170731707317
9172 27 0 0.0
8805 17 12 0.7058823529411765
9171 14 10 0.7142857142857143
8737 1 1 1.0
9149 1 0 0.0 I’ve left out the vendor details on purpose but this could be easily
added via a second join. I’ll point out here too that the dimension tables
referenced in these queries are externally defined Hive tables over Phoenix so
it’s possible to join between Hive managed tables and those residing in
Phoenix. In the example above we see that vendor 9154 was given 89 calls to
route but only succeeded in connecting 15 of those and vendor 9172 connected 0 of 27 route attempts. With this information, the
user can modify the call routing table to temporarily yank the vendors or push
them further down the list. This should result in the ASR for the given area
code to come back within an acceptable range. The customer could also begin to
a vendor’s historic performance across for optimal route placement without
ever having to drop down into the high fidelity fact table. However, when such a
query is necessary, the customer would have well defined predicates to apply to
efficiently execute against that data in Phoenix. There are a number of enhancements that exist in
Phoenix (secondary indexes), Hbase (date tiered compaction, read/write buffer
allocation) and Hive (table partitioning) not addressed in this article but I
think this architecture serves as a great jumping off point for building a time
series oriented data collection and processing solution. This design allows you
to leverage the strengths of both systems; Hbase for fast and scalable ingest
and Hive with LLAP for high concurrency and low latency ad-hoc data access.
... View more
Labels:
03-24-2017
01:53 PM
10 Kudos
This past year Hadoop celebrated it 10th birthday. At Hadoop
Summit we talked about how far Hadoop has come from its origins as a Google
whitepaper to its initial implementation at Yahoo to where it is today,
powering more than half of fortune 500 companies and being one, if not the most
disruptive technologies of this century so far. In that time, we’ve seen platform
improvements such as the elimination of the single point of failure in the Name
Node, YARN opened up the platform for mixed workloads and the proliferation of
new distributed computing frameworks beyond MapReduce and advances in hardware
has increased speed and computing density of worker nodes. For those of you not in the know, when you write to HDFS,
Hadoop’s distributed file system, your documents and data files are broken into
blocks. For each of those blocks 2 additional copies are created and all the
blocks are distributed across the data nodes in the cluster. There is a scheme
to how the blocks are scattered but it’s not important to this topic. This 3x
data replication is designed to serve two purposes: 1) provide data redundancy
in the event that there’s a hard drive or node failure. 2) provide availability
for jobs to be placed on the same node where a block of data resides. The
downside to this replication strategy obviously requires us to adjust our
storage to compensate. Because we need to maintain a balance of overall compute
to storage, we simply can’t just triple the raw storage per node but usually
have to triple the number of nodes of the cluster overall which drives up the
cost of the cluster both in terms of hardware and support. So with the maturation of the platform and hardware advances
over the last decade is HDFS 3x replication still necessary? Can we safely
reduce replication and still realize redundancy and availability? Redundancy Hadoop engineers early on eschewed expensive RAID solutions
for redundancy opting instead to build replication directly into the file
system through block replication. Hard drives fail and in clusters with lots of
disks failures are bound to happen weekly or maybe even daily. For arguments
sake we’ll skip the scenario where an entire node is unresponsive which is most
likely due to a network issue and just assume were talking about HDD failures.
When a drive becomes unresponsive the Name Node assumes the blocks on that disk
are lost and kicks off processes to locate the backup copies throughout the
cluster and copy them to ensure the replication factor is maintained. Only one
backup copy is needed to execute this action, not two. With 2x replication for
a block to be “permanently lost”, a concurrent failure of the disk that
contained the only backup copy would need to occur before that copy was
replicated. The extra backup ensures that a simultaneous failure of a
drive where the first backup lives can still be recovered from but what’s the likelihood
of such an event and does it justify the costs imposed? HDD resiliency has no doubt
improved in the last 10 years reducing the frequency for drive failures and enterprise
clusters now-a-days are typically networked with at least gigabit ethernet so
the replication process is very fast thus shrinking the window in which a
simultaneous disk failure would have to occur. Additionally, organizations more
and more are leveraging cloud services and backup features to snapshot data and
copy it off site in cases of a system wide data center outages so “lost blocks”
aren’t necessarily truly lost. Availability/Locality When MapReduce or other frameworks execute against a block
of data, it’s preferable to do so directly on the node where the data lives.
When you have more copies of the data spread across the cluster, you increase
your chances of finding a node where that data lives AND has available
resources for your application to run. Alternatively, by increasing the number
of jobs that can execute concurrently on a node, you effectively increase
availability to all the blocks hosted on that node. With the growth of cores
per socket and cheap memory, typical enterprise nodes of today are at least 6x
larger than early Hadoop worker nodes. This increase in node-job density more
than offsets the effects of reducing an additional block copy. Benefits What kind of benefits could you expect by reducing the
replication factor of HDFS? Cost – By far this is the most recognizable
savings. It’s reported that IoT and other data sources are doubling the amount
of data generated every year. By extension, this imposes a 6x increase in
storage requirements in your cluster, compression notwithstanding. And again,
you simply don’t just add more storage to a node to compensate as this will
skew the ratio of compute to storage. To scale storage in a balanced manner,
users end up having to add worker nodes which translates directly to hardware
and support costs. Performance – Every block write that occurs in
the cluster effectively creates 2x additional network traffic as a result of
block replication. This write traffic isn’t just limited to the typical writes
by clients; It effects all writes including application log files and
intermediate job processes as well. You eliminate 33% of write related network
traffic by decreasing replication from 3 to 2. This also decreases memory pressure on
the Name Node service which is responsible for assigning the location of and
keeping track of each and every block in the cluster. Unfortunately, modifying the replication factor this is an
all or nothing change. You can use the hdfs
dfs -setrep command to adjust the replication factor to an existing file or
files within a directory but the setting isn’t sticky. Any new files that get
created going forward will fall back to the current dfs.replication setting. JIRA HDFS-199 proposes a directory by directory level replication
factor but the status of this feature is currently unresolved. Undoubtedly this
would be ideal for /tmp or other staging/working directories which could be
optimized while still retaining the higher settings for critical HBase and
managed Hive directories. If you would like to see this added, I encourage you
to log into the JIRA site and up-vote this feature. You can start by making this change to non-critical dev or
test environments as a way to evaluate hardware resiliency and measure the
proposed benefits. Additionally, consider dropping replication to 2 or even 1
in virtualized environments where node “local” disks are actually backed by
network attached storage that may employ data redundancy features implicitly.
In such cases the availability argument goes out the window as all disk reads
are non-local anyway. In
summary, I question if HDFS 3x replication is still relevant today or just a
hold-over from days gone by. Advances in network speed and hardware resiliency along
with backup strategies allow us to safely reduce our reliance on redundant
blocks. Users can realize increased performance in their clusters and this
further improves the ROI offered by Hadoop. In the next article, we’ll look at
some of the self-healing features of HDFS and new developments that will help
shrink storage demands in upcoming versions.
... View more
Labels:
02-27-2017
08:43 PM
@Manish Anand it could be that in trying to re-run the ambari-server install you inadvertantly corrupted the postgreSQL database that was part of the Sandbox. It makes more sense to blow this away and reimport the Sandbox image rather than try and troubleshoot the problem.
... View more
01-16-2017
06:59 PM
2 Kudos
In addition to Josh's recommendations, the configuration details in this KB article are also relevant to setting up Spark-to-HBase connectivity in a secure environment.
... View more
11-14-2016
09:49 PM
Flow files content is written in the content repository. When a node
goes down, NiFi cluster manager will route the data to another node.
However, NiFi does not replicate data like Kafka. The queued data for
the failed node will still be queued for failed node. Only that data
must be manually sent over to the live node in the cluster or just bring
the failed node up. Any new data, will automatically be routed to other
nodes in the cluster by NiFi Cluster Manager (NCM). http://alvincjin.blogspot.com/2016/08/fault-tolerant-in-apache-nifi-07-cluster.html
... View more