Created on 08-17-2017 08:37 AM - edited 09-16-2022 05:06 AM
I'm getting several java.io. - errors when trying to run the code in exercise 3. Any Ideas?
scala> // First we're going to import the classes we need
scala> import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Job
scala> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
scala> import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecord
scala> import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.ParquetInputFormat
scala> import parquet.avro.AvroReadSupport
import parquet.avro.AvroReadSupport
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> // Then we create RDD's for 2 of the files we imported from MySQL with Sqoop
scala> // RDD's are Spark's data structures for working with distributed datasets
scala> def rddFromParquetHdfsFile(path: String): RDD[GenericRecord] = {
| val job = new Job()
| FileInputFormat.setInputPaths(job, path)
| ParquetInputFormat.setReadSupportClass(job,
| classOf[AvroReadSupport[GenericRecord]])
| return sc.newAPIHadoopRDD(job.getConfiguration,
| classOf[ParquetInputFormat[GenericRecord]],
| classOf[Void],
| classOf[GenericRecord]).map(x => x._2)
| }
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
rddFromParquetHdfsFile: (path: String)org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord]
scala>
scala> val warehouse = "hdfs://{{cluster_data.manager_node_hostname}}/user/hive/warehouse/"
warehouse: String = hdfs://{{cluster_data.manager_node_hostname}}/user/hive/warehouse/
scala> val order_items = rddFromParquetHdfsFile(warehouse + "order_items");
java.io.IOException: Incomplete HDFS URI, no host: hdfs://%7B%7Bcluster_data.manager_node_hostname%7D%7D/user/hive/warehouse/order_items
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2800)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2837)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2819)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(FileInputFormat.java:507)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(FileInputFormat.java:476)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.rddFromParquetHdfsFile(<console>:42)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
at $iwC$$iwC$$iwC.<init>(<console>:59)
at $iwC$$iwC.<init>(<console>:61)
at $iwC.<init>(<console>:63)
at <init>(<console>:65)
at .<init>(<console>:69)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
at org.apache.spark.repl.Main$.main(Main.scala:35)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
scala> val products = rddFromParquetHdfsFile(warehouse + "products");
java.io.IOException: Incomplete HDFS URI, no host: hdfs://%7B%7Bcluster_data.manager_node_hostname%7D%7D/user/hive/warehouse/products
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2800)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2837)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2819)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(FileInputFormat.java:507)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(FileInputFormat.java:476)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.rddFromParquetHdfsFile(<console>:42)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
at $iwC$$iwC$$iwC.<init>(<console>:59)
at $iwC$$iwC.<init>(<console>:61)
at $iwC.<init>(<console>:63)
at <init>(<console>:65)
at .<init>(<console>:69)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
at org.apache.spark.repl.Main$.main(Main.scala:35)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
scala> // Next, we extract the fields from order_items and products that we care about
scala> // and get a list of every product, its name and quantity, grouped by order
scala> val orders = order_items.map { x => (
| x.get("order_item_product_id"),
| (x.get("order_item_order_id"), x.get("order_item_quantity")))
| }.join(
| products.map { x => (
| x.get("product_id"),
| (x.get("product_name")))
| }
| ).map(x => (
| scala.Int.unbox(x._2._1._1), // order_id
| (
| scala.Int.unbox(x._2._1._2), // quantity
| x._2._2.toString // product_name
| )
| )).groupByKey()
<console>:38: error: not found: value order_items
val orders = order_items.map { x => (
^
scala> // Finally, we tally how many times each combination of products appears
scala> // together in an order, then we sort them and take the 10 most common
scala> val cooccurrences = orders.map(order =>
| (
| order._1,
| order._2.toList.combinations(2).map(order_pair =>
| (
| if (order_pair(0)._2 < order_pair(1)._2)
| (order_pair(0)._2, order_pair(1)._2)
| else
| (order_pair(1)._2, order_pair(0)._2),
| order_pair(0)._1 * order_pair(1)._1
| )
| )
| )
| )
<console>:38: error: not found: value orders
val cooccurrences = orders.map(order =>
^
scala> val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
<console>:38: error: not found: value cooccurrences
val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
^
scala> val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)
<console>:38: error: not found: value combos
val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)
^
scala> // We print our results, 1 per line, and exit the Spark shell
scala> println(mostCommon.deep.mkString("\n"))
<console>:39: error: not found: value mostCommon
println(mostCommon.deep.mkString("\n"))