Member since
06-12-2017
16
Posts
9
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1485 | 06-28-2017 07:55 AM |
02-04-2020
11:57 AM
@stevenmatison I'm not a 100% sure, but looking at NiFi 1.11.0, I can see the following list of JAR files: ./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-column-1.10.0.jar
./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-format-2.4.0.jar
./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-encoding-1.10.0.jar
./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-common-1.10.0.jar
./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-avro-1.10.0.jar
./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-jackson-1.10.0.jar
./nifi-parquet-nar-1.11.0.nar-unpacked/NAR-INF/bundled-dependencies/parquet-hadoop-1.10.0.jar
... View more
12-04-2019
05:54 AM
1 Kudo
Introduction
Knowing the performance of Kafka in general or on your hardware is an important part of capacity planning. Sizing can be hard to calculate, with different message sizes, retention periods, partitions, replica factor, network speeds and even synchronous vs asynchronous to pick from. So many decisions to make, but what type of performance can you realistically expect ?
Benchmarking for Kafka has been done before, however this is a couple of years out of date. There are newer versions of Kafka, newer hardware, faster networking all which have improved performance by many factors. But how ? This article will run the same tests as before with the latest and greatest to see what the improvements are.
Hardware specification:
Cloud Provider
AWS using m5.4xlarge * 6 machines
OS
Centos 7
CPU
16 cores
RAM
64GB
Disk
50GB OS, 6 * 20 GB SSD IO Optimised for Kafka log dirs
Networking
10GB/s
Kafka Version
CDH 6.3 with Kafka 2.3
Message size
100 bytes per message
3 of the 6 machines will participate as Kafka brokers in a single cluster, and the remaining three nodes will be producers / consumers. Given that we have 10GB/s networking, we should be able to push almost 30GB/s across the three Kafka nodes, and 60GB/s across all the nodes.
To make the testing simpler, I've set some environment variables to re-use in running each command:
BOOTSTRAP=10.0.0.4:9092,10.0.0.5:9092,10.0.0.6:9092
First we need to create the various topics, depending on the partition count and replicas:
kafka-topics --bootstrap-server ${BOOTSTRAP} --create --topic test-rep-one --partitions 6 --replication-factor 1
kafka-topics --bootstrap-server ${BOOTSTRAP} --create --topic test-rep-three --partitions 6 --replication-factor 3
kafka-topics --bootstrap-server ${BOOTSTRAP} --create --topic test-7k --partitions 18 --replication-factor 3
Kafka ships with two handy scripts you can use to test your cluster, kafka-producer-perf-test and kafka-consumer-perf-test
Test results
Single producer, no consumer, no replication
1 628 081 msg/sec (155.27 MB/sec)
Single producer, no consumer, 3x async replication
1 463 136 msg/sec (140.46 MB/sec)
Single producer, no consumer, 3x syncronous replication
1 226 439 msg/sec (125.11 MB/sec)
3 producers, no consumer, 3 asynchronous replication
3 960 110 msg/sec (377.69 MB/sec)
No producer, single consumer
4 096 100 msg/sec (390.63 MB/sec)
No producer, three consumers
11 813 321 msg/sec (1125 MB/sec)
At this point we've duplicated the testing with the original tests done, hopefully with much improved numbers. However, as the tests were done with 100 byte records, the tests were re-run with 7KB records and optimised Kafka settings (larger Heap size at 8GB, larger batch sizes and some snappy compression applied).
Optimised Kafka results, 7k records
6 producers, no consumers, 3x async replication (larger batch sizes, snappy compression)
1 070 970 msg/sec (7321 MB/sec)
0 producers, 6 consumers
963 071 msg/sec (6896 MB/sec)
Over 1 million messages a second, reading and writing 7kb per message. We've reached the networking limit!
The commands used for each test if you would like to reproduce yourself:
Test 1:
kafka-producer-perf-test --topic test-rep-one --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=0 bootstrap.servers=${BOOTSTRAP}
Test 2:
kafka-producer-perf-test --topic test-rep-three --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=0 bootstrap.servers=${BOOTSTRAP}
Test 3:
kafka-producer-perf-test --topic test-rep-three --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=${BOOTSTRAP}
Test 4 (run three instances in parallel, one on each node):
kafka-producer-perf-test --topic test-rep-three --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=0 bootstrap.servers=${BOOTSTRAP}
Test 5:
kafka-consumer-perf-test --broker-list ${BOOTSTRAP} --messages 50000000 --topic test-rep-three --threads 1 --timeout 60000 --print-metrics --num-fetch-threads 6
Test 6 (run three instances in parallel, one on each node):
kafka-consumer-perf-test --broker-list ${BOOTSTRAP} --messages 50000000 --topic test-rep-three --threads 1 --timeout 60000 --print-metrics --num-fetch-threads 6
Test 7 (run a producer on each node, including the Kafka brokers):
kafka-producer-perf-test --topic test-7k --num-records 50000000 --record-size 7168 --throughput -1 --producer-props acks=0 bootstrap.servers=${BOOTSTRAP} linger.ms=100 compression.type=snappy
Test 8 (run a consumer on each node, including the Kafka brokers):
kafka-consumer-perf-test --broker-list ${BOOTSTRAP} --messages 50000000 --topic test-7k --threads 1 --timeout 60000 --print-metrics --num-fetch-threads 18
... View more
Labels:
11-26-2019
04:09 AM
1 Kudo
In CM, the bootstrap file is auto generated when you start NiFi, so it is not really something you can edit. However you can set the same parameters through name=value pairs as Java args in CM:
... View more
11-25-2019
03:57 AM
Apache NiFi flow to recursively call the HDFS Namenode WebHDFS API to track disk consumption per user across the cluster for graphing, monitoring, capacity planning and alerting.
It can often times be hard to track the disk consumption per user on your HDFS filesystem. There isn't really a single command to tell you how much disk space a specific user is using across the filesystem (over many directories), nor can you easily track this over time to predict future consumption.
This article attempts to solve both problems. Using a NiFi flow to recursively pull the status of each file stored on HDFS, and storing that in a database for visualising through Superset and/or Grafana.
You can run REST queries against the HDFS active Namenode, and get a listing of all the files and directories at a specific location. Here is an example of query against the Namenode for '/user/willie':
curl -s 'http://localhost:50070/webhdfs/v1/user/willie/?op=liststatus' | python -m json.tool
{
"FileStatuses": {
"FileStatus": [
{
"accessTime": 1574672355184,
"blockSize": 134217728,
"childrenNum": 0,
"fileId": 34947,
"group": "willie",
"length": 2621440000,
"modificationTime": 1574672392189,
"owner": "willie",
"pathSuffix": "bigfile.dat",
"permission": "644",
"replication": 1,
"storagePolicy": 0,
"type": "FILE"
},
{
"accessTime": 0,
"blockSize": 0,
"childrenNum": 4,
"fileId": 34000,
"group": "willie",
"length": 0,
"modificationTime": 1574612331505,
"owner": "willie",
"pathSuffix": "dir1",
"permission": "755",
"replication": 0,
"storagePolicy": 0,
"type": "DIRECTORY"
},
{
"accessTime": 0,
"blockSize": 0,
"childrenNum": 1,
"fileId": 34001,
"group": "willie",
"length": 0,
"modificationTime": 1574603188421,
"owner": "willie",
"pathSuffix": "dir2",
"permission": "755",
"replication": 0,
"storagePolicy": 0,
"type": "DIRECTORY"
},
{
"accessTime": 0,
"blockSize": 0,
"childrenNum": 1,
"fileId": 34002,
"group": "willie",
"length": 0,
"modificationTime": 1574603197686,
"owner": "willie",
"pathSuffix": "dir3",
"permission": "755",
"replication": 0,
"storagePolicy": 0,
"type": "DIRECTORY"
}
]
}
}
This is very well documented at: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#List_a_Directory
There are two types of outputs in the sample JSON result, type=FILE and type=DIRECTORY. If type=FILE, then an attribute called length will tell us the filesize, and we know the path this file belongs to, as well as the owner of the file. With this we have enough information to store in a database, and over time we can track the consumption of all the files owned by a specific user.
Unfortunately the liststatus API call will only return the results for a specific location. If we want to query a subdirectory (like say /user/willie/dir1), we have to issue another REST API call, and so on for each directory on the filesystem. This means many API calls, recursively down each path until you reach the end.
Provided in this article is an example NiFi flow which does exactly that. It will recursively query the Namenode API through each directory path, and grab a listing of all the files, owners, file size and store in a relational database (MySQL in this example). It will construct a JSON string for each file, and by merging all the JSON strings together as a NiFi record, we can do very large bulk inserts into the database.
An example of a JSON string will look like this, and we merge thousands of them together in bulk:
{
"_owner" : "nifi",
"_group" : "hdfs",
"length" : 2205,
"basepath" : "/tmp/oxygenating2020_tweets/",
"filename" : "68b8531b-0caa-4b2e-942e-4d84dc5f6ebf.json",
"fullpath" : "/tmp/oxygenating2020_tweets/68b8531b-0caa-4b2e-942e-4d84dc5f6ebf.json",
"replication" : 3,
"type" : "FILE"
}
Using the following record schema, we then merge and push this to a MySQL table:
{
"namespace": "nifi",
"name": "filerecord",
"type": "record",
"fields": [
{ "name": "_owner", "type": "string" },
{ "name": "_group", "type": "string" },
{ "name": "length", "type": "long" },
{ "name": "basepath", "type": "string" },
{ "name": "filename", "type": ["null", "string"] },
{ "name": "fullpath", "type": "string" },
{ "name": "replication", "type": "int" },
{ "name": "type", "type": "string" }
]
}
The structure of the MySQL can be defined as:
CREATE TABLE filestats (
`pk` bigint PRIMARY KEY AUTO_INCREMENT,
`_owner` varchar(255),
`_group` varchar(255),
`length` bigint,
`basepath` varchar(4096),
`filename` varchar(4096),
`fullpath` varchar(4096),
`replication` int,
`type` char(32),
`ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`dt` DATE AS (DATE(ts))
);
CREATE INDEX idx_filestats_owner_length on filestats(_owner,length);
To make the flow generic, we have defined one variable called "baseurl" which will be the URL of your Name Node:
You also need to make adjustments to your database provider, database url and username & password. You can find the example flow here.
Using Superset, you can connect it to a relational database, and build various chart types to represent your data. One such chart type is the "Sunburst", which allows you to easily and visually navigate your disk consumption. Here is an example:
There's also a video to show a live visualisation which you can download to watch.
You can use Grafana to display the same data, but in a more timeseries way. By using a timeseries display, you can see the growth of disk consumption over a time period in order to make a prediction when a user will cross some threshold. This aids in monitoring and capacity planning. You can also setup Grafana to display and send alerts when thresholds are crossed.
An example of a Grafana dashboard:
You can also download a sample Grafana dashboard JSON file and import it for easy setup.
... View more
Labels:
11-20-2019
03:45 AM
2 Kudos
Apache Ranger is a framework to enable, monitor and manage comprehensive data security across the Big Data platform. Ranger allows you to centralize security administration to manage all security related tasks in a central UI or using REST APIs. You can setup fine grained authorization to do a specific action and/or operation with the platform's components/tools through role based and/or attribute based access control. Ranger also supports central auditing of all events across the cluster on access/edit/update/delete operations.
Ranger has support for the following components through plugins:
HDFS
HBase
Hive
YARN
KNOX
Storm
SOLR
Kafka
NiFi
NiFi-Registry
Atlas
This article will provide an example of setting up Ranger database policies as well as column masking rules. The example will also show what happens when you have two tables, with one table having column masking rules applied for a specific user, and trying to join the two tables.
Scenario:
Two users: user_sales, user_hr
Two tables: table_a, table_b
There is a common column between the two, allowing you to join the two tables together.
table_a has a column masking policy for user_sales, who is not allowed to see the values in this column.
We join the two tables together, and see the difference what user_sales see, and what user_hr sees
Ranger Policy Setup
In this screenshot, you can see that user_sales and user_hr have been given permission to access table_a and table_b:
However, in the following screenshot, user_sales is being prevented from seeing the col_masked values by a masking policy in table_a:
As user_hr, I can see both tables:
select * from table_a;
+---------------+---------------------+
| table_a.name | table_a.col_masked |
+---------------+---------------------+
| Mark | 1 |
| Jane | 2 |
| Steve | 3 |
+---------------+---------------------+
select * from table_b;
+-------------------+-----------------------+
| table_b.location | table_b.col_unmasked |
+-------------------+-----------------------+
| London | 1 |
| Amsterdam | 2 |
| Paris | 3 |
+-------------------+-----------------------+
user_hr can also join both tables together on the shared column without any issue:
select a.*, b.* from table_a a, table_b b where a.col_masked = b.col_unmasked;
+---------+---------------+-------------+-----------------+
| a.name | a.col_masked | b.location | b.col_unmasked |
+---------+---------------+-------------+-----------------+
| Mark | 1 | London | 1 |
| Jane | 2 | Amsterdam | 2 |
| Steve | 3 | Paris | 3 |
+---------+---------------+-------------+-----------------+
As user_sales, I'm allowed to see everything in table_b, but I'm not allowed to see the masked column in table_a:
select * from table_a;
+---------------+---------------------+
| table_a.name | table_a.col_masked |
+---------------+---------------------+
| Mark | NULL |
| Jane | NULL |
| Steve | NULL |
+---------------+---------------------+
select * from table_b;
+-------------------+-----------------------+
| table_b.location | table_b.col_unmasked |
+-------------------+-----------------------+
| London | 1 |
| Amsterdam | 2 |
| Paris | 3 |
+-------------------+-----------------------+
When I try and join the table, Hive will prevent the join from happening as the joined columns don't match up:
+---------+---------------+-------------+-----------------+
| a.name | a.col_masked | b.location | b.col_unmasked |
+---------+---------------+-------------+-----------------+
+---------+---------------+-------------+-----------------+
When running the join operation in Hive, Hive will apply the masking operation before performing the join, thus preventing the rows from matching each other and generating a result set.
... View more
11-07-2019
09:16 AM
2 Kudos
With the release of Apache NiFi 1.10 (http://nifi.apache.org/download.html), a new feature is available whereby you can configure a Parquet Record reader to read incoming Parquet formatted files.
Apache NiFi Record processing allows you to read and deal with a set of data as a single unit. By supplying a schema that matches the incoming data, you can perform record processing as if the data is a "table", and easily convert between formats (CSV, JSON, Avro, Parquet, XML). Before record processing was added to Apache NiFi, you would have had to split the file line by line, perform the transformation and then merge everything back together into a single output. This is extremely inefficient. With Record processing, we can do everything to a large unit of data as a single step which improves the speed by many factors.
Apache Parquet is a columnar storage format (https://parquet.apache.org/documentation/latest/), and the format includes the schema for the data that it stores. This is a great feature that we can use with Record processing in Apache NiFi to supply the schema automatically to the flow.
Here is an example flow:
In this flow, I am reading my incoming .parquet stored files, and passing that through my QueryRecord processor. The processor has been configured with a ParquetReader. I'm using the AvroRecordSetWriter for output, but you can use also CSV,JSON,XML record writer instead:
The QueryRecord is a special processor that allows you to run SQL queries against your Flowfile, where the output is a new Flowfile with the output of the SQL query:
The raw SQL code:
Select first_name, last_name, birth_date from FLOWFILE where gender = 'M' and birth_date like '1965%'
The input in my Parquet file looks like this:
You can see it has rows for years other than 1965, Males and Females, as well as other columns not listed in the SQL query.
Once running it through my flow, I am left with the result of the SQL query, matching my search criteria (birth year = 1965 and only Males), with the three columns selected (first_name, last_name, birth_year):
Depending on your RecordWriter, you can format the output as JSON, CSV, XML or Avro, and carry on with further processing.
... View more
Labels:
10-10-2019
08:28 AM
Ranger audit logs are stored in HDFS for long term storage, and can grow to a significant size after while. It might be useful to turn those logs into Hive tables using ORC as the backing store for compression and query speed. This will allow you to connect any external tool via ODBC to Hive and view your audit logs as SQL tables. Each plugin for Ranger that you have enabled will log to a different sub directory within /ranger/audit on HDFS: [hdfs@ip-10-0-1-75 ~]$ hdfs dfs -ls /ranger/audit/
Found 2 items
drwx------ - hdfs hdfs 0 2019-10-10 14:19 /ranger/audit/hdfs
drwxr-xr-x - hive hive 0 2019-10-10 14:12 /ranger/audit/hiveServer2 And then within each of these, you will have a sub-directory for audit logs for every day: [hdfs@ip-10-0-1-75 ~]$ hdfs dfs -ls /ranger/audit/hdfs
Found 16 items
drwx------ - hdfs hdfs 0 2019-09-25 09:55 /ranger/audit/hdfs/20190925
drwxr-xr-x - hdfs hdfs 0 2019-09-26 00:00 /ranger/audit/hdfs/20190926
drwxr-xr-x - hdfs hdfs 0 2019-09-27 00:00 /ranger/audit/hdfs/20190927
drwxr-xr-x - hdfs hdfs 0 2019-09-28 00:00 /ranger/audit/hdfs/20190928
drwxr-xr-x - hdfs hdfs 0 2019-09-29 00:00 /ranger/audit/hdfs/20190929
drwxr-xr-x - hdfs hdfs 0 2019-09-30 00:00 /ranger/audit/hdfs/20190930
drwxr-xr-x - hdfs hdfs 0 2019-10-01 00:00 /ranger/audit/hdfs/20191001
drwxr-xr-x - hdfs hdfs 0 2019-10-02 00:00 /ranger/audit/hdfs/20191002
drwxr-xr-x - hdfs hdfs 0 2019-10-03 14:43 /ranger/audit/hdfs/20191003
drwxr-xr-x - hdfs hdfs 0 2019-10-04 00:00 /ranger/audit/hdfs/20191004
drwxr-xr-x - hdfs hdfs 0 2019-10-05 00:00 /ranger/audit/hdfs/20191005
drwxr-xr-x - hdfs hdfs 0 2019-10-06 00:00 /ranger/audit/hdfs/20191006
drwxr-xr-x - hdfs hdfs 0 2019-10-07 00:00 /ranger/audit/hdfs/20191007
drwxr-xr-x - hdfs hdfs 0 2019-10-08 00:00 /ranger/audit/hdfs/20191008
drwxr-xr-x - hdfs hdfs 0 2019-10-09 00:00 /ranger/audit/hdfs/20191009
drwxr-xr-x - hdfs hdfs 0 2019-10-10 00:00 /ranger/audit/hdfs/20191010 Looking at one of the sample logs, we can see that each log message is just a JSON string per line: [hdfs@ip-10-0-1-75 ~]$ hdfs dfs -cat /ranger/audit/hdfs/20191002/hdfs_ranger_audit_ip-10-0-1-75.eu-west-3.compute.internal.log | tail -n 1 | python -m json.tool
{
"access": "READ_EXECUTE",
"action": "execute",
"agentHost": "ip-10-0-1-75.eu-west-3.compute.internal",
"cliIP": "10.0.1.75",
"cluster_name": "singlenode",
"enforcer": "hadoop-acl",
"event_count": 1,
"event_dur_ms": 0,
"evtTime": "2019-10-02 23:59:56.356",
"id": "bc54faf1-7afc-49d9-b32c-45e1f9ba8b47-333762",
"logType": "RangerAudit",
"policy": -1,
"reason": "/user/ams/hbase/oldWALs",
"repo": "singlenode_hadoop",
"repoType": 1,
"reqUser": "ams",
"resType": "path",
"resource": "/user/ams/hbase/oldWALs",
"result": 1,
"seq_num": 644489,
"tags": []
} We can use a JSON serde for Hive to create external tables on top of these directories, and view these logs as if they are Hive tables in effect. First, you need to download the serde and place it somewhere in HDFS: wget "<a href="<a href="https://github.com/cdamak/Twitter-Hive/raw/master/json-serde-1.3.8-jar-with-dependencies.jar" target="_blank">https://github.com/cdamak/Twitter-Hive/raw/master/json-serde-1.3.8-jar-with-dependencies.jar</a>" target="_blank"><a href="https://github.com/cdamak/Twitter-Hive/raw/master/json-serde-1.3.8-jar-with-dependencies.jar</a" target="_blank">https://github.com/cdamak/Twitter-Hive/raw/master/json-serde-1.3.8-jar-with-dependencies.jar</a</a>>" And then upload to HDFS: hdfs dfs -put json-serde-1.3.8-jar-with-dependencies.jar /tmp/ Next step, open up Beeline, and import the JAR file: add jar hdfs:///tmp/json-serde-1.3.8-jar-with-dependencies.jar; You are now ready to create an external Hive table for each of the Ranger audit logs. Here is for HDFS audit logs: CREATE EXTERNAL TABLE ext_ranger_audit_hdfs (
access string,
action string,
agentHost string,
cliIP string,
cluster_name string,
enforcer string,
event_count string,
event_dur_ms string,
evtTime string,
id string,
logType string,
policy string,
reason string,
repo string,
repoType string,
reqUser string,
resType string,
resource string,
result string,
seq_num string,
tags string
) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION '/ranger/audit/hdfs/'; And one for Hive audit logs: CREATE EXTERNAL TABLE ext_ranger_audit_hive (
access string,
action string,
additional_info string,
agentHost string,
cliIP string,
cliType string,
cluster_name string,
enforcer string,
event_count string,
event_dur_ms string,
evtTime string,
id string,
logType string,
policy string,
repo string,
repoType string,
reqData string,
reqUser string,
resType string,
resource string,
result string,
seq_num string,
sess string,
tags string
) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION '/ranger/audit/hiveServer2'; You can now run your favourite SQL queries on top of your tables, for example: 0: jdbc:hive2://ip-10-0-1-75.eu-west-3.comput> select count(*) from ext_ranger_audit_hdfs;
0: jdbc:hive2://ip-10-0-1-75.eu-west-3.comput> select count(*) from ext_ranger_audit_hive; The next step is to create an internal Hive table, which is ORC based and will give you fast query speeds through compression and partitioning. First, the internal HDFS audit table: CREATE TABLE ranger_audit_hdfs (
access string,
action string,
agentHost string,
cliIP string,
cluster_name string,
enforcer string,
event_count string,
event_dur_ms string,
evtTime string,
id string,
logType string,
policy string,
reason string,
repo string,
repoType string,
reqUser string,
resType string,
resource string,
result string,
seq_num string,
tags string
) partitioned by (date_year int, date_month int, date_day int)
stored as orc
TBLPROPERTIES ( 'transactional'='true' ); And the internal Hive audit log table: CREATE TABLE ranger_audit_hive (
access string,
action string,
additional_info string,
agentHost string,
cliIP string,
cliType string,
cluster_name string,
enforcer string,
event_count string,
event_dur_ms string,
evtTime string,
id string,
logType string,
policy string,
repo string,
repoType string,
reqData string,
reqUser string,
resType string,
resource string,
result string,
seq_num string,
sess string,
tags string
) partitioned by (date_year int, date_month int, date_day int)
stored as orc
TBLPROPERTIES ( 'transactional'='true' ); At this point, both the internal tables will be empty, so we can populate them from our externally mapped JSON tables. Load the internal HDFS audit table: set hive.exec.dynamic.partition=true;
INSERT INTO ranger_audit_hdfs PARTITION (date_year, date_month, date_day)
select *, cast(substr(a.evttime, 0, 4) as int),
cast(substr(a.evttime, 6, 2) as int),
cast(substr(a.evttime, 9, 2) as int)
from ext_ranger_audit_hdfs a where a.seq_num not in (select b.seq_num from ranger_audit_hdfs b); Load the internal Hive audit table: set hive.exec.dynamic.partition=true;
INSERT INTO ranger_audit_hive PARTITION (date_year, date_month, date_day)
select *, cast(substr(a.evttime, 0, 4) as int),
cast(substr(a.evttime, 6, 2) as int),
cast(substr(a.evttime, 9, 2) as int)
from ext_ranger_audit_hive a where a.seq_num not in (select b.seq_num from ranger_audit_hive b); At the end of each day, you can re-run the two statements above to load the new day's data from the external table into the internal table for fast querying. Once you have loaded the data, you can go back to hdfs://ranger/audit/<service>, and drop the old data. Your tables are now stored in ORC, with compression and partitioned by YYYY/MM/DD: [hdfs@ip-10-0-1-75 ~]$ hdfs dfs -ls /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10
Found 10 items
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=1
drwxrwx---+ - hive hadoop 0 2019-10-10 14:16 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=10
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=2
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=3
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=4
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=5
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=6
drwxrwx---+ - hive hadoop 0 2019-10-08 10:59 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=7
drwxrwx---+ - hive hadoop 0 2019-10-10 14:16 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=8
drwxrwx---+ - hive hadoop 0 2019-10-10 14:16 /warehouse/tablespace/managed/hive/ranger_audit_hdfs/date_year=2019/date_month=10/date_day=9 We brought the space down from 493MB to 7.2MB (1.4% of the original size): [hdfs@ip-10-0-1-75 ~]$ hdfs dfs -du -h -s /ranger/audit/hdfs/;
365.5 M 493.5 M /ranger/audit/hdfs
[hdfs@ip-10-0-1-75 ~]$ hdfs dfs -du -h -s /warehouse/tablespace/managed/hive/ranger_audit_hdfs
7.2 M 7.2 M /warehouse/tablespace/managed/hive/ranger_audit_hdfs
... View more
01-16-2019
01:00 PM
2 Kudos
Note: this article only deals with the disk space of each format, not the performance comparison. When using Hive as your engine for SQL queries, you might want to consider using ORC or Parquet file formats for your data. There are numerous advantages to consider when choosing ORC or Parquet.
Firstly, both will give you columnar compression of the data within, whereas a plain text file will have no compression at all. Secondly, indexes within ORC or Parquet will help with query speed as some basic statistics are stored inside the files, such as min,max value, number of rows etc. Both formats can also express complex data structures (such as hierarchies) which a plain CSV file cannot do. The combination of these can boost your query many times over. Have a look at the article here to read more about the underlying mechanics of both formats: https://www.datanami.com/2018/05/16/big-data-file-formats-demystified/
More information on the ORC file format: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC More information on the Parquet file format: https://parquet.apache.org/documentation/latest/
However, what is the space on disk that is used for these formats in Hive ? Saving on disk space (and inadvertently, IO), is always a good thing, but it can be hard to calculate exactly how much space you will be using with compression. Obviously, every file and data set is different, and the data inside will always be a determining factor for what type of compression you'll get. Text will compress better than binary data. Repeating values and strings will compress better than pure random data, and so forth. As a simple test, I took the 2008 data set from http://stat-computing.org/dataexpo/2009/the-data.html The compressed bz2 download measures at 108.5 Mb, and uncompressed at 657.5 Mb I then uploaded the data to HDFS, and created an external table on top of the uncompressed data set: Create external table flight_arrivals (
year int,
month int,
DayofMonth int,
DayOfWeek int,
DepTime int,
CRSDepTime int,
ArrTime int,
CRSArrTime int,
UniqueCarrier string,
FlightNum int,
TailNum string,
ActualElapsedTime int,
CRSElapsedTime int,
AirTime int,
ArrDelay int,
DepDelay int,
Origin string,
Dest string,
Distance int,
TaxiIn int,
TaxiOut int,
Cancelled int,
CancellationCode int,
Diverted int,
CarrierDelay string,
WeatherDelay string,
NASDelay string,
SecurityDelay string,
LateAircraftDelay string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/flight_arrivals';
Counting the rows: select count(*) from flight_arrivals;
+----------+
| _c0 |
+----------+
| 7009728 |
+----------+
1 row selected (3.341 seconds)
Converting to an ORC and Parquet table, using default settings for both formats: Create external table flight_arrivals_external_orc stored as ORC as select * from flight_arrivals;
Create external table flight_arrivals_external_parquet stored as Parquet as select * from flight_arrivals;
Let's have a look at the disk usage for both: 93.5 M /warehouse/tablespace/external/hive/flight_arrivals_external_orc
146.6 M /warehouse/tablespace/external/hive/flight_arrivals_external_parquet
In Summary: Format Size Compressed % bz2 108.5 Mb 16.5% CSV (Text) 657.5 Mb - ORC 93.5 Mb 14.2% Parquet 146.6 Mb 22.3% One should keep in mind that default settings and values were used to create the ORC and Parquet tables, as well as no other optimizations were used for either formats. Both ORC and Parquet ships with many options & optimizations to compress your data, only the defaults which ships with HDP 3.1 were used. Consider using a sample or subset of your data set, and play with the various compression algorithms and options for each to find the one that suits your use case best.
... View more
Labels:
09-06-2018
07:41 PM
4 Kudos
Short Description: Often times there will be a need to ingest binary files to Hadoop (like PDF, JPG, PNG) where you will want to store them in HBase directly and not on HDFS itself. This article describes an example how this could be achieved. Article The maximum number of files in HDFS depends on the amount of memory available for the NameNode. Each file object and each block object takes about 150 bytes of the memory. For example, if you have 10 million files and each file has 1 one block each, then you would need about 3GB of memory for the NameNode. This could pose a problem if you would like to store trillions of files, where you will run out of RAM on the NameNode trying to track all of these files. One option to resolve this issue is to store your blobs in an Object store. There are many to choose from, especially if you use your favorite cloud provider. Another alternative would be to look at Ozone: https://hortonworks.com/blog/ozone-object-store-hdfs/ Below is an example how you can use HDF to ingest these Blobs into HBase directly, as another field. HBase has support for MOB, Medium Objects, which is a way to store objects of around 10MB or so in HBase directly. The article describes MOB in more detail: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/bk_data-access/content/ch_MOB-support.html Hortonworks Dataflow (HDF), can be used to visually design a flow whereby you can ingest from a directly continuously, add extra fields, compress, encrypt, hash and then store this data in HBase. Any application can then connect to HBase and retrieve these objects at high speed. For example, a document system, images for a website's catalog etc. Here is a high level overview of the flow (also attached as a template file at the end of this article): Let's break this down, each step:
ListFile: Watches a configured directory, and generates a list of filenames when new files arrives FetchFile: Uses the list generated from the first processor, and reads those files from disk and streams into HDF HashFile: (Optional step) Hash the contents of the file, with md5, sha1, sha2 etc UpdateAttribute: (Optional step) Add additional attributes to the file read, for example author name, date ingested etc CompressContent: Compresses the file, using bzip, gz, snappy etc Base64EncodeContent: Changes the binary data to base64 representation for easier storage in HBase AttributesToJSON: Convert all attributes of the FlowFile (like filename, date, extra attributes etc) as as JSON file PutHBaseJSON: Take the JSON from the previous step, and store as key=>value in a column family Also, one last processor which splits out from Base64EncodeContent to PutHBaseCell, which stores the actual file/object in HBase, also part of the column family. To create your HBase table (called 't1' in this example): create 't1', {NAME => 'f1', IS_MOB => true, MOB_THRESHOLD => 102400}
As an example, here is the output you can expect from a sample PDF file stored in HBase: For the HBase processors, you will need to configure a controller service to define where your Zookeeper is in order to find your HBase servers. PutHBaseCell: Which in turn points to the controller service (HBase_1_1_2_ClientService): Additionally, here is an example to read the same objects from HBase, and store them back to the file system: As you can see, it's pretty much the reverse when writing to HBase initially:
Read from HBase Extract the original filename from the HBase read Decode from Base64 back to binary Decompress Write to disk, with the original filename Have a look at the attached hbasewriteexample.xml template, which you can import into your HDF environment to play with.
... View more
09-05-2018
12:54 PM
Test if I can publish an article. This errors out for me, and I want to know if this works now.
... View more
Labels: