Support Questions

Find answers, ask questions, and share your expertise

Exception in thread "dispatcher-event-loop-61" java.lang.OutOfMemoryError

avatar
Explorer

I get stuck with this error for a month, i want to try read log file using pyspark and doing some transformation then save the result on hive table, but i got this error everytime i run the job 

Exception in thread "dispatcher-event-loop-61" java.lang.OutOfMemoryError
 
read_file_log = sc.textFile("datalog2.log")
move_to_rdd = sc.parallelize("".join(read_file_log.collect()).split("time=")).filter(lambda x : x != "")
ReSymbol = move_to_rdd.map(lambda x : re.sub(r'\t', ' ', x)).map(lambda x : re.sub(r'\n', ' ',    x)).map(lambda x : re.sub(r' +', ' ', x))

filter_log = ReSymbol.filter(lambda x :
   re.findall(r"LOG: statement:", x) != [] or
   re.findall(r"LOG: AUDIT", x) != [] or
   re.findall(r"LOG: execute", x) != [])

def transformData(data) :
def getKategori(value):
if value.find("LOG:statement:") > -1:
return value[value.find("LOG:statement:") + 4:value.find("nt:") + 2]
elif value.find("LOG:AUDIT") > -1:
return value[value.find("LOG:AUDIT") + 4:value.find(":S")]
elif value.find("LOG:execute") > -1:
return value[value.find("LOG:execute") + 4:value.find("ute") + 3]
else:
pass

def getQuery(value):
if value.find("LOG:statement:") > -1:
return files[5].replace(" line=" + files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")] + " LOG: statement: ", "") + ",".join(files[6:])
elif value.find("LOG:execute") > -1:
return files[5].replace(" line="+files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")]+" LOG: execute ", "") + ",".join(files[6:])
else:
return ",".join(files[6:])

files = data.split(",")
 
return (files[0].replace("WIB", ""),
files[1].split(" ")[1][4:],
files[1].split(" ")[2][3:],
files[2].replace(" ", "")[4:],
files[3].replace(" ", "")[7:],files[4].replace(" ", "")[4:],
files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")],
getKategori(str(files[5].replace(" ", ""))),
getQuery(str(files[5].replace(" ", ""))))
 
to_rdd = filter_log.map(transformData)
to_df = spark.createDataFrame(to_rdd,schema=['tanggal', 'pid', 'db', 'user', 'client', 'app', 'line', 'kategori', 'query' ])

 

1 REPLY 1

avatar
Master Collaborator

Hi @Yosieam 

 

Please avoid calling read_file_log.collect() method. It will bring whole data to the driver and the driver needs to have more memory to hold that much data.

 

Please check the modified code:

move_to_rdd = sc.textFile("datalog2.log").map(lambda row : row.split("time=")).filter(lambda x : x != "")
ReSymbol = move_to_rdd.map(lambda x : re.sub(r'\t', ' ', x)).map(lambda x : re.sub(r'\n', ' ',    x)).map(lambda x : re.sub(r' +', ' ', x))