Member since
12-30-2015
6
Posts
0
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1923 | 05-30-2016 10:39 PM | |
13615 | 05-27-2016 05:12 AM |
05-30-2016
10:39 PM
Hi, I have the solution. Please check my post in stackoverflow: http://stackoverflow.com/questions/37466361/how-to-combine-two-dstreams-using-pyspark-similar-to-zip-on-normal-rdd/37537555#37537555 Thanks, Obaid
... View more
05-27-2016
05:19 AM
Hi, I need help on Dstream operation. In fact, I am using a MLlib randomforest model to predict using spark streaming. In the end, I want to combine the feature Dstream & prediction Dstream together for further downstream processing. I am predicting using below piece of code: predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : x.split(',')).map( lambda parts : [float(i) for i in parts] ).transform(lambda rdd: rf_model.predict(rdd)) Here texts is dstream having single line of text as records getFeatures generates a comma separated features extracted from each record I want the output as below tuple: ("predicted value", "original text") How can I achieve that ? or at least can I perform .zip like normal RDD operation on two Dstreams, like below: output = texts.zip(predictions) Note: I posted the same problem on spark user mailing list. Thanks, Obaid
... View more
Labels:
- Labels:
-
Apache Spark
05-27-2016
05:12 AM
Hi, Sorry guys for the reply whitch is too late. Anyways, I tried with different combinition(memory/disk channel etc.) and found flume is either failing of too slow to load larger files (more that 1G). So, I conclude that flume is not good for lage files. Instead, I am now using HDFS NFS gateways to dump file directly to HDFS using scp. Belive me, correctly configured NFS GW and NFS mount point are really cool old boys. Thanks, Obaid
... View more
12-31-2015
05:17 AM
Hi Tarek, Thanks again for your time. I tried with both, if i set to 0 it shows "batchSize must be greater than 0". If i set to a bigger value, then writing never starts and this will never gurantee me a full file(as bigger batchSize will always combine with other files). So, I think altering batchSize is not a viable idea or I failed to understand your point. Please correct me if I am wrong. -Obaid
... View more
12-31-2015
03:02 AM
Hi Tarek, Thanks for your suggestion. Unfortunately ther is no change. It is behaving exactly same. I think channel is not chocking. If channel choking there should be explicit error or warning message on flume log. However, in my case there is no indication of error or warning on log. I also checked on Cloudera Manager, the channel was less than 5% utilized. Please find below portion of flume log while I tested the scenerio: 2015-12-31 18:32:06,917 INFO org.mortbay.log: jetty-6.1.26.cloudera.4
2015-12-31 18:32:06,941 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2015-12-31 18:33:09,056 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
2015-12-31 18:33:09,238 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs://nameservice1/user/etl/temp/spool/bigfile03_2.csv-.1451557989057.tmp
2015-12-31 18:34:59,768 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2015-12-31 18:34:59,783 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/29180-flume-AGENT/flume.conf
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: sink_to_hdfs1 Agent: spoolDir
2015-12-31 18:34:59,788 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,789 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,789 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,789 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,789 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,789 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink_to_hdfs1
2015-12-31 18:34:59,806 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [spoolDir]
2015-12-31 18:34:59,806 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2015-12-31 18:34:59,812 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel channel-1 type memory
2015-12-31 18:34:59,816 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel channel-1
2015-12-31 18:34:59,817 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source src-1, type spooldir
2015-12-31 18:34:59,826 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: sink_to_hdfs1, type: hdfs
2015-12-31 18:34:59,835 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel channel-1 connected to [src-1, sink_to_hdfs1]
2015-12-31 18:34:59,843 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{src-1=EventDrivenSourceRunner: { source:Spool Directory source src-1: { spoolDir: /stage/AIU/ETL/temp/spool } }} sinkRunners:{sink_to_hdfs1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2195b77d counterGroup:{ name:null counters:{} } }} channels:{channel-1=org.apache.flume.channel.MemoryChannel{name: channel-1}} }
2015-12-31 18:34:59,853 INFO org.apache.flume.node.Application: Starting Channel channel-1
2015-12-31 18:34:59,903 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel-1: Successfully registered new MBean.
2015-12-31 18:34:59,903 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel-1 started
2015-12-31 18:34:59,903 INFO org.apache.flume.node.Application: Starting Sink sink_to_hdfs1
2015-12-31 18:34:59,903 INFO org.apache.flume.node.Application: Starting Source src-1
2015-12-31 18:34:59,904 INFO org.apache.flume.source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /stage/AIU/ETL/temp/spool
2015-12-31 18:34:59,905 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink_to_hdfs1: Successfully registered new MBean.
2015-12-31 18:34:59,905 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink_to_hdfs1 started
2015-12-31 18:34:59,928 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2015-12-31 18:34:59,929 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: src-1: Successfully registered new MBean.
2015-12-31 18:34:59,930 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: src-1 started
2015-12-31 18:34:59,962 INFO org.mortbay.log: jetty-6.1.26.cloudera.4
2015-12-31 18:34:59,995 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2015-12-31 18:35:00,614 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
2015-12-31 18:35:00,875 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs://nameservice1/user/etl/temp/spool/bigfile03_2.csv-.1451558100615.tmp
2015-12-31 18:35:06,132 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to move file /stage/AIU/ETL/temp/spool/bigfile03_2.csv to /stage/AIU/ETL/temp/spool/bigfile03_2.csv.COMPLETED
2015-12-31 18:36:42,947 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing idle bucketWriter hdfs://nameservice1/user/etl/temp/spool/bigfile03_2.csv-.1451558100615.tmp at 1451558202947
2015-12-31 18:36:42,947 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing hdfs://nameservice1/user/etl/temp/spool/bigfile03_2.csv-.1451558100615.tmp
2015-12-31 18:36:42,975 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming hdfs://nameservice1/user/etl/temp/spool/bigfile03_2.csv-.1451558100615.tmp to hdfs://nameservice1/user/etl/temp/spool/bigfile03_2.csv-.1451558100615
2015-12-31 18:36:42,987 INFO org.apache.flume.sink.hdfs.HDFSEventSink: Writer callback called. "2015-12-31 18:32:06,917" and "2015-12-31 18:35:00,875". After that at "2015-12-31 18:36:42,975 " it is closing the current file, although the file has not fully written. The file with 1st attempt is 0-2MB in size and never renamed(kept .tmp). The file with 2nd attempt was renamed (removed .tmp) and it was 50MB in size. The source file was 100MB in size. Thanks, Obaid
... View more
12-30-2015
08:01 PM
Hi, I am trying to ingest using flume spooling directory to HDFS(SpoolDir > Memory Channel > HDFS). I am using CDH 5.4.2. It works well with smaller files, however it fails with larger files. Please find below my testing scenerio: 1. files with size Kbytes to 50-60MBytes, processed without issue. 2. files with greater than 50-60MB, it writes around 50MB to HDFS then I found flume agent unexpected exit. 3. There are no error message on flume log. I found that it is trying to create the ".tmp" file (HDFS) several times, and each time writes couple of megabytes (some time 2MB, some time 45MB ) before unexpected exit. After some time, the last tried ".tmp" file renamed as completed(".tmp" removed) and the file in source spoolDir also renamed as ".COMPLETED" although full file is not written to HDFS. In real scenerio, our files will be around 2GB in size. So, need some robust flume configuration to handle workload. Note: 1. Flume agent node is part of hadoop cluster and not a datanode (it is an edge node). 2. Spool directory is local filesystem on the same server running flume agent. 3. All are physical sever (not virtual). 4. In the same cluster, we have twitter datafeeding with flume running fine(although very small about of data). 5. Please find below flume.conf file I am using here: #############start flume.conf#################### spoolDir.sources = src-1 spoolDir.channels = channel-1 spoolDir.sinks = sink_to_hdfs1 spoolDir.sources.src-1.type = spooldir spoolDir.sources.src-1.channels = channel-1 spoolDir.sources.src-1.spoolDir = /stage/ETL/spool/ spoolDir.sources.src-1.fileHeader = true spoolDir.sources.src-1.basenameHeader =true spoolDir.sources.src-1.batchSize = 100000 spoolDir.channels.channel-1.type = memory spoolDir.channels.channel-1.transactionCapacity = 50000000 spoolDir.channels.channel-1.capacity = 60000000 spoolDir.channels.channel-1.byteCapacityBufferPercentage = 20 spoolDir.channels.channel-1.byteCapacity = 6442450944 spoolDir.sinks.sink_to_hdfs1.type = hdfs spoolDir.sinks.sink_to_hdfs1.channel = channel-1 spoolDir.sinks.sink_to_hdfs1.hdfs.fileType = DataStream spoolDir.sinks.sink_to_hdfs1.hdfs.path = hdfs://nameservice1/user/etl/temp/spool spoolDir.sinks.sink_to_hdfs1.hdfs.filePrefix = %{basename}- spoolDir.sinks.sink_to_hdfs1.hdfs.batchSize = 100000 spoolDir.sinks.sink_to_hdfs1.hdfs.rollInterval = 0 spoolDir.sinks.sink_to_hdfs1.hdfs.rollSize = 0 spoolDir.sinks.sink_to_hdfs1.hdfs.rollCount = 0 spoolDir.sinks.sink_to_hdfs1.hdfs.idleTimeout = 60 #############end flume.conf#################### Kindly suggest me whether there is any issue with my configuration or am I missing something. Or is it a known issue that Flume SpoolDir cannot handle with bigger files. Regards, Obaid
... View more
Labels: