Member since
05-18-2022
6
Posts
0
Kudos Received
0
Solutions
06-24-2022
12:54 AM
Can you tell me where I should check that the process data in single JVM? the purpose of my spark job is writing the result to hive table and the oom comes when the job try to write all data into hive table
... View more
06-24-2022
12:32 AM
Now I am facing the error like this Error Exception in thread "dispatcher-event-loop-40" java.lang.OutOfMemoryError: Requested array size exceeds VM limit And I stuck with this one for about a month, everty time I try to run the job its always stucked there, i have increased the driver and executor memory but it didnt do nothing
... View more
06-22-2022
08:59 PM
I have change my code not using collect, but I use wholeTextFiles() and use flatMap() for split the string , but i have difference error java.lang.OutOfMemoryError: input is too large to fit in a byte array at org.spark_project.guava.io.ByteStreams.toByteArrayInternal(ByteStreams.java:194) at org.spark_project.guava.io.ByteStreams.toByteArray(ByteStreams.java:220) at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.s cala:79)
... View more
06-22-2022
07:34 PM
i want to collect them because i want to make them as 1 string and split them to array and store as RDD, because my log is need to split into several pieces with seperator i give
... View more
06-02-2022
08:38 PM
I get stuck with this error for a month, i want to try read log file using pyspark and doing some transformation then save the result on hive table, but i got this error everytime i run the job Exception in thread "dispatcher-event-loop-61" java.lang.OutOfMemoryError read_file_log = sc.textFile("datalog2.log") move_to_rdd = sc.parallelize("".join(read_file_log.collect()).split("time=")).filter(lambda x : x != "") ReSymbol = move_to_rdd.map(lambda x : re.sub(r'\t', ' ', x)).map(lambda x : re.sub(r'\n', ' ', x)).map(lambda x : re.sub(r' +', ' ', x)) filter_log = ReSymbol.filter(lambda x : re.findall(r"LOG: statement:", x) != [] or re.findall(r"LOG: AUDIT", x) != [] or re.findall(r"LOG: execute", x) != []) def transformData ( data ) : def getKategori ( value ) : if value. find ( " LOG:statement: " ) > - 1 : return value[value. find ( " LOG:statement: " ) + 4 :value. find ( " nt: " ) + 2 ] elif value. find ( " LOG:AUDIT " ) > - 1 : return value[value. find ( " LOG:AUDIT " ) + 4 :value. find ( " :S " )] elif value. find ( " LOG:execute " ) > - 1 : return value[value. find ( " LOG:execute " ) + 4 :value. find ( " ute " ) + 3 ] else : pass def getQuery ( value ) : if value. find ( " LOG:statement: " ) > - 1 : return files[ 5 ]. replace ( " line= " + files [ 5 ] . replace ( " " , "" ) [ 5 :files [ 5 ] . replace ( " " , "" ) . find ( " LOG " ) ] + " LOG: statement: " , "" ) + " , " . join ( files [ 6 : ]) elif value. find ( " LOG:execute " ) > - 1 : return files[ 5 ]. replace ( " line= " + files [ 5 ] . replace ( " " , "" ) [ 5 :files [ 5 ] . replace ( " " , "" ) . find ( " LOG " ) ] + " LOG: execute " , "" ) + " , " . join ( files [ 6 : ]) else : return " , " . join ( files [ 6 : ]) files = data. split ( " , " ) return (files[ 0 ]. replace ( " WIB " , "" ), files[ 1 ]. split ( " " ) [ 1 ][ 4 : ] , files[ 1 ]. split ( " " ) [ 2 ][ 3 : ] , files[ 2 ]. replace ( " " , "" ) [ 4 : ] , files[ 3 ]. replace ( " " , "" ) [ 7 : ] ,files[ 4 ]. replace ( " " , "" ) [ 4 : ] , files[ 5 ]. replace ( " " , "" ) [ 5 :files[ 5 ]. replace ( " " , "" ). find ( " LOG " ) ] , getKategori ( str ( files [ 5 ] . replace ( " " , "" ))), getQuery ( str ( files [ 5 ] . replace ( " " , "" )))) to_rdd = filter_log. map ( transformData ) to_df = spark. createDataFrame ( to_rdd , schema = [ ' tanggal ' , ' pid ' , ' db ' , ' user ' , ' client ' , ' app ' , ' line ' , ' kategori ' , ' query ' ] )
... View more
Labels:
- Labels:
-
Apache Spark
05-18-2022
07:35 PM
I found my error on spark job and i can't solve this error yet until i got stuck, this is the error i got 22/05/19 09:32:40 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1,5,main] java.lang.OutOfMemoryError: input is too large to fit in a byte array at org.spark_project.guava.io.ByteStreams.toByteArrayInternal(ByteStreams.java:194) at org.spark_project.guava.io.ByteStreams.toByteArray(ByteStreams.java:220) at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.HadoopFileWholeTextReader.hasNext(HadoopFileWholeTextReader.scala:52) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) I want to process log file which has 20gb size and save it to hive table and this is my code: spark = SparkSession \ .builder \ . config ( " spark.rpc.message.maxSize " , " 1024 " ) \ . appName ( " Spark Client " ) \ . getOrCreate () filepath = " /hdlog/dbgen2log/postgresql_2021-09-14_Tue.log " # files = spark.read.text(filepath, wholetext = True) # getData = files.select("value").collect() dataGenerator = (i for i in spark.read. text ( filepath , wholetext = True ). select ( " value " ). collect () [ 0 ] .value. split ( ' time= ' )) tanggal, pid, db, user, client, app, line, kategori, query = [ ] , [] , [] , [] , [] , [] , [] , [] , [] for i in dataGenerator: if i == "" : continue else : def getData () : stat = re. findall ( r " LOG: statement: " , i ) aud = re. findall ( r " LOG: AUDIT: " , i ) execu = re. findall ( r " LOG: execute " , i ) if stat != [] : return " statement " elif aud != [] : return " AUDIT " elif execu != [] : return " execute " else : False def getKategori ( value ) : if value. find ( " LOG:statement: " ) > - 1 : return value[value. find ( " LOG:statement: " ) + 4 :value. find ( " nt: " ) + 2 ] elif value. find ( " LOG:AUDIT " ) > - 1 : return value[value. find ( " LOG:AUDIT " ) + 4 :value. find ( " :S " )] elif value. find ( " LOG:execute " ) > - 1 : return value[value. find ( " LOG:execute " ) + 4 :value. find ( " ute " ) + 3 ] else : pass def getQuery ( value ) : if value == " statement " : query. append ( ReSpace [ 5 ] . replace ( " line= " + line [ - 1 ] + " LOG: statement: " , "" ) + " , " . join ( ReSpace [ 6 : ])) elif value == " execute " : query. append ( ReSpace [ 5 ] . replace ( " line= " + line [ - 1 ] + " LOG: execute " , "" ) + " , " . join ( ReSpace [ 6 : ])) else : query. append ( " , " . join ( ReSpace [ 6 : ])) if getData (): files = i ReSymbol1 = re. sub ( r ' \t ' , ' ' , files ) ReSymbol2 = re. sub ( r ' \n ' , ' ' , ReSymbol1 ) ReSpace = re. sub ( r ' + ' , ' ' , ReSymbol2 ). split ( " , " ) tanggal. append ( ReSpace [ 0 ] . replace ( " WIB " , "" )) pid. append ( ReSpace [ 1 ] . split ( " " ) [ 1 ][ 4 : ] ) db. append ( ReSpace [ 1 ] . split ( " " ) [ 2 ][ 3 : ] ) user. append ( ReSpace [ 2 ] . replace ( " " , "" ) [ 4 : ] ) client. append ( ReSpace [ 3 ] . replace ( " " , "" ) [ 7 : ] ) app. append ( ReSpace [ 4 ] . replace ( " " , "" ) [ 4 : ] ) line. append ( ReSpace [ 5 ] . replace ( " " , "" ) [ 5 :ReSpace [ 5 ] . replace ( " " , "" ) . find ( " LOG " ) ] ) kategori. append ( getKategori ( str ( ReSpace [ 5 ] . replace ( " " , "" )))) getQuery ( getData ()) else : pass df = pd. DataFrame ( { ' tanggal ' : tanggal, ' pid ' : pid, ' db ' : db, ' user ' : user, ' client ' : client, ' app ' : app, ' line ' : line, ' kategori ' : kategori, ' query ' : query } ) df = df[ [ " tanggal " , " pid " , " db " , " user " , " client " , " app " , " line " , " kategori " , " query " ] ] datas = spark. createDataFrame ( df ) spark. sql ( " use log_analytics " ) datas. registerTempTable ( " table_dbgen2log " ) datas.write. format ( " parquet " ). mode ( " Overwrite " ). saveAsTable ( " test_dbgen2_log_query " )
... View more
Labels:
- Labels:
-
Apache Spark