Apache Spark 练习六:使用Spark分析音乐专辑数据
一、源数据
本章所分析的数据来自于Kaggle公开的、人工合成的音乐专辑发行数据(https://www.kaggle.com/datasets/revilrosa/music-label-dataset)。以下,我们只针对albums.csv文件进行分析。该数据具体包括以下字段:
- id: the album identifier;
- artist_id: the artist identifier;
- album_title: the title of the album;
- genre: the genre of the album. An artist can release albums of different genres;
- year_of_pub: the year the album was published;
- num_of_tracks: how many tracks there are in the album (a small number can mean longer tracks);
- num_of_sales: how many sales the album has made in the first month after the release;
- rolling_stone_critic: how magazine Rolling Stone has rated the album;
- mtv_critic: how MTV has rated the album;
- music_maniac_critic: how review site Music Maniac has rated the album.
二、练习题
0. 数据预处理
以下,我们将csv文件从HDFS中读取进来,并转换为Spark DataFrame格式。
val spark = SparkSession
.builder()
.appName("Albums")
.getOrCreate()
import spark.implicits._
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs:///SparkLearning/albums.csv")
1. 统计各类型专辑的数量
val res = df
.select($"genre")
.groupBy($"genre")
.count()
.orderBy($"count".desc)
2. 统计各类型专辑的销量总数
val res = df
.select($"genre", $"num_of_sales")
.groupBy($"genre")
.sum("num_of_sales")
.withColumnRenamed("sum(num_of_sales)", "total_sales")
.orderBy($"total_sales".desc)
3. 统计近20年每年发行的专辑数量和单曲数量
val res = df
.select("year_of_pub", "num_of_tracks")
.filter($"year_of_pub" >= 2000)
.groupBy($"year_of_pub")
.agg("num_of_tracks" -> "count", "num_of_tracks" -> "sum")
.withColumnRenamed("count(num_of_tracks)", "total_albums")
.withColumnRenamed("sum(num_of_tracks)", "total_tracks")
.orderBy("year_of_pub")
4. 分析总销量前五的专辑类型的各年份销量
val res = df
.select($"genre", $"num_of_sales")
.groupBy("genre")
.sum("num_of_sales")
.withColumnRenamed("sum(num_of_sales)", "total_sales")
.orderBy($"total_sales".desc)
.limit(5)
.alias("t1")
.join(
df.select($"genre", $"num_of_sales", $"year_of_pub").alias("t2"),
$"t1.genre" === $"t2.genre"
)
.groupBy("t2.genre", "t2.year_of_pub")
.sum("t2.num_of_sales")
.orderBy($"genre", $"year_of_pub")
5. 分析总销量前五的专辑类型,在不同评分体系中的平均评分
val res = df
.select($"genre", $"num_of_sales")
.groupBy("genre")
.sum("num_of_sales")
.withColumnRenamed("sum(num_of_sales)", "total_sales")
.orderBy($"total_sales".desc)
.limit(5)
.alias("t1")
.join(
df.select(
$"genre",
$"rolling_stone_critic",
$"mtv_critic",
$"music_maniac_critic"
).alias("t2"),
$"t1.genre" === $"t2.genre"
)
.groupBy("t2.genre")
.agg(
"rolling_stone_critic" -> "avg",
"mtv_critic" -> "avg",
"music_maniac_critic" -> "avg"
)
.orderBy($"genre")