Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

oozie SparkAction a simple job that extract-transform-load from HDFS to Hive - temp files written to to local fs even when log says hdfs

avatar
Super Collaborator

Hello I have a simple spark job that read in a file from hdfs, do some transformation and write out the file as orc file.

The process then load the orc file into Hive.

If I run this from command line using spark submit then everything runs fine and refer to the resources / files on HDFS correctly.

 spark-submit  --master local[4]  --driver-memory 1024m --executor-memory 1024m --class mypackage.myMain --verbose myjar-0.3-SNAPSHOT.jar

However when I try to do this is an oozie job using:

nameNode=hdfs://sandbox.hortonworks.com:8020
jobTracker=sandbox.hortonworks.com:8032
master=local[4]
queueName=default
applicationRoot=myApp
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${applicationRoot}

and this workflow:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='MlcTransform'>
    <start to='spark-node' />
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                 <delete path="${nameNode}/user/${wf:user()}/scv/mlc-processed"/>
            </prepare>
            <configuration>
                <property>
                    <name>oozie.launcher.mapred.map.child.java.opts</name>
                    <value>-Xmx2048m</value>
                </property>
                <property>
                      <name>oozie.launcher.mapred.job.map.memory.mb</name>
                      <value>1024</value>
                </property> 
            </configuration>
            <master>${master}</master>
            <mode>client</mode>
            <name>MlcTransform</name>
            <class>mypackage.myMain</class>
            <jar>${nameNode}/user/${wf:user()}/${applicationRoot}/lib/myjar-0.3-SNAPSHOT.jar</jar>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <message>Workflow failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name='end' />
</workflow-app>

I got the below error. I have already change permission on hdfs:/tmp/hive to a+rxw

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:193)
at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:164)
at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:162)
at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:415)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:414)
at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:40)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:296)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:74)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.processComFiles(CSVTransformMain.scala:90)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.main(CSVTransformMain.scala:39)
at com.lbg.markets.scv.csv.main.CSVTransformMain.main(CSVTransformMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104)
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47)
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:241)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612)
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
... 37 more
log4j:WARN No appenders could be found for logger (org.apache.spark.SparkContext).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
                  


This goes away if I chmod on the local file system /tmp/hive, but then further down the line I got something similar - the process tries to access local file system:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job aborted.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1840)
at com.lbg.markets.scv.util.hive.OrcWriter.writeToOrc(OrcWriter.scala:48)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.processComFiles(CSVTransformMain.scala:99)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.main(CSVTransformMain.scala:39)
at com.lbg.markets.scv.csv.main.CSVTransformMain.main(CSVTransformMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104)
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47)
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:241)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/user/spark/scv/mlc-processed/_temporary/0/_temporary/attempt_201601211123_0000_m_000000_0 (exists=false, cwd=file:/hadoop/yarn/local/usercache/spark/appcache/application_1453374154950_0005/container_e20_1453374154950_0005_01_000002)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.getStream(WriterImpl.java:2103)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:2120)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:2425)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:106)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:91)
at org.apache.spark.sql.hive.orc.OrcOutputWriter.close(OrcRelation.scala:144)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:249)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
... 47 more
Caused by: java.io.IOException: Mkdirs failed to create file:/user/spark/scv/mlc-processed/_temporary/0/_temporary/attempt_201601211123_0000_m_000000_0 (exists=false, cwd=file:/hadoop/yarn/local/usercache/spark/appcache/application_1453374154950_0005/container_e20_1453374154950_0005_01_000002)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.getStream(WriterImpl.java:2103)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:2120)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:2425)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:106)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:91)
at org.apache.spark.sql.hive.orc.OrcOutputWriter.close(OrcRelation.scala:144)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:249)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Oozie Launcher failed, finishing Hadoop job gracefully

Is there a flag or settings somewhere that dictate where the sparkAction should refer to (local file system or HDFS) in oozie / hdfs / yarn / spark / hive?

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Thanks @Gangadhar Kadam

After posting the message I have tried a few things and I have finally got it to work. Here are the summary of things I have done:

  • Change the orc file writer code:
// following is the old code that is in the spark 1.3.1 API
// notice this didnt work even when targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
//    dataFrame.save(targetDir, "org.apache.spark.sql.hive.orc", SaveMode.Overwrite)
 
// change to this, only available since in Spark 1.4.0
// Save as an ORC file in the target directory with a specified file label.
// notice targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
dataFrame.write.format("orc").mode(SaveMode.Overwrite).save(targetDir)
  • Add the following to HiveContext - in spark-submit / spark-shell this has been setup but it needs to be setup by yourself on oozie
val hiveMetaStoreDir = hdfsPath + "/user/" + sc.sparkUser + "/warehouse"
val hiveMetaStoreUris = "thrift://sandbox.hortonworks.com:9083"

logger.info("setting hive.metastore.warehouse.dir to " + hiveMetaStoreDir)
logger.info("setting hive.metastore.uris to " + hiveMetaStoreUris)

hc.setConf("hive.metastore.warehouse.dir", hiveMetaStoreDir)
hc.setConf("hive.metastore.uris", hiveMetaStoreUris)
  • Add policy in ranger for user "yarn" to have write access to "/user/spark/scv/mlc-processed/" on hdfs

View solution in original post

3 REPLIES 3

avatar
Super Collaborator

Just want to add more information. The sandbox I am using is 2.3.4 (a manual upgrade from generally available 2.3.2)

the oozie spark lib (/user/oozie/share/lib/lib_20160115164104/spark) contains these files:

datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar
oozie-sharelib-spark-4.2.0.2.3.4.0-3485.jar
spark-1.5.2.2.3.4.0-3485-yarn-shuffle.jar
spark-assembly-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar
spark-examples-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar

I have got the similar on 2.3.2 - hence the upgrade.

avatar
Expert Contributor

Try specifyiong like <arg>outputpath=hdfs://localhost/input/file.txt</arg> as below

             <master>local[*]</master>
            <mode>client<mode>
            <name>Spark Example</name>
            <class>org.apache.spark.examples.mllib.JavaALS</class>
            <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
            <spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
            <arg>inputpath=hdfs://localhost/input/file.txt</arg>
             <arg>outputpath=hdfs://localhost/output/file.txt</arg>
            <arg>value=2</arg>

avatar
Super Collaborator

Thanks @Gangadhar Kadam

After posting the message I have tried a few things and I have finally got it to work. Here are the summary of things I have done:

  • Change the orc file writer code:
// following is the old code that is in the spark 1.3.1 API
// notice this didnt work even when targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
//    dataFrame.save(targetDir, "org.apache.spark.sql.hive.orc", SaveMode.Overwrite)
 
// change to this, only available since in Spark 1.4.0
// Save as an ORC file in the target directory with a specified file label.
// notice targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
dataFrame.write.format("orc").mode(SaveMode.Overwrite).save(targetDir)
  • Add the following to HiveContext - in spark-submit / spark-shell this has been setup but it needs to be setup by yourself on oozie
val hiveMetaStoreDir = hdfsPath + "/user/" + sc.sparkUser + "/warehouse"
val hiveMetaStoreUris = "thrift://sandbox.hortonworks.com:9083"

logger.info("setting hive.metastore.warehouse.dir to " + hiveMetaStoreDir)
logger.info("setting hive.metastore.uris to " + hiveMetaStoreUris)

hc.setConf("hive.metastore.warehouse.dir", hiveMetaStoreDir)
hc.setConf("hive.metastore.uris", hiveMetaStoreUris)
  • Add policy in ranger for user "yarn" to have write access to "/user/spark/scv/mlc-processed/" on hdfs