Member since
08-15-2016
189
Posts
63
Kudos Received
22
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5888 | 01-02-2018 09:11 AM | |
3177 | 12-04-2017 11:37 AM | |
2219 | 10-03-2017 11:52 AM | |
21934 | 09-20-2017 09:35 PM | |
1672 | 09-12-2017 06:50 PM |
12-05-2018
04:14 PM
@Ramisetty Venkatesh its easy, just execute which kinit
on Nix and the output is what should go into the hook script part at: echo "The cluster is secure, calling kinit ..."
kinit_cmd="/usr/bin/kinit -kt $HDFS_KEYTAB $HDFS_PRINCIPAL" But 9 times ou of 10 /usr/bin/kinit will be just fine
... View more
12-04-2018
12:09 PM
@Ricardo Junior Thanks for your answer to yourself 🙂 It helped me after many many many hours of Kafka debugging. BTW; I my case it was exactly the same scenario: Kerberos -> De-Kerberize -> Re-Kerberize Thanks
... View more
11-21-2018
10:16 PM
@Arindam Choudhury I think the only way to do it is to set the Hiveserver2 to 'No authentication' mode. If you really want anyone to connect anonymously that is what you could do.
... View more
11-21-2018
09:02 PM
@Amit Nandi I almost gave up on performing the last step with Hive, thinking I needed Spark/Scala to do it. End then it just worked. But doing the same with Spark can be done without a doubt. Maybe next time
... View more
11-21-2018
06:47 PM
3 Kudos
THIS IS ACTUALLY AN ARTICLE, NOT A QUESTION I want to show you the power of some built-in Hive functions to transform JSON data, which is 'normalized' (optimized for transport) into a denormalized format which is much more suitable for data analysis. This demo has been tested on HDP-3.0.1.0 with, Hive 3.1.0 but should be portable to lower Hive versions. Suppose we have this inbound data, which might represent some inbound experiment test data: {
"data" : {
"receipt_time" : "2018-09-28T10:00:00.000Z",
"site" : "Los Angeles",
"measures" : [ {
"test_id" : "C23_PV",
"metrics" : [ {
"val1" : [ 0.76, 0.75, 0.71 ],
"temp" : [ 0, 2, 5 ],
"TS" : [ 1538128801336, 1538128810408, 1538128818420 ]
} ]
},
{
"test_id" : "HBI2_XX",
"metrics" : [ {
"val1" : [ 0.65, 0.71 ],
"temp" : [ 1, -7],
"TS" : [ 1538128828433, 1538128834541 ]
} ]
}]
}
}
There are 3 nested arrays in this 1 JSON record. It is pretty printed above to get a feel for the data structure, but remember we need to feed it to Hive as 1 line per JSON only: {"data":{"receipt_time":"2018-09-28T10:00:00.000Z","site":"LosAngeles","measures":[{"test_id":"C23_PV","metrics":[{"val1":[0.76,0.75,0.71],"temp":[0,2,5],"TS":[1538128801336,1538128810408,1538128818420]}]},{"test_id":"HBI2_XX","metrics":[{"val1":[0.65,0.71],"temp":[1,-7],"TS":[1538128828433,1538128834541]}]}]}} The goal of the Hive transformations is to get to the layout as below receipt_time | site | test_id | val1 | temp | TS
------------------------------------------------------------------------------------
2018-09-28T10:00:00.000Z | Los Angeles | C23_PV | 0.76 | 0 | 1538128801336
2018-09-28T10:00:00.000Z | Los Angeles | C23_PV | 0.75 | 2 | 1538128810408
2018-09-28T10:00:00.000Z | Los Angeles | C23_PV | 0.71 | 5 | 1538128818420
2018-09-28T10:00:00.000Z | Los Angeles | HBI2_XX | 0.65 | 1 | 1538128828433
2018-09-28T10:00:00.000Z | Los Angeles | HBI2_XX | 0.71 | -7 | 1538128834541
Note that 1 JSON record has been exploded into 5 rows (the sum of sizes of the 'metrics' array in the 'measures' array) and keys of the inner most JSON keys (val1, temp, TS) have been transposed to top level columns. So how do we go about this? First we need a Hive table overlay that understands the JSON structure: CREATE EXTERNAL TABLE IF NOT EXISTS ds.json_serde(
data struct<
receipt_time: STRING,
site: STRING,
measures: ARRAY<
struct< test_id: STRING,
metrics: ARRAY<
struct< val1: array<DOUBLE>,
temp: array<SMALLINT>,
TS: array<BIGINT>
> >
>
>
>
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/hive/external/json_serde'
TBLPROPERTIES ("transactional"="false");
It is a Hive EXTERNAL table since then it is much easier to hand it (insert) some file containing the JSON strings. To do that we create a local file that contains just the JSON one liner (without line endings, copy it from above) Upload that file ('/home/cloudbreak/hive_json/source.json' in my case) to the folder for the external table we just created: hdfs dfs -mkdir -p /user/hive/external/json_serde
hdfs dfs -put /home/cloudbreak/hive_json/source.json /user/hive/external/json_serde Test the Hive table; hive> select data.site, data.measures[0].metrics[0].temp from ds.json_serde; It should return: INFO : OK
+-------------+----------+
| site | temp |
+-------------+----------+
| LosAngeles | [0,2,5] |
+-------------+----------+
1 row selected (0.826 seconds)
0: jdbc:hive2://spark1-e0.zmq0bv3frkfuhfsbkcz>
Now we begin transforming the data SELECT b.*, a.data.receipt_time, a.data.site from ds.json_serde a LATERAL VIEW OUTER inline(a.data.measures) b; The inline function will do 2 things here: 1.Explode the json into as many rows as there are array members in a.data.measures, 2 rows in this case 2.Create a new column for each JSON key that exists on the top level of the array members, in this case 'test_id' and 'metrics' of the 'measures' array objects *You can also try to exchange 'inline(a.data.measures)' for 'explode(a.data.measures)' in the statement above to see the difference. The output should look like this: INFO : OK
+------------+----------------------------------------------------+---------------------------+-------------+
| b.test_id | b.metrics | receipt_time | site |
+------------+----------------------------------------------------+---------------------------+-------------+
| C23_PV | [{"val1":[0.76,0.75,0.71],"temp":[0,2,5],"ts":[1538128801336,1538128810408,1538128818420]}] | 2018-09-28T10:00:00.000Z | LosAngeles |
| HBI2_XX | [{"val1":[0.65,0.71],"temp":[1,-7],"ts":[1538128828433,1538128834541]}] | 2018-09-28T10:00:00.000Z | LosAngeles |
+------------+----------------------------------------------------+---------------------------+-------------+
2 rows selected (0.594 seconds)
0: jdbc:hive2://spark1-e0.zmq0bv3frkfuhfsbkcz>
Note that the 'receipt_time' and 'site' fields have been propagated (or denormalized) onto every row. That is something we wanted. Because of the nested arrays we need to take this 1 step further: SELECT c.receipt_time, c.site, c.test_id, d.* FROM (SELECT b.*, a.data.receipt_time, a.data.site from ds.json_serde a LATERAL VIEW OUTER inline(a.data.measures) b) c LATERAL VIEW OUTER inline(c.metrics) d This statement might look daunting, but if you look carefully I am just doing the very same thing but on a nested table which is exactly the same as the previous query. *You could also materialize the result of the first LATERAL query in a Hive table with a CTAS statement The result should look like this now: INFO : OK
+---------------------------+-------------+------------+-------------------+----------+----------------------------------------------+
| c.receipt_time | c.site | c.test_id | d.val1 | d.temp | d.ts |
+---------------------------+-------------+------------+-------------------+----------+----------------------------------------------+
| 2018-09-28T10:00:00.000Z | LosAngeles | C23_PV | [0.76,0.75,0.71] | [0,2,5] | [1538128801336,1538128810408,1538128818420] |
| 2018-09-28T10:00:00.000Z | LosAngeles | HBI2_XX | [0.65,0.71] | [1,-7] | [1538128828433,1538128834541] |
+---------------------------+-------------+------------+-------------------+----------+----------------------------------------------+
2 rows selected (0.785 seconds)
0: jdbc:hive2://spark1-e0.zmq0bv3frkfuhfsbkcz>
It is beginning to look a lot better, but there is 1 last problem to solve; the arrays of 'metrics' are always of equal size but we want the first member of the 'val1' array to be connected/merged with the first member of the 'temp' array etc. There is a creative way to do this: For readability I will now materialize the second query statement into a intermediary table named 'ds.json_serde_II': CREATE TABLE ds.json_serde_II AS SELECT c.receipt_time, c.site, c.test_id, d.* FROM (SELECT b.*, a.data.receipt_time, a.data.site from ds.json_serde a LATERAL VIEW OUTER inline(a.data.measures) b) c LATERAL VIEW OUTER inline(c.metrics) d * Make sure you get the same result by running 'select * from ds.json_serde_II;' From here it takes only 1 step to get to the desired end result: SELECT a.receipt_time, a.site, a.test_id, a.temp[b.pos] as temp, a.TS[b.pos] as TS, b.* from ds.json_serde_II a LATERAL VIEW OUTER posexplode(a.val1) b; It will result in: INFO : OK
+---------------------------+-------------+------------+-------+----------------+--------+--------+
| a.receipt_time | a.site | a.test_id | temp | ts | b.pos | b.val |
+---------------------------+-------------+------------+-------+----------------+--------+--------+
| 2018-09-28T10:00:00.000Z | LosAngeles | C23_PV | 0 | 1538128801336 | 0 | 0.76 |
| 2018-09-28T10:00:00.000Z | LosAngeles | C23_PV | 2 | 1538128810408 | 1 | 0.75 |
| 2018-09-28T10:00:00.000Z | LosAngeles | C23_PV | 5 | 1538128818420 | 2 | 0.71 |
| 2018-09-28T10:00:00.000Z | LosAngeles | HBI2_XX | 1 | 1538128828433 | 0 | 0.65 |
| 2018-09-28T10:00:00.000Z | LosAngeles | HBI2_XX | -7 | 1538128834541 | 1 | 0.71 |
+---------------------------+-------------+------------+-------+----------------+--------+--------+
5 rows selected (1.716 seconds)
0: jdbc:hive2://spark1-e0.zmq0bv3frkfuhfsbkcz>
This needs some explaining: The function posexplode does the same thing as explode (creating as many rows as there are members in the array argument) but it also yields a positional number which is the zero-based index number of the array member. We can use this positional index to link to the corresponding index members of the other arrays 'temp' and 'TS' (all first members together on 1 row, all second members on next row etc. etc.). The clause a.temp[b.pos] is just walking the JSON/Hive path to the corresponding value in the other arrays. The value of b.pos is apparently known and resolved correctly because Hive will first take care of the exploding and then join the results back to the main query where b.pos is needed. Happy data processing!
... View more
Labels:
- Labels:
-
Apache Hive
10-15-2018
12:06 PM
@dvillarreal Thanks for this, very useful! Changing the principal on the beeline connect string to "principal=HTTP/_HOST@SUPPORT.COM" is something I forgot implementing this hiveserver2 access pattern
... View more
10-12-2018
02:44 PM
This is complex I believe your problem is you need to forward the traffic to/from the KDC to your Mac. You can do this by SSH tunnelling. That alone is not enough though since SSH port forwarding is only fit for TCP traffic and KDC traffic is UDP.
... View more
10-11-2018
08:13 AM
I have the same problem on Nifi 1.5 and would be very interested to get the solution to get Nifi's En(De)crypt processor to work with PGP. In the meantime I turned to another solution by using the ExecuteStreamCommand processor and outsource the decryption to the CLI which is verified to work: Just be aware that you have to import the pub and private keys into the /home/nifi/.gnupg folder of the nifi user since that is the one executing the stream command. So you might have to run these commands (on every Nifi node!) first: gpg --import < pub_keys_armor.pgp
gpg --import < priv_key_armor.pgp
... View more
06-19-2018
07:42 AM
Hi, I wonder if there is a way to have the Kafka verifiable consumer only consume from 1 partition. The standard console consumer has a parameter: --partition 1 now to support that, but the Verifiable consumer doesn't. Can you achieve the same it by playing with the consumer config options?
... View more
Labels:
- Labels:
-
Apache Kafka
06-19-2018
07:36 AM
Hi, I run a verifiable consumer on HDF-3.1.1 but it never exits. echo -e "max.poll.records=1\nenable.auto.commit=true\nauto.commit.interval.ms=1" > /tmp/consumer.config && /usr/hdf/3.1.1.0-35/kafka/bin/kafka-verifiable-consumer.sh --broker-list rjk-hdf-m:6667,rjk-hdf-s-01:6667,rjk-hdf-s-02:6667 --topic truck_speed_events_only_avro_keyed_non_transactional --reset-policy earliest --consumer.config /tmp/consumer.config --group-id test_group --verbose --max-messages 10 The output is according to expectations (at first): {"timestamp":1529393105586,"name":"startup_complete"}
{"timestamp":1529393105824,"name":"partitions_revoked","partitions":[]}
{"timestamp":1529393108933,"name":"partitions_assigned","partitions":[{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":2},{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1},{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":0}]}
{"timestamp":1529393109005,"name":"record_data","key":"95","value":"\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0004\u0000\u0000\u0000\u0001.2018-05-28 19:43:05.689�����X\"truck_speed_event�\u00010\u001ANadeem Asghar\u0006:Saint Louis to Chicago Route2�\u0001","topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1,"offset":0}
{"timestamp":1529393109008,"name":"records_consumed","count":1,"partitions":[{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1,"count":1,"minOffset":0,"maxOffset":0}]}
{"timestamp":1529393109024,"name":"offsets_committed","offsets":[{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1,"offset":1}],"success":true}
{"timestamp":1529393109032,"name":"record_data","key":"65","value":"\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0004\u0000\u0000\u0000\u0001.2018-05-28 19:43:05.734�����X\"truck_speed_event�\u00014\u0016Don Hilborn\u00006Saint Louis to Tulsa Route2�\u0001","topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1,"offset":1}
{"timestamp":1529393109032,"name":"records_consumed","count":1,"partitions":[{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1,"count":1,"minOffset":1,"maxOffset":1}]}
{"timestamp":1529393109039,"name":"offsets_committed","offsets":[{"topic":"truck_speed_events_only_avro_keyed_non_transactional","partition":1,"offset":2}],"success":true}
ETC. ETC. ETC.
but after the parameterized 10 messages have been consumed the util just keeps running forever with the following screen output: {"timestamp":1529393109144,"name":"records_consumed","count":1,"partitions":[]}
{"timestamp":1529393109144,"name":"offsets_committed","offsets":[],"success":true}
{"timestamp":1529393109144,"name":"records_consumed","count":1,"partitions":[]}
{"timestamp":1529393109144,"name":"offsets_committed","offsets":[],"success":true}
{"timestamp":1529393109144,"name":"records_consumed","count":1,"partitions":[]}
{"timestamp":1529393109144,"name":"offsets_committed","offsets":[],"success":true}
{"timestamp":1529393109145,"name":"records_consumed","count":1,"partitions":[]}
Why wouldn't the verifiable consumer just stop after --max-messages is reached? On the source code of the verifiable consumer (source) there is a method that should break the while loop: private boolean isFinished() { return hasMessageLimit() && consumedMessages >= maxMessages; } Thanks
... View more
Labels:
- Labels:
-
Apache Kafka