Options
- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Exception in thread "dispatcher-event-loop-61" java.lang.OutOfMemoryError
Labels:
- Labels:
-
Apache Spark
Explorer
Created ‎06-02-2022 08:38 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I get stuck with this error for a month, i want to try read log file using pyspark and doing some transformation then save the result on hive table, but i got this error everytime i run the job
Exception in thread "dispatcher-event-loop-61" java.lang.OutOfMemoryError
read_file_log = sc.textFile("datalog2.log")
move_to_rdd = sc.parallelize("".join(read_file_log.collect()).split("time=")).filter(lambda x : x != "")
ReSymbol = move_to_rdd.map(lambda x : re.sub(r'\t', ' ', x)).map(lambda x : re.sub(r'\n', ' ', x)).map(lambda x : re.sub(r' +', ' ', x))
filter_log = ReSymbol.filter(lambda x :
re.findall(r"LOG: statement:", x) != [] or
re.findall(r"LOG: AUDIT", x) != [] or
re.findall(r"LOG: execute", x) != [])
def transformData(data) :
def getKategori(value):
if value.find("LOG:statement:") > -1:
return value[value.find("LOG:statement:") + 4:value.find("nt:") + 2]
elif value.find("LOG:AUDIT") > -1:
return value[value.find("LOG:AUDIT") + 4:value.find(":S")]
elif value.find("LOG:execute") > -1:
return value[value.find("LOG:execute") + 4:value.find("ute") + 3]
else:
pass
def getQuery(value):
if value.find("LOG:statement:") > -1:
return files[5].replace(" line=" + files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")] + " LOG: statement: ", "") + ",".join(files[6:])
elif value.find("LOG:execute") > -1:
return files[5].replace(" line="+files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")]+" LOG: execute ", "") + ",".join(files[6:])
else:
return ",".join(files[6:])
files = data.split(",")
return (files[0].replace("WIB", ""),
files[1].split(" ")[1][4:],
files[1].split(" ")[2][3:],
files[2].replace(" ", "")[4:],
files[3].replace(" ", "")[7:],files[4].replace(" ", "")[4:],
files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")],
getKategori(str(files[5].replace(" ", ""))),
getQuery(str(files[5].replace(" ", ""))))
to_rdd = filter_log.map(transformData)
to_df = spark.createDataFrame(to_rdd,schema=['tanggal', 'pid', 'db', 'user', 'client', 'app', 'line', 'kategori', 'query' ])
1 REPLY 1
Master Collaborator
Created ‎08-31-2022 10:53 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @Yosieam
Please avoid calling read_file_log.collect() method. It will bring whole data to the driver and the driver needs to have more memory to hold that much data.
Please check the modified code:
move_to_rdd = sc.textFile("datalog2.log").map(lambda row : row.split("time=")).filter(lambda x : x != "")
ReSymbol = move_to_rdd.map(lambda x : re.sub(r'\t', ' ', x)).map(lambda x : re.sub(r'\n', ' ', x)).map(lambda x : re.sub(r' +', ' ', x))
