Spark-SQL基础知识

  • A+
所属分类:大数据分析
摘要这一篇文章介绍一下Spark SQL的基本概念,并且会举一个例子;会涉及文件的导出导入,查询操作,注册临时表并进行操作;

简介

这一篇简单介绍一下Spark SQL的基本概念,并且简单利用SQL Context进行统计分析。这一篇会直接使用一个例子进行分析;

例子分析

对于Spark SQL来说,程序的入口便是SQL Context类(或是该类的子类)。创建一个SQLContext对象,需要使用SparkContext

Spark-SQL基础知识

要分析的文件我们已经准备好并且放在 /home/hadoop/SPY_2016.csv中;

文件中的字段如下所示:

  • Date:交易日期
  • Open:开盘价
  • High:交易日最高价
  • Low:交易日最低价
  • Close:收盘价
  • Volume:成交量
  • Adj Close:调整后的收盘价

定义Case Class

首先我们定义一个Case Class来映射csv中每个字段;

  1. case class Stock(date: String, openPrice: Double, highPrice: Double, lowPrice: Double, closePrice: Double, volume: Double, adjClosePrice: Double)
Spark-SQL基础知识

定义解析csv的函数

接着我们定义一个用于解析csv的函数,原理即是使用逗号将csv的每一行进行分割,然后转换为Stock数据类型;

  1. def parseDataset(str: String): Stock = {
  2.     val line = str.split(",")
  3.     Stock(line(0), line(1).toDouble, line(2).toDouble, line(3).toDouble, line(4).toDouble, line(5).toDouble, line(6).toDouble)
  4. }
Spark-SQL基础知识

定义转换为RDD的函数

这里继续定义一个函数,将paraseData函数的返回值转换为RDD,并缓存下来;

  1. // map函数会将 parseData 函数作用于文件中的每一行
  2. def parseRDD(rdd: RDD[String]): RDD[Stock] = {
  3.     rdd.map(parseDataset).cache()
  4. }
Spark-SQL基础知识

将文件导入DataFrame

  1. val spyDF = parseRDD(sc.textFile("/home/hadoop/SPY_2016.csv")).toDF.cache()

查看一下是否导入成功,使用spyDF.show()进行展示;

Spark-SQL基础知识

下面进行简单的统计操作;

查找部分与分类汇总

  1. val spyQueryResult = spyDF.select(year($"date").alias("year"), $"adjClosePrice")

首先是查询的代码,在这里$表示用于标记列名alias为查询结果的字段起一个别名;

Spark-SQL基础知识

接下来我们使用groupby进行分类汇总,最后进行排序(desc为降序,asc为升序)

  1. spyQueryResult.groupBy("year").avg("adjClosePrice").orderBy(desc("year")).show()
Spark-SQL基础知识

注册临时表

在上面的操作中,我们对DataFrame使用了各种SQL的API,但是这样做在大量查询时需要编写很多的代码;

我们可以将DataFrame注册成一个临时表(使用registerTempTable进行临时表的注册),然后使用完整的SQL语句进行分析;

  1. spyDF.registerTempTable("spy")

接下来我们就可以通过SQL语句进行查看了;

  1. var spyQueryResult = sqlContext.sql("SELECT spy.date, spy.openPrice, spy.closePrice 
  2.     FROM spy 
  3.     WHERE abs(spy.closePrice - spy.openPrice) > 5 ")
Spark-SQL基础知识

结果的储存

对于上面查询的结果,我们可以永久储存在硬盘上:

常见的保存格式有csv, json, parquet(这个是可以通过Apache Parquet进行管理)等;

  1. spyQueryResult.write.format("parquet").save("/home/hadoop/test.parquet")

 

  • 微信公众号
  • 关注微信公众号
  • weinxin
  • QQ群
  • 我们的QQ群号
  • weinxin
王 茂南

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: