Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Using CrunchIndexerTool with Spark as the pipeline Type is not working

Using CrunchIndexerTool with Spark as the pipeline Type is not working

Hi, We have a CDH 5.5.1 on a Cisco UCS cluster. We have been using MapReduceIndexerTool over the past year. Based on the performance, we would like to move to spark-Indexer. The tutorial http://www.cloudera.com/documentation/enterprise/5-3-x/topics/search_spark_index_ref.html helped us build th indexer and following was the command. 

"spark-submit \
--master yarn \
--deploy-mode cluster \
--jars $myDependencyJarFiles \
--executor-memory 32G \
--class org.apache.solr.crunch.CrunchIndexerTool \
--files $(ls ${LOCAL_PROP_DIR}/log4j.properties),$(ls $MORPHLINE_FILE),$(ls ~/tokenFile.txt),$(ls ${LOCAL_SCHEMA_DIR}/AVRO_SCHEMA.avsc) \
$myDriverJar \
-D hadoop.tmp.dir=$HDFS_SOLR_TEMP_OUTPUT_DIR \
-D morphlineVariable.ZK_HOST=$ZK_HOST \
-D 'mapred.child.java.opts=-Xmx4G' \
-D tokenFile=tokenFile.txt \
--input-file-format avro \
--input-file-reader-schema AVRO_SCHEMA.avsc \
--morphline-file morphline.conf \
--pipeline-type spark \
--chatty \
--log4j log4j.properties \
$HDFS_FILE"

 

$HDFS_FILEis on file which would be loaded. If the don't use

--input-file-format avro \
--input-file-reader-schema AVRO_SCHEMA.avsc \

it takes approximiately 17 - 21 minutes to do the indexing into solr. Our input data is avro files. and AVRO_SCHEMA.avsc is a schema. My understanding by using --input-file-format and --input-file-reader-schema you are able to create more partitions which would help in parallel processing. With the above two included I get a failure. Could you advise what is going on ?.  I've already created a Cloudera ticket 

3 REPLIES 3

Re: Using CrunchIndexerTool with Spark as the pipeline Type is not working

Expert Contributor
Try setting the number of spark executors and cores, etc - e.g. see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

Re: Using CrunchIndexerTool with Spark as the pipeline Type is not working

I added the --executor-cores 5 to my spark-submit and for the number of executors the spark.dynamicAllocation.enabled property is set to true in my cluster. Even after these it runs as a single thread. if I add set the "--input-file-format avro" and pass the "--input-file-reader-schema $SCHEMA.avsc", there are 6 executors running and 22 tasks but job fails. when I look at the logs it has the following

org.kitesdk.morphline.api.MorphlineRuntimeException: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.io.InputStream
at org.kitesdk.morphline.base.FaultTolerance.handleException(FaultTolerance.java:73)
at org.apache.solr.crunch.MorphlineFnBuilder$MorphlineFn.process(MorphlineFnBuilder.java:280)
at org.apache.crunch.util.DoFnIterator.hasNext(DoFnIterator.java:55)
at org.apache.crunch.util.DoFnIterator.hasNext(DoFnIterator.java:54)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1034)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1042)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.io.InputStream
at org.kitesdk.morphline.stdio.AbstractParser.getAttachmentInputStream(AbstractParser.java:184)
at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:94)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:181)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:156)
at org.apache.solr.crunch.MorphlineFnBuilder$MorphlineFn.process(MorphlineFnBuilder.java:274)
... 17 more
Highlighted

Re: Using CrunchIndexerTool with Spark as the pipeline Type is not working

Expert Contributor

When using "--input-file-format avro" then your morphline should start with a extractAvroPaths or extractAvroTree command, rather than start with a readAvroContainer or readAvro morphline command. This is because --input-file-format already takes care of parsing the input stream into Java main memory Avro objects.

Don't have an account?
Coming from Hortonworks? Activate your account here