Member since 
    
	
		
		
		02-17-2018
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                2
            
            
                Posts
            
        
                0
            
            
                Kudos Received
            
        
                0
            
            
                Solutions
            
        
			
    
	
		
		
		02-27-2018
	
		
		12:46 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 For those who intrested in answer, please refer to  https://stackoverflow.com/questions/48847660/spark-parquet-snappy-overall-compression-ratio-loses-after-spark-shuffles-d 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-17-2018
	
		
		05:29 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Commmunity!     Please help me understand how to get better compression ratio with Spark?     Let me describe case:     1. I have dataset, let's call it product on HDFS which was imported using Sqoop ImportTool as-parquet-file using codec snappy. As result of import, I have 100 files with total 46.4 G du, files with diffrrent size (min 11MB, max 1.5GB, avg ~ 500MB). Total count of records a little bit more than 8 billions with 84 columns     2. I'm doing simple read/repartition/write with Spark using snappy as well and as result I'm getting:     ~100 GB output size with the same files count, same codec, same count and same columns.     Code snippet:    val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")        3. Using parquet-tools I have looked into random files from both ingest and processed and they looks as below:     ingest:     creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1:                    RC:3640100 TS:36454739 OFFSET:4 
AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]
        processed:     creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
extra:                          org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"
AVAILABLE:                      OPTIONAL INT64 R:0 D:1
...
row group 1:                    RC:6660100 TS:243047789 OFFSET:4 
AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]     In other hand, without repartition or using coalesce - size remains close to ingest data size.     4. Going forward, I did following:      - read dataset and write it back with      productDF
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "none")
.parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")     - read dataset, repartition and write it back with   productDF
.repartition(500)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "none")
.parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")  As result: 80 GB without and  283 GB with repartition with same # of output files     80GB parquet meta example:     AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]        283 GB parquet meta example:     AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]        It seems, that parquet itself (with encoding?) much reduce size of data even without uncompressed data. How ? 🙂     I tried to read  uncompressed 80GB, repartition and write back - I've got my 283 GB        The first question for me is why I'm getting bigger size after spark repartitioning/shuffle?     The second is how to efficiently shuffle data in spark to benefit parquet encoding/compression if there any?     In general, I don't want that my data size growing after spark processing, even if I didn't change anything.     Also, I failed to find, is there any configurable compression rate for snappy, e.g. -1 ... -9? As I know, gzip has this, but what is the way to control this rate in Spark/Parquet writer?     Appreciate for any help!     Thanks! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Spark
 - 
						
							
		
			Apache Sqoop