Created on 05-21-2016 07:04 AM - edited 09-16-2022 03:21 AM
I am testing CDH 5.7.0 and found that Spark Streaming no longer works with S3. I also found out that it doesn't work with SQS either. I tried using the AWS SDK 1.11.0 jar, but it only worked to fix the SQS problem. I also tried to get the latest jets3t 0.9.4 jar and use it. It didn't work.
Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V @155: invokevirtual
Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not assignable to 'org/jets3t/service/model/StorageObject'
Current Frame:
bci: @155
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', integer }
Bytecode:
0x0000000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
0x0000010: 5659 b701 5713 0192 b601 5b2b b601 5b13
0x0000020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
0x0000030: b400 7db6 00e7 b601 5bb6 015e b901 9802
0x0000040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
0x0000050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
0x0000060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
0x0000070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
0x0000080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
0x0000090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
0x00000a0: 000a 4e2a 2d2b b700 c7b1
Exception Handler Table:
bci [0, 116] => handler: 162
bci [117, 159] => handler: 162
Stackmap Table:
same_frame_extended(@65)
same_frame(@117)
same_locals_1_stack_item_frame(@162,Object[#139])
same_frame(@169)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
I hope there is a fix for this coming up soon.
Thanks,
Ben
Created 05-21-2016 08:30 AM
I got it to work. I found in another thread the solution. The way to access S3 has changed.
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)
val lines = ssc.textFileStream("s3a://amg-events-out/")
Cheers,
Ben
Created 05-21-2016 07:11 AM
That looks like a S3 library problem. The JVM says the bytecode itself is invalid. It is nothing to do with Spark per se.
CDH does not support S3 although there's no particular reason it wouldn't work if you had the right libraries in place.
Created on 05-21-2016 07:57 AM - edited 05-21-2016 08:02 AM
It looks like the AWS jar files have changed from CDH 5.4.8 to CDH 5.5.2.
In CDH 5.4.8:
/opt/cloudera/parcels/CDH/jars/aws-java-sdk-1.7.14.jar
In CDH 5.5.2+, these replaced aws-java-sdk-1.7.14.jar:
/opt/cloudera/parcels/CDH/jars/aws-java-sdk-core-1.10.6.jar
/opt/cloudera/parcels/CDH/jars/aws-java-sdk-kms-1.10.6.jar
/opt/cloudera/parcels/CDH/jars/aws-java-sdk-s3-1.10.6.jar
But the jets3t files are the same:
/opt/cloudera/parcels/CDH/jars/jets3t-0.9.0.jar
/opt/cloudera/parcels/CDH/jars/jets3t-0.6.1.jar
I don't know if this has anything to do with it but the only difference is the CDH version:
/opt/cloudera/parcels/CDH/jars/hadoop-aws-2.6.0-cdh5.x.x.jar
I wonder if any of these are the problem.
Thanks,
Ben
Created 05-21-2016 08:30 AM
I got it to work. I found in another thread the solution. The way to access S3 has changed.
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)
val lines = ssc.textFileStream("s3a://amg-events-out/")
Cheers,
Ben
Created 05-21-2016 10:42 AM
Yes you will certainly need to provide access keys for S3 access to work. I don't think (?) that would be a solution to a VerifyError, which is a much lower-level error indicating corrupted builds. Yes, it's expected that AWS SDK dependencies were updated along with the new Spark version in CDH 5.7. I think the current version should depend on jets3t 0.9, which is the one you want.