Member since
10-07-2015
107
Posts
73
Kudos Received
23
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2549 | 02-23-2017 04:57 PM | |
1994 | 12-08-2016 09:55 AM | |
8911 | 11-24-2016 07:24 PM | |
3970 | 11-24-2016 02:17 PM | |
9347 | 11-24-2016 09:50 AM |
05-03-2017
02:09 PM
Maybe check whether you can access WebHDFS via Knox to see if your kinit user is accepted by Knox
... View more
04-12-2017
08:09 AM
iris = spark.read.csv("/tmp/iris.csv", header=True, inferSchema=True)
iris.printSchema()
Result:
root
|-- sepalLength: double (nullable = true)
|-- sepalWidth: double (nullable = true)
|-- petalLength: double (nullable = true)
|-- petalWidth: double (nullable = true)
|-- species: string (nullable = true)
Write parquet file ... iris.write.parquet("/tmp/iris.parquet")
... and create hive table spark.sql("""
create external table iris_p (
sepalLength double,
sepalWidth double,
petalLength double,
petalWidth double,
species string
)
STORED AS PARQUET
location "/tmp/iris.parquet"
""")
... View more
03-24-2017
08:11 AM
Hard to say from the info you gave. Since you load the data into a DataFrame, the Oracle part should be abstracted, as long as the schema fits - and I guess you checked the schema of "tran1" You could try to select from tran1 into another dataframe trans2 to control all columns, check schema and try to write tran2
... View more
03-22-2017
04:33 PM
Hive: create table tran_spark_part (
id String,
amount BigInt
)
partitioned by (date1 string); This works in Spark (tested on 1.6.2): > case class Person(id: String, amount: Integer, date1: String)
> val df = Seq(Person("1", 32, "2017")).toDF()
> sqlContext.setConf("hive.exec.dynamic.partition", "true")
> sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
> df.write.mode("overwrite").partitionBy("date1").insertInto("tran_spark_part")
> sqlContext.sql("select * from tran_spark_part").show()
+---+------+-----+
| id|amount|date1|
+---+------+-----+
| 1| 32| 2017|
+---+------+-----+
This doesn't: > case class Person2(id: String, amount: Integer, t_date: String)
> val df2 = Seq(Person("2", 42, "2017")).toDF()
> df2.write.mode("overwrite").partitionBy("t_date").insertInto("tran_part")
org.apache.spark.sql.AnalysisException: Partition column t_date not found in existing columns (id, amount, date1);
... View more
03-21-2017
11:13 AM
partitionBy uses column names. Hive table has "date1" and in Spark "t_date" is used. Have you tried to rename the dataframe column to date1 so that it matches the Hive schema?
... View more
02-24-2017
09:30 AM
2 Kudos
Assumption: all files have the same columns and in each file the first line is the header
This is a solution in PySpark
I load every file via "com.databricks.spark.csv" class respecting header and inferring schema
Then I use python reduce to union them all
from functools import reduce
files = ["/tmp/test_1.csv", "/tmp/test_2.csv", "/tmp/test_3.csv"]
df = reduce(lambda x,y: x.unionAll(y),
[sqlContext.read.format('com.databricks.spark.csv')
.load(f, header="true", inferSchema="true")
for f in files])
df.show()
... View more
02-24-2017
09:13 AM
Try "explode": import org.apache.spark.sql.functions.{udf, array, explode, col}
case class Result ( date: String, usage: Double )
def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) =>
if (datediff == 32) {
val date = new DateTime(format.parse(startdate))
(for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()),
usage.toDouble / datediff.toDouble)).toArray
} else {
Array(Result(startdate, usage.toDouble))
}
}
val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage"))
val df3 = df2.select($"*", explode($"dayusage"))
val result = df3.select($"Id", $"startdate", $"enddate", $"datediff", $"did",
col("col")("date").alias("date"), col("col")("usage").alias("usage"))
... View more
02-24-2017
07:33 AM
Fine grained permissions (row level, column masking, ...) are created in Ranger for any Hive table - whether created by HiveQL or SparkQL So if you create a new table in Hive via SparkSQL that should be used by others with access control, you need to create the appropriate policies afterwards in Ranger. For less fine grained permissions (delete update, insert delete) you can also use the SQL commands of https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization#SQLStandardBasedHiveAuthorization-ManagingObjectPrivileges with SparkSQL
... View more
02-23-2017
04:57 PM
1 Kudo
For Hive one would use Apache Ranger for this. You can allow or deny access to tables, columns and even rows. Now, what to do with Spark: For the normal HiveContext Spark would read the Schema from Metastore and then read the the file directly from HDFS. So no Hive Ranger plugin would kick in. However, with LLAP it will be possible, see e.g. https://hortonworks.com/blog/sparksql-ranger-llap-via-spark-thrift-server-bi-scenarios-provide-row-column-level-security-masking/ If you additionally disable HDFS access for "others" for Hive tables, data is access controlled
... View more
02-23-2017
04:33 PM
You could also do in the Spark code: import org.apache.log4j.{Level, Logger}
def main(args: Array[String]) = {
Logger.getRootLogger.setLevel(Level.ERROR)
var conf = new SparkConf().setAppName("KafkaToHdfs")
val sc = new SparkContext(conf)
... View more