Member since
05-02-2016
74
Posts
41
Kudos Received
14
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3780 | 07-11-2018 01:40 PM | |
7524 | 01-05-2017 02:43 PM | |
1697 | 12-20-2016 01:17 PM | |
1570 | 12-02-2016 07:19 PM | |
2383 | 10-06-2016 01:29 PM |
08-04-2016
02:13 PM
Broadcast is done with blocks of data. See the spark.broadcast.blockSize property here. This explains why the value grows in the log output. How big is the file you are broadcasting? You can use the SizeEstimator to get a sense of what your object will really occupy. Then make sure your "--driver-memory" and "--executor-memory" has enough breathing room. Guidance for tuning can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_spark-guide/content/ch_tuning-spark.html
... View more
07-28-2016
04:22 PM
1 Kudo
Please keep in mind (from HDP 2.4 docs😞 Dynamic Resource Allocation does not work with Spark Streaming.
... View more
07-28-2016
03:02 PM
1 Kudo
If you use the second approach, instead of sc.textFile, you could use sc.wholeTextFiles. Then with a map method call you could strip the headers. Then use flatMap to convert the value (whole text file per element) to the records. Then leverage spark-csv capabilities.
... View more
07-27-2016
06:47 PM
1 Kudo
Here's one way: edit ZEPPELIN_JAVA_OPTS in zeppelin-env.sh. You can also leverage dynamic allocation (read this) if you want dynamic scaling. [root@craig-a-1 conf]# grep "ZEPPELIN_JAVA_OPTS" /usr/hdp/current/zeppelin-server/lib/conf/zeppelin-env.sh
# Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16"
export ZEPPELIN_JAVA_OPTS="-Dhdp.version=2.4.2.0-258 -Dspark.executor.memory=512m -Dspark.executor.instances=2 -Dspark.yarn.queue=default"
# zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS
... View more
07-26-2016
07:17 PM
You could use Spark's data federation capabilities... E.g. in Cluster-1 have a Hive table for the data in Cluster-1. In Spark (running on Cluster-1) you can set up a temporary table that refers to a table in Cluster-2... For instance:
CREATE TEMPORARY TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:<URL for Cluster-2's Spark thrift server>",
dbtable "schema.tablename"
)
So in Cluster-1's spark-sql session, create the TEMPORARY table (pointing to Cluster-2) and join it with the Hive table in Cluster-1. This can be done via spark-sql or via the API's. More info here: http://spark.apache.org/docs/1.6.1/sql-programming-guide.html#jdbc-to-other-databases
... View more
07-26-2016
03:21 PM
Some thoughts/questions: What does the key distribution look like? If lumpy, perhaps a repartition would help? Looking at the Spark UI might give some insight into the bottleneck. Bumping up spark.sql.autoBroadcastJoinThreshold to 300M might help ensure that the map-side join (broadcast join) happens. Check here though because it notes "...that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run." Other answers:
Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better? Doubt it. There's probably opportunities to tune with what you have that would be needed nonetheless in 2.0. What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method. If you want to post more of your code, we can comment on that. Hard to tell if the RDD API's more granular control would help you without the bigger picture. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)? Yes, the threshold matters and should be above the data size. Think of this like a map-side join. No, you should not need to call broadcast explicitly. However, if you did you cannot broadcast the dataframe itself. It would have to be a collection loaded up in the driver. Check here for info on broadcast variables: https://spark.apache.org/docs/0.8.1/scala-programming-guide.html#broadcast-variables What's the point of driver memory? When performing something like "collect" it will bring results back to the driver. If you're collecting a lot of results, you'll need to worry about that driver-memory setting. Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out? Looks good, but we could give more assistance if we have the full code. Also, look at the Spark UI and walk the DAG to see where the bottleneck is.
... View more
07-25-2016
07:20 PM
1 Kudo
@slachterman - In zeppelin-site.xml, try changing "zeppelin.server.addr" to the actual IP address of the host where it is running. Restart zeppelin and let us know how it goes!
... View more
07-25-2016
04:38 PM
You could pull out the keys, boil them down to distinct values and then index them. Would something like this work? rddA = your main dataset
rddAKeys = rddA.keys()
rddAUniqKeys = rddAKeys.distinct()
rddAKeyed = rddAUniqKeys.zipWithIndex()
# join rddAKeyed with rddA
... View more
07-25-2016
02:57 PM
2 Kudos
You can use the zipWithIndex method to get a sequence number. And if you need the key to be a primary key, you could snag the max value for the existing dataset in a separate RDD and then use the map method on the zipped RDD to increment the keys.
... View more
06-24-2016
12:16 PM
1 Kudo
Will this work? https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrame.html#rdd()
... View more
- « Previous
- Next »