Support Questions
Find answers, ask questions, and share your expertise

How to prepare transactional data set in pySpark for FP Growth?

How to prepare transactional data set in pySpark for FP Growth?

New Contributor

Dear All,

I am trying to run FPGrowth from MLLib on my transactional data. The example creates the data like this:

#data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
#rdd = sc.parallelize(data,2)

But i got a deep table with two integer coloums in my hive table, which i need to prepare. My table looks like this:

+-----------+----------+
|customer_id|product_id|
+-----------+----------+
|        370|       154|
|         41|        40|
|        109|       173|

I retrieve it cast my product id to string:

df = sqlContext.sql("SELECT * from raw_transactions")
df = df.withColumn("p",df.product_id.cast("string"))

Afterwards I create a data frame where i concat my products grouped by customer id

g = df.groupBy("customer_id").agg(concat_ws(",",collect_list("p"))).collect()
df1 = sqlContext.createDataFrame(g)
df1 = df1.withColumnRenamed("concat_ws(,,'collect_list(p))","products").drop("concat_ws(,,'collect_list(p))")

Which Looks like this

DataFrame[customer_id: bigint, products: string]
+-----------+--------------------+
|customer_id|            products|
+-----------+--------------------+
|         31|               1,145|
|        231|1,79,172,33,159,1...|
|         32|                 153|
|        232|109,5,19,168,60,1...|

What I am now struggeling to do, is get this comma seperated string of products into a format FPGrowth wants to have it. I try to work with flatMap and split, but i just get various errors.

Any help is very appriciated.

Best,

Martin

2 REPLIES 2

Re: How to prepare transactional data set in pySpark for FP Growth?

You don't need to do the whole string conversion and concat steps. Simply try

from pyspark.sql.functions import collect_list, col

transactions = df.groupBy("customer_id")\
      .agg(collect_list("product_id").alias("product_ids"))\
      .rdd\
      .map(lambda x: x.product_ids)

transactions.collect()

Then

from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(transactions, 0.2, 10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

will work

Re: How to prepare transactional data set in pySpark for FP Growth?

New Contributor

Suppose if we have multiple columns in the table shown above like customer Id, product it, timestamp of purchace, price of item etc. Each customer can do multiple transactions and if we need to find a fraudulent customer(different usage pattern than existing customer) based on the transaction patterns of customer(Group by customer Id of all transactions and find similar patterns among them), can we apply pySpark for this?