Member since
05-16-2016
26
Posts
20
Kudos Received
0
Solutions
02-06-2018
04:59 PM
5 Kudos
In this tutorial we will walkthrough the steps to enable YARN Node labels so that we can run LLAP on a specific set of nodes in the cluster. (Stack Version HDP 2.6.3) In the first step we will check if we have any NodeLabels created on the cluster and the command is as follows [$] yarn cluster --list-node-labels 18/02/05 10:32:29 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8050
18/02/05 10:32:31 INFO client.AHSProxy: Connecting to Application History server at stocks.hdp/192.168.1.200:10200 Node Labels: This shows that there are no node labels configured on the cluster. We now try to add a new node label, we will use this node label to assign to LLAP queue, so that LLAP runs on that specific node(s). For now we will keep the node label exclusive, so that only users in LLAP queue can access these nodes [$] yarn rmadmin -addToClusterNodeLabels "interactive(exclusive=true)” 18/02/05 10:33:25 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141
addToClusterNodeLabels: java.io.IOException: Node-label-based scheduling is disabled. Please check yarn.node-labels.enabled We get an exception saying Node-label-based scheduling is disabled, you may or may not get this error depending on your cluster config. If you do get this error, you will have to enable yarn.node-labels.enabled property in yarn-site.xml.
(Note: If user don’t specify “(exclusive=…)”, execlusive will be true by default) In Ambari you can use YARN config to enable node-labels, while we are at it we will also enable pre-emption. If Premption is enabled, higher-priority application do not have to wait because lower priority application have taken up the available capacity. With Preemption enabled, under-served queues can begin to claim their allocated cluster resources almost immediately, without having to wait for other queues applications to finish running. We can now create a node label [yarn@stocks ~]$ yarn rmadmin -addToClusterNodeLabels "interactive(exclusive=true)" 18/02/05 17:00:42 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141 [yarn@stocks ~]$ yarn rmadmin -addToClusterNodeLabels "high-mem(exclusive=true)" 18/02/05 17:00:54 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141 [yarn@stocks ~]$ yarn rmadmin -addToClusterNodeLabels "balanced(exclusive=true)" 18/02/05 17:02:37 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141 We list the Node Labels again [yarn@stocks ~]$ yarn cluster --list-node-labels 18/02/06 09:41:34 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8050 18/02/06 09:41:35 INFO client.AHSProxy: Connecting to Application History server at stocks.hdp/192.168.1.200:10200 Node Labels: <high-mem:exclusivity=true>,<interactive:exclusivity=true>,<balanced:exclusivity=true> Now we have Node Label for high-men which we will use for spark jobs, interactive label will be used for LLAP and balanced will be used for the rest of the jobs.
We can now go back to Ambari and view the YARN Queue Manager, node labels will now be available for the YARN Queues. We assign Node Label interactive to LLAP and Node Label high-mem to spark queue, and give balanced to the default queue. 100% capacity of each Node Label to the queue (exclusive, adjust based on your workload or if multiple queues are sharing the Node Label). Node Label Assignment for LLAP Queue Node Label Assignment for Spark Queue Node Label Assignment for Default Queue If you need to remove node labels you can use below command
[yarn@stocks ~]$ yarn rmadmin -removeFromClusterNodeLabels “<Node Label Name1>,<Node Label Name2>”
Make sure the Node labels are not assigned to any queue otherwise you will get an exception like “cannot remove nodes label=<label>, because queue=blahblah is using this label. Please remove label on queue before remove the label”
If we look at the YARN Resource Manager UI, you will now be able to see the Node Labels Note that none of the node labels have any active NodeManagers assigned to them and hence no resources < memory:0, vCores:0>, lets assign some nodes to each of the labels and revisit the YARN UI. Since we have only 3 nodes in my environment, we will assign a single node to each NodeLabel, you can decide how many nodes you want to assign to each label based on the amount of memory and vCores for each type of workload. [yarn@stocks ~]$ yarn rmadmin -replaceLabelsOnNode "dn1.stocks.hdp=interactive" 18/02/06 10:10:26 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141 [yarn@stocks ~]$ yarn rmadmin -replaceLabelsOnNode "dn2.stocks.hdp=balanced" 18/02/06 10:10:44 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141 [yarn@stocks ~]$ yarn rmadmin -replaceLabelsOnNode “stocks.hdp=interactive"
18/02/06 10:10:55 INFO client.RMProxy: Connecting to ResourceManager at stocks.hdp/192.168.1.200:8141 Now when we revisit the Yarn UI, we will see the resources assigned to each NodeLabel We now start LLAP on the cluster, assign llap queue which has the two nodes assigned with Node Labels, memory per daemon ~ 78Gb, in-memory cache per daemon ~ 29GB, and number of executors per daemon = 12. If we look at the Running Applications in YARN we will see 1 TEZ AM Container and 1 Slider App Master and 2 LLAP Daemons running
... View more
Labels:
11-19-2017
07:31 PM
Its probably a Spark config issue, can you share the detail log, the information you share doesn't give enough information to identify root cause
... View more
11-19-2017
07:28 PM
Check this post https://community.hortonworks.com/questions/34815/zeppelin-unable-to-run-multiple-hive-queries.html
... View more
08-12-2017
05:10 AM
3 Kudos
You can modify the Hive View 2.0 Settings and set "use.hive.interactive.mode" to true and restart the Hive View to run in llap mode. screen-shot-2017-08-12-at-10848-am.png
... View more
07-14-2017
01:55 AM
This Article will show how to read csv file which do not have header information as the first row. We will then specify the schema for both DataFrames and then join them together.
import org.apache.spark.sql.types._
val pathA = "hdfs:/tpc-ds/data/store_sales"
val pathB = "hdfs:/tpc-ds/data/store/"
// For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)
val A_df = sqlContext.read.format("com.databricks.spark.csv")
.option("header","false")
.option("inferSchema","false")
.option("delimiter","|")
.load(pathA)
// Assign column names to the Store Sales dataframe
val storeSalesDF = A_df.select(
A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
)
val B_df = sqlContext.read.format("com.databricks.spark.csv")
.option("header","false")
.option("inferSchema","false")
.option("delimiter","|")
.load(pathB)
// Assign column names to the Region dataframe
val storeDF = B_df.select(
B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
B_df("_c1").cast(StringType).as("S_STORE_ID")
B_df("_c5").cast(StringType).as("S_STORE_NAME")
)
val joinedDF = storeSalesDF.join(storeDF,
storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
)
joinedDF.take(5)
... View more
Labels:
07-14-2017
01:24 AM
Where is data located? Hive, HDFS, can you share your cluster specs, how many nodes (# of cores / RAM)
... View more
07-12-2017
07:28 PM
@Srinivasarao Daruna, can you give details of the HW and Spark config
... View more
07-12-2017
07:26 PM
Have you tried Caching the tables (subset) before executing the queries? Keep in mind, when doing caching on a DataFrame it is Lazy caching which means it will only cache what rows are used in the next processing event. So if you do a query on that DataFrame and only scan 100 rows, those will only be cached, not the entire table. If you do CACHE TABLE MyTableName in SQL though, it is defaulted to be eager caching and will cache the entire table. You can choose LAZY caching in SQL like so: CACHE LAZY TABLE Sales_Data_1998
... View more
06-29-2017
05:48 PM
import re
from pyspark.sql import Row
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'
# Returns a dictionary containing the parts of the Apache Access Log.
def parse_apache_log_line(logline):
match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
if match is None:
# Optionally, you can change this to just ignore if each line of data is not critical.
# For this example, we want to ensure that the format is consistent.
raise Exception("Invalid logline: %s" % logline)
return Row(
ipAddress = match.group(1),
clientIdentd = match.group(2),
userId = match.group(3),
dateTime = match.group(4),
method = match.group(5),
endpoint = match.group(6),
protocol = match.group(7),
responseCode = int(match.group(8)),
contentSize = long(match.group(9)))
log_files = "hdfs://dataset/apache_logs/"
raw_log_files = sc.textFile(log_files)
raw_log_files.count()
parsed_log_files = raw_log_files.map(parse_apache_log_line)
parsed_log_files.toDF().registerTempTable("log_data")
%scala
// HELPER FUNCTION - Register ParseDate UDF to use with queries later
def parseDate(rawDate:String):Long = {
val dtParser = new java.text.SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss")
val splitted = rawDate.split(" ")
val futureDt = splitted(0)
val offset = splitted(1).asInstanceOf[String].toLong
val hourOffset = (offset.toInt / 100)
val minuteOffset = (offset - hourOffset * 100).toInt
val totalOffset = hourOffset * 60 * 60 + minuteOffset * 60
(dtParser.parse(futureDt).getTime() / 1000) + totalOffset
}
val example = "21/Jun/2014:10:00:00 -0730"
parseDate(example)
sqlContext.udf.register("parseDate", parseDate(_:String):Long)
%sql
-- Used the parsed logs and the date helper UDF to execute SQL Query
select responseCode, ipAddress, to_date(cast(parseDate(dateTime) as timestamp)) as date, count(*) as NoOfRequests, sum(contentSize) as TotalContentSize
from log_data
group by responseCode,ipAddress,to_date(cast(parseDate(dateTime) as timestamp))
order by count(*) desc
... View more
Labels:
06-19-2017
06:35 PM
2 Kudos
Hi Abhi, Below tutorial from Micheal goes through how to modify json using ReplaceText and/or updateAttribute Processor for twitter data. https://community.hortonworks.com/articles/57803/using-nifi-gettwitter-updateattributes-and-replace.html Regards, Muji
... View more