Support Questions

Find answers, ask questions, and share your expertise

CDH 5.7.0 Spark Streaming S3 Error

avatar
Expert Contributor

I am testing CDH 5.7.0 and found that Spark Streaming no longer works with S3. I also found out that it doesn't work with SQS either. I tried using the AWS SDK 1.11.0 jar, but it only worked to fix the SQS problem. I also tried to get the latest jets3t 0.9.4 jar and use it. It didn't work.

 

Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V @155: invokevirtual
Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not assignable to 'org/jets3t/service/model/StorageObject'
Current Frame:
bci: @155
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', integer }
Bytecode:
0x0000000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
0x0000010: 5659 b701 5713 0192 b601 5b2b b601 5b13
0x0000020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
0x0000030: b400 7db6 00e7 b601 5bb6 015e b901 9802
0x0000040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
0x0000050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
0x0000060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
0x0000070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
0x0000080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
0x0000090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
0x00000a0: 000a 4e2a 2d2b b700 c7b1
Exception Handler Table:
bci [0, 116] => handler: 162
bci [117, 159] => handler: 162
Stackmap Table:
same_frame_extended(@65)
same_frame(@117)
same_locals_1_stack_item_frame(@162,Object[#139])
same_frame(@169)

at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

 

I hope there is a fix for this coming up soon.

 

Thanks,

Ben

1 ACCEPTED SOLUTION

avatar
Expert Contributor

I got it to work. I found in another thread the solution. The way to access S3 has changed.

 

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)

 

val lines = ssc.textFileStream("s3a://amg-events-out/")

 

Cheers,

Ben

View solution in original post

4 REPLIES 4

avatar
Master Collaborator

That looks like a S3 library problem. The JVM says the bytecode itself is invalid. It is nothing to do with Spark per se.

CDH does not support S3 although there's no particular reason it wouldn't work if you had the right libraries in place.

avatar
Expert Contributor

It looks like the AWS jar files have changed from CDH 5.4.8 to CDH 5.5.2.

 

In CDH 5.4.8:

/opt/cloudera/parcels/CDH/jars/aws-java-sdk-1.7.14.jar

 

In CDH 5.5.2+, these replaced aws-java-sdk-1.7.14.jar:

/opt/cloudera/parcels/CDH/jars/aws-java-sdk-core-1.10.6.jar

/opt/cloudera/parcels/CDH/jars/aws-java-sdk-kms-1.10.6.jar

/opt/cloudera/parcels/CDH/jars/aws-java-sdk-s3-1.10.6.jar

 

But the jets3t files are the same:

/opt/cloudera/parcels/CDH/jars/jets3t-0.9.0.jar

/opt/cloudera/parcels/CDH/jars/jets3t-0.6.1.jar

 

I don't know if this has anything to do with it but the only difference is the CDH version:

/opt/cloudera/parcels/CDH/jars/hadoop-aws-2.6.0-cdh5.x.x.jar

 

I wonder if any of these are the problem.

 

Thanks,

Ben

avatar
Expert Contributor

I got it to work. I found in another thread the solution. The way to access S3 has changed.

 

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)

 

val lines = ssc.textFileStream("s3a://amg-events-out/")

 

Cheers,

Ben

avatar
Master Collaborator

Yes you will certainly need to provide access keys for S3 access to work. I don't think (?) that would be a solution to a VerifyError, which is a much lower-level error indicating corrupted builds. Yes, it's expected that AWS SDK dependencies were updated along with the new Spark version in CDH 5.7. I think the current version should depend on jets3t 0.9, which is the one you want.