Created 01-21-2016 12:57 PM
Hello I have a simple spark job that read in a file from hdfs, do some transformation and write out the file as orc file.
The process then load the orc file into Hive.
If I run this from command line using spark submit then everything runs fine and refer to the resources / files on HDFS correctly.
spark-submit --master local[4] --driver-memory 1024m --executor-memory 1024m --class mypackage.myMain --verbose myjar-0.3-SNAPSHOT.jar
However when I try to do this is an oozie job using:
nameNode=hdfs://sandbox.hortonworks.com:8020 jobTracker=sandbox.hortonworks.com:8032 master=local[4] queueName=default applicationRoot=myApp oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/user/${user.name}/${applicationRoot}
and this workflow:
<workflow-app xmlns='uri:oozie:workflow:0.5' name='MlcTransform'> <start to='spark-node' /> <action name='spark-node'> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/scv/mlc-processed"/> </prepare> <configuration> <property> <name>oozie.launcher.mapred.map.child.java.opts</name> <value>-Xmx2048m</value> </property> <property> <name>oozie.launcher.mapred.job.map.memory.mb</name> <value>1024</value> </property> </configuration> <master>${master}</master> <mode>client</mode> <name>MlcTransform</name> <class>mypackage.myMain</class> <jar>${nameNode}/user/${wf:user()}/${applicationRoot}/lib/myjar-0.3-SNAPSHOT.jar</jar> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name='end' /> </workflow-app>
I got the below error. I have already change permission on hdfs:/tmp/hive to a+rxw
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------ java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------ at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:193) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:164) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:162) at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:415) at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:414) at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:40) at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:296) at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:74) at com.lbg.markets.scv.csv.main.CSVTransformMain$.processComFiles(CSVTransformMain.scala:90) at com.lbg.markets.scv.csv.main.CSVTransformMain$.main(CSVTransformMain.scala:39) at com.lbg.markets.scv.csv.main.CSVTransformMain.main(CSVTransformMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104) at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95) at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47) at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:241) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------ at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612) at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508) ... 37 more log4j:WARN No appenders could be found for logger (org.apache.spark.SparkContext). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
This goes away if I chmod on the local file system /tmp/hive, but then further down the line I got something similar - the process tries to access local file system:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job aborted. org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1840) at com.lbg.markets.scv.util.hive.OrcWriter.writeToOrc(OrcWriter.scala:48) at com.lbg.markets.scv.csv.main.CSVTransformMain$.processComFiles(CSVTransformMain.scala:99) at com.lbg.markets.scv.csv.main.CSVTransformMain$.main(CSVTransformMain.scala:39) at com.lbg.markets.scv.csv.main.CSVTransformMain.main(CSVTransformMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104) at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95) at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47) at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:241) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/user/spark/scv/mlc-processed/_temporary/0/_temporary/attempt_201601211123_0000_m_000000_0 (exists=false, cwd=file:/hadoop/yarn/local/usercache/spark/appcache/application_1453374154950_0005/container_e20_1453374154950_0005_01_000002) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.getStream(WriterImpl.java:2103) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:2120) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:2425) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:106) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:91) at org.apache.spark.sql.hive.orc.OrcOutputWriter.close(OrcRelation.scala:144) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:249) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) ... 47 more Caused by: java.io.IOException: Mkdirs failed to create file:/user/spark/scv/mlc-processed/_temporary/0/_temporary/attempt_201601211123_0000_m_000000_0 (exists=false, cwd=file:/hadoop/yarn/local/usercache/spark/appcache/application_1453374154950_0005/container_e20_1453374154950_0005_01_000002) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.getStream(WriterImpl.java:2103) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:2120) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:2425) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:106) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:91) at org.apache.spark.sql.hive.orc.OrcOutputWriter.close(OrcRelation.scala:144) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:249) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Oozie Launcher failed, finishing Hadoop job gracefully
Is there a flag or settings somewhere that dictate where the sparkAction should refer to (local file system or HDFS) in oozie / hdfs / yarn / spark / hive?
Created 01-22-2016 03:55 PM
Thanks @Gangadhar Kadam
After posting the message I have tried a few things and I have finally got it to work. Here are the summary of things I have done:
// following is the old code that is in the spark 1.3.1 API // notice this didnt work even when targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/ // dataFrame.save(targetDir, "org.apache.spark.sql.hive.orc", SaveMode.Overwrite) // change to this, only available since in Spark 1.4.0 // Save as an ORC file in the target directory with a specified file label. // notice targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/ dataFrame.write.format("orc").mode(SaveMode.Overwrite).save(targetDir)
val hiveMetaStoreDir = hdfsPath + "/user/" + sc.sparkUser + "/warehouse" val hiveMetaStoreUris = "thrift://sandbox.hortonworks.com:9083" logger.info("setting hive.metastore.warehouse.dir to " + hiveMetaStoreDir) logger.info("setting hive.metastore.uris to " + hiveMetaStoreUris) hc.setConf("hive.metastore.warehouse.dir", hiveMetaStoreDir) hc.setConf("hive.metastore.uris", hiveMetaStoreUris)
Created 01-21-2016 04:37 PM
Just want to add more information. The sandbox I am using is 2.3.4 (a manual upgrade from generally available 2.3.2)
the oozie spark lib (/user/oozie/share/lib/lib_20160115164104/spark) contains these files:
datanucleus-api-jdo-3.2.6.jar datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar oozie-sharelib-spark-4.2.0.2.3.4.0-3485.jar spark-1.5.2.2.3.4.0-3485-yarn-shuffle.jar spark-assembly-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar spark-examples-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar
I have got the similar on 2.3.2 - hence the upgrade.
Created 01-21-2016 08:59 PM
Try specifyiong like <arg>outputpath=hdfs://localhost/input/file.txt</arg> as below
<master>local[*]</master> <mode>client<mode> <name>Spark Example</name> <class>org.apache.spark.examples.mllib.JavaALS</class> <jar>/lib/spark-examples_2.10-1.1.0.jar</jar> <spark-opts>--executor-memory 20G --num-executors 50</spark-opts> <arg>inputpath=hdfs://localhost/input/file.txt</arg> <arg>outputpath=hdfs://localhost/output/file.txt</arg> <arg>value=2</arg>
Created 01-22-2016 03:55 PM
Thanks @Gangadhar Kadam
After posting the message I have tried a few things and I have finally got it to work. Here are the summary of things I have done:
// following is the old code that is in the spark 1.3.1 API // notice this didnt work even when targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/ // dataFrame.save(targetDir, "org.apache.spark.sql.hive.orc", SaveMode.Overwrite) // change to this, only available since in Spark 1.4.0 // Save as an ORC file in the target directory with a specified file label. // notice targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/ dataFrame.write.format("orc").mode(SaveMode.Overwrite).save(targetDir)
val hiveMetaStoreDir = hdfsPath + "/user/" + sc.sparkUser + "/warehouse" val hiveMetaStoreUris = "thrift://sandbox.hortonworks.com:9083" logger.info("setting hive.metastore.warehouse.dir to " + hiveMetaStoreDir) logger.info("setting hive.metastore.uris to " + hiveMetaStoreUris) hc.setConf("hive.metastore.warehouse.dir", hiveMetaStoreDir) hc.setConf("hive.metastore.uris", hiveMetaStoreUris)