Member since 
    
	
		
		
		10-07-2015
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                107
            
            
                Posts
            
        
                73
            
            
                Kudos Received
            
        
                23
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 3220 | 02-23-2017 04:57 PM | |
| 2559 | 12-08-2016 09:55 AM | |
| 10034 | 11-24-2016 07:24 PM | |
| 4846 | 11-24-2016 02:17 PM | |
| 10307 | 11-24-2016 09:50 AM | 
			
    
	
		
		
		05-03-2017
	
		
		02:09 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Maybe check whether you can access WebHDFS via Knox to see if your kinit user is accepted by Knox 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-12-2017
	
		
		08:09 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 iris = spark.read.csv("/tmp/iris.csv", header=True, inferSchema=True) 
iris.printSchema()
  
	Result: 
 root 
|-- sepalLength: double (nullable = true)
|-- sepalWidth: double (nullable = true)
|-- petalLength: double (nullable = true)
|-- petalWidth: double (nullable = true)
|-- species: string (nullable = true)
  Write parquet file ...  iris.write.parquet("/tmp/iris.parquet")
  ... and create hive table  spark.sql("""
create external table iris_p (
    sepalLength double,
    sepalWidth double,
    petalLength double,
    petalWidth double,
    species string
)
STORED AS PARQUET
location "/tmp/iris.parquet"
""")
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		03-24-2017
	
		
		08:11 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hard to say from the info you gave.   Since you load the data into a DataFrame, the Oracle part should be abstracted, as long as the schema fits - and I guess you checked the schema of "tran1"  You could try to select from tran1 into another dataframe trans2 to control all columns, check schema and try to write tran2 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		03-22-2017
	
		
		04:33 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hive:  create table tran_spark_part (
  id String,
  amount BigInt
)
partitioned by (date1 string);  This works in Spark (tested on 1.6.2):  > case class Person(id: String, amount: Integer, date1: String)
> val df = Seq(Person("1", 32, "2017")).toDF()
> sqlContext.setConf("hive.exec.dynamic.partition", "true") 
> sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
> df.write.mode("overwrite").partitionBy("date1").insertInto("tran_spark_part")
> sqlContext.sql("select * from tran_spark_part").show()
+---+------+-----+
| id|amount|date1|
+---+------+-----+
|  1|    32| 2017|
+---+------+-----+
  This doesn't:  > case class Person2(id: String, amount: Integer, t_date: String)
> val df2 = Seq(Person("2", 42, "2017")).toDF()
> df2.write.mode("overwrite").partitionBy("t_date").insertInto("tran_part")
org.apache.spark.sql.AnalysisException: Partition column t_date not found in existing columns (id, amount, date1); 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		03-21-2017
	
		
		11:13 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 partitionBy uses column names.   Hive table has "date1" and in Spark "t_date" is used.  Have you tried to rename the dataframe column to date1 so that it matches the Hive schema? 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-24-2017
	
		
		09:30 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 
	Assumption: all files have the same columns and in each file the first line is the header  
	This is a solution in PySpark  
	I load every file via "com.databricks.spark.csv" class respecting header and inferring schema  
	Then I use python reduce to union them all 
 from functools import reduce
files = ["/tmp/test_1.csv", "/tmp/test_2.csv", "/tmp/test_3.csv"]
df = reduce(lambda x,y: x.unionAll(y), 
            [sqlContext.read.format('com.databricks.spark.csv')
                       .load(f, header="true", inferSchema="true") 
             for f in files])
df.show()
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-24-2017
	
		
		09:13 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Try "explode":  import org.apache.spark.sql.functions.{udf, array, explode, col}
case class Result ( date: String, usage: Double )
def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) =>
    if (datediff == 32) {
        val date = new DateTime(format.parse(startdate))
        (for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()), 
                                               usage.toDouble / datediff.toDouble)).toArray
    } else {
        Array(Result(startdate, usage.toDouble))
    }
}
val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage"))
val df3 = df2.select($"*", explode($"dayusage"))
val result = df3.select($"Id", $"startdate", $"enddate", $"datediff", $"did", 
                        col("col")("date").alias("date"), col("col")("usage").alias("usage")) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-24-2017
	
		
		07:33 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Fine grained permissions (row level, column masking, ...) are created in Ranger for any Hive table - whether created by HiveQL or SparkQL  So if you create a new table in Hive via SparkSQL that should be used by others with access control, you need to create the appropriate policies afterwards in Ranger.  For less fine grained permissions (delete update, insert delete) you can also use the SQL commands of https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization#SQLStandardBasedHiveAuthorization-ManagingObjectPrivileges with SparkSQL 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-23-2017
	
		
		04:57 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 For Hive one would use Apache Ranger for this. You can allow or deny access to tables, columns and even rows.  Now, what to do with Spark:  For the normal HiveContext Spark would read the Schema from Metastore and then read the the file directly from HDFS. So no Hive Ranger plugin would kick in.  However, with LLAP it will be possible, see e.g. https://hortonworks.com/blog/sparksql-ranger-llap-via-spark-thrift-server-bi-scenarios-provide-row-column-level-security-masking/ If you additionally disable HDFS access for "others" for Hive tables, data is access controlled 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-23-2017
	
		
		04:33 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 You could also do in the Spark code:  import org.apache.log4j.{Level, Logger}
def main(args: Array[String]) = {
Logger.getRootLogger.setLevel(Level.ERROR)
var conf = new SparkConf().setAppName("KafkaToHdfs")
val sc = new SparkContext(conf) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
        













