Member since
09-24-2015
98
Posts
76
Kudos Received
18
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1077 | 08-29-2016 04:42 PM | |
1849 | 08-09-2016 08:43 PM | |
520 | 07-19-2016 04:08 PM | |
772 | 07-07-2016 04:05 PM | |
2681 | 06-29-2016 08:25 PM |
04-25-2016
07:24 PM
Hello Aru: According to the Datasets API, you now have a GroupedDataset, not a Dataset. In order to query the original Dataset (dss), you can first create a temp table, then write a SQL select statement to pull records out into a result, like this: dss.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") Alternately, you can use built-in Dataset.select() method to pull requested columns out, and then apply filter, groupBy, map, etc. You should be able to embed a condition (similar to a "where clause") within the filter(), similar to DataFrames API. Here is the description of Dataset filter() method: public Dataset<T> filter(FilterFunction<T> func)
... View more
04-18-2016
08:48 PM
Sorry to hear that. First, a few questions: a) what did you change the "storm.version" to in your "pom.xml" file? b) did you get an error during "mvn clean package" (which compiles the target) c) do you see the specified jar file ("Tutorial-1.0-SNAPSHOT.jar") in your target directory? To troubleshoot, if you put the "pom.xml" file back to it's original state, can you recompile and run successfully? If so, then there could be a problem with the "storm.version" string that you entered. Make sure it is a viable version, you can get the proper version from the command-line using: storm version and look for this line in the output: Storm 0.10.0.2.4.0.0-169
... View more
04-18-2016
08:27 PM
I am wondering if the zeppelin-server daemon is up and running; it may not be started by default. Assuming you can 'ssh' into the server where zeppelin is installed, try this command to see if it is running:
ps -ef |grep zeppelin-server
You should see one or more tasks running, and at least one listed as interpreter.sh, which would be the interpreter for the notebook. Also, I believe your notebook is not connected until you select a given notebook. To make sure the interpreter is ready to load the notebook, navigate to the Interpreter tab along the top, find the "spark" interpreter, and click on the "restart" button. Wait for 5 seconds, then go back and reload the notebook to see if it is now connected. Also, try hitting the browser "reload" button to try to start the interpreter.
... View more
04-18-2016
05:33 PM
Thanks @Bernhard Walter - yes, that's exactly what I did.
... View more
04-18-2016
05:32 PM
You can add the following code (in Scala) to calculate elapsed time, or duration, to any Spark-based Zeppelin notebook. Please note that this code uses (imports) the JodaTime third-party jar/library. Also, you can insert the start_time and end_time at any two points that you want to calculate elapsed time.
import org.joda.time._
import org.joda.time.format._
import org.joda.time.format.DateTimeFormat
import org.joda.time.DateTime
import org.joda.time.Days
import org.joda.time.Duration
// this starts the clock
val start_time = DateTime.now()
def getElapsedSeconds(start: DateTime, end: DateTime): Int = {
val elapsed_time = new Duration(start.getMillis(), end.getMillis())
val elapsed_seconds = elapsed_time.toStandardSeconds().getSeconds()
(elapsed_seconds)
}
// this stops the clock
val end_time = DateTime.now()
val elapsed_secs = getElapsedSeconds(start_time, end_time)
// print out elapsed time
println(f"Elapsed time (seconds): ${elapsed_secs}%d")
Here is sample output from this code: start_time: org.joda.time.DateTime = 2016-04-15T17:28:10.682Z getElapsedSeconds: (start: org.joda.time.DateTime, end: org.joda.time.DateTime)Int end_time: org.joda.time.DateTime = 2016-04-15T17:47:50.144Z elapsed_secs: Int = 1179 Elapsed time (seconds): 1179
... View more
04-18-2016
05:25 PM
Frequently, when running a notebook in Zeppelin, I want to know how long it took to run the whole notebook. This is important when tuning Spark, Yarn, etc, to understand whether the tuning is successful. Zeppelin prints the "per cell" timings, however, it does not provide full elapsed time for the notebook. How can I add code to calculate this?
... View more
Labels:
04-18-2016
05:06 PM
1 Kudo
Zeppelin comes with a long list of interpreters (including Spark/Scala, Python/Pyspark, Hive, Cassandra, SparkSQL, Phoenix, Markdown and Shell), which basically provide language binding to run code that you type into a notebook cell. Currently, the list of interpreters does not include Java, so you will need to compile your code first and build a jar file, which can be submitted to Spark via spark-submit, as described here: http://spark.apache.org/docs/latest/submitting-applications.html
... View more
04-13-2016
07:22 PM
Yes, please include the error or stacktrace. If I had a guess, I would say it's because you are not running the "ntpd" time daemon on your local kafka node. I had trouble connecting to Twitter until I installed and enabled ntpd. Full instructions to install and enable on CentOS here: http://www.cyberciti.biz/faq/howto-install-ntp-to-synchronize-server-clock/
... View more
04-13-2016
07:07 PM
1 Kudo
Spark allocates memory based on option parameters, which can be passed in multiple ways: 1) via the command-line (as you do) 2) via programmatic instructions 3) via the "spark-defaults.conf" file in the "conf" directory under your $SPARK_HOME Second, there are separate config params for the driver and the executors. This is important, because the main difference between "yarn-client" and "yarn-cluster" mode is where the Driver lives (either on the client, or on cluster within the AppMaster). Therefore, we should look at your driver config parameters. It looks like these are your driver-related options from the command-line: --driver-memory 5000m
--driver-cores 2
--conf spark.yarn.driver.memoryOverhead=1024
--conf spark.driver.maxResultSize=5g
--driver-java-options "-XX:MaxPermSize=1000m" It is possible that the AppMaster is running on a node that does not have enough memory to support your option requests, e.g. that the sum of driver-memory (5G) and PermSize (1G), plus overhead (1G) does not fit on the node. I would try lowering the --driver-memory by 1G steps until you no longer get the OOM error.
... View more
04-13-2016
06:58 PM
Another option you can try is clicking on the "Interpreters" link at the top of Zeppelin page, find the "spark" interpreter, and click on the "restart" button on the right-hand side. Next, make sure that your notebook page shows "Connected" with a green dot, meaning it is talking successfully with the Spark driver.
... View more
03-31-2016
08:24 PM
1 Kudo
One caveat: In case you reboot (reset) your VM/Sandbox, you should enable 'ntpd' daemon to start on bootup. I had trouble with GetTwitter as mentioned in the post above, even after following the steps to add ntpd and enable it. However, in the meantime, I had to reboot, which turned it off. To enable it on system bootup, run this command: chkconfig ntpd on
To make sure it was effective, you can run this command to make sure 'ntpd' is enabled in the run modes (2,3,4,5): chkconfig --list | grep ntpd
... View more
03-30-2016
07:11 PM
Which version of the HDP Sandbox are you running? (2.4 or 2.3.x) There is a chance that zeppelin can't connect to the Spark interpreter, so from the Interpreter page (shown), click on the [restart] button, wait for 5 seconds, and go back and refresh the notebook page. Also, from the notebook page in zeppelin, click on the gears icon to make sure Spark is one of the listed interpreters.
... View more
03-28-2016
04:05 PM
1 Kudo
In order to run properly on cluster (using one of the 2 described cluster modes), Spark needs to distribute any extra jars that are required at runtime. Normally, the Spark driver sends required jars to the nodes for use by the executors, but that doesn't happen by default for user-supplied or third-party jars (via import statements). Therefore, you have to set one or two parameters, depending on whether the driver and/or the executors need those libs: # Extra Classpath jars
spark.driver.extraClassPath=/home/zeppelin/notebook/jars/guava-11.0.2.jar
spark.executor.extraClassPath=/home/zeppelin/notebook/jars/guava-11.0.2.jar If you are not sure, set both. Finally, the actual jar files should be copied to the specified location. If on the local filesystem, you will have to copy to each node's local fs. If you reference from HDFS, then a single copy will suffice.
... View more
03-24-2016
08:26 PM
From the error message in the stack trace, it looks like you may have mistyped the spark-submit command line. The main class definition is provided by the --class <main-class> parameter, as shown in this syntax definition: ./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments] If you have put the string "Dhdp.version=2.3.4.1-10" on the command-line, then it could lead to the error. The other possibility is that you have entered this string into the "spark-env.sh" file within $SPARK_HOME/conf directory. Double check this file and look for any parameter ending in "OPTS", such as "SPARK_DAEMON_JAVA_OPTS". This could be adding something wrong to the spark-submit arg list, and lead to the error.
... View more
03-23-2016
05:54 PM
8 Kudos
Zeppelin stores all displayable information in a JSON format file named "note.json" (default), located under the home directory, usually /user/zeppelin/notebook. This JSON file includes source code, markup, and output results. Easiest thing to do is: ssh into the machine where zeppelin service is running cd to the notebook directory (cd /user/zeppelin/notebook) cd to the specific notebook sub-directory; each notebook is in separate sub-dir (cd
2A94M5J1Z) edit the note.json file and remove the unwanted results If you use a good editor (like TextMate or vim) that has JSON plugin to format the contents, you can easily locate the results section and rip it out. Make sure you don't break the integrity of the JSON file itself; you just want to eliminate the inner JSON contents where the superfluous result is stored. Here is an example of a results field from note.json: "result": {
"code": "SUCCESS",
"type": "HTML",
"msg": "\u003ch2\u003eWelcome to Zeppelin.\u003c/h2\u003e\n\u003ch5\u003eThis is a live tutorial, you can run the code yourself. (Shift-Enter to Run)\u003c/h5\u003e\n"
},
... View more
03-23-2016
12:14 PM
2 Kudos
I assume you are referring to using Spark's MLlib to train a machine learning model. If so, then I'm betting people are saying that because you have to launch Spark where the client is installed, which is typically on an edge node. The other reason is if they are using Zeppelin to access Spark, then the Zeppelin service and web client would likely be on the management node. However, when you run Spark in cluster modes ("yarn-client" or "yarn-cluster") then the spark job takes advantage of all the Yarn nodes on the cluster. Tuning Spark properly to take advantage of these cluster resources can take some time, and many Spark jobs are not properly tuned. Hope that helps, and that I've understood the question.
... View more
03-22-2016
06:37 PM
Remember that the collector.emit() method makes the current tuple available downstream to other bolts consuming the stream. Therefore, generally speaking, the emit() is the last thing your current bolt does (except maybe ack()). In the case you outlined, the current bolt is "feeding" (or routing to) multiple streams (boltReadingStreamId1, boltReadingStreamId2). So each of these downstream bolts will receive tuples in it's own window, handed to it as parameter inputWindow, and processed with: public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation ...
}
Once the downstream bolts receive their tuples in window, they can access them in via multiple method options, such as: /*
* The inputWindow gives a view of
* (a) all the events in the window
* (b) events that expired since last activation of the window
* (c) events that newly arrived since last activation of the window
*/
List<Tuple> tuplesInWindow = inputWindow.get();
List<Tuple> newTuples = inputWindow.getNew();
List<Tuple> expiredTuples = inputWindow.getExpired();
... View more
03-17-2016
08:02 PM
Have you tried using this Spark syntax described here: http://phoenix.apache.org/phoenix_spark.html import org.apache.spark.SparkContext
import org.apache.phoenix.spark._
val sc = new SparkContext("local", "phoenix-test")
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
sc
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
Seq("ID","COL1","COL2"),
zkUrl = Some("phoenix-server:2181")
)
... View more
03-16-2016
09:03 PM
1 Kudo
I can only comment on question #1, but I would say, for sure you want to run in one of the yarn modes (yarn-client or yarn-cluster), if only so oozie can get information on container usage and job completion. You can do this by using the --master command-line argument: spark-submit --master yarn-client --class com.mypackage.MyClass /user/hue/oozie/workspaces/_hue_-oozie-1-1457990185.41/my.jar
... View more
02-23-2016
03:19 PM
3 Kudos
You can either run Spark natively and declare a SparkR context, via sparkR.init(), or use RStudio for IDE access. Instructions for both are included here: https://spark.apache.org/docs/latest/sparkr.html
... View more
02-22-2016
10:49 PM
The UC Berkeley white paper on RDD's spells it out this way: Although individual RDDs are immutable, it is possible to implement
mutable state by having multiple RDDs to represent multiple versions
of a dataset. We made RDDs immutable to make it easier to describe
lineage graphs, but it would have been equivalent to have our
abstraction be versioned datasets and track versions in lineage graphs. If you notice the output above, your collect() output is initially res6, and second time is res8. This, presumably, is a new version of the initial RDD and thus gets a different reference name.
... View more
02-17-2016
10:46 PM
2 Kudos
Spark has a PySpark class that acts as a wrapper around Spark's scala-based libraries. It also provides REPL interface for the python interpreter. If you launch pySpark, you will be able to import whatever python libraries you have installed locally, i.e. python imports should work. Specifically (from the docs): PySpark requires Python 2.6 or higher. PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions. We have not tested PySpark with Python 3 or with alternative Python interpreters, such as PyPy or Jython. By default, PySpark requires python to be available on the system PATH and use it to run programs; an alternate Python executable may be specified by setting the PYSPARK_PYTHON environment variable in conf/spark-env.sh (or .cmd on Windows). All of PySpark’s library dependencies, including Py4J, are bundled with PySpark and automatically imported. Standalone PySpark applications should be run using the bin/pyspark script, which automatically configures the Java and Python environment using the settings in conf/spark-env.sh or .cmd . The script automatically adds the bin/pyspark package to the PYTHONPATH .
... View more
02-17-2016
04:19 PM
1 Kudo
From the Storm TruckEvents tutorial, here is the Hbase pom reference for inclusion of jar files into the Storm deploy-jar. Beware that this is for HDP 2.2 example: ...
<hbase.version>0.98.0.2.1.1.0-385-hadoop2</hbase.version>
...
<!-- HBase Dependcies -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency> Therefore, you need to include both the Hbase jars in the uber Storm jar, as well as the config file as shown in Ali's answer above.
... View more
02-07-2016
01:38 AM
2 Kudos
Leave storm/lib vanilla and package the higher versions of those libs with your topology jar using maven shade to relocate the necessary packages to avoid conflict. Here is an example from a storm topology that relocates guava (com.google.common). <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${version.shade}</version>
<configuration>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.cisco.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.datanucleus</exclude>
</excludes>
</artifactSet>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
... View more
02-04-2016
08:03 PM
1 Kudo
This tutorial skipped one set of instructions to eliminate use of Yarn HistoryServer. These are the required steps: Ensure “spark-defaults.conf” doesn’t have any YARN history service related properties enabled. If this tech preview is installed on a node where Spark was already there, there may be Spark properties set related to YARN ATS. Make sure you have disabled the following properties in your “spark-defaults.conf” file by adding a '#' in front of each setting.
#spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider #spark.history.ui.port 18080 #spark.yarn.historyServer.address sandbox.hortonworks.com:18080 #spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
... View more
01-27-2016
09:49 PM
3 Kudos
Workaround for Hive queries OutOfMemory errors: Please note that in some cases (such as when running the Hortonworks Sandbox on Microsoft Azure VM and allocating ‘A4’ VM machine), some of the Hive queries will produce OutOfMemory (Java Heap) errors. As a workaround, you can adjust some Hive-Tez config parameters using Ambari console. Go to the Services–>Hive page, click on ‘Configs’ tab, and make the following changes: 1) Scroll down to Optimization section, change Tez Container Size, increasing from 200 to 512
Param: “hive.tez.container.size” Value: 512 2) Click on “Advanced” tab to show extra settings, scroll down to find parameter “hive.tez.java.opts”, and change Hive-Tez Java Opts by increasing Java Heap Max size from 200MB to 512MB:
Param: “hive.tez.java.opts” Value: “-server -Xmx512m -Djava.net.preferIPv4Stack=true”
... View more
01-27-2016
09:47 PM
1 Kudo
Some users are getting OutOfMemory errors when running the "Getting Started with HDP" tutorial on Hortonworks website: http://hortonworks.com/hadoop-tutorial/hello-world-an-introduction-to-hadoop-hcatalog-hive-and-pig/#section_1 What is the suggested workaround, especially when running in a limited memory environment like the Sandbox?
... View more
Labels:
01-21-2016
10:52 PM
1 Kudo
This is not a complete answer, but would like to also add that, by default, Kafka brokers write to local storage (not HDFS), and therefore, benefit from fast local disk (SSD) and/or multiple spindles to parallelize writes to partitions. I don't know of a formula to calculate this, but try to maximize I/O throughput to disk, and allocate # spindles up to the # of available CPUs per node. Lots of Hadoop architectures don't really specify allocation for local storage (beyond OS disk), and therefore it is something to be aware of.
... View more
01-07-2016
04:10 PM
Actually, many BI vendors including Tableau have announced a Spark Connector over JDBC, which should presumably be able to leverage data loaded into RDD's in memory. If you load data via Spark Streaming into RDD, then either schematize it (rdd.registerTempTable) or convert to DataFrame (rdd.toDF), you should be able to query that data from a JDBC connection and display in dashboard. Here is info on Tableau connector, including a video at bottom of page: https://www.google.com/url?sa=t&rct=j&q=&esrc=s&so...
... View more
01-05-2016
07:21 PM
1 Kudo
The error message seems to indicate that you cannot simply enable CBO for your session or connection (i.e. with a set statement). Rather, it should be enabled for the HiveServer2 instance "cluster wide", and not per user session. This could be due to how the Calcite optimizer is initialized or executed.
... View more
- « Previous
- Next »