Member since
07-01-2015
460
Posts
78
Kudos Received
43
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1346 | 11-26-2019 11:47 PM | |
1304 | 11-25-2019 11:44 AM | |
9471 | 08-07-2019 12:48 AM | |
2175 | 04-17-2019 03:09 AM | |
3484 | 02-18-2019 12:23 AM |
01-16-2017
01:35 AM
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,TimestampType,LongType,DoubleType,DataType}
import org.apache.spark.sql.Row
import java.io.File
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.hive.HiveContext
object SQLHiveContextSingleton {
@transient private var instance: HiveContext = _
def getInstance(sparkContext: SparkContext): HiveContext = {
synchronized {
if (instance == null ) {
instance = new HiveContext(sparkContext)
}
instance
}
}
}
......
val mydataSplitted = mydataDStream.map( .... )
// saving the content of mydataSplitted dstream of RDD in Hive table
mydataSplitted.foreachRDD( rdd => {
println("Processing mydata RDD")
val sqlContext = SQLHiveContextSingleton.getInstance( rdd.sparkContext )
val mydataDF = sqlContext.createDataFrame( rdd, mydataStruct )
mydataDF.registerTempTable("mydata")
val mydataTrgPart = sqlContext.sql(mydataSQL)
sqlContext.sql("SET hive.exec.dynamic.partition = true;")
sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;")
mydataTrgPart.write.mode(SaveMode.Append).partitionBy(partCol).saveAsTable(mydataTable)
} )
... View more
01-12-2017
07:06 AM
Probably the problem will be that the CDH has old version of Sentry (sentry-1.5.1+cdh5.9.0+261). The Kafka broker needs a sentry-binding-kafka.jar file, which should be available in Sentry 1.7. "Starting from 1.7.0 release, Apache Sentry has Kafka binding that can be used to enable authorization in Apache Kafka with Apache Sentry." Confirmed here> https://cwiki.apache.org/confluence/display/SENTRY/Apache+Kafka+Authorization+with+Apache+Sentry
... View more
01-12-2017
05:42 AM
Hi, on CDH 5.9.0 (from parcel 5.9.0-1.cdh5.9.0.p0.23) tried to enable Kafka from parcel 2.0.2-1.2.0.2.p0.5. Kafka brokers cannot start up, because the class SentryKafkaAuthorizer is missing. I tried to copy Sentry JARs to Kafka lib directory under /opt/cloudera/parcels/KAFKA/lib/kafka/libs/ but it did not helped. The Kafka CM configuration seems to be ok, Sentry dependency is set, in /etc/kafka/conf a sentry configuration is there. Does anybody have an idea what could cause the problem? Thanks Tomas Fatal error during KafkaServerStartable startup. Prepare to shutdown java.lang.ClassNotFoundException: org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
... View more
Labels:
- Labels:
-
Apache Kafka
01-04-2017
02:49 AM
I would go directly.
... View more
11-21-2016
11:58 PM
1 Kudo
Hi Harsh, I had the same issue, put the -Dzookeeper.skipACL=true option to Java Configuration Options for Zookeeper Server restarted the Zookeeper service, but still having this error message: [zk: localhost:2181(CONNECTED) 0] rmr /hbase Authentication is not valid : /hbase/backup-masters Tomas
... View more
10-05-2016
02:34 AM
1 Kudo
Update: the explain function on the problematic query fails: explain insert into tmp.tab partition (day_id)
select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID
from ( select count(*) as row_count, max( file_name ) as fn from tmp.tab group by file_name ) t;
Query: explain insert into tmp.tab partition (day_id)
select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID
from ( select count(*) as row_count, max( file_name ) as fn from tmp.tab group by file_name ) t
ERROR: IllegalStateException: null but the explain on the working query suceeds: explain insert into tmp.tab partition (day_id)
> select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID
> from ( select rc as ROW_COUNT, file_name as fn from tmp.tab ) t;
Query: explain insert into tmp.tab partition (day_id)
select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID
from ( select rc as ROW_COUNT, file_name as fn from tmp.tab ) t
+--------------------------------------------------------------------------------------------------------------------------+
| Explain String |
+--------------------------------------------------------------------------------------------------------------------------+
| WARNING: The following tables are missing relevant table and/or column statistics. |
| tmp.tab |
| |
| WRITE TO HDFS [tmp.tab, OVERWRITE=false, PARTITION-KEYS=(CAST(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') AS INT))] |
| | partitions=1 |
| | |
| 00:SCAN HDFS [tmp.tab] |
| partitions=0/0 files=0 size=0B |
+--------------------------------------------------------------------------------------------------------------------------+
Fetched 8 row(s) in 0.03s I noticed the warning about missing stats on empty table, so I did compute stats on tm,p.tab This helped, so the problem is solved! Query: explain insert into tmp.tab partition (day_id)
select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID
from ( select count(*) as row_count, max( file_name ) as fn from tmp.tab group by file_name ) t
+--------------------------------------------------------------------------------------------------------------------------+
| Explain String |
+--------------------------------------------------------------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=0B VCores=0 |
| |
| WRITE TO HDFS [tmp.tab, OVERWRITE=false, PARTITION-KEYS=(CAST(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') AS INT))] |
| | partitions=1 |
| | |
| 01:AGGREGATE [FINALIZE] |
| | output: count(*), max(file_name) |
| | group by: file_name |
| | |
| 00:SCAN HDFS [tmp.tab] |
| partitions=0/0 files=0 size=0B |
+--------------------------------------------------------------------------------------------------------------------------+
Fetched 11 row(s) in 0.03s
... View more
10-04-2016
02:50 AM
Hi, I am inserting a result of simple aggregation and my query fails when the result of the aggregation is an empty record set. The results are inserted into a partitioned table. The insert fails with this message: ERROR: IllegalStateException: null. The query which should prepare the data is containing aggregations. I thought the error is because the query does not return any row, but if I change the subquery from GROUP by to a simple select, then it works, and Inserts 0 rows. create table tmp.tab ( file_name string, rc int ) partitioned by ( day_id int ) stored as parquet; -- this fails: insert into tmp.tab partition (day_id) select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID from ( select count(*) as row_count, max( file_name ) as fn from tmp.tab group by file_name ) t; -- this works: insert into tmp.tab partition (day_id) select t.fn, cast( t.ROW_COUNT as int ), cast(from_unixtime(unix_timestamp(now()), 'yyyyMMdd') as int) as DAY_ID from ( select rc as ROW_COUNT, file_name as fn from tmp.tab ) t; Using Impala 2.2.0-cdh5.4.8 Tomas
... View more
Labels:
- Labels:
-
Apache Impala
08-17-2016
01:13 AM
Yes, you are right, there has to be a explicit grant on that URI, not just a HDFS access to the given directory. I don't understand why the documentation do not explain it more clearly.. Thanks
... View more
08-09-2016
10:04 AM
Toad for Hadoop.
... View more
08-09-2016
07:31 AM
Yes, it works with kerberized cluster.
... View more