Member since
03-24-2016
184
Posts
239
Kudos Received
39
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2606 | 10-21-2017 08:24 PM | |
1607 | 09-24-2017 04:06 AM | |
5699 | 05-15-2017 08:44 PM | |
1729 | 01-25-2017 09:20 PM | |
5696 | 01-22-2017 11:51 PM |
04-03-2016
06:45 PM
2 Kudos
@John Cod Spark shell attempts to start a SQL Context by default. The first thing I would check is whether you are pointing Spark at your existing Hive meta store. In your {SPARK_HOME}/conf folder you should have a hive-site.xml file. Make sure you have the following configuraiton: <property>
<name>hive.metastore.uris</name>
<value>thrift://{IP of meta store host}:{port meta store listening}</value>
</property>
This should tell Spark Shell to connect to your existing meta store instead of trying to create a default, which is what it looks like it is trying to do. The SQL context should now be able to start up and you should be able to access Hive by using the default SQLContext. val result = sqlContext.sql("SELECT * FROM {hive table name}")
result.show
If the Hive Context was not created by default then do this and retry the query: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
... View more
03-31-2016
07:01 PM
@dconnolly My pleasure. Would you mind accepting the answer and up voting?
... View more
03-31-2016
12:03 PM
1 Kudo
There is certainly no reason why you could not use Nifi for one off, movement of small amounts of data or for repeated enrichment of events from an RDBMS. But just as you would not use a hand axe to chop down a Redwood, you generally would not use an event/message stream processing tool to handle a large static ETL job such as a database export. It's simply a matter of using the tool for the purpose that it was designed for.
... View more
03-31-2016
11:45 AM
@Sridhar Babu M Glad it worked out. Would you mind accepting this answer and the one from the other thread? https://community.hortonworks.com/questions/24518/spark-sql-query-to-modify-values.html
... View more
03-31-2016
01:21 AM
<code>groupId: com.databricks
artifactId: spark-csv_2.10
version: 1.4.0
@Sridhar Babu M Make sure that you add the dependency in Zeppelin %dep z.load("com.databricks:spark-csv_2.10:1.4.0") or spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
... View more
03-30-2016
10:19 PM
So I have now had a chance to do some reading an experimentation on my own and Ignite seems to be very impressive in terms of the capabilities it provides. Besides for providing the standard ability of an in-memory data grid to cache data across many remote JVMs, the ability to distribute processing to where the data lives, execute functions on individual requests based on operation type, live based event notification when data changes, the ability to read/write through/behind to data store, ect... Ignite in particular has some excellent built in integration with Hadoop. Data grids provide the ability to plug a persistence layer by writing code, Ignite allows configuration based integration with HDFS, no code. This means that it is possible to support OLTP type of access to data in order to support new kinds of applications. It could also help speed up so long running jobs by ensuring that really hot data is read from memory, or perhaps speed things up by using in-memory scratch space instead of local disk (this seemed like it was possible but was not entirely clear). It also seems to also do all of the same things that Tachyon does. The other key integration point is the ability to run Ignite on Yarn. The compute aspect of some of the acceleration features can be managed by the cluster as if it is native. There definitely seems to be some great synergy between Ingnite and Hadoop.
... View more
03-30-2016
09:50 PM
1 Kudo
@Adam Doyle If I understand your question correctly, you could try to use a state management function with UpdateStateByKey (http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) where the key is the recordType field (I am assuming this is a String). You would need to have all of the target HBase tables initialized at startup and then you would put the handle object for each table into a map where the key is also the recordType string. The function itself would just have the logic to persist the record by looking up the Hbase table object in the map you created earlier. The you just create the Hbase Put object by looping through the columns and values of the record and then executing the Put to the table you got from the Table Map. The stateful function is typically used to keep a running aggregate. However, because it actually partitions the DStream (I believe by creating separate DStreams) based on the key you provide it should allow you to write generic logic where you lookup the specifics (like target table and columns) at run time.
... View more
03-30-2016
07:59 PM
14 Kudos
@Randy Gelhausen For transferring data out of an relational database, Sqoop is still the way to go. The main reason is that Nifi will not parallelize the loading of the data automatically. When you create an ExecuteSQL processor it will start pulling data fromt he DB on the thread that the processor is running on. The only way to get parallelism at this point is to create multiple processors each pulling a partition of the target data. Sqoop will take the SQL statement and create workers that will pull their partition of data this automatically providing parallelism and ensuring the fastest most efficient load of RDBMS data. Don't get me wrong,Nifi is great for just about any simple event processing use case that can be concieved but for a pure large scale RDBMS ingest use case Sqoop is the way to go for now.
... View more
03-30-2016
06:33 PM
1 Kudo
@dconnolly The simplest approach is to build a REST web service on a servlet container like Tomcat or Jetty. Strictly speaking, you could publish that web service anywhere, however, I would recommend that you leverage the existing resources available in the Hadoop cluster use Slider to run the web service on Yarn. Try this: Create a web service and build it as a runnable jar. Put that jar into a linux docker container and then create the configuration files need to run the docker container with slider. https://slider.incubator.apache.org/docs/slider_specs/application_pkg_docker.html http://www.slideshare.net/hortonworks/docker-on-slider-45493303 Use Slider to start the docker container on Yarn and secure the listener endpoint with Knox. This would allow you to leverage the resources of the cluster, manage the synch service resources with Yarn, and provide security for the API endpoint with Knox.
... View more
03-30-2016
04:04 PM
@Sridhar Babu M If you actually need to change the value in the file then you will need to export the resulting Data Frame to file. The save function that is part of DF class creates a files for each partition. If you need a single file you convert back to an RDD and use coalesce(1) to get everything down to a single partition so you get one file. Make sure that you add the dependency in Zeppelin %dep z.load("com.databricks:spark-csv_2.10:1.4.0") or spark-shell --packages com.databricks:spark-csv_2.10:1.4.0 import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
case class Person(name: String, age: Int)
var personRDD = sc.textFile("/user/spark/people.txt")
var personDF = personRDD.map(x=>x.split(",")).map(x=>Person(x(0),(x(1).trim.toInt))).toDF()
personDF.registerTempTable("people")
var personeDF = sqlContext.sql("SELECT * FROM people")
var agedPerson = personDF.map(x=>if(x.getAs[String]("name")=="Justin"){Person(x.getAs[String]("name"), x.getAs[Int]("age")+2)}else{Person(x.getAs[String]("name"), x.getAs[Int]("age"))}).toDF()
agedPerson.registerTempTable("people")
var agedPeopleDF = sqlContext.sql("SELECT * FROM people")
agedPeopleDF.show
agedPeopleDF.select("name", "age").write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).save("agedPeople")
var agedPeopleRDD = agedPeopleDF.rdd
agedPeopleRDD.coalesce(1).saveAsTextFile("agedPeopleSingleFile")
... View more