Apache Spark 练习六:使用Spark分析音乐专辑数据
- id: the album identifier;
- artist_id: the artist identifier;
- album_title: the title of the album;
- genre: the genre of the album. An artist can release albums of different genres;
- year_of_pub: the year the album was published;
- num_of_tracks: how many tracks there are in the album (a small number can mean longer tracks);
- num_of_sales: how many sales the album has made in the first month after the release;
- rolling_stone_critic: how magazine Rolling Stone has rated the album;
- mtv_critic: how MTV has rated the album;
- music_maniac_critic: how review site Music Maniac has rated the album.
0. 数据预处理
以下,我们将csv文件从HDFS中读取进来,并转换为Spark DataFrame格式。
val spark = SparkSession
import spark.implicits._
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
1. 统计各类型专辑的数量
val res = df
2. 统计各类型专辑的销量总数
val res = df
.select($"genre", $"num_of_sales")
.withColumnRenamed("sum(num_of_sales)", "total_sales")
3. 统计近20年每年发行的专辑数量和单曲数量
val res = df
.select("year_of_pub", "num_of_tracks")
.filter($"year_of_pub" >= 2000)
.agg("num_of_tracks" -> "count", "num_of_tracks" -> "sum")
.withColumnRenamed("count(num_of_tracks)", "total_albums")
.withColumnRenamed("sum(num_of_tracks)", "total_tracks")
4. 分析总销量前五的专辑类型的各年份销量
val res = df
.select($"genre", $"num_of_sales")
.withColumnRenamed("sum(num_of_sales)", "total_sales")
df.select($"genre", $"num_of_sales", $"year_of_pub").alias("t2"),
$"t1.genre" === $"t2.genre"
.groupBy("t2.genre", "t2.year_of_pub")
.orderBy($"genre", $"year_of_pub")
5. 分析总销量前五的专辑类型,在不同评分体系中的平均评分
val res = df
.select($"genre", $"num_of_sales")
.withColumnRenamed("sum(num_of_sales)", "total_sales")
$"t1.genre" === $"t2.genre"
"rolling_stone_critic" -> "avg",
"mtv_critic" -> "avg",
"music_maniac_critic" -> "avg"