Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Error on avro M/R job with Tez: missing configuration property

Highlighted

Error on avro M/R job with Tez: missing configuration property

New Contributor

Hello,

I am trying Tez to enhance performance of an existing map/reduce avro job (actually hundreds, but all share the same issue).

Before submitting the job, I set the "avro.map.output.schema" configuration property calling org.apache.avro.mapred.AvroJob.setMapOutputSchema. In the shuffle part of the job, Tez compares the keys of the records calling the org.apache.avro.mapred.AvroKeyCcomparator (I actually subclassed it but this is not related to my issue), which needs to retrieve the map output schema from the configuration. The problem is that the configuration instance available here appears to be incomplete for my needs: the "avro.map.output.schema" property is missing and the job fails with the following trace

org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$ShuffleError: Error while doing final merge
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:378)
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:337)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.NullPointerException
at java.io.StringReader.<init>(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:917)
at org.apache.avro.Schema.parse(Schema.java:966)
at org.apache.avro.mapred.AvroJob.getMapOutputSchema(AvroJob.java:78)
at com.afklm.karma.core.common.hadoop.fwk.comparator.AvroKeyComparator.getKeySchema(AvroKeyComparator.java:100)
at com.afklm.karma.core.common.hadoop.fwk.comparator.AvroKeyComparator.compare(AvroKeyComparator.java:77)
at org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.lessThan(TezMerger.java:612)
at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:128)
at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:55)
at org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.merge(TezMerger.java:711)
at org.apache.tez.runtime.library.common.sort.impl.TezMerger$MergeQueue.merge(TezMerger.java:622)
at org.apache.tez.runtime.library.common.sort.impl.TezMerger.merge(TezMerger.java:147)
at org.apache.tez.runtime.library.common.sort.impl.TezMerger.merge(TezMerger.java:130)
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager.finalMerge(MergeManager.java:1036)
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager.close(MergeManager.java:540)
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:376)
... 6 more

I experimented storing the output schema in some other key that I observed to be available in the shuffle part (for example "ha.zookeeper.acl" - I guessed my job would not need this property for its intended usage) and reading it back in my subclass of AvroKeyCcomparator, and this works! Of course I can't keep this ugly workaround, but this showed Tez is somehow filtering the configuration and this is what stops my job from working.

My workaround also enabled me to get one step further and I saw that the same kind of issue happens again when deserializing avro records when Tez builds the valuesIterator for the reduce phase:

java.lang.NullPointerException
at java.io.StringReader.<init>(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:917)
at org.apache.avro.Schema.parse(Schema.java:966)
at org.apache.avro.mapred.AvroJob.getMapOutputSchema(AvroJob.java:78)
at org.apache.avro.mapred.AvroSerialization.getDeserializer(AvroSerialization.java:53)
at org.apache.hadoop.io.serializer.SerializationFactory.getDeserializer(SerializationFactory.java:90)
at org.apache.tez.runtime.library.common.ValuesIterator.<init>(ValuesIterator.java:80)
at org.apache.tez.runtime.library.input.OrderedGroupedKVInput.createValuesIterator(OrderedGroupedKVInput.java:287)
at org.apache.tez.runtime.library.input.OrderedGroupedKVInput.waitForInputReady(OrderedGroupedKVInput.java:178)
at org.apache.tez.runtime.library.input.OrderedGroupedKVInput.getReader(OrderedGroupedKVInput.java:242)
at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:130)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:344)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:181)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:172)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:168)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

What should I do to have the "avro.map.output.schema" property available in all parts of the job?

I am using HDP 2.4 (thus a 0.7.0.x version of Tez).

Thanks for your help!