Member since
09-19-2016
23
Posts
11
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1235 | 12-12-2017 11:23 AM | |
1369 | 11-07-2016 07:41 PM |
02-15-2019
02:45 PM
Hi @Raj Zalavadia
As described in SPARK-16996 and SPARK-15348, Spark currently doesn't support Hive ACID ( v1 (Hive 1.XX) or v2 (3.XX) )
To circumvent that you can use the Hive Warewhouse connector.
It will create the necessary link between the 2 components, by getting Spark to connect via Hive Server2. I'm not sure if it's directly bundled into HDI (should be). In any case, it's available publicly at :
https://github.com/hortonworks/hive-warehouse-connector-release/tree/HDP-3.0.1.10-7-tag You'll find the documentation here :
https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html
Here's another HCC article that gives you a concrete example on how to use it. : https://community.hortonworks.com/articles/223626/integrating-apache-hive-with-apache-spark-hive-war.html To get you started, here's a quick example of how to use it :
1. The Hive Warehouse connector must be given as a dependency to spark
spark.jars=[path to the Hive warehouse connector]
usually : /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-[Build version].jar
2. It also requiers a few more configuration - basically describing where the Hive meta store and Hive Server2 instances reside
spark.datasource.hive.warehouse.metastoreUri=thrift://[YOUR METASTORE URI]:9083
spark.hadoop.hive.llap.daemon.service.hosts=@llap0
spark.hadoop.hive.zookeeper.quorum=[YOUR HIVE ZOOKEEPER QUORUM]:2181
spark.sql.hive.hiveserver2t.jdbc.url=[YOUR HIVE LLAP JDBC URL]
These can be passed as spark conf items
--conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0
or as extra configuration parameters for spark notebooks (ex : zeppelin)
3. Create a hiveWarhouse context import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
//set a database
hive.setDatabase("airline_ontime")
//show table
hive.showTables().show(100) 4. Query Data val flights_df = hive.executeQuery("SELECT * FROM flights WHERE year = 1989")
flights_df.createOrReplaceTempView("flights_1998")
+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+
|month|dayofmonth|dayofweek|deptime|crsdeptime|arrtime|crsarrtime|uniquecarrier|flightnum|tailnum|actualelapsedtime|crselapsedtime|airtime|arrdelay|depdelay|origin|dest|distance|taxiin|taxiout|cancelled|cancellationcode|diverted|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|year|
+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+----+
| 12| 25| 1| 1415| 1415| 1547| 1552| US| 1478| NA| 92| 97| null| -5| 0| TPA| CLT| 508| null| null| 0| NA| 0| null| null| null| null| null|1989| 5. Write data Back to Hive ( in ACID Format ) hive.table("flights").filter("month = 01")
.write
.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
.option("table", "flight_2019_01")
.save()
... View more
11-07-2018
07:05 PM
Hi @Slim Thanks, yep that was it. Another little quirk identified with the help of Charles Bernard : - All names in the JSON object must be in lower case for them to be parsed - A corollary of this is that all columns names must also be in lower case
... View more
11-04-2018
11:18 PM
Hi, I'm trying to ingest event data form Kafka to Druid using the new Hive/Druid/Kafka integration in Hive 3 (see - https://cwiki.apache.org/confluence/display/Hive/Druid+Integration ; section " Druid Kafka Ingestion from Hive" I've got events in JSON format in a Kafka topic using the following structure : {
"timestamp": "2018-11-04T22:43:10Z",
"machine1": "RXI901",
"machine2": "RXI902",
"priority": "74",
"level": "[e.warning]",
"machine3": "RXI900",
"Protocol": "TCP",
"SrcIP": "109.26.211.73",
"OriginalClientIP": "::",
"DstIP": "192.168.104.96",
"SrcPort": "36711",
"DstPort": "54",
"TCPFlags": "0x0",
"IngressInterface": "s3p4",
"EgressInterface": "s3p3",
"IngressZone": "INUTILISE",
"EgressZone": "INUTILISE",
"DE": "Primary Detection Engine (f77608a0-0e20-11e6-91d7-88d7e001637c)",
"Policy": "Default Access Control",
"ConnectType": "Start",
"AccessControlRuleName": "Unknown",
"AccessControlRuleAction": "Allow",
"PrefilterPolicy": "Unknown",
"UserName": "No Authentication Required",
"InitiatorPackets": 1,
"ResponderPackets": 0,
"InitiatorBytes": 80,
"ResponderBytes": 0,
"NAPPolicy": "Network Analysis",
"DNSResponseType": "No Error",
"Sinkhole": "Unknown",
"URLCategory": "Unknown",
"URLReputation": "Risk unknown"
} To ingest them from Kafka I've created to following external table in Hive matching the JSON structure of the messages CREATE EXTERNAL TABLE ssh_druid_kafka (
`__time` timestamp,
`machine1` string,
`machine2` string,
`priority` string,
`level` string,
`machine3` string,
`Protocol` string,
`SrcIP` string,
`OriginalClientIP` string,
`DstIP` string,
`SrcPort` string,
`DstPort` string,
`TCPFlags` string,
`IngressInterface` string,
`EgressInterface` string,
`IngressZone` string,
`EgressZone` string,
`DE` string,
`Policy` string,
`ConnectType` string,
`AccessControlRuleName` string,
`AccessControlRuleAction` string,
`PrefilterPolicy` string,
`UserName` string,
`InitiatorPackets` int,
`ResponderPackets` int,
`InitiatorBytes` int,
`ResponderBytes` int,
`NAPPolicy` string,
`DNSResponseType` string,
`Sinkhole` string,
`URLCategory` string,
`URLReputation` string
)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES (
"kafka.bootstrap.servers" = "[kakfa host]:6667",
"kafka.topic" = "log_schema_raw",
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "20",
"druid.kafka.ingestion.startDelay" = "PT5S",
"druid.kafka.ingestion.period" = "PT30S",
"druid.kafka.ingestion.consumer.retries" = "2"
);
ALTER TABLE ssh_druid_kafka SET TBLPROPERTIES("druid.kafka.ingestion" = 'START');
I'm getting an indexing task in Druid supervisor... => but no data source in the Druid Broker 😞 Upon closer look at the the task logs in Druid Supervisor, I see parsing errors : 2018-11-04T23:06:06,305 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
2018-11-04T23:09:06,306 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
... Questions : 1. How do I enable Debug Logging on tasks ? => I've tried setting the log4j level to DEBUG in the Ambari Druid tab. That does affect the the log levels of the components but doesn't seem to affect the indexing tasks. 2. What is the format expected by Druid for using the Kafka Indexing Service ? Am I missing something ? Thank for your help
... View more
Labels:
09-26-2018
08:14 AM
1 Kudo
@Srikanth t
The easiest approach is to use lateral views.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView
It allows you to split an array into multiple line.
1. Let's create an array from the items in your column "items" select key, split(items, ',') as valArray
from test
result
+------+---------------------------------------+--+
| key | _c1 |
+------+---------------------------------------+--+
| 22 | ["1001 abc"," 1002 pqr"," 1003 tuv"] |
| 33 | ["1004 def"," 1005 xyz"] |
+------+---------------------------------------+--+
2. Now let's use lateral view to split these items into lines (using "trim" to clean up the space) select key, trim(uniqueVal)
from(
select key, split(items, ',') as valArray
from test ) a lateral view explode(a.valArray) exploded as uniqueVal ;
+------+-----------+--+
| key | _c1 |
+------+-----------+--+
| 22 | 1001 abc |
| 22 | 1002 pqr |
| 22 | 1003 tuv |
| 33 | 1004 def |
| 33 | 1005 xyz |
+------+-----------+--+
3. Finally let's use split again to get separate values. select key, split(trim(uniqueVal), ' ')[0], split(trim(uniqueVal), ' ')[1]
from(
select key, split(items, ',') as valArray
from test
) a lateral view explode(a.valArray) exploded as uniqueVal ;
+------+-------+------+--+
| key | _c1 | _c2 |
+------+-------+------+--+
| 22 | 1001 | abc |
| 22 | 1002 | pqr |
| 22 | 1003 | tuv |
| 33 | 1004 | def |
| 33 | 1005 | xyz |
+------+-------+------+--+ Note : I used the following to create the table create table test (
key string,
value string )
STORED AS ORC ;
INSERT INTO test (key, value )
VALUES (22, '1001 abc, 1002 pqr, 1003 tuv'),
(33, '1004 def, 1005 xyz');
... View more
03-02-2018
03:38 PM
Hi @Guilherme Braccialli Late answer to your question but I'm hoping it will help others. The issue at hand is that the CN of the KNOX certificate does not match the name of the machine connected. This often happens when the KNOX TSL Certificate was issued against an internal name, but you're connecting to it from an external connection with a different Name. The CN name of the certificate must match the DNS name of the machine connecting to... For example (mismatch) : Most ODBC connection (ex. windows odbc) allow you to ignore this with settings such as " ignore Common Name mismatch" Unfortunately Tableau doesn't let you do that... Two solutions : 1. Re create your KNOX gateway certificate ( Preferred ) ## 1. Regenerate self signed certificate
#Where
# $gateway-hostname is the FQDN of the Knox Gateway
# $knox_dir is the knox install dir (usually /usr/hdp/current/knox-server/)
cd $knox_dir
./bin/knoxcli.sh create-cert --hostname $gateway-hostname
## 2. Export the certificate in PEM format:
keytool -export -alias gateway-identity -rfc -file data/security/keystores/knox.crt -keystore data/security/keystores/gateway.jks
## 3. Restart Knox server
./bin/gateway.sh stop
./bin/gateway.sh start https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_security/content/knox_self_signed_certificate_specific_hostname_evaluations.html 2. Modify the hostname of the KNOX server on your machine ( modify /etc/hosts ) The idea is to "trick" your browser into thinking it's connected to the correct machine On MacOS/ Linux machine : ## 1. get IP of the knox gateway
nslookup $KnowGatewayFQDN
## 2. modify /etc/hosts
sudo vim /etc/hosts
# insert
$KnoxGatewayIP(ex. XX.XX.XX.XX) $ExpectedCN Name of Knox Gateway
... View more
12-12-2017
11:23 AM
2 Kudos
Hi @Fernando Lopez Bello Sharing of interpreter processes is easily adjustable Go to the interpreter setting page : And scroll down to the spark interpreter : By default interpreters are shared globaly : ie - all notes/users share the same interpreter instance (hence the same spark context) Change the setting to either "per note" or "per user" depending on your use case : - Per Note : ie - each note will instantiate a separate interpreter process - Per User : ie - each user instantiates a seperate interpreter process (which is shared amongst the notes for which he/she has ownership) Below an article written by one of the original developpers of zeppelin describing interpreter modes : https://medium.com/@leemoonsoo/apache-zeppelin-interpreter-mode-explained-bae0525d0555 Zeppelin documentation: https://zeppelin.apache.org/docs/latest/manual/interpreters.html#interpreter-binding-mode
... View more
11-29-2017
06:35 PM
1 Kudo
Hi @Michael Bronson Spark has a bunch of parameter to deal with job history rotation. In particular : spark.history.fs.cleaner.enabled true
spark.history.fs.cleaner.maxAge 12h
spark.history.fs.cleaner.interval 1h Source : https://spark.apache.org/docs/latest/monitoring.html In the example above : - Rotation is active - All Jobs > than 12 hours will be deleted - Deletion happens at 1 hour intervals Note these parameters need to be implemented on a environnement level ( not on a job level ). They are usually placed in spark-default file. Matthieu
... View more
11-21-2017
06:53 PM
Hi @Michael DeGuzis All events linked to Ambari LDAP sync are written to : /var/log/ambari-server/ambari-server.log
cf : https://docs.hortonworks.com/HDPDocuments/Ambari-2.6.0.0/bk_ambari-security/content/synchronizing_ldap_users_and_groups.html Search the logs for events containing : - FilterBasedLdapUserSearch => search filter criteria or - AmbariLdapAuthenticationProvider => ex : warnings for failed authentication
... View more
11-12-2017
10:33 PM
Hi @Raymond Pfaff Ambari MPACKs are only compatible with the version of Ambari they were written against. You can only upgrade to the next release of each product that coincides with Ambari support ( ex. HDP 2.6.1 and HDF 3.0 against Ambari 2.5.1.0 ) HDF 3.0.2 has just been released and supports Ambari 2.6.0 : https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.2/bk_support-matrices/content/ch_matrices-hdf.html That should take care of the prb. Regards,
... View more
11-12-2017
10:33 PM
Hi @Raymond Pfaff Ambari MPACKs are only compatible with the version of Ambari they were written against. You can then only upgrade to the next release of each product that coincides Ambari support ( ex. HDP 2.6.1 and HDF 3.0 against Ambari 2.5.1.0 ) HDF 3.0.2 has just been released and supports Ambari 2.6.0 : https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.2/bk_support-matrices/content/ch_matrices-hdf.html That should take care of the prb. Regards,
... View more
10-02-2017
03:20 PM
1 Kudo
Hello @Raju
The thing to know when handling dataframes in Zeppelin is that a resultset is imported into the notebook when queried.
That table is also read into memory when a particular notebook is accessed. To prevent a query for bringing back too much data and crashing the server (this can happen very quickly), each interpreter set a limit to the number of rows that are brought back to zeppelin ( default for most interpreters is ~1000 ). => This does not mean that queries of more than a thousand rows will not be executed, just that only the first 1000 rows are actually shown in Zeppelin. You can adjust the number of rows interperter by interpreter, look for "maxResult" properties Go to the interpreter configuration page (upper right hand corner) Ex : for SPARK zeppelin.spark.maxResult or for Livy zeppelin.livy.spark.sql.maxResult For the JDBC interperter ( there's always an exception to the rule 🙂 ) common.max_count When using Zeppelin's dataframe export to CSV feature you simply exports what has been pushed back to zeppelin.
If the the max number of rows is set to a thousand, then you'll never have more than a thousand rows in your csv => The actual number of rows in your result set may be larger, it simply hasn't been fully read back into Zeppelin. This feature is great when working with small resultsets. It can however be deceiving as the results can be arbitrarily truncated when the max number of rows has been reached. If you're looking to export an entire table or a large subset, you should probably do it programmatically, for example by saving a table to a file system such as HDFS For Spark (2.0): dataframe.coalesce(1)
.write
.option("header", "true")
.csv("/path/to/sample_file.csv")
//Note the coalesce(1) => will bring all result to a single file otherwise you'll have 1 file per executor FYI
Notebooks are simply JSON object organized by paragraph. Just open up any notebook to get a sense of the structure In HDP they are saved in : /usr/hdp/current/zeppelin-server/notebook/[id of notebook]/note.json
... View more
09-04-2017
01:36 PM
Hi, I've got the following existing table created in HBASE : #Hbase table definition
create 'WEB_STAT_INFO', 'info', 'usage'
#Populate HBase table
put 'WEB_STAT_INFO', 'row1', 'info:host', 'Host A'
put 'WEB_STAT_INFO', 'row1', 'info:domain', 'Domain A'
put 'WEB_STAT_INFO', 'row1', 'usage:core', '15'
#view hbase data
scan 'WEB_STAT_INFO' I would like to create a Phoenix view on top of it. Mapping all data to VARCHAR works fine 0: jdbc:phoenix:localhost:2181/hbase> CREATE VIEW "WEB_STAT_INFO" ( ROWKEY VARCHAR PRIMARY KEY, "info"."host" VARCHAR, "info"."domain" VARCHAR, "usage"."core" VARCHAR ) ;
No rows affected (0.069 seconds)
0: jdbc:phoenix:localhost:2181/hbase> select * from WEB_STAT_INFO ;
+---------+---------+-----------+-------+
| ROWKEY | host | domain | core |
+---------+---------+-----------+-------+
| row1 | Host A | Domain A | 15 |
+---------+---------+-----------+-------+
1 row selected (0.081 seconds)
Mapping to anything else results in an error... ex : 0: jdbc:phoenix:localhost:2181/hbase> CREATE VIEW "WEB_STAT_INFO" ( ROWKEY VARCHAR PRIMARY KEY, "info"."host" VARCHAR, "info"."domain" VARCHAR, "usage"."core" INTEGER ) ;
No rows affected (0.064 seconds)
0: jdbc:phoenix:localhost:2181/hbase> select * from WEB_STAT_INFO ;
Error: ERROR 201 (22000): Illegal data. Expected length of at least 4 bytes, but had 2 (state=22000,code=201)
java.sql.SQLException: ERROR 201 (22000): Illegal data. Expected length of at least 4 bytes, but had 2
at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:442)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
at org.apache.phoenix.schema.KeyValueSchema.next(KeyValueSchema.java:211)
at org.apache.phoenix.expression.ProjectedColumnExpression.evaluate(ProjectedColumnExpression.java:115)
at org.apache.phoenix.compile.ExpressionProjector.getValue(ExpressionProjector.java:69)
at org.apache.phoenix.jdbc.PhoenixResultSet.getObject(PhoenixResultSet.java:524)
at sqlline.Rows$Row.<init>(Rows.java:157)
at sqlline.BufferedRows.<init>(BufferedRows.java:38)
at sqlline.SqlLine.print(SqlLine.java:1650)
at sqlline.Commands.execute(Commands.java:833)
at sqlline.Commands.sql(Commands.java:732)
at sqlline.SqlLine.dispatch(SqlLine.java:808)
at sqlline.SqlLine.begin(SqlLine.java:681)
at sqlline.SqlLine.start(SqlLine.java:398)
at sqlline.SqlLine.main(SqlLine.java:292)
Examples based on HDP 2.6.1 Note : I know that best practices recommend to create tables directly through Phoenix but in this particular use case the HBase table is used by an existing application. The objective is to facilitate data access without having any rewrites to the existing application
... View more
Labels:
08-14-2017
10:55 AM
Correction for syntax using beeline : Formated data for referecence {
"user": {
"location": "",
"id": 1514008171,
"name": "Auzzie Jet",
"screenname": "metalheadgrunge",
"geoenabled": false
},
"tweetmessage": "Anthrax - Black - Sunglasses hell yah\n http://t.co/qCNjba57Dm",
"createddate": "2013-06-20T12:08:44",
"geolocation": null
}
<br> From here on I refer to
`user`, `tweetmessage`, `createddate`, `geolocation` => Level 1 fields `location`, `id`, `name`, `screenname`, `geoenabled` => Level 2 fields Corrected select query : SELECT t2.name, t1.tweetmessage
FROM tweets t
LATERAL VIEW json_tuple(t.tweet, 'user', 'tweetmessage' ) t1 as `user`, `tweetmessage`
LATERAL VIEW json_tuple(t1.`user`, 'name', 'location') t2 as `name`, `location`
where t2.`name`="Auzzie Jet"
; Other examples : Select level 1 : `user`, `tweetmessage`, `createddate`, `geolocation` SELECT t1.`user`, t1.tweetmessage, t1.createddate, t1.geolocation
FROM tweets t
LATERAL VIEW json_tuple(t.tweet, 'user', 'tweetmessage', 'createddate', 'geolocation' ) t1
as `user`, `tweetmessage`, `createddate`, `geolocation`
; Select level 1 and 2 => Flatten everything SELECT t2.location, t2.id, t2.name, t2.screenname, t2.geoenabled, t1.tweetmessage, t1.createddate, t1.geolocation
FROM tweets t
LATERAL VIEW json_tuple(t.tweet, 'user', 'tweetmessage', 'createddate', 'geolocation' ) t1
as `user`, `tweetmessage`, `createddate`, `geolocation`
LATERAL VIEW json_tuple(t1.`user`, 'location', 'id', 'name', 'screenname', 'geoenabled' ) t2
as `location`, `id`, `name`, `screenname`, `geoenabled`
;
... View more
01-02-2017
02:35 PM
1 Kudo
Hi @Rambabu Chamakuri It might be easier to express it an sql statement : // SC is an existing JavaSparkContext
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
JavaRDD<PageData> rddX = sc.parallelize(pageData,20);
// Apply the schema to the RDD to create a dataFrame
DataFrame processedData= sqlContext.createDataFrame(rddX, PageData.class);
// Register the DataFrame as a table.
processedData.registerTempTable("data");
//Use SQL to express your Queries
DataFrame result = sqlContext.sql("SELECT date_publication, date_application, id_ref, id_unite, libelle, valeur, zone, tableau, dense_rank() OVER (PARTITION BY tableau ORDER BY libelle DESC) as ordre FROM data");
You may have already read them, but here are a few good ressources to help you out : Databrick's "Introducing Window Functions in Spark SQL" blog article :
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html Apache SPARK programming guide
http://spark.apache.org/docs/1.6.2/sql-programming-guide.html
... View more
12-30-2016
07:49 PM
1 Kudo
Hi, I’ve encountered an issue while using the spark-llap connector (https://github.com/hortonworks-spark/spark-llap). The connector does not seem to be able to read table schemas that have been created using the saveAsTable SPARK dataframe method, even though the same table is perfectly legible via Hive. Consider the following CSV file : 1,Mackenzy,Smith,US,1993-12-18,123-456-7890
2,Sherlyn,Miller,US,1975-03-22,234-567-8901
3,Khiana,Wilson,US,1989-08-14,345-678-9012
4,Jack,Thompson,US,1962-10-28,456-789-0123
5,Audrey,Taylor,UK,1985-01-11,12-3456-7890
6,Ruford,Walker,UK,1976-05-19,23-4567-8901
7,Marta,Lloyd,UK,1981-07-23,34-5678-9012
8,Derick,Schneider,DE,1982-04-17,12-345-67890
9,Anna,Richter,DE,1995-09-07,23-456-78901
10,Raina,Graf,DE,1999-02-06,34-567-89012
11,Felix,Lee,CA,1982-04-17,321-654-0987
12,Adam,Brown,CA,1995-09-07,432-765-1098
13,Lucas,Jones,CA,1999-02-06,543-876-2109
14,Yvonne,Dupont,FR,1982-04-17,01-23-45-67-89
15,Pascal,Fournier,FR,1995-09-07,23-45-67-89-01
16,Ariel,Simon,FR,1999-02-06,34-56-78-90-12
Used to create Hive Tables using SPARK via the saveAsTable method and via HiveQL // read CSV file
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val hdfsNameNode = {nameNodeURI}
val hdfsLocation = "/tmp/llap_spark_test/customers.csv"
val csv_uri = "hdfs://%s:8020/%s".format(hdfsNameNode,hdfsLocation)
val schemaString = "id,name_first,name_last,addr_country,date_of_birth,phone_num"
val schema = StructType(schemaString.split(",")
.map(fieldName => StructField(fieldName, StringType, true)))
val customerDf = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("delimiter",",")
.schema(schema)
.load(csv_uri)
//read back schema
customerDf.printSchema
customerDf.show(2)
// Save as Hive table “customers_llap” Spark saveAsTable
customerDf.write.format("orc").mode("overwrite").saveAsTable("employees.customers_llap")
// Save as Hive table “customers_llap_bis” HiveQL
customerDf.registerTempTable("customers_llap_tmp")
sqlContext.sql("CREATE TABLE IF NOT EXISTS employees.customers_llap_bis (id int, name_first string, name_last string, addr_country string, date_of_birth string, phone_num string) stored as ORC")
sqlContext.sql("INSERT OVERWRITE TABLE employees.customers_llap_bis SELECT * FROM customers_llap_tmp")
Reading the tables back using SPARK Hive Context works fine : Table customers_llap sqlContext.sql("SELECT * FROM employees.customers_llap").show Result : 16/12/30 19:20:36 INFO DAGScheduler: Job 1 finished: show at <console>:26, took 6.418023 s
+---+----------+---------+------------+-------------+--------------+
| id|name_first|name_last|addr_country|date_of_birth| phone_num|
+---+----------+---------+------------+-------------+--------------+
| 1| Mackenzy| Smith| US| 1993-12-18| 123-456-7890|
| 2| Sherlyn| Miller| US| 1975-03-22| 234-567-8901|
| 3| Khiana| Wilson| US| 1989-08-14| 345-678-9012|
| 4| Jack| Thompson| US| 1962-10-28| 456-789-0123|
| 5| Audrey| Taylor| UK| 1985-01-11| 12-3456-7890|
| 6| Ruford| Walker| UK| 1976-05-19| 23-4567-8901|
| 7| Marta| Lloyd| UK| 1981-07-23| 34-5678-9012|
| 8| Derick|Schneider| DE| 1982-04-17| 12-345-67890|
| 9| Anna| Richter| DE| 1995-09-07| 23-456-78901|
| 10| Raina| Graf| DE| 1999-02-06| 34-567-89012|
| 11| Felix| Lee| CA| 1982-04-17| 321-654-0987|
| 12| Adam| Brown| CA| 1995-09-07| 432-765-1098|
| 13| Lucas| Jones| CA| 1999-02-06| 543-876-2109|
| 14| Yvonne| Dupont| FR| 1982-04-17|01-23-45-67-89|
| 15| Pascal| Fournier| FR| 1995-09-07|23-45-67-89-01|
| 16| Ariel| Simon| FR| 1999-02-06|34-56-78-90-12|
+---+----------+---------+------------+-------------+--------------+
Table customers_llap_bis sqlContext.sql("SELECT * FROM employees.customers_llap_bis").show Result : 16/12/30 19:41:34 INFO DAGScheduler: Job 5 finished: show at <console>:27, took 0.086317 s
+---+----------+---------+------------+-------------+--------------+
| id|name_first|name_last|addr_country|date_of_birth| phone_num|
+---+----------+---------+------------+-------------+--------------+
| 1| Mackenzy| Smith| US| 1993-12-18| 123-456-7890|
| 2| Sherlyn| Miller| US| 1975-03-22| 234-567-8901|
| 3| Khiana| Wilson| US| 1989-08-14| 345-678-9012|
| 4| Jack| Thompson| US| 1962-10-28| 456-789-0123|
| 5| Audrey| Taylor| UK| 1985-01-11| 12-3456-7890|
| 6| Ruford| Walker| UK| 1976-05-19| 23-4567-8901|
| 7| Marta| Lloyd| UK| 1981-07-23| 34-5678-9012|
| 8| Derick|Schneider| DE| 1982-04-17| 12-345-67890|
| 9| Anna| Richter| DE| 1995-09-07| 23-456-78901|
| 10| Raina| Graf| DE| 1999-02-06| 34-567-89012|
| 11| Felix| Lee| CA| 1982-04-17| 321-654-0987|
| 12| Adam| Brown| CA| 1995-09-07| 432-765-1098|
| 13| Lucas| Jones| CA| 1999-02-06| 543-876-2109|
| 14| Yvonne| Dupont| FR| 1982-04-17|01-23-45-67-89|
| 15| Pascal| Fournier| FR| 1995-09-07|23-45-67-89-01|
| 16| Ariel| Simon| FR| 1999-02-06|34-56-78-90-12|
+---+----------+---------+------------+-------------+--------------+
But it hangs when trying to read back the table created via the saveAsTable SPARK dataframe method when using an Llap Context //Create an LlapContext
import org.apache.spark.sql.hive.llap.LlapContext
var llapContext = new LlapContext(sc)
//Read back the table created using saveAsTable ("customers_llap”)
llapContext.sql("SELECT * FROM employees.customers_llap").show(5)
Result : scala> llapContext.sql("SELECT * FROM employees.customers_llap").show(5)
16/12/30 19:41:54 INFO ParseDriver: Parsing command: SELECT * FROM employees.customers_llap
16/12/30 19:41:54 INFO ParseDriver: Parse Completed
java.lang.Exception: Expected MetastoreRelation
at org.apache.spark.sql.hive.llap.LlapCatalog.lookupRelation(LlapContext.scala:116)
at org.apache.spark.sql.hive.llap.LlapContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(LlapContext.scala:47)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)
at org.apache.spark.sql.hive.llap.LlapContext$$anon$1.lookupRelation(LlapContext.scala:47)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:302)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:314)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:309)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:299)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Reading back the table created using HiveQL (“customers_llap_bis”) works fine… //Read back the table created using saveAsTable ( “customers_llap”)
llapContext.sql("SELECT * FROM employees.customers_llap_bis").show(5) Result : 16/12/30 19:42:53 INFO DAGScheduler: Job 6 finished: show at <console>:31, took 2.519766 s
+---+----------+---------+------------+-------------+------------+
| id|name_first|name_last|addr_country|date_of_birth| phone_num|
+---+----------+---------+------------+-------------+------------+
| 1| Mackenzy| Smith| US| 1993-12-18|123-456-7890|
| 2| Sherlyn| Miller| US| 1975-03-22|234-567-8901|
| 3| Khiana| Wilson| US| 1989-08-14|345-678-9012|
| 4| Jack| Thompson| US| 1962-10-28|456-789-0123|
| 5| Audrey| Taylor| UK| 1985-01-11|12-3456-7890|
+---+----------+---------+------------+-------------+------------+
only showing top 5 rows
Reading the table created using the saveAsTable SPARK dataframe method using Hive work fine too beeline -u jdbc:hive2://{LLAP JDBC URL}:10500/ -n spark -e "SELECT * FROM employees.customers_llap limit 5" Result : Connected to: Apache Hive (version 2.1.0.2.5.3.0-37)
Driver: Hive JDBC (version 1.2.1000.2.5.3.0-37)
Transaction isolation: TRANSACTION_REPEATABLE_READ
INFO : Compiling command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096): SELECT * FROM employees.customers_llap limit 5
INFO : Semantic Analysis Completed
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:customers_llap.id, type:string, comment:null), FieldSchema(name:customers_llap.name_first, type:string, comment:null), FieldSchema(name:customers_llap.name_last, type:string, comment:null), FieldSchema(name:customers_llap.addr_country, type:string, comment:null), FieldSchema(name:customers_llap.date_of_birth, type:string, comment:null), FieldSchema(name:customers_llap.phone_num, type:string, comment:null)], properties:null)
INFO : Completed compiling command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096); Time taken: 0.198 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096): SELECT * FROM employees.customers_llap limit 5
INFO : Completed executing command(queryId=hive_20161230194324_5ac93edb-bf0b-4a6b-8246-76d33374a096); Time taken: 0.003 seconds
INFO : OK
+--------------------+----------------------------+---------------------------+------------------------------+-------------------------------+---------------------------+--+
| customers_llap.id | customers_llap.name_first | customers_llap.name_last | customers_llap.addr_country | customers_llap.date_of_birth | customers_llap.phone_num |
+--------------------+----------------------------+---------------------------+------------------------------+-------------------------------+---------------------------+--+
| 1 | Mackenzy | Smith | US | 1993-12-18 | 123-456-7890 |
| 2 | Sherlyn | Miller | US | 1975-03-22 | 234-567-8901 |
| 3 | Khiana | Wilson | US | 1989-08-14 | 345-678-9012 |
| 4 | Jack | Thompson | US | 1962-10-28 | 456-789-0123 |
| 5 | Audrey | Taylor | UK | 1985-01-11 | 12-3456-7890 |
+--------------------+----------------------------+---------------------------+------------------------------+-------------------------------+---------------------------+--+
5 rows selected (0.332 seconds)
... View more
Labels:
11-09-2016
06:19 PM
Hi @Sivasaravanakumar K The write function was implemented in 1.4.1... Try simply : df.saveAsTable("default.sample_07_new_schema")
It will be saved as Parquet (default format for Spark)
... View more
11-09-2016
05:23 PM
um which version of Spark ? 1.3.1 => HDP 2.3.0 1.4.1 => HDP 2.3.2 1.5.2 => HDP 2.3.4 I have a feeling it's spark 1.3, they made some major improvement in spark <=> Hive integration starting with spark 1.4.1.
... View more
11-08-2016
11:37 AM
Hi @Sivasaravanakumar K I've simplified my answer a bit.
What version of spark are you using ? This was tested on Spark 1.6.2 on a HDP 2.5 sandbox Note : When using spark-shell did you import : import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._
... View more
11-07-2016
07:41 PM
1 Kudo
Hi @Sivasaravanakumar K Here's one way of going about this.
Note the example below is based on the sample data available on the hortonworks sandbox.
Just change the database, table and column name to suit you needs 0. Get database and table info //show databases in Hive
sqlContext.sql("show databases").show
//show table in a database
sqlContext.sql("show tables in default").show
//read the table headers
sqlContext.sql("select * from default.sample_07").printSchema
result --------+
| result|
+--------+
| default|
|foodmart|
| xademo|
+--------+
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|sample_07| false|
|sample_08| false|
+---------+-----------+
root
|-- code: string (nullable = true)
|-- description: string (nullable = true)
|-- total_emp: integer (nullable = true)
|-- salary: integer (nullable = true)
1. Read table data into a DataFrame : // read data from Hive
val df = sqlContext.sql("select * from default.sample_07")
//Show Table Schema
df.printSchema result root
|-- code: string (nullable = true)
|-- description: string (nullable = true)
|-- total_emp: integer (nullable = true)
|-- salary: integer (nullable = true)
2. Change column names Change a single column name with the withColumnRenamed function val df_renamed = df.withColumnRenamed("salary", "money")
df_renamed.printSchema Or all at once using a list of header val newNames = Seq("code_1", "description_1", "total_emp_1", "money_1")
val df_renamed = df.toDF(newNames: _*)
df_renamed.printSchema Note you can combine reading toghether so as not to create 2 sets of data in memory val newNames = Seq("code_1", "description_1", "total_emp_1", "money_1")
val df = sqlContext.sql("select * from default.sample_07").toDF(newNames: _*) Or all at once using SQL alias (** preferred) val df = sqlContext.sql("select code as code_1, description as description_1, total_emp as total_emp_1, salary as money from default.sample_07")
df.printSchema result (using SQL alias) df: org.apache.spark.sql.DataFrame = [code_1: string, description_1: string, total_emp_1: int, money: int]
root
|-- code_1: string (nullable = true)
|-- description_1: string (nullable = true)
|-- total_emp_1: integer (nullable = true)
|-- money: integer (nullable = true)
3. Save back to hive //write to Hive (in ORC format)
df.write.format("orc").saveAsTable("default.sample_07_new_schema")
//read back and check new_schema
sqlContext.sql("select * from default.sample_07_new_schema").printSchema
result root
|-- code_1: string (nullable = true)
|-- description_1: string (nullable = true)
|-- total_emp_1: integer (nullable = true)
|-- money: integer (nullable = true)
... View more
10-19-2016
02:31 PM
Hi @Dhana Shekar Having your Namenode running in HA in not enough, your Ressource Manager (which handles Yarn management) also needs to be configured in HA (cf. first link in my answer above). Having your namenode in HA allows you to continue to have access to HDFS in case of the failure of active NameNode. However, it doesn't handle ressource allocation for application that's what the Ressource Manager (YARN) is for. Let's go through a few failure scenarios : 1. Ressource Manager fails but the containers (application master + slaves + driver) linked to that particular application are unaffected.
=> Your application continues to run as if nothing happened. You won't be able to submit new apps until the ressource Manager is back up or the standby has been brought to active (in case of HA) 2. One of the slave containers fails. => The application manager spawns a new container to take over. The spark task it was handling might fail but it will be replayed 3. Application Master container fails. => The application fails but will be re-spawned by the Ressource Manager.
... View more
10-17-2016
04:18 PM
1 Kudo
@Dhana Shekar What's you cluster setup like ?
Are you using Yarn? Is it setup to function in HA ?
http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html
If so, then you can rely on Yarn to handle your application to run in HA and to handle all the ressource allocations for you. That's the beauty of using Yarn to run your hadoop applications. All you need to do is tell spark to run your applications against Yarn : http://spark.apache.org/docs/latest/running-on-yarn.html
... View more
09-30-2016
08:59 PM
1 Kudo
Hi @Peter Smyth
In order to have access to a particular port from outside of docker, you need to expose it at the start of the docker container. Couple of ways to get around that:
0. First let's make sure that RStudio server is indeed up and running curl localhost:8787
If you get : curl: (7) couldn't connect to host Then check your Rstudio server configs 1.First option - Configure Rstudio server to use a port that is already exposed.
Ex. If you're not using Zeppelin Notebooks (ie. the service is stopped), you can use port 9995 1. Go to your Rstudio server install directory and modify : rserver.conf Change www-port=80 to www-port=9995 https://support.rstudio.com/hc/en-us/articles/200552316-Configuring-the-Server 2.Restart Rstudio
3.Go to the new RStudio login page - http://localhost:9995
2. Second option - Launch a new container with the new port exposed. (a bit more work, but cleaner...) Note by instanciating a new contained you revert to the original configurations...
This need to be done on the docker host machine. In this case the VMWare virtual machine
1. login to the VM ssh -p 2122 root@localhost Default password : hadoop 2. stop the current container
docker stop sandbox
docker stop sandbox
3. start a new container docker run -v hadoop:/hadoop --name sandbox2 --hostname "sandbox.hortonworks.com" --privileged -d \ -p 1000:1000 \
-p 10000:10000 \
-p 10001:10001 \
-p 10500:10500 \
-p 1100:1100 \
-p 11000:11000 \
-p 1220:1220 \
-p 15000:15000 \
-p 16010:16010 \
-p 16030:16030 \
-p 18080:18080 \
-p 1988:1988 \
-p 19888:19888 \
-p 10000:10000 \
-p 10001:10001 \
-p 10500:10500 \
-p 1100:1100 \
-p 11000:11000 \
-p 1220:1220 \
-p 15000:15000 \
-p 16010:16010 \
-p 16030:16030 \
-p 18080:18080 \
-p 1988:1988 \
-p 19888:19888 \
-p 2100:2100 \
-p 21000:21000 \
-p 2222:22 \
-p 4040:4040 \
-p 4200:4200 \
-p 42111:42111 \
-p 5007:5007 \
-p 50070:50070 \
-p 50075:50075 \
-p 50095:50095 \
-p 5011:5011 \
-p 50111:50111 \
-p 60000:60000 \
-p 6001:6001 \
-p 6003:6003 \
-p 6008:6008 \
-p 60080:60080 \
-p 6080:6080 \
-p 6188:6188 \
-p 61888:61888 \
-p 8000:8000 \
-p 8005:8005 \
-p 8020:8020 \
-p 8040:8040 \
-p 8042:8042 \
-p 8050:8050 \
-p 8080:8080 \
-p 8082:8082 \
-p 8086:8086 \
-p 8088:8088 \
-p 8090:8090 \
-p 8091:8091 \
-p 8188:8188 \
-p 8443:8443 \
-p 8744:8744 \
-p 8765:8765 \
-p 8886:8886 \
-p 8888:8888 \
-p 8889:8889 \
-p 8983:8983 \
-p 8993:8993 \
-p 9000:9000 \
-p 9090:9090 \
-p 9995:9995 \
-p 9996:9996 \
-p 8787:8787 \
sandbox /usr/sbin/sshd -D 3. Add a new route to the NAT settings of the VM.
The docker container is running on a VM which is itself running on a physical host.
A route must be created through both layers of virtualization to access the service.
Go to (Edit > Virtual Network Settings > NAT) Add a route for port 8787 following the same format as those already set. 4. Re - install R core and RStudio 5. go to the Rstudio Server login page http://localhost:8787 Alternative - Use the R interpreter for zeppelin to run R code...
... View more
09-19-2016
09:25 PM
1 Kudo
Hi AnjiReddy
Like Dan said, using some type of notebook would probably make your life a lot easier.
It will allow you to work interactively with spark with visual outputs and work with.
Check out :
- Zeppelin - https://zeppelin.apache.org/
Note : You can find a pre-installed version of Zeppeling in the hdp sandbox (if you don't want to go throught the hassle of installing and configuring the tool) - Jupyter - http://jupyter.org/ Note when working with spark, you'll probably want to install the spark (scala) Kernel https://github.com/ibm-et/spark-kernel That said you can make it work using only the spark shell.
I assume that there is a file already available in hdfs at /user/zeppelin/yahoo_stocks.csv
cvs csv file : date,open,high,low,close,volume,adj_close
2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34
2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36
2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52
2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70
2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98
2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49
2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66
2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45
2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78
2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73
2015-04-14,44.82,45.64,44.79,45.53,12365800,45.53
2015-04-13,45.25,45.59,44.72,44.77,8837300,44.77
2015-04-10,45.79,45.79,45.00,45.18,8436400,45.18
2015-04-09,45.70,46.17,45.16,45.63,13678000,45.63
2015-04-08,43.86,45.19,43.80,45.17,16071000,45.17
2015-04-07,43.79,44.22,43.56,43.61,11382000,43.61
2015-04-06,43.82,44.03,43.61,43.67,10717000,43.67
2015-04-02,44.24,44.36,43.68,44.15,12229400,44.15
2015-04-01,44.45,44.60,43.95,44.13,14722300,44.13
2015-03-31,44.82,45.20,44.42,44.44,10415500,44.44
2015-03-30,45.36,45.42,44.82,44.95,8884300,44.95
2015-03-27,45.20,45.67,45.01,45.10,20563500,45.10
2015-03-26,43.78,44.67,43.68,44.47,16162900,44.47
2015-03-25,44.59,44.93,44.13,44.20,14036900,44.20
2015-03-24,44.64,44.78,44.28,44.42,7559100,44.42
2015-03-23,45.25,45.54,44.71,44.72,8268800,44.72
2015-03-20,45.37,45.58,44.91,45.04,14194200,45.04
2015-03-19,44.90,45.45,44.81,44.98,14758000,44.98
2015-03-18,43.58,44.71,43.43,44.67,18919900,44.67
Launch Spark shell Note The spark csv parckage makes reading cvs files a lot easier - https://github.com/databricks/spark-csv spark-shell --master yarn --packages com.databricks:spark-csv_2.10:1.5.0 Code : // create RDD from file
val input_df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter",",").load("hdfs://sandbox.hortonworks.com:8020/user/zeppelin/yahoo_stocks.csv")
// save file to hive (the spark way)
input_df.write.saveAsTable("%s.%s".format( "default" , "stockPrice" ))
// save the file the sql way
input_df.registerTempTable("stock_price")
sqlContext.sql("create table hivestockprice as select * from stockPrice")
//You can read back the data from Hive in the same way
sqlContext.sql("Select * from hivestockprice").show(10)
The Zeppelin way : Paragraph 1 : Read data and save to Hive : %spark
//read file
val input_df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter",",").load("hdfs://sandbox.hortonworks.com:8020/user/zeppelin/yahoo_stocks.csv")
//save to Hive
input_df.write.saveAsTable("%s.%s".format( "default" , "stockPrice" ))
Paragraphe 2 : Read back from Hive using SQL %sql
Select * from stockPrice
... View more