基于 PySpark 的电影推荐系统:深入解析 ALS 算法

引言

协同过滤算法是推荐系统中应用最广泛的算法之一,而交替最小二乘法 (ALS) 则是协同过滤算法中的一种经典算法。本篇文章将以微软 Recommenders 工具中的 als_movielens.ipynb 示例为例,详细介绍如何使用 PySpark 实现基于 ALS 算法的电影推荐系统,并对模型性能进行评估。

数据准备

本示例使用经典的 MovieLens 数据集,该数据集包含了用户对电影的评分信息。首先,需要使用 recommenders.datasets.movielens 模块加载数据集,并指定使用的数据集大小和数据格式。

# 定义数据格式
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

# 加载 MovieLens 100k 数据集
data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()

接下来,使用 spark_random_split 函数将数据集划分为训练集和测试集,划分比例为 75:25。

train, test = spark_random_split(data, ratio=0.75, seed=123)

模型构建

使用 PySpark MLlib 中的 ALS 模块构建 ALS 模型。

# 设置模型参数
als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

# 训练模型
with Timer() as train_time:
    model = als.fit(train)

在 ALS 模型中,rank 参数表示隐含特征维度,maxIter 参数表示最大迭代次数,regParam 参数表示正则化系数。

生成推荐结果

为了避免推荐用户已经评分过的电影,需要将训练集中出现过的用户-电影组合从推荐结果中剔除。

with Timer() as test_time:
    # 生成所有用户-电影组合
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)

    # 预测评分
    dfs_pred = model.transform(user_item)

    # 剔除训练集中出现过的用户-电影组合
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    # 获取最终推荐结果
    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # 触发计算
    top_all.cache().count()

模型评估

使用 SparkRankingEvaluationSparkRatingEvaluation 模块对模型进行评估。

# 排序指标评估
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM,
                                    col_rating=COL_RATING, col_prediction="prediction",
                                    relevancy_method="top_k")

# 评分预测指标评估
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM,
                                    col_rating=COL_RATING, col_prediction="prediction")

# 打印评估指标
print(f"Model:\tALS\n"
      f"Top K:\t{rank_eval.k}\n"
      f"MAP:\t{rank_eval.map_at_k():.6f}\n"
      f"NDCG:\t{rank_eval.ndcg_at_k():.6f}\n"
      f"Precision@K:\t{rank_eval.precision_at_k():.6f}\n"
      f"Recall@K:\t{rank_eval.recall_at_k():.6f}\n"
      f"RMSE:\t{rating_eval.rmse():.6f}\n"
      f"MAE:\t{rating_eval.mae():.6f}\n"
      f"Explained variance:\t{rating_eval.exp_var():.6f}\n"
      f"R squared:\t{rating_eval.rsquared():.6f}")

总结

本篇文章以 als_movielens.ipynb 为例,详细介绍了如何使用 PySpark 实现基于 ALS 算法的电影推荐系统,并对模型性能进行了评估。 ALS 算法原理简单,易于实现,并且可以通过调节参数来控制模型的复杂度,适用于各种规模的数据集。

Leave a Comment