Reply
New Contributor
Posts: 2
Registered: ‎12-04-2017

Kudu is failing when loading data using Envelope

[ Edited ]

Versions

------------

Kudu 1.5.0

Envelope 0.4.0

 

Stack Trace:

----------------

java.lang.NullPointerException
at org.apache.kudu.client.KuduPredicate.newInListPredicate(KuduPredicate.java:435)
at com.cloudera.labs.envelope.output.KuduOutput.scannerForFilters(KuduOutput.java:272)
at com.cloudera.labs.envelope.output.KuduOutput.getExistingForFilters(KuduOutput.java:126)
at com.cloudera.labs.envelope.run.DataStep$JoinExistingForKeysFunction.call(DataStep.java:460)
at com.cloudera.labs.envelope.run.DataStep$JoinExistingForKeysFunction.call(DataStep.java:419)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

 

Envelope Properties:

---------------------------

application {
name = Load customers
}
steps {
load_customers {
input {
type = hive
table = sfmta_raw
}
planner {
type = bitemporal
fields.key = [report_time, vehicle_tag]
fields.values = [speed]
field.timestamp = report_time
field.event.time.effective.from = as_of_start_date
field.event.time.effective.to = as_of_end_date
field.system.time.effective.from = system_start_ts
field.system.time.effective.to = system_end_ts
field.current.flag = current_flag
}
output {
type = kudu
connection = "quickstart.cloudera:7051"
table.name = "impala::default.sfmta1"
}
}
}

 

Table DDLs:

----------------

CREATE TABLE sfmta
PRIMARY KEY (report_time, vehicle_tag)
PARTITION BY HASH(report_time) PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'quickstart.cloudera:7051',
'kudu.num_tablet_replicas' = '1'
)
AS SELECT
UNIX_TIMESTAMP(report_time, 'MM/dd/yyyy HH:mm:ss') AS report_time,
vehicle_tag,
longitude,
latitude,
speed,
heading
FROM sfmta_raw;

CREATE TABLE sfmta1(
report_time string,
vehicle_tag int,
as_of_start_date BIGINT,
system_start_ts BIGINT,
revision int,
longitude float,
latitude float,
speed float,
heading float,
as_of_end_date BIGINT,
system_end_ts BIGINT,
current_flag STRING,
PRIMARY KEY (report_time, vehicle_tag, as_of_start_date , system_start_ts )
)
PARTITION BY HASH(report_time) PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'quickstart.cloudera:7051',
'kudu.num_tablet_replicas' = '1'
);

 

Cloudera Employee
Posts: 23
Registered: ‎08-26-2015

Re: Kudu is failing when loading data using Envelope

Hello,

 

The immediate issue here is that unfortunately you've hit a bug where the natural key of the incoming records has multiple fields (defined by 'fields.key') when using Kudu with the bitemporal planner.

 

But, from looking at your data model, it seems like your natural key is just 'vehicle_tag'. If you want to maintain the history of a 'vehicle_tag' over time (where each version in that history is for a different 'record_time') then 'fields.key' should just be '[vehicle_tag]', and 'report_time' should not be in the primary key of the Kudu table. Can you try that change?

 

Jeremy

New Contributor
Posts: 2
Registered: ‎12-04-2017

Re: Kudu is failing when loading data using Envelope

Thanks Jeremy for the quick response.

I have tried the your suggestion but its failing with different exception now.

 

Stack Trace

----------------

17/12/05 08:52:29 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, quickstart.cloudera, executor 1): java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at com.cloudera.labs.envelope.utils.RowUtils.compareTimestamp(RowUtils.java:791)
at com.cloudera.labs.envelope.plan.BitemporalHistoryPlanner$ArrivingTimestampComparator.compare(BitemporalHistoryPlanner.java:406)
at com.cloudera.labs.envelope.plan.BitemporalHistoryPlanner$ArrivingTimestampComparator.compare(BitemporalHistoryPlanner.java:397)
at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
at java.util.TimSort.sort(TimSort.java:234)
at java.util.Arrays.sort(Arrays.java:1512)
at java.util.ArrayList.sort(ArrayList.java:1460)
at java.util.Collections.sort(Collections.java:175)
at com.cloudera.labs.envelope.plan.BitemporalHistoryPlanner.planMutationsForKey(BitemporalHistoryPlanner.java:106)
at com.cloudera.labs.envelope.run.DataStep$PlanForKeyFunction.call(DataStep.java:558)
at com.cloudera.labs.envelope.run.DataStep$PlanForKeyFunction.call(DataStep.java:530)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at com.google.common.collect.Lists.newArrayList(Lists.java:138)
at com.cloudera.labs.envelope.run.DataStep$ApplyMutationsForPartitionFunction.call(DataStep.java:593)
at com.cloudera.labs.envelope.run.DataStep$ApplyMutationsForPartitionFunction.call(DataStep.java:571)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

 

Envelope Config

----------------------

application {
name = Load customers
}
steps {
load_customers {
input {
type = hive
table = sfmta_raw
}
planner {
type = bitemporal
fields.key = [vehicle_tag]
fields.values = [speed]
field.timestamp = report_time
field.event.time.effective.from = as_of_start_date
field.event.time.effective.to = as_of_end_date
field.system.time.effective.from = system_start_ts
field.system.time.effective.to = system_end_ts
field.current.flag = current_flag
}
output {
type = kudu
connection = "quickstart.cloudera:7051"
table.name = "impala::default.sfmta1"
}
}
}

 

New Table DDL

---------------------


CREATE TABLE sfmta1(
vehicle_tag int,
as_of_start_date BIGINT,
system_start_ts BIGINT,
report_time string,
revision int,
longitude float,
latitude float,
speed float,
heading float,
as_of_end_date BIGINT,
system_end_ts BIGINT,
current_flag STRING,
PRIMARY KEY (vehicle_tag, as_of_start_date , system_start_ts )
)
PARTITION BY HASH(vehicle_tag) PARTITIONS 8
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'quickstart.cloudera:7051',
'kudu.num_tablet_replicas' = '1'
);

Highlighted
Cloudera Employee
Posts: 23
Registered: ‎08-26-2015

Re: Kudu is failing when loading data using Envelope

It looks like you made the recommended change correctly, but you have now run into the current limitation that when using the bitemporal planner that timestamp fields must be the BIGINT data type. From your first post it looks like you have already done that in the 'sfmta' table, so maybe you could input from there instead? If you need to use 'sfmta_raw' then you could use a SQL deriver step to run that same 'UNIX_TIMESTAMP' logic within the Envelope pipeline.

 

In the next release we will have more data types and encodings you can select to represent timestamps, such as formatted strings, and also the ability to plug in your own custom timestamp-encoding logic.

Announcements

Currently incubating in Cloudera Labs:

Envelope
HTrace
Ibis
Impyla
Livy
Oryx
Phoenix
Spark Runner for Beam SDK
Time Series for Spark
YCSB