Member since
09-19-2016
23
Posts
12
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3082 | 12-12-2017 11:23 AM | |
5209 | 11-07-2016 07:41 PM |
02-15-2019
02:45 PM
Hi @Raj Zalavadia
As described in SPARK-16996 and SPARK-15348, Spark currently doesn't support Hive ACID ( v1 (Hive 1.XX) or v2 (3.XX) )
To circumvent that you can use the Hive Warewhouse connector.
It will create the necessary link between the 2 components, by getting Spark to connect via Hive Server2. I'm not sure if it's directly bundled into HDI (should be). In any case, it's available publicly at :
https://github.com/hortonworks/hive-warehouse-connector-release/tree/HDP-3.0.1.10-7-tag You'll find the documentation here :
https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html
Here's another HCC article that gives you a concrete example on how to use it. : https://community.hortonworks.com/articles/223626/integrating-apache-hive-with-apache-spark-hive-war.html To get you started, here's a quick example of how to use it :
1. The Hive Warehouse connector must be given as a dependency to spark
spark.jars=[path to the Hive warehouse connector]
usually : /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-[Build version].jar
2. It also requiers a few more configuration - basically describing where the Hive meta store and Hive Server2 instances reside
spark.datasource.hive.warehouse.metastoreUri=thrift://[YOUR METASTORE URI]:9083
spark.hadoop.hive.llap.daemon.service.hosts=@llap0
spark.hadoop.hive.zookeeper.quorum=[YOUR HIVE ZOOKEEPER QUORUM]:2181
spark.sql.hive.hiveserver2t.jdbc.url=[YOUR HIVE LLAP JDBC URL]
These can be passed as spark conf items
--conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0
or as extra configuration parameters for spark notebooks (ex : zeppelin)
3. Create a hiveWarhouse context import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
//set a database
hive.setDatabase("airline_ontime")
//show table
hive.showTables().show(100) 4. Query Data val flights_df = hive.executeQuery("SELECT * FROM flights WHERE year = 1989")
flights_df.createOrReplaceTempView("flights_1998")
+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+
|month|dayofmonth|dayofweek|deptime|crsdeptime|arrtime|crsarrtime|uniquecarrier|flightnum|tailnum|actualelapsedtime|crselapsedtime|airtime|arrdelay|depdelay|origin|dest|distance|taxiin|taxiout|cancelled|cancellationcode|diverted|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|year|
+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+
| 12| 25| 1| 1415| 1415| 1547| 1552| US| 1478| NA| 92| 97| null| -5| 0| TPA| CLT| 508| null| null| 0| NA| 0| null| null| null| null| null|1989| 5. Write data Back to Hive ( in ACID Format ) hive.table("flights").filter("month = 01")
.write
.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
.option("table", "flight_2019_01")
.save()
... View more
11-07-2018
07:05 PM
Hi @Slim Thanks, yep that was it. Another little quirk identified with the help of Charles Bernard : - All names in the JSON object must be in lower case for them to be parsed - A corollary of this is that all columns names must also be in lower case
... View more
11-04-2018
11:18 PM
Hi, I'm trying to ingest event data form Kafka to Druid using the new Hive/Druid/Kafka integration in Hive 3 (see - https://cwiki.apache.org/confluence/display/Hive/Druid+Integration ; section " Druid Kafka Ingestion from Hive" I've got events in JSON format in a Kafka topic using the following structure : {
"timestamp": "2018-11-04T22:43:10Z",
"machine1": "RXI901",
"machine2": "RXI902",
"priority": "74",
"level": "[e.warning]",
"machine3": "RXI900",
"Protocol": "TCP",
"SrcIP": "109.26.211.73",
"OriginalClientIP": "::",
"DstIP": "192.168.104.96",
"SrcPort": "36711",
"DstPort": "54",
"TCPFlags": "0x0",
"IngressInterface": "s3p4",
"EgressInterface": "s3p3",
"IngressZone": "INUTILISE",
"EgressZone": "INUTILISE",
"DE": "Primary Detection Engine (f77608a0-0e20-11e6-91d7-88d7e001637c)",
"Policy": "Default Access Control",
"ConnectType": "Start",
"AccessControlRuleName": "Unknown",
"AccessControlRuleAction": "Allow",
"PrefilterPolicy": "Unknown",
"UserName": "No Authentication Required",
"InitiatorPackets": 1,
"ResponderPackets": 0,
"InitiatorBytes": 80,
"ResponderBytes": 0,
"NAPPolicy": "Network Analysis",
"DNSResponseType": "No Error",
"Sinkhole": "Unknown",
"URLCategory": "Unknown",
"URLReputation": "Risk unknown"
} To ingest them from Kafka I've created to following external table in Hive matching the JSON structure of the messages CREATE EXTERNAL TABLE ssh_druid_kafka (
`__time` timestamp,
`machine1` string,
`machine2` string,
`priority` string,
`level` string,
`machine3` string,
`Protocol` string,
`SrcIP` string,
`OriginalClientIP` string,
`DstIP` string,
`SrcPort` string,
`DstPort` string,
`TCPFlags` string,
`IngressInterface` string,
`EgressInterface` string,
`IngressZone` string,
`EgressZone` string,
`DE` string,
`Policy` string,
`ConnectType` string,
`AccessControlRuleName` string,
`AccessControlRuleAction` string,
`PrefilterPolicy` string,
`UserName` string,
`InitiatorPackets` int,
`ResponderPackets` int,
`InitiatorBytes` int,
`ResponderBytes` int,
`NAPPolicy` string,
`DNSResponseType` string,
`Sinkhole` string,
`URLCategory` string,
`URLReputation` string
)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES (
"kafka.bootstrap.servers" = "[kakfa host]:6667",
"kafka.topic" = "log_schema_raw",
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "20",
"druid.kafka.ingestion.startDelay" = "PT5S",
"druid.kafka.ingestion.period" = "PT30S",
"druid.kafka.ingestion.consumer.retries" = "2"
);
ALTER TABLE ssh_druid_kafka SET TBLPROPERTIES("druid.kafka.ingestion" = 'START');
I'm getting an indexing task in Druid supervisor... => but no data source in the Druid Broker 😞 Upon closer look at the the task logs in Druid Supervisor, I see parsing errors : 2018-11-04T23:06:06,305 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
2018-11-04T23:09:06,306 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
... Questions : 1. How do I enable Debug Logging on tasks ? => I've tried setting the log4j level to DEBUG in the Ambari Druid tab. That does affect the the log levels of the components but doesn't seem to affect the indexing tasks. 2. What is the format expected by Druid for using the Kafka Indexing Service ? Am I missing something ? Thank for your help
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Kafka
09-26-2018
08:14 AM
2 Kudos
@Srikanth t
The easiest approach is to use lateral views.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView
It allows you to split an array into multiple line.
1. Let's create an array from the items in your column "items" select key, split(items, ',') as valArray
from test
result
+------+---------------------------------------+--+
| key | _c1 |
+------+---------------------------------------+--+
| 22 | ["1001 abc"," 1002 pqr"," 1003 tuv"] |
| 33 | ["1004 def"," 1005 xyz"] |
+------+---------------------------------------+--+
2. Now let's use lateral view to split these items into lines (using "trim" to clean up the space) select key, trim(uniqueVal)
from(
select key, split(items, ',') as valArray
from test ) a lateral view explode(a.valArray) exploded as uniqueVal ;
+------+-----------+--+
| key | _c1 |
+------+-----------+--+
| 22 | 1001 abc |
| 22 | 1002 pqr |
| 22 | 1003 tuv |
| 33 | 1004 def |
| 33 | 1005 xyz |
+------+-----------+--+
3. Finally let's use split again to get separate values. select key, split(trim(uniqueVal), ' ')[0], split(trim(uniqueVal), ' ')[1]
from(
select key, split(items, ',') as valArray
from test
) a lateral view explode(a.valArray) exploded as uniqueVal ;
+------+-------+------+--+
| key | _c1 | _c2 |
+------+-------+------+--+
| 22 | 1001 | abc |
| 22 | 1002 | pqr |
| 22 | 1003 | tuv |
| 33 | 1004 | def |
| 33 | 1005 | xyz |
+------+-------+------+--+ Note : I used the following to create the table create table test (
key string,
value string )
STORED AS ORC ;
INSERT INTO test (key, value )
VALUES (22, '1001 abc, 1002 pqr, 1003 tuv'),
(33, '1004 def, 1005 xyz');
... View more
12-12-2017
11:23 AM
2 Kudos
Hi @Fernando Lopez Bello Sharing of interpreter processes is easily adjustable Go to the interpreter setting page : And scroll down to the spark interpreter : By default interpreters are shared globaly : ie - all notes/users share the same interpreter instance (hence the same spark context) Change the setting to either "per note" or "per user" depending on your use case : - Per Note : ie - each note will instantiate a separate interpreter process - Per User : ie - each user instantiates a seperate interpreter process (which is shared amongst the notes for which he/she has ownership) Below an article written by one of the original developpers of zeppelin describing interpreter modes : https://medium.com/@leemoonsoo/apache-zeppelin-interpreter-mode-explained-bae0525d0555 Zeppelin documentation: https://zeppelin.apache.org/docs/latest/manual/interpreters.html#interpreter-binding-mode
... View more
11-29-2017
06:35 PM
1 Kudo
Hi @Michael Bronson Spark has a bunch of parameter to deal with job history rotation. In particular : spark.history.fs.cleaner.enabled true
spark.history.fs.cleaner.maxAge 12h
spark.history.fs.cleaner.interval 1h Source : https://spark.apache.org/docs/latest/monitoring.html In the example above : - Rotation is active - All Jobs > than 12 hours will be deleted - Deletion happens at 1 hour intervals Note these parameters need to be implemented on a environnement level ( not on a job level ). They are usually placed in spark-default file. Matthieu
... View more
10-02-2017
03:20 PM
1 Kudo
Hello @Raju
The thing to know when handling dataframes in Zeppelin is that a resultset is imported into the notebook when queried.
That table is also read into memory when a particular notebook is accessed. To prevent a query for bringing back too much data and crashing the server (this can happen very quickly), each interpreter set a limit to the number of rows that are brought back to zeppelin ( default for most interpreters is ~1000 ). => This does not mean that queries of more than a thousand rows will not be executed, just that only the first 1000 rows are actually shown in Zeppelin. You can adjust the number of rows interperter by interpreter, look for "maxResult" properties Go to the interpreter configuration page (upper right hand corner) Ex : for SPARK zeppelin.spark.maxResult or for Livy zeppelin.livy.spark.sql.maxResult For the JDBC interperter ( there's always an exception to the rule 🙂 ) common.max_count When using Zeppelin's dataframe export to CSV feature you simply exports what has been pushed back to zeppelin.
If the the max number of rows is set to a thousand, then you'll never have more than a thousand rows in your csv => The actual number of rows in your result set may be larger, it simply hasn't been fully read back into Zeppelin. This feature is great when working with small resultsets. It can however be deceiving as the results can be arbitrarily truncated when the max number of rows has been reached. If you're looking to export an entire table or a large subset, you should probably do it programmatically, for example by saving a table to a file system such as HDFS For Spark (2.0): dataframe.coalesce(1)
.write
.option("header", "true")
.csv("/path/to/sample_file.csv")
//Note the coalesce(1) => will bring all result to a single file otherwise you'll have 1 file per executor FYI
Notebooks are simply JSON object organized by paragraph. Just open up any notebook to get a sense of the structure In HDP they are saved in : /usr/hdp/current/zeppelin-server/notebook/[id of notebook]/note.json
... View more
09-04-2017
01:36 PM
Hi, I've got the following existing table created in HBASE : #Hbase table definition
create 'WEB_STAT_INFO', 'info', 'usage'
#Populate HBase table
put 'WEB_STAT_INFO', 'row1', 'info:host', 'Host A'
put 'WEB_STAT_INFO', 'row1', 'info:domain', 'Domain A'
put 'WEB_STAT_INFO', 'row1', 'usage:core', '15'
#view hbase data
scan 'WEB_STAT_INFO' I would like to create a Phoenix view on top of it. Mapping all data to VARCHAR works fine 0: jdbc:phoenix:localhost:2181/hbase> CREATE VIEW "WEB_STAT_INFO" ( ROWKEY VARCHAR PRIMARY KEY, "info"."host" VARCHAR, "info"."domain" VARCHAR, "usage"."core" VARCHAR ) ;
No rows affected (0.069 seconds)
0: jdbc:phoenix:localhost:2181/hbase> select * from WEB_STAT_INFO ;
+---------+---------+-----------+-------+
| ROWKEY | host | domain | core |
+---------+---------+-----------+-------+
| row1 | Host A | Domain A | 15 |
+---------+---------+-----------+-------+
1 row selected (0.081 seconds)
Mapping to anything else results in an error... ex : 0: jdbc:phoenix:localhost:2181/hbase> CREATE VIEW "WEB_STAT_INFO" ( ROWKEY VARCHAR PRIMARY KEY, "info"."host" VARCHAR, "info"."domain" VARCHAR, "usage"."core" INTEGER ) ;
No rows affected (0.064 seconds)
0: jdbc:phoenix:localhost:2181/hbase> select * from WEB_STAT_INFO ;
Error: ERROR 201 (22000): Illegal data. Expected length of at least 4 bytes, but had 2 (state=22000,code=201)
java.sql.SQLException: ERROR 201 (22000): Illegal data. Expected length of at least 4 bytes, but had 2
at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:442)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
at org.apache.phoenix.schema.KeyValueSchema.next(KeyValueSchema.java:211)
at org.apache.phoenix.expression.ProjectedColumnExpression.evaluate(ProjectedColumnExpression.java:115)
at org.apache.phoenix.compile.ExpressionProjector.getValue(ExpressionProjector.java:69)
at org.apache.phoenix.jdbc.PhoenixResultSet.getObject(PhoenixResultSet.java:524)
at sqlline.Rows$Row.<init>(Rows.java:157)
at sqlline.BufferedRows.<init>(BufferedRows.java:38)
at sqlline.SqlLine.print(SqlLine.java:1650)
at sqlline.Commands.execute(Commands.java:833)
at sqlline.Commands.sql(Commands.java:732)
at sqlline.SqlLine.dispatch(SqlLine.java:808)
at sqlline.SqlLine.begin(SqlLine.java:681)
at sqlline.SqlLine.start(SqlLine.java:398)
at sqlline.SqlLine.main(SqlLine.java:292)
Examples based on HDP 2.6.1 Note : I know that best practices recommend to create tables directly through Phoenix but in this particular use case the HBase table is used by an existing application. The objective is to facilitate data access without having any rewrites to the existing application
... View more
Labels:
- Labels:
-
Apache HBase
-
Apache Phoenix
08-14-2017
10:55 AM
Correction for syntax using beeline : Formated data for referecence {
"user": {
"location": "",
"id": 1514008171,
"name": "Auzzie Jet",
"screenname": "metalheadgrunge",
"geoenabled": false
},
"tweetmessage": "Anthrax - Black - Sunglasses hell yah\n http://t.co/qCNjba57Dm",
"createddate": "2013-06-20T12:08:44",
"geolocation": null
}
<br> From here on I refer to
`user`, `tweetmessage`, `createddate`, `geolocation` => Level 1 fields `location`, `id`, `name`, `screenname`, `geoenabled` => Level 2 fields Corrected select query : SELECT t2.name, t1.tweetmessage
FROM tweets t
LATERAL VIEW json_tuple(t.tweet, 'user', 'tweetmessage' ) t1 as `user`, `tweetmessage`
LATERAL VIEW json_tuple(t1.`user`, 'name', 'location') t2 as `name`, `location`
where t2.`name`="Auzzie Jet"
; Other examples : Select level 1 : `user`, `tweetmessage`, `createddate`, `geolocation` SELECT t1.`user`, t1.tweetmessage, t1.createddate, t1.geolocation
FROM tweets t
LATERAL VIEW json_tuple(t.tweet, 'user', 'tweetmessage', 'createddate', 'geolocation' ) t1
as `user`, `tweetmessage`, `createddate`, `geolocation`
; Select level 1 and 2 => Flatten everything SELECT t2.location, t2.id, t2.name, t2.screenname, t2.geoenabled, t1.tweetmessage, t1.createddate, t1.geolocation
FROM tweets t
LATERAL VIEW json_tuple(t.tweet, 'user', 'tweetmessage', 'createddate', 'geolocation' ) t1
as `user`, `tweetmessage`, `createddate`, `geolocation`
LATERAL VIEW json_tuple(t1.`user`, 'location', 'id', 'name', 'screenname', 'geoenabled' ) t2
as `location`, `id`, `name`, `screenname`, `geoenabled`
;
... View more
11-09-2016
06:19 PM
Hi @Sivasaravanakumar K The write function was implemented in 1.4.1... Try simply : df.saveAsTable("default.sample_07_new_schema")
It will be saved as Parquet (default format for Spark)
... View more