Created 04-11-2017 02:34 PM
Dear All,
I am trying to run FPGrowth from MLLib on my transactional data. The example creates the data like this:
#data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] #rdd = sc.parallelize(data,2)
But i got a deep table with two integer coloums in my hive table, which i need to prepare. My table looks like this:
+-----------+----------+ |customer_id|product_id| +-----------+----------+ | 370| 154| | 41| 40| | 109| 173|
I retrieve it cast my product id to string:
df = sqlContext.sql("SELECT * from raw_transactions") df = df.withColumn("p",df.product_id.cast("string"))
Afterwards I create a data frame where i concat my products grouped by customer id
g = df.groupBy("customer_id").agg(concat_ws(",",collect_list("p"))).collect() df1 = sqlContext.createDataFrame(g) df1 = df1.withColumnRenamed("concat_ws(,,'collect_list(p))","products").drop("concat_ws(,,'collect_list(p))")
Which Looks like this
DataFrame[customer_id: bigint, products: string] +-----------+--------------------+ |customer_id| products| +-----------+--------------------+ | 31| 1,145| | 231|1,79,172,33,159,1...| | 32| 153| | 232|109,5,19,168,60,1...|
What I am now struggeling to do, is get this comma seperated string of products into a format FPGrowth wants to have it. I try to work with flatMap and split, but i just get various errors.
Any help is very appriciated.
Best,
Martin
Created 04-12-2017 08:40 AM
You don't need to do the whole string conversion and concat steps. Simply try
from pyspark.sql.functions import collect_list, col transactions = df.groupBy("customer_id")\ .agg(collect_list("product_id").alias("product_ids"))\ .rdd\ .map(lambda x: x.product_ids) transactions.collect()
Then
from pyspark.mllib.fpm import FPGrowth model = FPGrowth.train(transactions, 0.2, 10) result = model.freqItemsets().collect() for fi in result: print(fi)
will work
Created 08-03-2017 02:23 PM
Suppose if we have multiple columns in the table shown above like customer Id, product it, timestamp of purchace, price of item etc. Each customer can do multiple transactions and if we need to find a fraudulent customer(different usage pattern than existing customer) based on the transaction patterns of customer(Group by customer Id of all transactions and find similar patterns among them), can we apply pySpark for this?