Member since
10-07-2015
107
Posts
73
Kudos Received
23
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2704 | 02-23-2017 04:57 PM | |
2094 | 12-08-2016 09:55 AM | |
9235 | 11-24-2016 07:24 PM | |
4123 | 11-24-2016 02:17 PM | |
9607 | 11-24-2016 09:50 AM |
12-20-2019
01:15 PM
I have the following issue with this setup . .I define Livy service on a Knox topology with authentication provider enabled . When I request the Livy session over Knox url Knox requests the Livy session with doAs = myuser . So far so good. .. Livy sessions is started with owner=Knox and proxyuser =myuser.. Problem is when we attempt to post to Livy statements API over the Knox url. If we use the Knox url for posting to the running Livy session Knox will add the doAs=myuser . But now we get a forbidden response . Basically because the Livy session is owned by Knox we cannot post statement into the session over the Knox url with doAs=myuser . in my setup at least only the Knox user may post a statement to a Livy session owned by Knox .
... View more
04-12-2017
08:09 AM
iris = spark.read.csv("/tmp/iris.csv", header=True, inferSchema=True)
iris.printSchema()
Result:
root
|-- sepalLength: double (nullable = true)
|-- sepalWidth: double (nullable = true)
|-- petalLength: double (nullable = true)
|-- petalWidth: double (nullable = true)
|-- species: string (nullable = true)
Write parquet file ... iris.write.parquet("/tmp/iris.parquet")
... and create hive table spark.sql("""
create external table iris_p (
sepalLength double,
sepalWidth double,
petalLength double,
petalWidth double,
species string
)
STORED AS PARQUET
location "/tmp/iris.parquet"
""")
... View more
02-23-2017
11:03 AM
@Oriane Try option "Reward User"
... View more
10-25-2018
07:51 PM
Useful for getting SparkMagic to run w/ Jupyter. And the images do not seem to load for me either, still good how-to tech article for Jupyter.
... View more
11-28-2016
03:54 PM
Thank you! I was having difficulty with the replace function. I had not thought to first use the ExtractText processor.
... View more
08-22-2016
11:55 AM
1 Kudo
Hi Pedro, python API for Spark is still missing, however there is a git project with a higher level API on top of Spark GraphX called GraphFrames: (GraphFrames) . The project claims: "GraphX is to RDDs as GraphFrames are to DataFrames." I haven't worked with it, however a quick test of their samples with Spark 1.6.2 worked: Use pyspark like this: pyspark --packages graphframes:graphframes:0.2.0-spark1.6-s_2.10 or use zeppelin and add the dependencies to the interpreter configuration. Maybe this library has what you need.
... View more
07-07-2016
06:17 PM
4 Kudos
In order to submit jobs to Spark, so called "fat jars" (containing all dependencies) are quite useful. If you develop your code in Scala,
"sbt"
(http://www.scala-sbt.org) is a great choice to build your project. The following relies on the newest version, sbt 0.13
For fat jars you first need
"sbt-assembly" (https://github.com/sbt/sbt-assembly). Assuming you have the standard sbt folder structure, the easiest way is to add a file "assembly.sbt"
into the "project" folder containing one line
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
The project structure now looks like (most probably without the "target" folder which will be created upon building the project)
MyProject
+-- build.sbt
+-- project
| +-- assembly.sbt
+-- src
| +-- main
| +-- scala
| +-- MyProject.scala
+-- target
For building Spark Kafka Streaming jobs on HDP 2.4.2, this is the build file
"build.sbt"
name := "MyProject"
version := "0.1"
scalaVersion := "2.10.6"
resolvers += "Hortonworks Repository" at "http://repo.hortonworks.com/content/repositories/releases/"
resolvers += "Hortonworks Jetty Maven Repository" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1.2.4.2.0-258" % "provided",
"org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.6.1.2.4.2.0-258"
)
assemblyMergeStrategy in assembly := {
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "squareup", xs @ _*) => MergeStrategy.last
case PathList("com", "sun", xs @ _*) => MergeStrategy.last
case PathList("com", "thoughtworks", xs @ _*) => MergeStrategy.last
case PathList("commons-beanutils", xs @ _*) => MergeStrategy.last
case PathList("commons-cli", xs @ _*) => MergeStrategy.last
case PathList("commons-collections", xs @ _*) => MergeStrategy.last
case PathList("commons-io", xs @ _*) => MergeStrategy.last
case PathList("io", "netty", xs @ _*) => MergeStrategy.last
case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
case PathList("javax", "xml", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("org", "codehaus", xs @ _*) => MergeStrategy.last
case PathList("org", "fusesource", xs @ _*) => MergeStrategy.last
case PathList("org", "mortbay", xs @ _*) => MergeStrategy.last
case PathList("org", "tukaani", xs @ _*) => MergeStrategy.last
case PathList("xerces", xs @ _*) => MergeStrategy.last
case PathList("xmlenc", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
1) The
"resolvers" section adds the Hortonworks repositories.
2) In
"libraryDependencies" you add Spark-Streaming (which will also load Spark-Core) and Spark-Kafka-Streaming jars. To avoid problems with Kafka dependencies it is best to use the "spark-streaming-kafka-assembly" fat jar.
Note that Spark-Streaming can be tagged as
"provided" (it is omitted from the jat jar), since it is automatically available when you submit a job .
3) Unfortunately a lot of libraries are imported twice due to the dependencies which leads to assembly errors. To overcome the issue, the
"assemblyMergeStrategy" section tells sbt assembly to always use the last one (which is from the spark jars). This list is handcrafted and might change in a new version of HDP. However the idea should be clear.
4) Assemble the project (if you call it the first time it will "download the internet" like maven)
sbt assembly
will create "target/scala-2.10/myproject-assembly-0.1.jar" 5) You can now submit it to Spark spark-submit --master yarn --deploy-mode client \
--class my.package.MyProject target/scala-2.10/myproject-assembly-0.1.jar
... View more
Labels:
05-06-2016
10:37 AM
Hi @Mike Vogt, thanks and glad to hear it worked. Can you kindly accept the answer and thus help us managing answered questions. Tnx!
... View more
04-22-2016
08:26 AM
1 Kudo
There seems to be a length issue. If I compress your avro schema to one line it works in Hiev view and in beeline DROP TABLE metro;
CREATE TABLE metro
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.literal'='{"namespace":"ly.stealth.xmlavro","protocol":"xml","type":"record","name":"MetroBillType","fields":[{"name":"BillDate","type":"string"},{"name":"BillTime","type":"string"},{"name":"Remit_CompanyName","type":"string"},{"name":"Remit_Addr","type":"string"},{"name":"Remit_CityStZip","type":"string"},{"name":"Remit_Phone","type":"string"},{"name":"Remit_Fax","type":"string"},{"name":"Remit_TaxID","type":"string"},{"name":"BillAcct_Break","type":{"type":"record","name":"BillAcct_BreakType","fields":[{"name":"BillAcct","type":"string"},{"name":"Invoice_Number","type":"int"},{"name":"Acct_Break","type":{"type":"record","name":"Acct_BreakType","fields":[{"name":"Acct","type":"string"},{"name":"Items","type":{"type":"record","name":"ItemsType","fields":[{"name":"Item","type":{"type":"array","items":{"type":"record","name":"ItemType","fields":[{"name":"Account","type":"string"},{"name":"Claim_Number","type":"string"},{"name":"Insured_Name","type":"string"},{"name":"Price","type":"float"},{"name":"Control_Number","type":"int"},{"name":"State","type":"string"},{"name":"Report_Type_Code","type":"string"},{"name":"Report_Type_Desc","type":"string"},{"name":"Policy_Number","type":"string"},{"name":"Date_of_Loss","type":"string"},{"name":"Date_Received","type":"string"},{"name":"Date_Closed","type":"string"},{"name":"Days_to_Fill","type":"int"},{"name":"Police_Dept","type":"string"},{"name":"Attention","type":"string"},{"name":"RequestID","type":"int"},{"name":"ForceDup","type":"string"},{"name":"BillAcct","type":"string"},{"name":"BillCode","type":"string"}]}}}]}},{"name":"Acct_Total","type":"float"},{"name":"Acct_Count","type":"int"}]}},{"name":"Bill_Total","type":"float"},{"name":"Bill_Count","type":"int"}]}},{"name":"Previous_Balance","type":"int"}]}');
SELECT * FROM metro;
+-----------------+-----------------+--------------------------+-------------------+------------------------+--------------------+------------------+--------------------+-----------------------+-------------------------+--+
| metro.billdate | metro.billtime | metro.remit_companyname | metro.remit_addr | metro.remit_citystzip | metro.remit_phone | metro.remit_fax | metro.remit_taxid | metro.billacct_break | metro.previous_balance |
+-----------------+-----------------+--------------------------+-------------------+------------------------+--------------------+------------------+--------------------+-----------------------+-------------------------+--+
+-----------------+-----------------+--------------------------+-------------------+------------------------+--------------------+------------------+--------------------+-----------------------+-------------------------+--+
With the nicely formatted avro schema I receive: "Unexpected end-of-input within/between ARRAY entries" which indicates that there is a length restriction for this parameter. Else try to use the avro.schema.url way.
... View more
04-18-2016
05:52 PM
5 Kudos
This is an extension of Starting Spark jobs directly via YARN REST API Assume the cluster is kerberized and the only access is Knox. Further assume that Knox uses Basic Authentication and we have user and password of the user to start the Spark job. The overall idea is to call curl with <<user>>:<<password>> as Basic Authentication
==> Knox (verifying user:password against LDAP or AD)
==> Resource Manager (YARN REST API) with kerberos principal 1) Create and distribute a keytab The user that should run the spark job also needs to have a kerberos principal. For this principle create a keytab on one machine: [root@KDC-HOST ~]$ kadmin
kadmin> xst -k /etc/security/keytabs/<<primaryName>>.keytab <<primaryName>>@<<REALM>> Use your REALM and an appropriate primaryName of your principle Then distribute this keytab to all other machines in the cluster, copy to /etc/security/keytabs and set permissions [root@CLUSTER-HOST ~]$ chown <<user>>:hadoop /etc/security/keytabs/<<primaryName>>.keytab
[root@CLUSTER-HOST ~]$ chmod 400 /etc/security/keytabs/<<primaryName>>.keytab Test the keytab on each machine [root@CLUSTER-HOST ~]$ kinit <<primaryName>>@<<REALM>> \
-k -t /etc/security/keytabs/<<primaryName>>.keytab
# There must be no password prompt!
[root@KDC-HOST ~]$ klist -l
# Principal name Cache name
# -------------- ----------
# <<primaryName>>@<<REALM>> FILE:/tmp/krb5cc_<<####>> 2) Test connection from the workstation outside the cluster a) HDFS: [MacBook simple-project]$ curl -s -k -u '<<user>>:<<password>>' \
https://$KNOX_SERVER:8443/gateway/default/webhdfs/v1/?op=GETFILESTATUS
# {
# "FileStatus": {
# "accessTime": 0,
# "blockSize": 0,
# "childrenNum": 9,
# "fileId": 16385,
# "group": "hdfs",
# "length": 0,
# "modificationTime": 1458070072105,
# "owner": "hdfs",
# "pathSuffix": "",
# "permission": "755",
# "replication": 0,
# "storagePolicy": 0,
# "type": "DIRECTORY"
# }
# } b) YARN: [MacBook simple-project]$ curl -s -k -u '<<user>>:<<password>>' -d '' \
https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application
# {
# "application-id": "application_1460654399208_0004",
# "maximum-resource-capability": {
# "memory": 8192,
# "vCores": 3
# }
# } 3) Changes to spark-yarn.properties The following values need to changed added compared to Starting Spark jobs directly via YARN REST API: spark.history.kerberos.keytab=/etc/security/keytabs/spark.headless.keytabs
spark.history.kerberos.principal=spark-Demo@<<REALM>>
spark.yarn.keytab=/etc/security/keytabs/<<primaryName>>.keytab
spark.yarn.principal=<<primaryName>>@<<REALM>> 4) Changes to spark-yarn.json The following properties need to be added to the command attribute (before org.apache.spark.deploy.yarn.ApplicationMaster) compared to Starting Spark jobs directly via YARN REST API via YARN REST API: -Dspark.yarn.keytab=/etc/security/keytabs/<<primaryName>>.keytab \
-Dspark.yarn.principal=<<primaryName>>@<<REALM>> \
-Dspark.yarn.credentials.file=hdfs://<<name-node>>:8020/tmp/simple-project/credentials_4b023f93-fbde-48ff-b2c8-516251aeed52 \
-Dspark.history.kerberos.keytab=/etc/security/keytabs/spark.headless.keytabs \
-Dspark.history.kerberos.principal=spark-Demo@<<REALM>> \
-Dspark.history.kerberos.enabled=true credentials_4b023f93-fbde-48ff-b2c8-516251aeed52 is just a unique filename and the file does not need to exist. Concatenate "credentials" with an UUID4. This is the trigger for Spark to start a Delegation Token refresh thread. Details see attachment. 5) Submit a Job Same as in Starting Spark jobs directly via YARN REST API via YARN REST API, however one needs to provide -u <<user>>:<<password>> to the curl command to authenticate with Knox. After being authenticated by Knox, the keytabs for the following steps will be taken by YARN and Spark from the properties and job json file. 6) Know Issue After finishing the job successfully, the log aggregation status will continue to be "RUNNING" until it gets a "TIME_OUT" 7) More details Again, more details and a python script to ease the whole process can be found in Spark-Yarn-REST-API Repo Any comment to make this process easier is highly appreciated ...
... View more
Labels: