1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
546 | 04-03-2024 06:39 AM | |
1198 | 01-12-2024 08:19 AM | |
673 | 12-07-2023 01:49 PM | |
1138 | 08-02-2023 07:30 AM | |
1674 | 03-29-2023 01:22 PM |
05-19-2016
01:06 PM
3 Kudos
It is very easy to connect various tools to Hive. Apache Zeppelin is my favorite tool to query Hive via HiveQL, but you have a ton of options. First, DBVisualizer, I am using the Free Edition for MacOSx. For both tools, you will need to copy (or acquire) a copy of two Hive JARS: hadoop-common.jar hive-jdbc-1.2.1000.2.4.0.0-169-standalone.jar On HDP 2.4: /usr/hdp/current/hive-client/lib/hive-jdbc-1.2.1000.2.4.0.0-169-standalone.jar /usr/hdp/current/hadoop-client/hadoop-common.jar Another tool, DBeaver which has a unique twist (it finds and downloads the JARS itself via Maven, though it takes a bit for the initial driver install). It also supports Phoenix, Drill and GemfireXD. Also I did SquirrelSQL. For SquirrelSQL setup details, check out the official Hive Docs. jdbc:hive2://localhost:10000/default
org.apache.hive.jdbc.HiveDriver For Hive URL, see the Hive Docs. I also connected from SQLWorkBench/J as well. This is a free SQL Tool.
... View more
Labels:
05-19-2016
10:40 AM
Having run a bunch of Spark jobs locally, in Spark Standalone clusters and in HDP Yarn Clusters; I have found a few JVM settings that helped with debugging non-production jobs and assist with better Garbage Collection. This is important even with off-heap storage and bare metal optimizations. spark-submit --driver-java-options "-XX:+PrintGCDetails -XX:+UseG1GC -XX:MaxGCPauseMillis=400"
You can also set options extra options in the runtime environment (see Spark Documentation). For HDP / Spark, you can add this from Ambari. In your Scala Spark Program: sparkConf.set("spark.cores.max", "4")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "MyAppIWantToFind")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "false")
sparkConf.set("spark.suffle.compress", "true")
Make sure you have Tungsten on, the KryoSerializer, eventLog enabled and use Logging. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val log = Logger.getLogger("com.hortonworks.myapp")
log.info("Started Logs Analysis") Also, whenever possible include relevant filters on your datasets: "filter(!_.clientIp.equals("Empty"))".
... View more
Labels:
05-19-2016
02:13 AM
1 Kudo
Updated for 2.4 https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_spark-guide/content/ch_tuning-spark.html
... View more
05-16-2016
02:54 PM
Make sure you have tungsten enabled. Take a look at the history after the run. sparkConf.set("spark.cores.max", "24")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "YourApp")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("spark.sql.parquet.mergeSchema", "true")
sparkConf.set("spark.sql.parquet.binaryAsString", "true")
val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") What does the query look like? What else is running in YARN? What kind of tables? Joins? If you look at the Spark History UI and you can look at what exactly spark was doing. Make sure eventlog is enabled. Take a look at the Spark logs, might be an error in there. In the latest version of HDP you get Spark 1.6 which has some performance enhancements.
... View more
05-16-2016
02:10 PM
Ambari agents running everywhere? SSHless setup. Firewalls configured. Running as the correct user? Correct permissions on directories and files? Sometimes a reboot helps.
... View more
05-15-2016
12:07 PM
You can also use HDF or Spark if you need to do some interesting things with it
... View more
05-13-2016
07:50 PM
4 Kudos
Below is a short Scala program and SBT build script to generate a Spark application to submit to YARN that will run on the HDP Sandbox with Spark 1.6. We consume KAFKA messages in microbatches of 2 seconds. We set a number of parameters for performance of the SparkSQL write to HDFS including enabling Tungsten, using Snappy compression, Backpressure and the KryoSerializer. We setup the Spark Context and the Spark Streaming Context, then use the KafkaUtils to prepare a Stream of generic Avro data records from a byte array. From there we then convert to a Scala Case Class that models the Twitter tweet from our source. We convert to a DataFrame, register it as a Temp Table and then save to ORC File, AVRO, Parquet and JSON. To Build this Application: sbt clean assembly
Create Kafka Topic cd /usr/hdp/current/kafka-broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meetup
To Prepare Hadoop Environment # Create HDFS Directories (also avro, orc, json, ... )
hdfs dfs -mkdir /parquetresults
hdfs dfs -chmod -R 777 /parquetresults
hdfs dfs -ls /
To Deploy to YARN # Check YARN
yarn application --list
# If need be
# yarn application --kill <id from list>
spark-submit --class "com.sparkdeveloper.receiver.KafkaConsumer" \
--master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --conf spark.ui.port=4244 kafkaconsumer.jar
Spark Scala Program package com.sparkdeveloper.receiver
import java.io.ByteArrayOutputStream
import java.util.HashMap
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import kafka.serializer._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{ByteArrayOutputStream, File, IOException}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.io._
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
import org.apache.avro._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import com.databricks.spark.avro._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
case class HashtagEntities(text: String, start: Double, end: Double)
case class User(id: Double, name: String,
screenName: String, location: String, description: String, url: String, statusesCount: Double)
case class Tweet(text: String, createdAt: String, lang: String, source: String, expandedURL: String,
url: String, screenName: String, description: String, name: String, retweetCount: Double, timestamp: Long,
favoriteCount: Double, user: Option[User], hashtags: HashtagEntities)
/**
* Created by timothyspann
*/
object KafkaConsumer {
val tweetSchema = SchemaBuilder
.record("tweet")
.fields
.name("tweet").`type`().stringType().noDefault()
.name("timestamp").`type`().longType().noDefault()
.endRecord
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val logger: Logger = Logger.getLogger("com.sparkdeveloper.receiver.KafkaConsumer")
val sparkConf = new SparkConf().setAppName("Avro to Kafka Consumer")
sparkConf.set("spark.cores.max", "24") // For my sandbox
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "KafkaConsumer") // want to know your app in the UI
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("spark.sql.parquet.mergeSchema", "true")
sparkConf.set("spark.sql.parquet.binaryAsString", "true")
val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
val ssc = new StreamingContext(sc, Seconds(2))
try {
val kafkaConf = Map(
"metadata.broker.list" -> "sandbox.hortonworks.com:6667",
"zookeeper.connect" -> "sandbox.hortonworks.com:2181", // Default zookeeper location
"group.id" -> "KafkaConsumer",
"zookeeper.connection.timeout.ms" -> "1000")
val topicMaps = Map("meetup" -> 1)
// Create a new stream which can decode byte arrays.
val tweets = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder]
(ssc, kafkaConf,topicMaps, StorageLevel.MEMORY_ONLY_SER)
try {
tweets.foreachRDD((rdd, time) => {
if (rdd != null) {
try {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd2 = rdd.map { case (k, v) => parseAVROToString(v) }
try {
val result = rdd2.mapPartitions(records => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Tweet]))
} catch {
case e: Exception => None;
}
})
}, true)
val df1 = result.toDF()
logger.error("Registered tweets: " + df1.count())
df1.registerTempTable("tweets")
// To show how easy it is to write multiple formats
df1.write.format("orc").mode(org.apache.spark.sql.SaveMode.Append).orc("orcresults")
df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).avro("avroresults")
df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("parquetresults")
df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults")
} catch {
case e: Exception => None;
}
}
catch {
case e: Exception => None;
}
}
})
} catch {
case e: Exception =>
println("Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
} catch {
case e: Exception =>
println("Kafka Stream. Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
ssc.start()
ssc.awaitTermination()
}
def parseAVROToString(rawTweet: Array[Byte]): String = {
try {
if (rawTweet.isEmpty) {
println("Rejected Tweet")
"Empty"
}
else {
deserializeTwitter(rawTweet).get("tweet").toString
}
} catch {
case e: Exception =>
println("Exception:" + e.getMessage);
"Empty"
}
}
def deserializeTwitter(tweet: Array[Byte]): GenericRecord = {
try {
val reader = new GenericDatumReader[GenericRecord](tweetSchema)
val decoder = DecoderFactory.get.binaryDecoder(tweet, null)
reader.read(null, decoder)
} catch {
case e: Exception => None;
null;
}
}
}
// scalastyle:on println
build.sbt
name := "KafkaConsumer"
version := "1.0"
scalaVersion := "2.10.6"
jarName in assembly := "kafkaconsumer.jar"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided"
libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1"
libraryDependencies += "org.apache.avro" % "avro" % "1.7.6" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0"
libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3"
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
... View more
Labels:
05-13-2016
06:45 PM
as long as logging is on, a lot will show in the history and in logs. For Spark Job setup: sparkConf.set("spark.eventLog.enabled","true") Then check the Spark History Server You can also put on old fashioned Java logging import org.apache.log4j.{Level, Logger}
val logger: Logger = Logger.getLogger("My.Example.Code.Rules")
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
logger.setLevel(Level.INFO)
You can set it to info, but expect a lot of junk.
... View more
05-13-2016
03:01 PM
In preparation, get together version #, OS version, # of nodes, RAM/CPU/HDs on each node and what you are using nodes for. Are you using HAWQ? Greenplum? Gemfire? GemfireXD? Spring XD? Definitely engage you Pivotal and Hortonworks support teams.
... View more
05-11-2016
07:47 PM
1 Kudo
can you show a Java dump? What are you memory settings? What is the JVM heap dump? What are your Flume settings and versions? Is there space on /var/log/flume/data See here: https://issues.apache.org/jira/browse/FLUME-2403 Do the twitter messages before that look okay? Normal JSON? Have you tried this in NIFI? My Twitter to NIFI worked without issue.
... View more