Reply
New Contributor
Posts: 5
Registered: ‎08-28-2018
Accepted Solution

Envelope:UDF in yarn mode not work,and get error

SQL:select 'tb_user_info' as tableName,count(1) as totalNum,statistics(result) as statis,DATE_FORMAT(register_time,'%Y-%m') as date,home_city as city from jdbcProcess group by DATE_FORMAT(register_time,'%Y-%m'),home_city,statis

Where statistics is a custom udf method,When I run the program in local mode, there is no error, as shown below:

=== Result of Batch Resolution ===
!'Aggregate ['DATE_FORMAT('register_time, %Y-%m), 'home_city, 'stat], [tb_user_info AS tableName#118, 'count(1) AS totalNum#119, 'statistics('result) AS stat#120, 'DATE_FORMAT('register_time, %Y-%m) AS date#121, 'home_city AS city#122] Aggregate [date_format(register_time#67, %Y-%m, Some(Asia/Shanghai)), home_city#64, UDF(result#68) AS stat#120], [tb_user_info AS tableName#118, count(1) AS totalNum#119L, UDF(result#68) AS stat#120, date_format(register_time#67, %Y-%m, Some(Asia/Shanghai)) AS date#121, home_city#64 AS city#122]
!+- 'UnresolvedRelation jdbcProcess +- SubqueryAlias jdbcprocess
! +- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#60, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#61, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#62, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#63, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#64, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#65, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#66, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#67, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#68, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#69]
! +- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@1d921d, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#59: org.apache.spark.sql.Row
! +- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#58: org.apache.spark.sql.Row
! +- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
! +- Filter (age#5 > 10)
! +- SubqueryAlias jdbcinput
!
But when I used the yarn mode to run the program, I reported the following error.

18/08/28 22:56:59 INFO SparkSqlParser: Parsing command: statsProcess3
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisException: cannot resolve 'statis' given input columns: [jobId, age, birthday, user_name, user_sex, result, home_city, user_id, register_time, phone]; line 1 pos 217;
'Aggregate [date_format(register_time#47, %Y-%m), home_city#44, 'statis], [tb_user_info AS tableName#98, count(1) AS totalNum#99L, UDF(result#48) AS statis#100, date_format(register_time#47, %Y-%m) AS date#101, home_city#44 AS city#102]
+- SubqueryAlias jdbcprocess
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#40, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#41, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#42, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#43, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#44, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#45, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#46, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#47, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#49]
+- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@4a6d6e4, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#39: org.apache.spark.sql.Row
+- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#38: org.apache.spark.sql.Row
+- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
+- Filter (age#5 > 10)
+- SubqueryAlias jdbcinput
+- Relation[user_id#0,user_name#1,user_sex#2,phone#3,home_city#4,age#5,birthday#6,register_time#7] JDBCRelation(tb_user_info) [numPartitions=1]

at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.cloudera.labs.envelope.run.Runner.awaitAllOffMainThreadsFinished(Runner.java:332)
at com.cloudera.labs.envelope.run.Runner.runBatch(Runner.java:300)
at com.cloudera.labs.envelope.run.Runner.run(Runner.java:93)
at com.cloudera.labs.envelope.EnvelopeMain.main(EnvelopeMain.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 'statis' given input columns: [jobId, age, birthday, user_name, user_sex, result, home_city, user_id, register_time, phone]; line 1 pos 217;
'Aggregate [date_format(register_time#47, %Y-%m), home_city#44, 'statis], [tb_user_info AS tableName#98, count(1) AS totalNum#99L, UDF(result#48) AS statis#100, date_format(register_time#47, %Y-%m) AS date#101, home_city#44 AS city#102]
+- SubqueryAlias jdbcprocess
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#40, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#41, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#42, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#43, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#44, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#45, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#46, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#47, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#49]
+- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@4a6d6e4, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#39: org.apache.spark.sql.Row
+- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#38: org.apache.spark.sql.Row
+- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
+- Filter (age#5 > 10)
+- SubqueryAlias jdbcinput
+- Relation[user_id#0,user_name#1,user_sex#2,phone#3,home_city#4,age#5,birthday#6,register_time#7] JDBCRelation(tb_user_info) [numPartitions=1]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:266)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:276)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:280)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:280)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:285)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:83)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.cloudera.labs.envelope.derive.SQLDeriver.derive(SQLDeriver.java:69)
at com.cloudera.labs.envelope.run.BatchStep.submit(BatchStep.java:84)
at com.cloudera.labs.envelope.run.Runner$2.call(Runner.java:324)
at com.cloudera.labs.envelope.run.Runner$2.call(Runner.java:321)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

This is the java code I registered for udf:

        String name = udfConfig.getString("name");
        String className = udfConfig.getString("class");

        // null third argument means that registerJava will infer the return type
        Contexts.getSparkSession().udf().registerJava(name, className, null);

The UDF function java code is below:

    public class Statistics implements UDF1<String, Integer>, ProvidesAlias {
@Override
public Integer call(String s) {
String[] units = s.split(",", -1);
return units.length;
}

@Override
public String getAlias() {
    return "statistics";
}

}
I HOPE SOMEONE TELL ME WHAT IS THE REASON,THANKS!!

Highlighted
Cloudera Employee
Posts: 34
Registered: ‎08-26-2015

Re: Envelope:UDF in yarn mode not work,and get error

I am not sure why it works in local mode but not YARN mode. Generally with SQL I have found it is not always possible to use column aliases in a GROUP BY, so you might be able to get it to work by changing your query to:

 

select 'tb_user_info' as tableName,count(1) as totalNum,statistics(result) as statis,DATE_FORMAT(register_time,'%Y-%m') as date,home_city as city from jdbcProcess group by DATE_FORMAT(register_time,'%Y-%m'),home_city,statistics(result)
Announcements

Currently incubating in Cloudera Labs:

Envelope
HTrace
Ibis
Impyla
Livy
Oryx
Phoenix
Spark Runner for Beam SDK
Time Series for Spark
YCSB