Member since
02-24-2016
9
Posts
4
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2851 | 01-17-2017 08:36 PM |
05-18-2017
08:50 PM
1 Kudo
Background
Hadoop and its ecosystem projects are distributed systems. Hadoop provides foundational services,such as distributed file system, cluster wide resource management, that many other projects (e.g OOzie, MR Jobs, Tez, Hive Queries, Spark Jobs) leverage.
With this layered approach a Hive Query, or a Spark job leads to sequence of operations that span multiple systems. For example, a Spark job may leverage YARN to get containers to run, it may also read from HDFS.
Tracing and debugging problems in distributed systems is hard since it involves tracing call across many systems.
Hadoop resolved this problem. Hadoop has implemented a feature of log tracing – caller context (HDFS-9184 andYARN-4349). It can help users to better diagnose and understand how specific applications impact parts of the Hadoop system and potential problems they may be creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184 , for a given HDFS operation, it's very helpful to track which upper level job issues it. The upper level callers may be specific Oozie tasks, MR jobs, hive queries, or Spark jobs.
Hadoop ecosystems like MapReduce, Tez (TEZ-2851), Hive (HIVE-12249, HIVE-12254) and Pig(PIG-4714) have implemented their caller contexts. Those systems invoke HDFS/YARN APIs to set up their caller contexts, and also expose an API to pass in caller context.
We have now implemented in Spark the ability to provide its caller context via invoking Hadoop APIs. We have also exposed an API for Spark applications to set up their caller contexts. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into YARN RM log and HDFS audit log. This allow the ability to trace the logs from Spark down to YARN & HDFS resources. Usage
Two ways to configure `spark.log.callerContext` property:
In spark-defaults.conf:
spark.log.callerContext infoSpecifiedByUpstreamApp
or:
In app's source code (only works for client mode):
val spark = SparkSession
.builder
.appName("SparkKMeans")
.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")
.getOrCreate()
When running in Spark YARN cluster mode, the driver is unable to pass 'spark.log.callerContext' to YARN client and AM since YARN client and AM have already started before the driver performs .config("spark.log.callerContext", "infoSpecifiedByUpstreamApp"). So for YARN cluster mode, users should configure `spark.log.callerContext` property in spark-defaults.conf instead of in app’s source code. Example
The following example shows the command line used to submit a SparkKMeans application and the corresponding records in YARN RM log and HDFS audit log. Command line:
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5 Spark caller contexts written into YARN RM log and HDFS audit log: Yarn RM log:
…
2017-02-08 16:04:14,856 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=wyang IP=127.0.0.1 OPERATION=Submit Application Request TARGET=ClientRMService RESULT=SUCCESSAPPID=application_1486594207699_0011 CALLERCONTEXT=SPARK_CLIENT_application_1486594207699_0011_infoSpecifiedByUpstreamApp HDFS audit log:
…
2017-02-08 16:04:14,806 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=getfileinfo src=/.sparkStaging/application_1486594207699_0011/__spark_conf__.zip dst=null perm=null proto=rpc callerContext=SPARK_CLIENT_application_1486594207699_0011_infoSpecifiedByUpstreamApp
2017-02-08 16:04:14,807 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=getfileinfo src=/.sparkStaging/application_1486594207699_0011/__spark_conf__.zip dst=null perm=null proto=rpc callerContext=SPARK_CLIENT_application_1486594207699_0011_infoSpecifiedByUpstreamApp
…
2017-02-08 16:04:22,725 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=getfileinfo src=/spark-history dst=null perm=null proto=rpc callerContext=SPARK_APPMASTER_application_1486594207699_0011_1_infoSpecifiedByUpstreamApp
2017-02-08 16:04:22,742 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=create src=/spark-history/application_1486594207699_0011_1.lz4.inprogress dst=null perm=wyang:supergroup:rw-r--r-- proto=rpc callerContext=SPARK_APPMASTER_application_1486594207699_0011_1_infoSpecifiedByUpstreamApp
…
2017-02-08 16:04:29,671 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_application_1486594207699_0011_1_JId_0_SId_0_0_TId_1_0_infoSpecifiedByUpstreamApp
2017-02-08 16:04:29,672 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_application_1486594207699_0011_1_JId_0_SId_0_0_TId_0_0_infoSpecifiedByUpstreamApp
2017-02-08 16:04:29,672 INFO FSNamesystem.audit: allowed=true ugi=wyang (auth:SIMPLE)ip=/127.0.0.1 cmd=open src=/lr_big.txt dst=null perm=null proto=rpc callerContext=SPARK_TASK_application_1486594207699_0011_1_JId_0_SId_0_0_TId_2_0_infoSpecifiedByUpstreamApp
... View more
Labels:
01-01-2017
08:24 AM
1 Kudo
The Apache Spark - Apache HBase Connector (SHC) is a library to support Spark accessing HBase table as external data source or sink. It provides high performance HBase access via SparkSQL and DataFrames. SHC implements the standard Spark data source APIs, and leverages the Spark catalyst engine for query optimization. It bridges the gap between the simple HBase Key Value store and complex relational SQL queries and enables users to perform complex data analytics on top of HBase using Spark. With the data frame support, SHC leverages all the optimization techniques in catalyst, and achieves data locality, partition pruning, predicate pushdown, Scanning and BulkGet, etc. For the detailed information, please refer the README in SHC github, which is kept up-to-date.
... View more
Labels:
12-20-2016
08:51 PM
'%1s' is a java format specifier that I use to dynamically add the HBase table name in the catalog for testing purpose. The other issue (multiple columns) you mentioned is not applicable in my case since I don't create a new table using the option 'HBaseTableCatalog.newTable -> “5”'. I use an existing pre split table. This is the code from 'Utils$.toBytes()'. If one of the fields in the Data Frame is Null, the method toBytes() gets the first argument 'input' as Null and Null is not instance of anything. So, it eventually goes to 'label323' and throws that error. So, the only workaround at this stage is to remove the null fields from the data frame or to populate them with something which is not always feasible. public byte[] toBytes(Object input, Field field) { if (field.schema().isDefined()); Object record; Object localObject1 = input; Object localObject2; if (localObject1 instanceof Boolean) { boolean bool = BoxesRunTime.unboxToBoolean(localObject1); localObject2 = Bytes.toBytes(bool); } else if (localObject1 instanceof Byte) { int i = BoxesRunTime.unboxToByte(localObject1); localObject2 = new byte[] { i }; } else if (localObject1 instanceof byte[]) { byte[] arrayOfByte = (byte[])localObject1; localObject2 = arrayOfByte; } else if (localObject1 instanceof Double) { double d = BoxesRunTime.unboxToDouble(localObject1); localObject2 = Bytes.toBytes(d); } else if (localObject1 instanceof Float) { float f = BoxesRunTime.unboxToFloat(localObject1); localObject2 = Bytes.toBytes(f); } else if (localObject1 instanceof Integer) { int j = BoxesRunTime.unboxToInt(localObject1); localObject2 = Bytes.toBytes(j); } else if (localObject1 instanceof Long) { long l = BoxesRunTime.unboxToLong(localObject1); localObject2 = Bytes.toBytes(l); } else if (localObject1 instanceof Short) { short s = BoxesRunTime.unboxToShort(localObject1); localObject2 = Bytes.toBytes(s); } else if (localObject1 instanceof UTF8String) { UTF8String localUTF8String = (UTF8String)localObject1; localObject2 = localUTF8String.getBytes(); } else { if (!(localObject1 instanceof String)) break label323; String str = (String)localObject1; localObject2 = Bytes.toBytes(str); } return ((record = field.catalystToAvro().apply(input)) ? AvroSedes..MODULE$.serialize(record, (Schema)field.schema().get()) : (field.sedes().isDefined()) ? ((Sedes)field.sedes().get()).serialize(input) : localObject2); label323: throw new Exception(new StringContext(Predef..MODULE$.wrapRefArray((Object[])new String[] { "unsupported data type ", "" })).s(Predef..MODULE$.genericWrapArray(new Object[] { field.dt() }))); }
... View more