Support Questions

Find answers, ask questions, and share your expertise

Error spark job input is too large to fit in a byte array

avatar
Explorer
I found my error on spark job and i can't solve this error yet until i got stuck, this is the error i got
 
22/05/19 09:32:40 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1,5,main]
java.lang.OutOfMemoryError: input is too large to fit in a byte array
at org.spark_project.guava.io.ByteStreams.toByteArrayInternal(ByteStreams.java:194)
at org.spark_project.guava.io.ByteStreams.toByteArray(ByteStreams.java:220)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileWholeTextReader.hasNext(HadoopFileWholeTextReader.scala:52)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:645)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:265)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I want to process log file which has 20gb size and save it to hive table and this is my code:

spark
= SparkSession \
.builder \
.config("spark.rpc.message.maxSize", "1024") \
.appName("Spark Client") \
.getOrCreate()


filepath = "/hdlog/dbgen2log/postgresql_2021-09-14_Tue.log"
# files = spark.read.text(filepath, wholetext = True)
# getData = files.select("value").collect()
dataGenerator = (i for i in spark.read.text(filepath, wholetext = True).select("value").collect()[0].value.split('time='))
tanggal, pid, db, user, client, app, line, kategori, query = [
], [], [], [], [], [], [], [], []
for i in dataGenerator:
if i == "":
continue
else:
def getData():
stat = re.findall(r"LOG: statement:", i)
aud = re.findall(r"LOG: AUDIT:", i)
execu = re.findall(r"LOG: execute", i)
if stat != []:
return "statement"
elif aud != []:
return "AUDIT"
elif execu != []:
return "execute"
else:
False

def getKategori(value):
if value.find("LOG:statement:") > -1:
return value[value.find("LOG:statement:") + 4:value.find("nt:") + 2]
elif value.find("LOG:AUDIT") > -1:
return value[value.find("LOG:AUDIT") + 4:value.find(":S")]
elif value.find("LOG:execute") > -1:
return value[value.find("LOG:execute") + 4:value.find("ute") + 3]
else:
pass

def getQuery(value):
if value == "statement":
query.append(ReSpace[5].replace(
" line=" + line[-1] + " LOG: statement: ", "") + ",".join(ReSpace[6:]))
elif value == "execute":
query.append(ReSpace[5].replace(
" line="+line[-1]+" LOG: execute ", "") + ",".join(ReSpace[6:]))
else:
query.append(",".join(ReSpace[6:]))

if getData():
files = i
ReSymbol1 = re.sub(r'\t', ' ', files)
ReSymbol2 = re.sub(r'\n', ' ', ReSymbol1)
ReSpace = re.sub(r' +', ' ', ReSymbol2).split(",")
tanggal.append(ReSpace[0].replace("WIB", ""))
pid.append(ReSpace[1].split(" ")[1][4:])
db.append(ReSpace[1].split(" ")[2][3:])
user.append(ReSpace[2].replace(" ", "")[4:])
client.append(ReSpace[3].replace(" ", "")[7:])
app.append(ReSpace[4].replace(" ", "")[4:])
line.append(ReSpace[5].replace(" ", "")[
5:ReSpace[5].replace(" ", "").find("LOG")])
kategori.append(getKategori(str(ReSpace[5].replace(" ", ""))))
getQuery(getData())
else:
pass
df = pd.DataFrame({
'tanggal': tanggal,
'pid': pid,
'db': db,
'user': user,
'client': client,
'app': app,
'line': line,
'kategori': kategori,
'query': query
})
df = df[["tanggal", "pid", "db", "user",
"client", "app", "line", "kategori", "query"]]
datas = spark.createDataFrame(df)
spark.sql("use log_analytics")
datas.registerTempTable("table_dbgen2log")
datas.write.format("parquet").mode(
"Overwrite").saveAsTable("test_dbgen2_log_query")
8 REPLIES 8

avatar

Hi @Yosieam , Using "collect" method is not recommended as it needs to collect the data to the Spark driver side and as such it needs to fit the whole dataset into the Driver's memory. Please rewrite your code to avoid the "collect" method.

avatar
Explorer

i want to collect them because i want to make them as 1 string and split them to array and store as RDD, because my log is need to split into several pieces with seperator i give

avatar
Explorer

carbon-4.png

I have change my code not using collect, but I use wholeTextFiles() and use flatMap() for split the string , but i have difference error 

 

java.lang.OutOfMemoryError: input is too large to fit in a byte array
at org.spark_project.guava.io.ByteStreams.toByteArrayInternal(ByteStreams.java:194)
at org.spark_project.guava.io.ByteStreams.toByteArray(ByteStreams.java:220)
at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.s
cala:79)

avatar

wholeTextFiles is also not a scalable solution.
https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.SparkContext.wholeTextFiles.htm...

"Small files are preferred, as each file will be loaded fully in memory."

avatar
Explorer

Now I am facing the error like this 

 

Error Exception in thread "dispatcher-event-loop-40" java.lang.OutOfMemoryError: Requested array size exceeds VM limit

 

And I stuck with this one for about a month, everty time I try to run the job its always stucked there, i have increased the driver and executor memory but it didnt do nothing 

avatar

Hi, 

The "Requested array size exceeds VM limit" means that your code tries to instantiate an array which has more than 2^31-1 elements (~2 billion) which is the max size of an array in Java. You cannot solve this with adding more memory. You need to split the work between executors and not process data on a single JVM (Driver side).

 

avatar
Explorer

Can you tell me where I should check that the process data in single JVM? the purpose of my spark job is writing the result to hive table and the oom comes when the job try to write all data into hive table

avatar
Master Collaborator

Hi @Yosieam 

 

Thanks for sharing the code. You forgot to share the spark-submit/pyspark command. Please check what is executor/driver memory is passed to the spark-submit. Could you please confirm file is in local system/hdfs system.