def transformData(data) :
def getKategori(value):
if value.find("LOG:statement:") > -1:
return value[value.find("LOG:statement:") + 4:value.find("nt:") + 2]
elif value.find("LOG:AUDIT") > -1:
return value[value.find("LOG:AUDIT") + 4:value.find(":S")]
elif value.find("LOG:execute") > -1:
return value[value.find("LOG:execute") + 4:value.find("ute") + 3]
else:
pass
def getQuery(value):
if value.find("LOG:statement:") > -1:
return files[5].replace(" line=" + files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")] + " LOG: statement: ", "") + ",".join(files[6:])
elif value.find("LOG:execute") > -1:
return files[5].replace(" line="+files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")]+" LOG: execute ", "") + ",".join(files[6:])
else:
return ",".join(files[6:])
files = data.split(",")
return (files[0].replace("WIB", ""),
files[1].split(" ")[1][4:],
files[1].split(" ")[2][3:],
files[2].replace(" ", "")[4:],
files[3].replace(" ", "")[7:],files[4].replace(" ", "")[4:],
files[5].replace(" ", "")[5:files[5].replace(" ", "").find("LOG")],
getKategori(str(files[5].replace(" ", ""))),
getQuery(str(files[5].replace(" ", ""))))
to_rdd = filter_log.map(transformData)
to_df = spark.createDataFrame(to_rdd,schema=['tanggal', 'pid', 'db', 'user', 'client', 'app', 'line', 'kategori', 'query' ])