Member since
08-18-2017
6
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1856 | 08-18-2017 06:24 PM |
11-29-2017
06:19 PM
@nkumar I'm interested in changing the default filesystem for the entire HDP to NFS, can you please share this?
... View more
09-10-2017
09:23 PM
@Geoffrey Shelton Okot you're absolutely right! Thank you a lot. I was struggling to understand why I was loosing containers the first time I used "/fast_nfs/yarn/local (was getting "file not found" error messages) and then the tiredness made me think of using hdfs:/// instead...and I ended up sending this dumb question. My initial problem was caused by the fact that I was using the same NFS filesystem and path for all the nodes. After mounting /fast_nfs/nodeXXX/ on each node (where nodeXXX contains the yarn/local subdir) and adjusting permissions it worked perfectly (note that node1 gets /fast_nfs/node001 mounted as /scratch, node2 gets /fast_nfs/node002 mounted as /scratch and so on, so I'm using /fast_nfs as the NFS filesystem and mounting subdirs for each node..and defined yarn.nodemanager.local-dir as "/scratch/yarn/local")
... View more
09-10-2017
01:24 AM
Hi, I'm trying to change the "yarn.nodemanager.local-dirs" to point to "file:///fast_nfs/yarn/local". This is indeed a high-performance NFS mount-point that all the nodes in my cluster have. When I try to change it in Ambari I can't and the message "Must be a slash or drive at the start, and must not contain white spaces" is displayed. If I manually change the /etc/hadoop/conf/yarn-site.xml in all the nodes, after restarting YARN the "file:///" is removed from that option. I want to have all the shuffle happening in my high-performance NFS array instead of in HDFS. How can I change this behaviour in HDP?
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache YARN
09-01-2017
04:37 PM
Since it doesn't seem you can easily transform a dataframe into an RDD in Spark's structured streaming, I found a way to manipulate the dataset to fit my needs. I used the split function from the pyspark.sql.functions module to split the contents of the dataframe's column (a string containing the independent variables for my ML model) into several new columns and then I used the VectorAssembler class from pyspark.ml to merge the new columns into a vector column.
... View more
08-30-2017
11:10 PM
Hi, I need to consume messages from a Kafka topic which are just lines of values separated by space like "0.00 2.000 10.000 37.000 ...". So I use the readStream as follows: stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "myhost1:6667,myhost2:6667") \
.option("subscribe", "pruebas") \
.option("startingOffsets", "earliest") \
.load()
df = stream_df.selectExpr("CAST(value AS STRING)") Then I need to manipulate this single column in the dataframe to be able to transform the contents of this column to a dense Vector for a Machine Learning model, so I use the following: rdd = df.rdd.map(list).map(lambda lista: lista[0].split()) But I get the following: >>> rdd = df.rdd.map(list).map(lambda lista: lista[0].split())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 84, in rdd
jrdd = self._jdf.javaToPython()
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must be executed with writeStream.start();;\nkafka' How do I do that? I have tried this but it does not work: query2 = df.rdd.map(list).map(lambda lista: lista[0].split()) \
.writeStream \
.outputMode('append') \
.format("console") \
.start() Any idea on how manipulate the contents of a dataframe's column in a better way under Structured Streaming? Any help is highly appreciated.
... View more
Labels:
- Labels:
-
Apache Kafka
08-18-2017
06:24 PM
1 Kudo
That should be doable. I have been performing some tests with a high-performance enterprise NFSv3 storage and Spark and it worked like a charm. I still kept an HDFS filesystem to keep logs and historical data (as a kind of tier-2) and used the high-performance NFS storage for the tier-1 datasets that needed more performance and lower response times. Ironically I found out that this NFS storage solution NFS performed similar or slightly better than HDFS when it comes to massive reads but clearly outperformed HDFS in writes, specially when the jobs had a lot of shuffle and spill to disk. The key thing to use an external and high-performance NFS storage is to make sure all the nodes in the cluster have a persistent mount to the NFS filesystem and all of them use the same mountpoint. When you submit your Spark jobs you just use instead "file:///", for example: "file:///mnt_bigdata/datasets/x". The great questions here are: (1) Is Hortonworks supporting this? (2) Is there any kind of generic NFS integration/deployment/best-practice guide? (3) Is there a procedure to completely move the entire cluster services and resources file dependencies out from HDFS to NFS ?
... View more