Member since
03-08-2016
33
Posts
0
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
12315 | 04-25-2016 05:45 AM | |
13387 | 04-07-2016 04:13 AM |
03-29-2017
09:17 AM
Now cloudera work Fine but in Host Monitor log file I get an error : Could not fetch descriptor after 5 tries, exiting. And i can't restart this service, and when i'm trying to restart the Cloudera Management Service i get : Cannot restart service when Host Monitor (master) is in STOPPING state.
... View more
03-29-2017
08:45 AM
I do not know why the number of files mgmt_mgmt-NAVIGATORMETASERVER* increases mgmt_mgmt-NAVIGATORMETASERVER-9a89af62abe8393b48c78926720ffe2c_pid19656.hprof Despite that I have increased java heap size
... View more
03-29-2017
07:40 AM
after configure Navigator Metadata Server Heap, i m trying to restart Cloudera Management Service but i can't . I get : Cannot restart service when Host Monitor (master) is in STOPPING state In the Host Monitor log file : mars 29, 14:05:55.133 ERROR com.cloudera.cmon.firehose.Main
Could not fetch descriptor after 5 tries, exiting. and the number of files mgmt_mgmt-NAVIGATORMETASERVER* increased
... View more
03-29-2017
06:28 AM
Hi Jim, How to change the Navigator configuration to allocate enough memory to the JVM
... View more
03-29-2017
12:57 AM
I found the files using that space : -rw-------. 1 cloudera-scm cloudera-scm 359M Mar 27 14:40 mgmt_mgmt-NAVIGATOR-9a89af62abe8393b48c78926720ffe2c_pid28766.hprof It is repeated 40 times. And : -rw-------. 1 cloudera-scm cloudera-scm 761M Mar 27 15:10 mgmt_mgmt-NAVIGATORMETASERVER-9a89af62abe8393b48c78926720ffe2c_pid11739.hprof It is repeated 12 times. How to resolve this ?
... View more
03-28-2017
07:46 AM
when i do : du -sh / i get : du: cannot access ‘/proc/4982/task/4982/fd/4’: No such file or directory
du: cannot access ‘/proc/4982/task/4982/fdinfo/4’: No such file or directory
du: cannot access ‘/proc/4982/fd/4’: No such file or directory
du: cannot access ‘/proc/4982/fdinfo/4’: No such file or directory
34G /
... View more
03-28-2017
06:46 AM
I installed Cloudera using PATH B installation in 4 machines (VMs, Centos 7) 1 master and 3 slaves, after installation i get an error in clock synchronization in every slave, I resolve it when I do : systemctl start ntpd After a few minutes I get an error in master node and i can't display cloudera page (master:7180) although cloudera-scm-server status is running. I noticed afterwards that the hard drive of Master node is full: when I do : df -h I get : [root@master ~]# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/mapper/centos-root 34G 34G 20K 100% /
devtmpfs 4.1G 0 4.1G 0% /dev
tmpfs 4.1G 0 4.1G 0% /dev/shm
tmpfs 4.1G 8.7M 4.1G 1% /run
tmpfs 4.1G 0 4.1G 0% /sys/fs/cgroup
/dev/sda1 497M 212M 286M 43% /boot
/dev/mapper/centos-home 17G 36M 17G 1% /home
tmpfs 833M 0 833M 0% /run/user/0 I thought that maybe the ntpd log is behind all that. if / dir is full (use% = 100%) so the master can't desplay any think. Any help please to resolve this, and avoid hard disk bombardment of Master node. This is the third I'm trying to install cloudera and every time I have the same problem.
... View more
- Tags:
- Installation
Labels:
02-15-2017
05:43 AM
So to resolve this issue, I need to remove openjdk?
... View more
02-14-2017
01:46 AM
Hi, I wont to install Cloudera in three node(master,slave1,slave2), but i have an error in Log file: Please any help and thanks in advance
... View more
Labels:
10-19-2016
07:13 AM
Dear all, I am working on a set up of a new cluster( 1 master node and 2 data nodes). I can run a select * from sample_Table but when I am traying to run select count(*) from sample_Table I get an error and the MR job fails. Below summary on the job history. Thanks in advance. Log Type: syslog Log Upload Time: mer. oct. 19 09:58:27 -0400 2016 Log Length: 5571 Showing 4096 bytes of 5571 total. Click here for the full log. WithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:26,199 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:27,200 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:29,203 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:30,205 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:31,206 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:32,208 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:33,209 INFO [main] org.apache.hadoop.ipc.Client: Retrying connect to server: slave1/192.168.1.33:40912. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2016-10-19 09:57:33,213 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.net.NoRouteToHostException: No Route to Host from master1/192.168.1.30 to slave1:40912 failed on socket timeout exception: java.net.NoRouteToHostException: Aucun chemin d'accès pour atteindre l'hôte cible; For more details see: http://wiki.apache.org/hadoop/NoRouteToHost
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:757)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1408)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:243)
at com.sun.proxy.$Proxy9.getTask(Unknown Source)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:132)
Caused by: java.net.NoRouteToHostException: Aucun chemin d'accès pour atteindre l'hôte cible
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:713)
at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1524)
at org.apache.hadoop.ipc.Client.call(Client.java:1447)
... 4 more
2016-10-19 09:57:33,214 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system...
2016-10-19 09:57:33,215 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped.
2016-10-19 09:57:33,215 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete.
... View more
10-13-2016
08:16 AM
Hello everyone, I did a cdh 5.8.2 install using the cloudera manager installer bin on a three node vm cluster(Master,slave1-2).I was able to bring up the hdfs service and the name node role instance and 2 data node role instance. But Name node sees the data nodes as dead and shows 0 bytes available in the cluster: Healthy DataNode: 0. Concerning DataNode: 0. Total DataNode: 2. Percent healthy: 0.00%. Percent healthy or concerning: 0.00%. Critical threshold: 90.00%. When I looked at the datanode logs, I find the following error: getting attribute VolumeInfo of Hadoop:service=DataNode,name=DataNodeInfo threw an exception
javax.management.RuntimeMBeanException: java.lang.NullPointerException: Storage not yet initialized
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.rethrow(DefaultMBeanServerInterceptor.java:839)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.rethrowMaybeMBeanException(DefaultMBeanServerInterceptor.java:852)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttribute(DefaultMBeanServerInterceptor.java:651)
at com.sun.jmx.mbeanserver.JmxMBeanServer.getAttribute(JmxMBeanServer.java:678)
at org.apache.hadoop.jmx.JMXJsonServlet.writeAttribute(JMXJsonServlet.java:346)
at org.apache.hadoop.jmx.JMXJsonServlet.listBeans(JMXJsonServlet.java:324)
at org.apache.hadoop.jmx.JMXJsonServlet.doGet(JMXJsonServlet.java:217)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1296)
at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
at org.mortbay.jetty.Server.handle(Server.java:326)
at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
Caused by: java.lang.NullPointerException: Storage not yet initialized
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.hadoop.hdfs.server.datanode.DataNode.getVolumeInfo(DataNode.java:2805)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:75)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:279)
at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:193)
at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:175)
at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:117)
at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:54)
at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237)
at com.sun.jmx.mbeanserver.PerInterface.getAttribute(PerInterface.java:83)
at com.sun.jmx.mbeanserver.MBeanSupport.getAttribute(MBeanSupport.java:206)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttribute(DefaultMBeanServerInterceptor.java:647) Any help is very appreciated. I have been sitting with this issue for a long time now. Looks like I am missing something simple but couldnt figure out. Thank you!
... View more
05-12-2016
03:10 AM
I am using spark streaming , event count example , flume as source. this is my config file : FileFileAgent.sources = File
FileFileAgent.channels = MemChannel
FileFileAgent.sinks = spark
#configuring the souce
FileFileAgent.sources.File.type = spooldir
FileFileAgent.sources.File.spoolDir = /usr/lib/flume/spooldir
#FileFileAgent.sources.File.fileHeader = true
# Describing/Configuring the sink
FileAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
FileAgent.sinks.spark.hostname = 192.168.1.31
FileAgent.sinks.spark.port = 18088
FileAgent.sinks.spark.channel = MemoryChannel
# Describing/Configuring the channel
FileFileAgent.channels.MemChannel.type = memory
FileFileAgent.channels.MemChannel.capacity = 10000
FileFileAgent.channels.MemChannel.transactionCapacity = 100
# Binding the source and sink to the channel
FileFileAgent.sources.File.channels = MemChannel when I run the command line : bin/flume-ng agent --conf conf --conf-file flumeSpark.conf --name FileAgent -Dflume.root.logger=INFO,console I have a warning : 16/05/12 10:44:52 WARN conf.FlumeConfiguration: Agent configuration for 'FileAgent' does not contain any channels. Marking it as invalid.
16/05/12 10:44:52 WARN conf.FlumeConfiguration: Agent configuration invalid for agent 'FileAgent'. It will be removed.
16/05/12 10:44:52 WARN conf.FlumeConfiguration: no context for sinkspark
16/05/12 10:44:52 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [FileFileAgent]
16/05/12 10:44:52 WARN node.AbstractConfigurationProvider: No configuration found for this host:FileAgent
16/05/12 10:44:52 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} } And when i execute my spark application : val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val flumeStream = FlumeUtils.createPollingStream(ssc, "192.168.1.31", 18088)
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination() I have this warning : 16/05/12 10:46:59 WARN FlumeBatchFetcher: Error while receiving data from Flume
java.io.IOException: NettyTransceiver closed
at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:363)
at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:61)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:580)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
at org.jboss.netty.channel.Channels.close(Channels.java:812)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:166)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) Any help please . how can i execute my application, i don't know how to resolve this problem?
... View more
05-11-2016
03:56 AM
Yes it's working now, thanks for your help.
... View more
05-11-2016
03:15 AM
thanks, but i have an other error with sink machine hostname. I do : FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) the error is : overloaded method value createPollingStream with alternatives In the official site : val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]) How can i enter the sink machine hostname ?
... View more
05-11-2016
01:20 AM
Hi i'm using spark streaming to analyse data arrived from flume. But i have an error with FlumeUtils, he says : not found value : FlumeUtils This is my code : import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val flumeStream = FlumeUtils.createPollingStream(ssc,198.168.1.31,8020) // not found value : FlumeUtils
............
ssc.start()
ssc.awaitTermination()
}
} and this is pom.xml dependency: <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency> thanks in advance for your reply !!!
... View more
04-25-2016
05:45 AM
I have found the solution : var addedRDD : org.apache.spark.rdd.RDD[(String,Int)] = sc.emptyRDD
... View more
04-25-2016
04:53 AM
I want to make a union for which RDD who I have in streaming this is my code : val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.20:8020/user/sparkStreaming/input")
var test = file.map(x => (x.split(";")(0)+";"+x.split(";")(1), 1)).reduceByKey((x,y) => x+y)
var addedRDD = sc.emptyRDD
test.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
} but I have this error : type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.RDD[Nothing] And when I try to create an empty RDD with a given type, I have this error : type mismatch; found : org.apache.spark.rdd.RDD[(String, Int)] required: org.apache.spark.rdd.EmptyRDD[(String, Int)] How can I fix this problem? thanks in advance !!!
... View more
04-14-2016
07:50 AM
I'm new to Spark, I want to make a treatment on files in streaming. I have files csv which arrive non-stop: Example csv file: world world
count world
world earth
count world and I want to do two treatment on them : the first treatment is for a result like this : (world,2,2) // word is twice repeated for the first column and distinct (world,earth) for second therefore (2,2)
(count,2,1) // word is twice repeated for the first column and not distinct (world,world) for second therefore (2,1) the second result I want to get that result after each hour.in our example: (world,1) // 1=2/2
(count,2) //2=2/1 this is my code : val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10m))
val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
var result = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
val window = result.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20))
val result1 = window.map(x => x.toString )
val result2 = result1.map(line => line.split(";")(0)+","+line.split(",")(1))
val result3 = result2.map(line => line.substring(1, line.length-1))
val result4 = result3.map(line => (line.split(",")(0),line.split(",")(1).toInt ) )
val result5 = result4.reduceByKey((x,y) => x+y )
val result6 = result3.map(line => (line.split(",")(0), 1 ))
val result7 = result6.reduceByKey((x,y) => x+y )
val result8 = result7.join(result5) // (world,2,2)
val finalResult = result8.mapValues(x => x._1.toFloat / x._2 ) // (world,1), I want this result after every one hour
ssc.start()
ssc.awaitTermination() Thanks in Advance!!!
... View more
04-12-2016
07:26 AM
Hi, I have a simple code spark streaming do some processing on csv files. val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
val test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
val windowed = test.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(20))
windowed.foreachRDD(rdd=>{
rdd.saveAsTextFile("hdfs://192.168.1.31:8020/user/sparkStreaming/output")
})
ssc.start()
ssc.awaitTermination()
}
} in directory output I have only the last file processing result. I want to keep result of all the file not only the last one.
... View more
04-07-2016
04:13 AM
I don't know why but I re-run and it works, but I have an empty _success file into the directory file1. here is the complete code : def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val file = ssc.textFileStream("/root/file/test/file")
file.foreachRDD(t=> {
val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFile("/root/file/file1")
})
ssc.start()
ssc.awaitTermination()
}
... View more
04-07-2016
03:34 AM
16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
at com.org.file.filecount.FileCount.main(FileCount.scala) there's a mismatch in the versions of dependencies and runtime so i do : <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.1</version>
</dependency> And i am getting error like as the following : 16/04/07 11:23:56 WARN FileInputDStream: Error finding new files
java.io.IOException: Incomplete HDFS URI, no host: "/root/file/test"
... View more
04-06-2016
06:20 AM
I try it, and I get : 16/04/06 14:09:52 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@4bf57335
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.runInNewThread$default$2()Z
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at com.org.file.filecount.FileCount$.main(FileCount.scala:52)
at com.org.file.filecount.FileCount.main(FileCount.scala)
... View more
04-06-2016
04:59 AM
this is my code : import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerStageCompleted
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("/root/file/test/f3")
file.foreachRDD(t=> {
val test = t.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFile("/root/file/file1")
})
sc.stop()
}
}
... View more
04-06-2016
04:51 AM
It does not work, what is the problem? Here are my console: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/04/06 12:44:42 INFO SparkContext: Running Spark version 1.5.0
16/04/06 12:44:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/04/06 12:44:48 INFO SecurityManager: Changing view acls to: root
16/04/06 12:44:48 INFO SecurityManager: Changing modify acls to: root
16/04/06 12:44:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/04/06 12:45:00 INFO Slf4jLogger: Slf4jLogger started
16/04/06 12:45:00 INFO Remoting: Starting remoting
16/04/06 12:45:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.31:38825]
16/04/06 12:45:04 INFO Utils: Successfully started service 'sparkDriver' on port 38825.
16/04/06 12:45:04 INFO SparkEnv: Registering MapOutputTracker
16/04/06 12:45:04 INFO SparkEnv: Registering BlockManagerMaster
16/04/06 12:45:05 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1b896884-d84a-4c39-b9dd-93decdb6ee0b
16/04/06 12:45:05 INFO MemoryStore: MemoryStore started with capacity 1027.3 MB
16/04/06 12:45:06 INFO HttpFileServer: HTTP File server directory is /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1/httpd-849fa48d-e2de-46de-845a-a68a02f76b94
16/04/06 12:45:06 INFO HttpServer: Starting HTTP Server
16/04/06 12:45:08 INFO Utils: Successfully started service 'HTTP file server' on port 50992.
16/04/06 12:45:08 INFO SparkEnv: Registering OutputCommitCoordinator
16/04/06 12:45:11 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/04/06 12:45:11 INFO SparkUI: Started SparkUI at http://192.168.1.31:4040
16/04/06 12:45:12 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/04/06 12:45:12 INFO Executor: Starting executor ID driver on host localhost
16/04/06 12:45:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42498.
16/04/06 12:45:15 INFO NettyBlockTransferService: Server created on 42498
16/04/06 12:45:15 INFO BlockManagerMaster: Trying to register BlockManager
16/04/06 12:45:15 INFO BlockManagerMasterEndpoint: Registering block manager localhost:42498 with 1027.3 MB RAM, BlockManagerId(driver, localhost, 42498)
16/04/06 12:45:15 INFO BlockManagerMaster: Registered BlockManager
16/04/06 12:45:18 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/04/06 12:45:22 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@11fb9657
16/04/06 12:45:23 INFO SparkUI: Stopped Spark web UI at http://192.168.1.31:4040
16/04/06 12:45:23 INFO DAGScheduler: Stopping DAGScheduler
16/04/06 12:45:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/06 12:45:23 INFO MemoryStore: MemoryStore cleared
16/04/06 12:45:23 INFO BlockManager: BlockManager stopped
16/04/06 12:45:23 INFO BlockManagerMaster: BlockManagerMaster stopped
16/04/06 12:45:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/06 12:45:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/06 12:45:23 INFO SparkContext: Successfully stopped SparkContext
16/04/06 12:45:23 INFO ShutdownHookManager: Shutdown hook called
16/04/06 12:45:23 INFO ShutdownHookManager: Deleting directory /tmp/spark-14a1c553-e160-4b93-8822-3b943e27edd1 No creation of the file, nothing happens. what's wrong?
... View more
04-06-2016
01:22 AM
I try : val file = ssc.textFileStream("/root/file/test")
file.foreachRDD(t=> {
var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFiles("/root/file/file1")
})
sc.stop() But it doesn't work
... View more
04-05-2016
08:26 AM
Hi, It is simple to display the result in RDD, for example: val sc = new SparkContext(conf)
val textFile = sc.textFile("/root/file/test")
val apps = textFile.map (line => line.split(";")(0))
.map(p=>(p,1)) // convert to countable tuples
.reduceByKey(_+_) // count keys
.collect() // collect the result
apps.foreach(println) And I have the result in my console.And if I want to save the output to a file I do: apps.saveAsTextFiles("/root/file/file1") But how I can do it now with DStream,this is my code: val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("/root/file/test")
var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
test.saveAsTextFiles("/root/file/file1")
sc.stop()
}
} But it doesn't work. Any help please !!
... View more
03-08-2016
08:18 AM
I am trying to display the characteristics of csv files (file name, MD5). I don't know how to do it, and is it possible. Any help please
... View more