1973
            
            
                Posts
            
        
                1225
            
            
                Kudos Received
            
        
                124
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 1824 | 04-03-2024 06:39 AM | |
| 2835 | 01-12-2024 08:19 AM | |
| 1570 | 12-07-2023 01:49 PM | |
| 2321 | 08-02-2023 07:30 AM | |
| 3204 | 03-29-2023 01:22 PM | 
			
    
	
		
		
		05-19-2016
	
		
		01:06 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		3 Kudos
		
	
				
		
	
		
					
							 It is very easy to connect various tools to Hive.  Apache Zeppelin is my favorite tool to query Hive via HiveQL, but you have a ton of options.  First,   DBVisualizer, I am using the Free Edition for MacOSx.  For both tools, you will need to copy (or acquire) a copy of two Hive JARS:  hadoop-common.jar  hive-jdbc-1.2.1000.2.4.0.0-169-standalone.jar  On HDP 2.4:  /usr/hdp/current/hive-client/lib/hive-jdbc-1.2.1000.2.4.0.0-169-standalone.jar  /usr/hdp/current/hadoop-client/hadoop-common.jar              Another tool, DBeaver which has a unique twist (it finds and downloads the JARS itself via Maven, though it takes a bit for the initial driver install).  It also supports Phoenix, Drill and GemfireXD.          Also I did SquirrelSQL.   For SquirrelSQL setup details, check out the official Hive Docs.  jdbc:hive2://localhost:10000/default
org.apache.hive.jdbc.HiveDriver          For Hive URL, see the Hive Docs.  I also connected from SQLWorkBench/J as well.  This is a free SQL Tool.             
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		05-19-2016
	
		
		10:40 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Having run a bunch of Spark jobs locally, in Spark Standalone clusters and in HDP Yarn Clusters; I have found a few JVM settings that helped with debugging non-production jobs and assist with better Garbage Collection.   This is important even with off-heap storage and bare metal optimizations.  spark-submit  --driver-java-options "-XX:+PrintGCDetails -XX:+UseG1GC -XX:MaxGCPauseMillis=400" 
  You can also set options extra options in the runtime environment (see Spark Documentation).  For HDP / Spark, you can add this from Ambari.      In your Scala Spark Program:  sparkConf.set("spark.cores.max", "4")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "MyAppIWantToFind")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "false")
sparkConf.set("spark.suffle.compress", "true")
  Make sure you have Tungsten on, the KryoSerializer, eventLog enabled and use Logging.  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val log = Logger.getLogger("com.hortonworks.myapp")
log.info("Started Logs Analysis")  Also, whenever possible include relevant filters on your datasets:  "filter(!_.clientIp.equals("Empty"))". 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		05-19-2016
	
		
		02:13 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Updated for 2.4  https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_spark-guide/content/ch_tuning-spark.html 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-16-2016
	
		
		02:54 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Make sure you have tungsten enabled.   Take a look at the history after the run.  sparkConf.set("spark.cores.max", "24")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "YourApp")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
sparkConf.set("spark.sql.parquet.mergeSchema", "true")
sparkConf.set("spark.sql.parquet.binaryAsString", "true")
val sc = new SparkContext(sparkConf)
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")  What does the query look like?  What else is running in YARN?  What kind of tables?   Joins?  If you look at the Spark History UI and you can look at what exactly spark was doing.   Make sure eventlog is enabled.  Take a look at the Spark logs, might be an error in there.  In the latest version of HDP you get Spark 1.6 which has some performance enhancements. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-16-2016
	
		
		02:10 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Ambari agents running everywhere?  SSHless setup.   Firewalls configured.   Running as the correct user?  Correct permissions on directories and files?  Sometimes a reboot helps. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-15-2016
	
		
		12:07 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 You can also use HDF or Spark if you need to do some interesting things with it 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-13-2016
	
		
		07:50 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		4 Kudos
		
	
				
		
	
		
					
							 Below is a short Scala program and SBT build script to generate a Spark application to submit to YARN that will run on the HDP Sandbox with Spark 1.6.  We consume KAFKA messages in microbatches of 2 seconds.  We set a number of parameters for performance of the SparkSQL write to HDFS including enabling Tungsten, using Snappy compression, Backpressure and the KryoSerializer.  We setup the Spark Context and the Spark Streaming Context, then use the KafkaUtils to prepare a Stream of generic Avro data records from a byte array.   From there we then convert to a Scala Case Class that models the Twitter tweet from our source.   We convert to a DataFrame, register it as a Temp Table and then save to ORC File, AVRO, Parquet and JSON.  To Build this Application:  sbt clean assembly  
  Create Kafka Topic  cd /usr/hdp/current/kafka-broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meetup
  To Prepare Hadoop Environment  # Create HDFS Directories (also avro, orc, json, ... )
hdfs dfs -mkdir /parquetresults
hdfs dfs -chmod -R 777 /parquetresults
hdfs dfs -ls /
  
  To Deploy to YARN  # Check YARN
yarn application --list
# If need be
# yarn application --kill <id from list>
spark-submit --class "com.sparkdeveloper.receiver.KafkaConsumer" \
           --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --conf spark.ui.port=4244 kafkaconsumer.jar
  Spark Scala Program  package com.sparkdeveloper.receiver
import java.io.ByteArrayOutputStream
import java.util.HashMap
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import kafka.serializer._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{ByteArrayOutputStream, File, IOException}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.io._
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
import org.apache.avro._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import com.databricks.spark.avro._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
case class HashtagEntities(text: String, start: Double, end: Double)
case class User(id: Double, name: String,
                screenName: String, location: String, description: String, url: String, statusesCount: Double)
case class Tweet(text: String, createdAt: String, lang: String, source: String, expandedURL: String,
                 url: String, screenName: String, description: String, name: String, retweetCount: Double, timestamp: Long,
                 favoriteCount: Double, user: Option[User], hashtags: HashtagEntities)
/**
  * Created by timothyspann
  */
object KafkaConsumer {
  val tweetSchema = SchemaBuilder
    .record("tweet")
    .fields
    .name("tweet").`type`().stringType().noDefault()
    .name("timestamp").`type`().longType().noDefault()
    .endRecord
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    val logger: Logger = Logger.getLogger("com.sparkdeveloper.receiver.KafkaConsumer")
    val sparkConf = new SparkConf().setAppName("Avro to Kafka Consumer")
    sparkConf.set("spark.cores.max", "24") // For my sandbox
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "KafkaConsumer") // want to know your app in the UI
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "true")
    sparkConf.set("spark.streaming.backpressure.enabled", "true")
    sparkConf.set("spark.sql.parquet.compression.codec", "snappy")
    sparkConf.set("spark.sql.parquet.mergeSchema", "true")
    sparkConf.set("spark.sql.parquet.binaryAsString", "true")
    val sc = new SparkContext(sparkConf)
    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
    val ssc = new StreamingContext(sc, Seconds(2))
    try {
      val kafkaConf = Map(
        "metadata.broker.list" -> "sandbox.hortonworks.com:6667",
        "zookeeper.connect" -> "sandbox.hortonworks.com:2181", // Default zookeeper location
        "group.id" -> "KafkaConsumer",
        "zookeeper.connection.timeout.ms" -> "1000")
      val topicMaps = Map("meetup" -> 1)
      // Create a new stream which can decode byte arrays.
      val tweets = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder]
(ssc, kafkaConf,topicMaps, StorageLevel.MEMORY_ONLY_SER)
      try {
        tweets.foreachRDD((rdd, time) => {
          if (rdd != null) {
            try {
              val sqlContext = new SQLContext(sc)
              import sqlContext.implicits._
              val rdd2 = rdd.map { case (k, v) => parseAVROToString(v) }
              try {
                val result = rdd2.mapPartitions(records => {
                  val mapper = new ObjectMapper()
                  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                  mapper.registerModule(DefaultScalaModule)
                  records.flatMap(record => {
                    try {
                      Some(mapper.readValue(record, classOf[Tweet]))
                    } catch {
                      case e: Exception => None;
                    }
                  })
                }, true)
                val df1 = result.toDF()
                logger.error("Registered tweets: " + df1.count())
                df1.registerTempTable("tweets")
		
		// To show how easy it is to write multiple formats
                df1.write.format("orc").mode(org.apache.spark.sql.SaveMode.Append).orc("orcresults")
                df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).avro("avroresults")
                df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("parquetresults")
               df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults")
              } catch {
                case e: Exception => None;
              }
            }
            catch {
              case e: Exception => None;
            }
          }
        })
      } catch {
        case e: Exception =>
          println("Writing files after job. Exception:" + e.getMessage);
          e.printStackTrace();
      }
    } catch {
      case e: Exception =>
        println("Kafka Stream. Writing files after job. Exception:" + e.getMessage);
        e.printStackTrace();
    }
    ssc.start()
    ssc.awaitTermination()
  }
  def parseAVROToString(rawTweet: Array[Byte]): String = {
    try {
      if (rawTweet.isEmpty) {
        println("Rejected Tweet")
        "Empty"
      }
      else {
        deserializeTwitter(rawTweet).get("tweet").toString
      }
    } catch {
      case e: Exception =>
        println("Exception:" + e.getMessage);
        "Empty"
    }
  }
  def deserializeTwitter(tweet: Array[Byte]): GenericRecord = {
    try {
      val reader = new GenericDatumReader[GenericRecord](tweetSchema)
      val decoder = DecoderFactory.get.binaryDecoder(tweet, null)
      reader.read(null, decoder)
    } catch {
        case e: Exception => None;
        null;
      }
    }
  }
// scalastyle:on println
build.sbt
name := "KafkaConsumer"
version := "1.0"
scalaVersion := "2.10.6"
jarName in assembly := "kafkaconsumer.jar"
libraryDependencies  += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies  += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided"
libraryDependencies  += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided"
libraryDependencies  += "com.databricks" %% "spark-avro" % "2.0.1"
libraryDependencies  += "org.apache.avro" % "avro" % "1.7.6" % "provided"
libraryDependencies  += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0"
libraryDependencies  += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"
libraryDependencies  += "com.google.code.gson" % "gson" % "2.3"
mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                  => MergeStrategy.first
}
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		05-13-2016
	
		
		06:45 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 as long as logging is on, a lot will show in the history and in logs.  For Spark Job setup:    sparkConf.set("spark.eventLog.enabled","true")  Then check the Spark History Server  You can also put on old fashioned Java logging  import org.apache.log4j.{Level, Logger}    
val logger: Logger = Logger.getLogger("My.Example.Code.Rules")
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
logger.setLevel(Level.INFO)
  You can set it to info, but expect a lot of junk.    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-13-2016
	
		
		03:01 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 In preparation, get together version #, OS version, # of nodes, RAM/CPU/HDs on each node and what you are using nodes for.  Are you using HAWQ?  Greenplum?  Gemfire?  GemfireXD?  Spring XD?  Definitely engage you Pivotal and Hortonworks support teams. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-11-2016
	
		
		07:47 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 can you show a Java dump?  What are you memory settings?  What is the JVM heap dump?  What are your Flume settings and versions?  Is there space on /var/log/flume/data  See here:  https://issues.apache.org/jira/browse/FLUME-2403  Do the twitter messages before that look okay?  Normal JSON?  Have you tried this in NIFI?   My Twitter to NIFI worked without issue. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
        













