Support Questions

Find answers, ask questions, and share your expertise

create Analytics from http usng spark streaming

avatar
Contributor

Hi My reqmnt is to create Analytics from http://10.3.9.34:9900/messages that is pull data from fromhttp://10.3.9.34:9900/messages and put this data in HDFS location /user/cloudera/flume and from HDFS create Analytics report using Tableau or HUE UI . i tried with below code at scala console of spark-shell of CDH5.5 but unable to fetch data from the http link

 

import org.apache.spark.SparkContext
val dataRDD = sc.textFile(“http://10.3.9.34:9900/messages”)dataRDD.collect().foreach(println)dataRDD.count()dataRDD.saveAsTextFile(/user/cloudera/flume”)

I get below error at scala console :- java.io.IOException: No FileSystem for scheme: http at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2623) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2637) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2680) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2662) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:379) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

1 ACCEPTED SOLUTION

avatar
Rising Star

You are getting this exception because "sc.testFile" reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. 

 

You said that you want to get the data from URL and want to save it to HDFS, then you should do:

 

val data = scala.io.Source.fromURL("http://10.3.9.34:9900/messages").mkString
val list = data.split("\n").filter(_ != "")
val rdds = sc.parallelize(list)
rdds.saveAsTextFile(outputDirectory)

View solution in original post

4 REPLIES 4

avatar
Rising Star

You are getting this exception because "sc.testFile" reads a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. 

 

You said that you want to get the data from URL and want to save it to HDFS, then you should do:

 

val data = scala.io.Source.fromURL("http://10.3.9.34:9900/messages").mkString
val list = data.split("\n").filter(_ != "")
val rdds = sc.parallelize(list)
rdds.saveAsTextFile(outputDirectory)

avatar
Contributor

First of all thanks Umesh,you got my half problem solved ,appreciate that really but only issue is now its not saving at hdfs location /user/cloudera/flume because of illegal character 

 

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

 

scala> val data = scala.io.Source.fromURL("http://10.3.9.34:9900/messages").mkString
data: String =
"Jul 31 03:38:01 MSAT-T8360-62-RHEL64-24-103934 kernel: imklog 4.6.2, log source = /proc/kmsg started.
Jul 31 03:38:01 MSAT-T8360-62-RHEL64-24-103934 rsyslogd: [origin software="rsyslogd" swVersion="4.6.2" x-pid="1342" x-info="http://www.rsyslog.com"] (re)start
Jul 31 03:38:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic
Aug 1 03:36:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic
Aug 2 03:16:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic
Aug 3 03:24:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic
"

scala> val list = data.split("\n").filter(_ != "")
list: Array[String] = Array(Jul 31 03:38:01 MSAT-T8360-62-RHEL64-24-103934 kernel: imklog 4.6.2, log source = /proc/kmsg started., Jul 31 03:38:01 MSAT-T8360-62-RHEL64-24-103934 rsyslogd: [origin software="rsyslogd" swVersion="4.6.2" x-pid="1342" x-info="http://www.rsyslog.com"] (re)start, Jul 31 03:38:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic, Aug 1 03:36:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic, Aug 2 03:16:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic, Aug 3 03:24:01 MSAT-T8360-62-RHEL64-24-103934 rhsmd: This system is registered to RHN Classic)

 

scala> val rdds = sc.parallelize(list)
rdds: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

 

scala> rdds.saveAsTextFile(“/user/cloudera/flume”)
<console>:1: error: illegal character '\u201c'
rdds.saveAsTextFile(“/user/cloudera/flume”)
^
<console>:1: error: illegal character '\u201d'
rdds.saveAsTextFile(“/user/cloudera/flume”)
^

scala>

 

Can you please help

avatar
New Contributor

Hello Experts,

 

We are upgrading Our Cloudera Hive from 1.3 to 2.0, Could you please let us know, if there is known issues related to this, i did a search in Tableau and Cloudera Community, but i didn't found any issues.

 

Thanks in Advance!!!

 

Regards,

Muthu Venkatesh

 

avatar
Contributor

Awesome here is working code

 

import org.apache.spark.SparkContext
val data = scala.io.Source.fromURL("http://10.3.9.34:9900/messages").mkString
val list = data.split("\n").filter(_ != "")
val rdds = sc.parallelize(list)
rdds.saveAsTextFile("/user/cloudera/spark/fromsource")