I found my error on spark job and i can't solve this error yet until i got stuck, this is the error i got
22/05/19 09:32:40 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1,5,main]
java.lang.OutOfMemoryError: input is too large to fit in a byte array
at org.spark_project.guava.io.ByteStreams.toByteArrayInternal(ByteStreams.java:194)
at org.spark_project.guava.io.ByteStreams.toByteArray(ByteStreams.java:220)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileWholeTextReader.hasNext(HadoopFileWholeTextReader.scala:52)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I want to process log file which has 20gb size and save it to hive table and this is my code:
spark = SparkSession \
.builder \
.config("spark.rpc.message.maxSize", "1024") \
.appName("Spark Client") \
.getOrCreate()
filepath = "/hdlog/dbgen2log/postgresql_2021-09-14_Tue.log"
# files = spark.read.text(filepath, wholetext = True)
# getData = files.select("value").collect()
dataGenerator = (i for i in spark.read.text(filepath, wholetext = True).select("value").collect()[0].value.split('time='))
tanggal, pid, db, user, client, app, line, kategori, query = [
], [], [], [], [], [], [], [], []
for i in dataGenerator:
if i == "":
continue
else:
def getData():
stat = re.findall(r"LOG: statement:", i)
aud = re.findall(r"LOG: AUDIT:", i)
execu = re.findall(r"LOG: execute", i)
if stat != []:
return "statement"
elif aud != []:
return "AUDIT"
elif execu != []:
return "execute"
else:
False
def getKategori(value):
if value.find("LOG:statement:") > -1:
return value[value.find("LOG:statement:") + 4:value.find("nt:") + 2]
elif value.find("LOG:AUDIT") > -1:
return value[value.find("LOG:AUDIT") + 4:value.find(":S")]
elif value.find("LOG:execute") > -1:
return value[value.find("LOG:execute") + 4:value.find("ute") + 3]
else:
pass
def getQuery(value):
if value == "statement":
query.append(ReSpace[5].replace(
" line=" + line[-1] + " LOG: statement: ", "") + ",".join(ReSpace[6:]))
elif value == "execute":
query.append(ReSpace[5].replace(
" line="+line[-1]+" LOG: execute ", "") + ",".join(ReSpace[6:]))
else:
query.append(",".join(ReSpace[6:]))
if getData():
files = i
ReSymbol1 = re.sub(r'\t', ' ', files)
ReSymbol2 = re.sub(r'\n', ' ', ReSymbol1)
ReSpace = re.sub(r' +', ' ', ReSymbol2).split(",")
tanggal.append(ReSpace[0].replace("WIB", ""))
pid.append(ReSpace[1].split(" ")[1][4:])
db.append(ReSpace[1].split(" ")[2][3:])
user.append(ReSpace[2].replace(" ", "")[4:])
client.append(ReSpace[3].replace(" ", "")[7:])
app.append(ReSpace[4].replace(" ", "")[4:])
line.append(ReSpace[5].replace(" ", "")[
5:ReSpace[5].replace(" ", "").find("LOG")])
kategori.append(getKategori(str(ReSpace[5].replace(" ", ""))))
getQuery(getData())
else:
pass
df = pd.DataFrame({
'tanggal': tanggal,
'pid': pid,
'db': db,
'user': user,
'client': client,
'app': app,
'line': line,
'kategori': kategori,
'query': query
})
df = df[["tanggal", "pid", "db", "user",
"client", "app", "line", "kategori", "query"]]
datas = spark.createDataFrame(df)
spark.sql("use log_analytics")
datas.registerTempTable("table_dbgen2log")
datas.write.format("parquet").mode(
"Overwrite").saveAsTable("test_dbgen2_log_query")