Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

oozie SparkAction a simple job that extract-transform-load from HDFS to Hive - temp files written to to local fs even when log says hdfs

avatar
Super Collaborator

Hello I have a simple spark job that read in a file from hdfs, do some transformation and write out the file as orc file.

The process then load the orc file into Hive.

If I run this from command line using spark submit then everything runs fine and refer to the resources / files on HDFS correctly.

 spark-submit  --master local[4]  --driver-memory 1024m --executor-memory 1024m --class mypackage.myMain --verbose myjar-0.3-SNAPSHOT.jar

However when I try to do this is an oozie job using:

nameNode=hdfs://sandbox.hortonworks.com:8020
jobTracker=sandbox.hortonworks.com:8032
master=local[4]
queueName=default
applicationRoot=myApp
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${applicationRoot}

and this workflow:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='MlcTransform'>
    <start to='spark-node' />
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                 <delete path="${nameNode}/user/${wf:user()}/scv/mlc-processed"/>
            </prepare>
            <configuration>
                <property>
                    <name>oozie.launcher.mapred.map.child.java.opts</name>
                    <value>-Xmx2048m</value>
                </property>
                <property>
                      <name>oozie.launcher.mapred.job.map.memory.mb</name>
                      <value>1024</value>
                </property> 
            </configuration>
            <master>${master}</master>
            <mode>client</mode>
            <name>MlcTransform</name>
            <class>mypackage.myMain</class>
            <jar>${nameNode}/user/${wf:user()}/${applicationRoot}/lib/myjar-0.3-SNAPSHOT.jar</jar>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <message>Workflow failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name='end' />
</workflow-app>

I got the below error. I have already change permission on hdfs:/tmp/hive to a+rxw

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:193)
at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:164)
at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:162)
at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:415)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:414)
at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:40)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:296)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:74)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.processComFiles(CSVTransformMain.scala:90)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.main(CSVTransformMain.scala:39)
at com.lbg.markets.scv.csv.main.CSVTransformMain.main(CSVTransformMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104)
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47)
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:241)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612)
at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
... 37 more
log4j:WARN No appenders could be found for logger (org.apache.spark.SparkContext).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
                  


This goes away if I chmod on the local file system /tmp/hive, but then further down the line I got something similar - the process tries to access local file system:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job aborted.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1840)
at com.lbg.markets.scv.util.hive.OrcWriter.writeToOrc(OrcWriter.scala:48)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.processComFiles(CSVTransformMain.scala:99)
at com.lbg.markets.scv.csv.main.CSVTransformMain$.main(CSVTransformMain.scala:39)
at com.lbg.markets.scv.csv.main.CSVTransformMain.main(CSVTransformMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104)
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47)
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:241)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/user/spark/scv/mlc-processed/_temporary/0/_temporary/attempt_201601211123_0000_m_000000_0 (exists=false, cwd=file:/hadoop/yarn/local/usercache/spark/appcache/application_1453374154950_0005/container_e20_1453374154950_0005_01_000002)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.getStream(WriterImpl.java:2103)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:2120)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:2425)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:106)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:91)
at org.apache.spark.sql.hive.orc.OrcOutputWriter.close(OrcRelation.scala:144)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:249)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
... 47 more
Caused by: java.io.IOException: Mkdirs failed to create file:/user/spark/scv/mlc-processed/_temporary/0/_temporary/attempt_201601211123_0000_m_000000_0 (exists=false, cwd=file:/hadoop/yarn/local/usercache/spark/appcache/application_1453374154950_0005/container_e20_1453374154950_0005_01_000002)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.getStream(WriterImpl.java:2103)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:2120)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:2425)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:106)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:91)
at org.apache.spark.sql.hive.orc.OrcOutputWriter.close(OrcRelation.scala:144)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:249)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Oozie Launcher failed, finishing Hadoop job gracefully

Is there a flag or settings somewhere that dictate where the sparkAction should refer to (local file system or HDFS) in oozie / hdfs / yarn / spark / hive?

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Thanks @Gangadhar Kadam

After posting the message I have tried a few things and I have finally got it to work. Here are the summary of things I have done:

  • Change the orc file writer code:
// following is the old code that is in the spark 1.3.1 API
// notice this didnt work even when targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
//    dataFrame.save(targetDir, "org.apache.spark.sql.hive.orc", SaveMode.Overwrite)
 
// change to this, only available since in Spark 1.4.0
// Save as an ORC file in the target directory with a specified file label.
// notice targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
dataFrame.write.format("orc").mode(SaveMode.Overwrite).save(targetDir)
  • Add the following to HiveContext - in spark-submit / spark-shell this has been setup but it needs to be setup by yourself on oozie
val hiveMetaStoreDir = hdfsPath + "/user/" + sc.sparkUser + "/warehouse"
val hiveMetaStoreUris = "thrift://sandbox.hortonworks.com:9083"

logger.info("setting hive.metastore.warehouse.dir to " + hiveMetaStoreDir)
logger.info("setting hive.metastore.uris to " + hiveMetaStoreUris)

hc.setConf("hive.metastore.warehouse.dir", hiveMetaStoreDir)
hc.setConf("hive.metastore.uris", hiveMetaStoreUris)
  • Add policy in ranger for user "yarn" to have write access to "/user/spark/scv/mlc-processed/" on hdfs

View solution in original post

3 REPLIES 3

avatar
Super Collaborator

Just want to add more information. The sandbox I am using is 2.3.4 (a manual upgrade from generally available 2.3.2)

the oozie spark lib (/user/oozie/share/lib/lib_20160115164104/spark) contains these files:

datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar
oozie-sharelib-spark-4.2.0.2.3.4.0-3485.jar
spark-1.5.2.2.3.4.0-3485-yarn-shuffle.jar
spark-assembly-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar
spark-examples-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar

I have got the similar on 2.3.2 - hence the upgrade.

avatar
Expert Contributor

Try specifyiong like <arg>outputpath=hdfs://localhost/input/file.txt</arg> as below

             <master>local[*]</master>
            <mode>client<mode>
            <name>Spark Example</name>
            <class>org.apache.spark.examples.mllib.JavaALS</class>
            <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
            <spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
            <arg>inputpath=hdfs://localhost/input/file.txt</arg>
             <arg>outputpath=hdfs://localhost/output/file.txt</arg>
            <arg>value=2</arg>

avatar
Super Collaborator

Thanks @Gangadhar Kadam

After posting the message I have tried a few things and I have finally got it to work. Here are the summary of things I have done:

  • Change the orc file writer code:
// following is the old code that is in the spark 1.3.1 API
// notice this didnt work even when targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
//    dataFrame.save(targetDir, "org.apache.spark.sql.hive.orc", SaveMode.Overwrite)
 
// change to this, only available since in Spark 1.4.0
// Save as an ORC file in the target directory with a specified file label.
// notice targetDir = hdfs://sandbox.hortonworks.com:8020/user/spark/scv/mlc-processed/
dataFrame.write.format("orc").mode(SaveMode.Overwrite).save(targetDir)
  • Add the following to HiveContext - in spark-submit / spark-shell this has been setup but it needs to be setup by yourself on oozie
val hiveMetaStoreDir = hdfsPath + "/user/" + sc.sparkUser + "/warehouse"
val hiveMetaStoreUris = "thrift://sandbox.hortonworks.com:9083"

logger.info("setting hive.metastore.warehouse.dir to " + hiveMetaStoreDir)
logger.info("setting hive.metastore.uris to " + hiveMetaStoreUris)

hc.setConf("hive.metastore.warehouse.dir", hiveMetaStoreDir)
hc.setConf("hive.metastore.uris", hiveMetaStoreUris)
  • Add policy in ranger for user "yarn" to have write access to "/user/spark/scv/mlc-processed/" on hdfs