Member since
05-22-2019
70
Posts
24
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1186 | 12-12-2018 09:05 PM | |
1222 | 10-30-2018 06:48 PM | |
1764 | 08-23-2018 11:17 PM | |
7554 | 10-07-2016 07:54 PM | |
2062 | 08-18-2016 05:55 PM |
12-31-2017
11:57 PM
Hive
is very powerful, but sometimes you need to add some procedural code for a
special circumstance such as complex parsing of a field. Hive provides the
ability to easily create User Defined Table Functions (UDFT’s). These allow you
to transform your Hive results, pass them through the UDTF and return data as a
set of rows that can then be used like any other Hive result set. These can be
written in Java or Python, and we will use Python for this article. However,
the techniques here are applicable to both with some syntax changes. There
are a lot of great articles on building these such as https://community.hortonworks.com/articles/72414/how-to-create-a-custom-udf-for-hive-using-python.html. These
pretty much work as advertised, but don’t get into how to debug or troubleshoot
your code. However, when you get into any significant logic (and sometimes not
so significant!), you are likely to create a few bugs. This can be an issue
parsing, or in the case of Python, can even be syntax errors in your code! So
we are going to look at two techniques that can be used to debug these UDTF’s. The problem When
Hive encounters an error in the UDTF it simply blows up with a pretty confusing
error. The underlying error is hidden, and you are left scratching your head.
The error will probably look something like: Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error
occurred when trying to close the Operator running your custom script. Or
from a Yarn perspective: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143 NOT VERY HELPFUL!!! Our
test scenario We
need to parse json text stored in a column of a Hive table into columns. Hive
serde’s can handle most json formats, but there are still a few outlier
situations, where you need a more flexible json parser. Quick UDTF Review First, two important things to remember about Hive UDTF’s: Hive sends data in to your UDTF through stdin as strings of column values separated by tabs Your UDTF sends rows back to Hive through stdout as column values separated by tabs So, this means that if you are getting an error in your UDTF, you can’t just print debug statements to stdout. Hive is expecting its’ output here, and just won’t print them. Rather, it would cause a format error. The Table CREATE EXTERNAL TABLE `default.events`(`json_text` string)
STORED AS TEXTFILE
LOCATION '/tmp/events'; The Data { "deviceId":
"13a46b21-9528-4eb1-93bd-303a3b3e6b6a", "events": [ {
"Process_Started": { "timestamp":
"2017-06-01T18:26:24.444Z" } }, { "Process_Stopped": {
"timestamp": "2017-06-01T18:26:24.444Z",
"errorReason": "-1", "errorMsg": "The
operation couldn’t be completed." } } ] }{ "deviceId":
"9cd57d50-4d0e-457e-9fd3-05b9e56644e6", "events": [ {
"Process_Started": { "timestamp":
"2017-06-02T00:20:20.400Z" } }, { "Process_Completed": {
"timestamp": "2017-06-02T02:20:29.020" } } ] } The Query We will save this in select_json.hql DELETE FILE /home/<your id>/parse_events.py;
ADD FILE /home/<your id>/parse_events.py;
SELECT TRANSFORM (json_text)
USING 'python parse_events.py'
AS deviceId, eventType, eventTime, errorReason, errorMsg
FROM default.events; The UDFT #!/usr/bin/python
##################################################################################################
Hive UDTF to parse json data
##################################################################################################
import sys
import json
reload(sys)
sys.setdefaultencoding('utf8')
def parse_json(json_string):
j = json.loads(json_string)
deviceId=j["deviceId"]
events=j["events"]
# Force a stupid error!
x=1
y=0
z=x/y
# Flatten Events Array
for evt in events:
try:
eventType = evt.keys()[0]
e = evt[eventType]
edata = []edata.append(eventType)
edata.append(e.get("timestamp",u''))
edata.append(e.get("errorReason",u''))
edata.append(e.get("errorMsg",u''))
# Send a tab-separated string back to Hive
print u'\t'.join(edata)
except Exception as ex:
sys.stderr.write('AN ERROR OCCURRED IN PYTHON UDTF\n %s\n' % ex.message)
def main(argv):
# Parse each line sent from Hive (note we are only receiving 1 column, so no split needed)
for line in sys.stdin:
parse_json(line)
if __name__ == "__main__":
main(sys.argv[1:]) Let's Run It! Here's a hint, Python should throw and error "ZeroDivisionError: integer division or modulo by zero". Assuming you have saved the query in select_json.hql, this would go something like this: hive -f select_json.hql
...
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
Task with the most failures(4):
-----
Task ID:
task_1514310228021_3433_m_000000
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: Hive Runtime Error while closing operators
at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:210)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script.
at org.apache.hadoop.hive.ql.exec.ScriptOperator.close(ScriptOperator.java:560)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:631)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:631)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:631)
at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:192)
... 8 more Nothing about divide by Zero anywhere. Ugh! TECHNIQUE 1 - Forget Hive! You are writing a Hive UDTF, but you are also just writing a program that reads from stdin and writes to stdout. So, it is a great idea to develop your logic completely outside of Hive, and once you have adequately tested you can plug it in and continue development. The easiest way to do this, which also allows you to test later with no changes is to pull out the data that Hive would send your UDTF and feed it to stdin. Given our table, it could be done like this: hive -e "INSERT OVERWRITE LOCAL DIRECTORY '/tmp/events'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
SELECT json_text FROM default.events;"
-bash-4.1$ ls -l /tmp/events
total 4
-rw-r--r-- 1 screamingweasel screamingweasel 486 Dec 31 23:02 000000_0
cat /tmp/events/*
{ "deviceId": "13a46b21-9528-4eb1-93bd-303a3b3e6b6a", "events": [ { "Process_Started": { "timestamp": "2017-06-01T18:26:24.444Z" } }, { "Process_Stopped": { "timestamp": "2017-06-01T18:26:24.444Z", "errorReason": "-1", "errorMsg": "The operation couldn’t be completed." } } ] }
{ "deviceId": "9cd57d50-4d0e-457e-9fd3-05b9e56644e6", "events": [ { "Process_Started": { "timestamp": "2017-06-02T00:20:20.400Z" } }, { "Process_Completed": { "timestamp": "2017-06-02T02:20:29.020" } } ] }
# DO THE ACTUAL TEST (note there may be >1 file in the directory)
cat /tmp/events/* | python parse_events.py
Traceback (most recent call last):
File "parse_events.py", line 42, in <module>
main(sys.argv[1:])
File "parse_events.py", line 39, in main
parse_json(line)
File "parse_events.py", line 18, in parse_json
z=x/y
ZeroDivisionError: integer division or modulo by zero Simple as that! Export the columns you will be passing to the UDTF to a tab-separated file and pipe it into your UDTF. This will simulate Hive calling your UDTF, but doesn't bury any error messages. In addition, you can print whatever debug messages you like to stdout or stderr to help in debugging. TECHNIQUE 2 - stderr is your friend! As noted, Hive expects the results from the UDTF to be in stdout. The stderr file is fair game for writing debug statements. This is pretty old school debugging, but it's still effective. Print out values and locations in your code to help you determine where the error occurs or what values are in variables at certain times. For example, you might add the following to the UDTF script to help identify where the issue is happening: sys.stderr.write("Before stupid error\n")
x=1
y=0
z=x/y
sys.stderr.write("After stupid error!\n") The trick is to find these in the logs when running on a Yarn cluster. These scripts are set to use mapreduce, which makes it a little easier, but basically, you find the Yarn job, drill down on one of the failed containers and examine its' stderr. Attached are some screen prints from the Yarn RM showing this process. Winner, Winner, Here are our debugging statements! SUPPORTING FILES FOR THIS ARTICLE ARE AVAILABLE ON GITHUB AT https://github.com/screamingweasel/articles/tree/master/udtf_debugging
... View more
Labels:
11-16-2017
08:24 PM
First of all, Nic Swart's comment is VERY IMPORTANT! Thanks much. Without the extra parentheses it just doesn't work (doesn't bother to tell you that the syntax of the ldap query is wrong, grrr!) Second, To be clear, the ldapRealm.xxx settings are recommended for use with Active Directory starting with Zeppelin 0.7. This can be a little confusing.
... View more
10-25-2017
05:03 PM
One thing I wish I had known when starting with python UDF's is that you can write to stderr to assist in debugging. Then look in the Yarn RM for the logs. import sys sys.stderr.write('>>>> Read a line \n' + line + '\n')
... View more
10-07-2017
05:52 PM
1 Kudo
Thanks! Very subtle difference, but obviously important to Spark! For everyone's reference, this tar command can be used to create a tar.gz with the jars in the root of the archive: cd /usr/hdp/current/spark2-client/jars/
tar -zcvf /tmp/spark2-hdp-yarn-archive.tar.gz *
# List the files in the archive. Note that they are in the root!
tar -tvf /tmp/spark2-hdp-yarn-archive.tar.gz
-rw-r--r-- root/root 69409 2016-11-30 03:31 activation-1.1.1.jar
-rw-r--r-- root/root 445288 2016-11-30 03:31 antlr-2.7.7.jar
-rw-r--r-- root/root 302248 2016-11-30 03:31 antlr4-runtime-4.5.3.jar
-rw-r--r-- root/root 164368 2016-11-30 03:31 antlr-runtime-3.4.jar
...
# Then upload to hdfs, fix ownership and permissions if needed, and good to go!
... View more
10-04-2017
10:25 PM
I was getting a zero-length error on /usr/hdp/apps/spark2/spark2-hdp-yarn-archive.tar.gz, which is documented as an issue
after some upgrades. So I created and uploaded the file to hdfs using the following commands: tar -zcvf spark2-hdp-yarn-archive.tar.gz /usr/hdp/current/spark2-client/jars/*
hadoop fs -put spark2-hdp-yarn-archive.tar.gz /hdp/apps/2.5.3.0-37/spark2/ Now when running any spark job in yarn (say the example pi app), I get the following error: Error: 'Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster' Other info: This is HDP 2.5.3 Running Spark 2.1
Upgraded from HDP 2.2.8 -> 2.4.3 -> 2.5.3 I believe the missing class is in spark/lib/spark-hdp-assembly.jar, but this does not exist. HERE'S THE WEIRD PART - If I completely remove the spark2-hdp-yarn-archive.tar.gz from HDFS then Spark jobs start to run again! So, here are the questions: Is this file (spark2-hdp-yarn-archive.tar.gz) needed? If so, any direction on correcting this error. Thanks in advance!
... View more
Labels:
- Labels:
-
Apache Spark
09-30-2017
05:28 AM
Bridging the Process Time – Event Time gap with Hive (Part 1) Synopsis Reconciling the difference between event time and
collection/processing time is critical to understand for any system that
analyses event data. This is important whether events are processed in batch or
near real-time streams. This post focuses on batch processing with Hive, and
demonstrates easily replicable mechanisms for bridging this gap. We will look at the issues surrounding this and prevent two repeatable
solution patterns using Hive and Hive ACID. This first post will look at the
issue and present the solution using Hive only, and the follow-up article will
introduce Hive ACID and a solution using that technology. Overview One of the most common big data
ingestion cases is event data, and as IoT becomes more important, so does this
use case. This is one of the most common
Hadoop use cases, but I have not found many detailed step by step patterns for
implementing it. In addition, I think it is important to understand some of the
thinking around events, and specifically, the gap between event time and
processing times. One of the key considerations in
event analysis is the difference between data collection time (process time)
and the time that the event occurred (event time.) A more formal definition might be: Event Time – The time that the event occurred Processing Time – The time that the event was observed in the
processing system In an ideal world, these two times would be the same or very
close. However, in the real world there is always some time lag or “skew”. And, this skew may be significant, and this
exists whether you are processing events in batches or in near real-time. This skew can be caused by many
different factors including Resource Limitations – Bandwidth, CPU, etc. may not allow events to
be immediately forwarded and processed. Software Features/Limitations – Software may be intentionally
programmed to queue events and send them at predetermined times. For example,
cable TV boxes that report information once or twice a day, or fitness trackers
that send some information, such as sleep data only daily. Network Discontinuity – Any mobile application needs to plan for
disruptions in Internet connectivity. Whether because of dead-spots in wireless
coverage, airplane-mode, or dead batteries, these interruptions can happen
regularly. To mitigate these, any good mobile app will queue event messages for
sending the next time that a connection is available, which may be minutes or
months! Time Windows Much of the current interest is around near real-time
ingestion of event data. There are many advantages to this, but a lot of use
cases only require event data to be processed in larger windows of data. That’s
is the focus of the remainder of this article. I was surprised to find a lack of posts about the mechanics
of dealing with event skew and reporting by event time in batch systems. So, I
wanted to layout some repeatable patterns that can be used for this. As you probably know, event streams are essentially unbounded
stream of logs. We often deal with this as a series of bounded datasets each
representing some time period. Our main consideration here is a batched process
that deals with large windows (15 min to 1 hour), but applies down to any
level, since we almost always analyze event data by time in the target system. The Problems There are two main issues in dealing with this—Completeness
and Restatement. Completeness—When
event data can come in for some time past the end of a time window, it is very
difficult to assess the completeness of the data. Most the data may arrive
within a period (hour or day) of the time window. However, data may continue to
trickle in for quite some time afterwards. This presents issues of
Processing and combining data that arrives over
time
Determining a cutoff when data is considered
complete As we can see in this figure, most event data is received in
the few windows after the event time. However, data continues to trickle in,
and in fact, 100% theoretical completeness may never be achieved! So, if we
were to report on the event data at day 3 and at day 7 the results would be
very different. Restatement—By
this we mean the restatement of data that has arrived and been grouped by
process time into our desired dimension of event time. This would not be an
issue if we could simply scan through all the data each time we want to analyze
it, but this becomes unworkable as the historical data grows. We need to find a
way to process just the newly arrived data and combine it with the existing
data. Other Requirements In addition, with
dealing with our two main issues, we want to a solution that will Be Scalable – Any solution must be able to scale to large volumes
of data, particularly as event history grows over time. Any solution that
relies on completely reprocessing the entire set of data will quickly become
unworkable. Provide the ability to reprocess data – Restating event data by Event
Time is pretty straightforward if everything goes right. However, if we
determine that source data was corrupt or needs to be reloaded for any reasons,
things get messy. In that case, we potentially have data from multiple
processing periods co-mingled for the same event time partition. So, to
reprocess a process period, we need to separate out those rows for the process
period and replace them, while leaving the other rows in the partition intact.
Not always an easy task with HDFS! As
an aside, to reprocess data, you need to keep the source around for a while. Pretty
obvious, but just saying! Sample Use Case and Data For an example use case we will use events arriving for a
mobile device representing streaming video viewing events. For this use case,
we will receive a set of files hourly and place them in a landing folder in
HDFS with an external Hive table laid on top. The processing (collection) time
is stamped into the filename using the format YYYYMMDDHH-nnnn.txt. This
external table will contain one period’s data at a time and serves as an
initial landing zone. We are also going to assume that we need to save this data
in detail, and that analysis will be done directly on the detailed data. Thus,
we need to restate the data by event time in the detail store. Raw Input Source Format Of particular interest is the event_time columns which is an ISO timestamp in the form: YYYY-MM-DDTHH:MM:SS.sssZ CREATE EXTERNAL TABLE video_events_stg (
device_id string,
event_type string,
event_time string,
play_time_ms bigint,
buffer_time_ms bigint)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/landing/video_events_stg';
https://raw.githubusercontent.com/screamingweasel/sample-data/master/schema/video_events_stg.hql Detailed Table Format CREATE TABLE video_events (
device_id string,
event_type string,
event_time string,
play_time_ms bigint,
buffer_time_ms bigint)
PARTITIONED BY (
event_year string,
event_month string,
event_day string,
event_hour string,
process_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
wget https://raw.githubusercontent.com/screamingweasel/sample-data/master/schema/video_events.hql Sample Data I have put together three files, each containing one hour of
processing data. You can pull them from GitHub and load the first hour into
hdfs. mkdir -p /tmp/video
cd /tmp/video
wget
https://raw.githubusercontent.com/screamingweasel/sample-data/master/video/2017011200-00001.txt
wget
https://raw.githubusercontent.com/screamingweasel/sample-data/master/video/2017011201-00001.txt
wget https://raw.githubusercontent.com/screamingweasel/sample-data/master/video/2017011202-00001.txt
hadoop fs -rm -r /landing/video_events_stg
hadoop fs -mkdir -p /landing/video_events_stg
hadoop fs -put /tmp/video/2017011200.00001.txt /landing/video_events_stg/ Solutions Let’s look at two possible solutions that meet our criteria
above. The first utilizes Hive without the newer ACID features. The second post
in this series details how to solve this using Hive ACID. Per our requirements,
both will have to restate the data as it is ingested into the detailed Hive
table and both must support reprocessing of data. Solution 1 This solution uses pure Hive and does not rely on the newer
ACID transaction feature. As noted one hour’s worth of raw input may contain
data from any number of event times. We want to reorganize this and store it in
the detailed table partitioned by event time for easy reporting. This can be
visualized as: Loading Restatement We are going to achieve this through Hive Dynamic
Partitioning. Later versions of Hive (0.13+) support efficient dynamic partitioning
that can accomplish this. Dynamic partitioning is, unfortunately, a bit slower
than inserting to a static fixed partition. Our approach of incrementally
ingesting should mitigate this, but you would need to benchmark this with your
volume. set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=true;
INSERT INTO TABLE default.video_events
PARTITION (event_year, event_month, event_day, event_hour, process_time)
SELECT device_id,event_type,
CAST(regexp_replace(regexp_replace(event_time,'Z',''),'T',' ') as
timestamp) as event_time,
play_time_ms,
buffer_time_ms,
substr(event_time,1,4) AS event_year,
substr(event_time,6,2) AS event_month,
substr(event_time,9,2) AS event_day,substr(event_time,12,2) AS event_hour,substr(regexp_extract(input__file__name, '.*\/(.*)', 1),1,10) AS process_timeFROM default.video_events_stg; You can see from a “show partitions” that three partitions
were created, one for each event time period. Show partitions default.video_events;
event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200
event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200
event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200
Now let’s process the rest of the data and see the results:
hadoop fs -rm -skipTrash /landing/video_events_stg/*
hadoop fs -put /tmp/video/2017011201-00001.txt /landing/video_events_stg/
hive -f video_events_insert.hql
hadoop fs -rm -skipTrash /landing/video_events_stg/*
hadoop fs -put /tmp/video/2017011202-00001.txt
/landing/video_events_stg/
hive -f video_events_insert.hql show partitions default.video_events;
event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011201 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011201 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011202 event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011201 event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011202 event_year=2017/event_month=01/event_day=12/event_hour=01/process_time=2017011202 select count(*) from default.video_events 3000 So, we can see that our new data is being nicely added by
event time. Note that now there are multiple partitions for the event hour,
each corresponding to a processing event. We will see how that is used in the
next section. Reprocessing In order to reprocess input data for a specific process
period, we need to be able to identify that data in the restated detail and
remove it before reprocessing. The approach we are going to take here is to
keep the process period as part of the partition scheme, so that those partitions
can be easily identified. In this case, the partitioning would be:
Event Year
Event Month
Event Day
Event Hour
Process Timestamp (concatenated) Ex. year=2017/month=01/day=10/hour=01/process_date=2017011202 year=2017/month=01/day=12/hour=01/process_date=2017011202 year=2017/month=01/day=12/hour=02/process_date=2017011202 This makes it fairly simple to reprocess a period of source
data.
1.List
all the partitions of the table and identify ones from the specific processing
hour to be reprocessed.
2.Manually
drop those partitions.
3.Restore
the input data and reprocess the input data as normal Let’s assume that the data for hour 2017-01-12 01 was
incorrect and needs reprocessed. From the show partitions statement, we can see
that there are three partitions containing data from that processing time.
event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011201 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011201 event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011201 Let’s drop ‘em and see what we get ALTER TABLE default.video_events DROP PARTITION (event_year='2017',event_month='01',event_day='1',event_hour='22',process_time='2017011201');
ALTER TABLE default.video_events DROP PARTITION (event_year='2017',event_month='01',event_day='11',event_hour='23',process_time='2017011201');
ALTER TABLE default.video_events DROP PARTITION (event_year='2017',event_month='01',event_day='12',event_hour='00',process_time='2017011201'); show partitions video_events;
event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011202 event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011202 event_year=2017/event_month=01/event_day=12/event_hour=01/process_time=2017011202 select count(*) from default.video_events 2000 Now, finally let’s put that data back and reprocess it. hadoop fs -rm -skipTrash /landing/video_events_stg/* hadoop fs -put /tmp/video/2017011201-00001.txt /landing/video_events_stg/ hive -f video_events_insert.hql show partitions default.video_events;
event_year=2017/event_month=01/event_day=11/event_hour=21/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=22/process_time=2017011201 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011200 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011201 event_year=2017/event_month=01/event_day=11/event_hour=23/process_time=2017011202 event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011201 event_year=2017/event_month=01/event_day=12/event_hour=00/process_time=2017011202 event_year=2017/event_month=01/event_day=12/event_hour=01/process_time=2017011202 select count(*) from default.video_events 3000 Comments on this Solution One drawback of this solution is that you may end up with
small files as event trickle in for older event times. For example, if you only
get a handful of events that come in 4 weeks after the event time, you are
going to get some very small files, indeed! Our next solution will overcome
that issue by using Hive ACID. Conclusion When handling event data, we must always be aware of the
skew between event time and processing time in order to provide accurate
analytics. Our solution to restating the
data in terms of event time must be scalable, performant, and allow for
reprocessing of data. We looked at one solution using plain Hive and partitioning.
In the next of this series we will look at Hive ACID transactions to develop a
more advanced and simpler solution. Accompanying files can be found at: https://github.com/screamingweasel/articles/tree/master/event_processing_part_1
... View more
Labels:
02-23-2017
04:46 PM
Yes, if you want to be more restrictive you could use the user hdfs or @hadoop to indicate any user in the hadoop group.
... View more
02-17-2017
09:23 PM
1 Kudo
Centralized Cache Management in HDFS is a mechanism that
explicitly caches specific files or directories in memory for improved
performance. This is useful for relatively small files that are accessed
repeatedly. For example, reference/lookup tables or fact tables that are used
in many joins. Once enabled, HDFS will automatically cache selected files, and
periodically check for changes and recache the files. While HDFS and the underlying file system do some caching of
files when memory is available, explicit caching using Centralized Cache
Management prevents the data from being evicted from memory when processes
consume all of the physical memory. As a corollary of this, if you ARE working
on a lightly loaded system where there is free memory, you may not see any
performance improvement from this method, as the data was already in disk
cache. So, your performance testing needs to stress the system. Let’s look at some key terms and concepts: Cache Pools A cache pool is an administrative entity
used to manage groups of cache directives. One of the key attributes of the
pool it the maximum number of bytes that can be cached for all directives in
this pool. Cache pools can be managed from the command line using the
hdfs cacheadmin utility. Some common commands include: hdfs cacheadmin -addDirective -path <path> -pool <pool-name> [-force] [-replication <replication>] [-ttl <time-to-live>]
hdfs cacheadmin -listPools -stats
hdfs cacheadmin -removeDirective <id>hdfs cacheadmin -listDirectives [-stats] [-path <path>][-pool <pool>] Cache Directives A cache directive defines a path that
should be cached. This can be either a specific file or a single directory.
Note that directives are not recursive—They apply to a single directory only,
not any sub-directories. So, they would usually be applied to the lowest level
directory that contains the actual data files. Cache directives can be managed from the command line using
the hdfs cacheadmin utility. Some common commands include: hdfs cacheadmin -addPool <name> [-owner <owner>] [-group <group>] [-mode <mode>] [-limit <limit>][-maxTtl <maxTtl>
hdfs cacheadmin -removePool <name>
hdfs cacheadmin -listPools [-stats] [<name>] HDFS Configuration Settings There is really only
one Hadoop configuration setting that is required to turn on Centralized Caching.
There are a few others to control the frequency that caching looks for new
files, which you can usually leave at default. The following, which is added to
the custom hdfs-site.xml, specifies the maximum number of bytes that can be
cached on each datanode. dfs.datanode.max.locked.memory Remember
that this value is in bytes, in contrast with the OS limits which are set in
KB. OS Limits Before you implement Centralized Caching, you need to ensure
that the locked memory setting on each of the datanodes is set to a value equal
or greater than memory specified in the hdfs dfs.datanode.max.locked.memory. On
each datanode run the following to determine the limit for locked memory. This
will return a value in KB or “unlimited”. ulimit -l To set this, you need to add the following to /etc/security/limits.conf.
This is for Centos/Red Hat and may be different if you are using another Linux
distro. The setting will take effect when you log out of a terminal session and
log back in. * hard memlock 1048576
* soft memlock 1048576 How to Implement Let’s walk through an example. 1. Set memlock limits on each datanode. This will take effect after you logout and login again. # On each datanode (max cacheable memory in KB) example for 1.0 GB
echo "* hard memlock 1048576" >> /etc/security/limits.conf
echo "* soft memlock 1048576" >> /etc/security/limits.conf
2. Create a folder to be cached hadoop fs -mkdir -p /cache/usstates
hadoop fs -chmod -R 777 /cache
3. Create a Cache Pool hdfs cacheadmin -addPool testPool -mode 0777 -maxTtl never
4. Create one or more Cache Directives hdfs cacheadmin -addDirective -path /cache/usstates -pool testPool -replication 3 -ttl never
5. Change HDFS configurations to support Caching. Add the following to HDFS configs, Custom hdfs-site.xml in
Ambari. This example is for 0.5 GB (in bytes) dfs.datanode.max.locked.memory=536870912
6. Restart HDFS using Ambari 7. Get some test data and load into cached
directory wget http://www.fonz.net/blog/wp-content/uploads/2008/04/states.csv
# Strip the column headers and double quotes
tail -n +2 states.csv > usstates.csv
sed -i 's/\"//g' usstates.csv
hadoop fs -put –f usstates.csv /cache/usstates
8. Look for cached data. You should see a value in BYTES_CACHED and
FILES_CACHED. hdfs cacheadmin -listDirectives -stats
Found 1 entry
ID POOL REPL EXPIRY PATH BYTES_NEEDED BYTES_CACHED FILES_NEEDED FILES_CACHED
1 testPool 3 never /cache/usstates 2550 2550 1 1
9. Query the data with Hive (Optional) CREATE EXTERNAL TABLE states (state_name string, state_cd string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/cache/usstates';
select * from usstates; After that you can update the file (add some dummy states)
and re-upload to hadoop, and verify that the changed are picked up, add
additional folders, files, etc. and generally experiment. You can performance
test on your particular system, remembering that you may not see much
difference unless the memory is used forcing the normal disk cache to be
evicted from memory. References: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.3/bk_hdfs-administration/content/ch03.html
... View more
Labels:
12-16-2016
12:53 AM
1 Kudo
We have two Clusters, H6 and H7 that have been linked for HA
distcp as well as HBase sharing as described in http://henning.kropponline.de/2015/03/15/distcp-two-ha-cluster/. When running an hdfs rebalancer command on one of the
clusters, it attempts to run the command ON BOTH CLUSTERS.
Is this expected behavior? If this is expected, then will the rebalancer
command be run separately, as if we were executing on two separate cluster. Specifically, we want to make sure that it is
not going to try to balance ACROSS the clusters and mix up data blocks Is there any way to prevent this behavior. We
are thinking of a custom hdfs-site.xml with only one clusters info in the path
when running rebalancer. Thoughts? Here is the configuration, assuming two clusters named h6 and h7. Component H6 H7 Name Service nn-h6 nn-h7 HA Namenode Name nn-1, nn2 nn-1, nn2 On H6 the following
have been added to the hdfs-site.xml file: dfs.nameservices
nn-h6,nn-h7 dfs.ha.namenodes.nn-h7=nn1,nn2 dfs.client.failover.proxy.provider.nn-h7= org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.namenode.http-address.nn-h7.nn1=hdpnn-h7-example.com:50070 dfs.namenode.http-address.nn-h7.nn2=hdpnn-h7-awsw02.example.com:50070 dfs.namenode.https-address.nn-h7.nn1=hdpnn-h7-example.com:50470 dfs.namenode.https-address.nn-h7.nn2=hdpnn-h7-awsw02.example.com:50470 dfs.namenode.rpc-address.nn-h7.nn1=hdpnn-h7-example.com:8020 dfs.namenode.rpc-address.nn-h7.nn2=hdpnn-h7-awsw02.example.com:8020 On H7 the following
have been added to the hdfs-site.xml file: dfs.nameservices
nn-h7,nn-h6 dfs.ha.namenodes.nn-h6=nn1,nn2 dfs.client.failover.proxy.provider.nn-h6= org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.namenode.http-address.nn-h6.nn1=hdpnn-h6-example.com:50070 dfs.namenode.http-address.nn-h6.nn2=hdpnn-h6-awsw02.example.com:50070 dfs.namenode.https-address.nn-h6.nn1=hdpnn-h6-example.com:50470 dfs.namenode.https-address.nn-h6.nn2=hdpnn-h6-awsw02.example.com:50470 dfs.namenode.rpc-address.nn-h6.nn1=hdpnn-h6-example.com:8020 dfs.namenode.rpc-address.nn-h6.nn2=hdpnn-h6-awsw02.example.com:8020
... View more
Labels:
- Labels:
-
Apache Hadoop