Cloudera Labs
Provide feedback on Cloudera Labs
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. Want to know more about what has changed? Check out the Community News blog.

Envelope:UDF in yarn mode not work,and get error

SOLVED Go to solution

Envelope:UDF in yarn mode not work,and get error

New Contributor

SQL:select 'tb_user_info' as tableName,count(1) as totalNum,statistics(result) as statis,DATE_FORMAT(register_time,'%Y-%m') as date,home_city as city from jdbcProcess group by DATE_FORMAT(register_time,'%Y-%m'),home_city,statis

Where statistics is a custom udf method,When I run the program in local mode, there is no error, as shown below:

=== Result of Batch Resolution ===
!'Aggregate ['DATE_FORMAT('register_time, %Y-%m), 'home_city, 'stat], [tb_user_info AS tableName#118, 'count(1) AS totalNum#119, 'statistics('result) AS stat#120, 'DATE_FORMAT('register_time, %Y-%m) AS date#121, 'home_city AS city#122] Aggregate [date_format(register_time#67, %Y-%m, Some(Asia/Shanghai)), home_city#64, UDF(result#68) AS stat#120], [tb_user_info AS tableName#118, count(1) AS totalNum#119L, UDF(result#68) AS stat#120, date_format(register_time#67, %Y-%m, Some(Asia/Shanghai)) AS date#121, home_city#64 AS city#122]
!+- 'UnresolvedRelation jdbcProcess +- SubqueryAlias jdbcprocess
! +- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#60, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#61, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#62, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#63, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#64, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#65, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#66, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#67, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#68, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#69]
! +- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@1d921d, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#59: org.apache.spark.sql.Row
! +- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#58: org.apache.spark.sql.Row
! +- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
! +- Filter (age#5 > 10)
! +- SubqueryAlias jdbcinput
!
But when I used the yarn mode to run the program, I reported the following error.

18/08/28 22:56:59 INFO SparkSqlParser: Parsing command: statsProcess3
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisException: cannot resolve 'statis' given input columns: [jobId, age, birthday, user_name, user_sex, result, home_city, user_id, register_time, phone]; line 1 pos 217;
'Aggregate [date_format(register_time#47, %Y-%m), home_city#44, 'statis], [tb_user_info AS tableName#98, count(1) AS totalNum#99L, UDF(result#48) AS statis#100, date_format(register_time#47, %Y-%m) AS date#101, home_city#44 AS city#102]
+- SubqueryAlias jdbcprocess
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#40, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#41, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#42, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#43, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#44, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#45, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#46, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#47, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#49]
+- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@4a6d6e4, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#39: org.apache.spark.sql.Row
+- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#38: org.apache.spark.sql.Row
+- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
+- Filter (age#5 > 10)
+- SubqueryAlias jdbcinput
+- Relation[user_id#0,user_name#1,user_sex#2,phone#3,home_city#4,age#5,birthday#6,register_time#7] JDBCRelation(tb_user_info) [numPartitions=1]

at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.cloudera.labs.envelope.run.Runner.awaitAllOffMainThreadsFinished(Runner.java:332)
at com.cloudera.labs.envelope.run.Runner.runBatch(Runner.java:300)
at com.cloudera.labs.envelope.run.Runner.run(Runner.java:93)
at com.cloudera.labs.envelope.EnvelopeMain.main(EnvelopeMain.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 'statis' given input columns: [jobId, age, birthday, user_name, user_sex, result, home_city, user_id, register_time, phone]; line 1 pos 217;
'Aggregate [date_format(register_time#47, %Y-%m), home_city#44, 'statis], [tb_user_info AS tableName#98, count(1) AS totalNum#99L, UDF(result#48) AS statis#100, date_format(register_time#47, %Y-%m) AS date#101, home_city#44 AS city#102]
+- SubqueryAlias jdbcprocess
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#40, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#41, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#42, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#43, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#44, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#45, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#46, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#47, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#49]
+- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@4a6d6e4, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#39: org.apache.spark.sql.Row
+- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#38: org.apache.spark.sql.Row
+- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
+- Filter (age#5 > 10)
+- SubqueryAlias jdbcinput
+- Relation[user_id#0,user_name#1,user_sex#2,phone#3,home_city#4,age#5,birthday#6,register_time#7] JDBCRelation(tb_user_info) [numPartitions=1]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:266)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:276)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:280)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:280)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:285)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:83)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.cloudera.labs.envelope.derive.SQLDeriver.derive(SQLDeriver.java:69)
at com.cloudera.labs.envelope.run.BatchStep.submit(BatchStep.java:84)
at com.cloudera.labs.envelope.run.Runner$2.call(Runner.java:324)
at com.cloudera.labs.envelope.run.Runner$2.call(Runner.java:321)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

This is the java code I registered for udf:

        String name = udfConfig.getString("name");
        String className = udfConfig.getString("class");

        // null third argument means that registerJava will infer the return type
        Contexts.getSparkSession().udf().registerJava(name, className, null);

The UDF function java code is below:

    public class Statistics implements UDF1<String, Integer>, ProvidesAlias {
@Override
public Integer call(String s) {
String[] units = s.split(",", -1);
return units.length;
}

@Override
public String getAlias() {
    return "statistics";
}

}
I HOPE SOMEONE TELL ME WHAT IS THE REASON,THANKS!!

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Envelope:UDF in yarn mode not work,and get error

Rising Star

I am not sure why it works in local mode but not YARN mode. Generally with SQL I have found it is not always possible to use column aliases in a GROUP BY, so you might be able to get it to work by changing your query to:

 

select 'tb_user_info' as tableName,count(1) as totalNum,statistics(result) as statis,DATE_FORMAT(register_time,'%Y-%m') as date,home_city as city from jdbcProcess group by DATE_FORMAT(register_time,'%Y-%m'),home_city,statistics(result)
1 REPLY 1
Highlighted

Re: Envelope:UDF in yarn mode not work,and get error

Rising Star

I am not sure why it works in local mode but not YARN mode. Generally with SQL I have found it is not always possible to use column aliases in a GROUP BY, so you might be able to get it to work by changing your query to:

 

select 'tb_user_info' as tableName,count(1) as totalNum,statistics(result) as statis,DATE_FORMAT(register_time,'%Y-%m') as date,home_city as city from jdbcProcess group by DATE_FORMAT(register_time,'%Y-%m'),home_city,statistics(result)