8.spark-sql

教程 DER ⋅ 于 2023-04-15 17:55:31 ⋅ 2313 阅读

spark-sql

20.1 SparkSQL的发展历程

20.1.1 Hive and Shark

SparkSQL的前身是Shark,是给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,hive应运而生,它是运行在Hadoop上的SQL-on-hadoop工具。但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,运行效率低。为了提高SQL-on-Hadoop的效率,shark 应运而生。它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。

file

随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。

SparkSQL抛弃原有Shark的代码,摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。

hive与spark-sql的对比

file

20.2 实验室集群配置

1)安装hive

#以nn1机器为例
#切换用户为root,安装的目录是/usr/local
tar -zxvf /public/software/bigdata/apache-hive-3.1.3-bin.tar.gz -C /usr/local/
#修改权限
chown hadoop:hadoop -R /usr/local/apache-hive-3.1.3-bin/
#创建软连接
ln -s /usr/local/apache-hive-3.1.3-bin/ /usr/local/hive
# 配置环境变量 /etc/profile
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
source /etc/profile
# 基础准备工作完毕

2) 安装mysql

wget https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

安装mysql 源
rpm -ivh mysql57-community-release-el7-11.noarch.rpm

修改阿里云的镜像为原生镜像
rm -rf CentOS-Base.repo
mv CentOS-Base.repo.back CentOS-Base.repo

检查mysql源是否安装成功
yum repolist enabled | grep "mysql.*-community.*"

用 yum 命令安装mysql
yum -y install mysql-community-server

出现问题
rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
重新执行安装

查看MYSQL配置文件加载顺序: 
mysqld --help --verbose|grep -A1 -B1 cnf 

修改/etc/my.cnf 配置文件内的文件目录
datadir=/data/mysql/data
sql_mode=STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
log-error=/data/mysql/log/mysqld.log

创建mysql 文件目录
[root@localhost jar]# mkdir -p /data/mysql/data
[root@localhost jar]# mkdir -p /data/mysql/log

生成首次登录随机密码
mysqld --initialize

修改mysql 文件目录所有者为 mysql 用户
chown -R mysql:mysql /data/mysql

systemctl start mysqld.service
如果systemctl不能使用,我们需要进行替换
文件在/public/software/other/systemctl
cp /public/software/other/systemctl /usr/bin/systemctl

如果说启动mysql后使用
ps -ef|grep mysql
如果没有任何进程
chmod 777 -R /data/mysql
systemctl start mysqld.service

用生成的随机密码登录mysql
随机密码的位置 /data/mysql/log/mysqld.log
mysql -uroot -p'/FJThgDrD6Il'

修改ROOT用户密码

set password=PASSWORD('123456'); 

3)创建用户和数据库

mysql -uroot -p'123456'                            

--创建hive用户
CREATE USER 'hive'@'%' IDENTIFIED BY '12345678';   
--在mysql中创建hive_meta数据库
create database hive_meta default charset utf8 collate utf8_general_ci;
--给hive用户增加hive_meta数据库权限
grant all privileges on hive_meta.* to 'hive'@'%' identified by '12345678';
--更新
flush privileges; 

拷贝mysql驱动jar 到/usr/local/hive/lib/
cp /public/software/other/mysql-connector-java-5.1.35.jar /usr/local/hive/lib/

删除冲突jar包
rm -f /usr/local/hive/lib/log4j-slf4j-impl-2.4.1.jar

创建文件夹
su - hadoop
hadoop fs -mkdir -p /hive/warehouse
hadoop fs -chmod -R 777 /hive
hadoop fs -ls /hive

初始化元数据仓库
schematool -dbType mysql -initSchema

初始化的时候出现jar包冲突,对比换成版本比较高的
hive中guava.jar位置/hive/lib/
hadoop中guava.jar位置/hadoop/share/hadoop/common/lib/

命令如下
rm -rf /usr/local/hive/lib/guava-19.0.jar 
cp /usr/local/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /usr/local/hive/lib/

4) 修改后的hive-site.xml 内容

/usr/local/hive/conf/

<configuration>
    <!-- 数据库 start -->
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://nn1:3306/hive_meta</value>
      <description>mysql连接</description>
    </property>

    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>mysql驱动</description>
    </property>

    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>hive</value>
      <description>数据库使用用户名</description>
    </property>

    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>12345678</value>
      <description>数据库密码</description>
    </property>
    <!-- 数据库 end -->
    <!-- HDFS start -->
    <property> 
      <name>hive.metastore.warehouse.dir</name>
      <value>/hive/warehouse</value>
      <description>hive使用的HDFS目录</description>
    </property>
    <!-- HDFS end -->
    <!-- metastore start 在客户端使用时,mysql连接和metastore同时出现在配置文件中,客户端会选择使用metastore -->
    <property>
      <name>hive.metastore.schema.verification</name>
      <value>false</value>
    </property>

    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://nn1:9083</value>
    </property>
    <!-- metastore end -->
    <!-- hiveserver start -->
    <property>
      <name>hive.server2.authentication</name>
      <value>NONE</value>
    </property>

    <property>
      <name>hive.server2.thrift.bind.host</name>
      <value>nn1</value>
      <description>hive开启的thriftServer地址</description>
    </property>

    <property>
      <name>hive.server2.thrift.port</name>
      <value>10000</value>
      <description>hive开启的thriftServer端口</description>
    </property>

    <property>
      <name>hive.server2.enable.doAs</name>
      <value>true</value>
    </property>
    <!-- hiveserver end -->
    <!-- 其它 start -->
    <property>
      <name>datanucleus.autoCreateSchema</name>
      <value>false</value>
    </property>

    <property>
      <name>datanucleus.fixedDatastore</name>
      <value>true</value>
    </property>

    <property>
      <name>datanucleus.autoStartMechanism</name> 
      <value>SchemaTable</value>
    </property>

    <property> 
      <name>hive.cli.print.current.db</name>
      <value>true</value>
    </property>
    <!-- 其它 end -->
</configuration>

到此为止hive安装完毕

#启动元数据服务
nohup hive --service metastore >> /tmp/hive.log 2>&1 &

配置spark的hive-site.xml

<configuration>
    <!-- HDFS start -->
    <property> 
      <name>hive.metastore.warehouse.dir</name>
      <value>/hive/warehouse</value>
      <description>hive使用的HDFS目录</description>
    </property>
    <!-- HDFS end -->
    <!-- metastore start 在客户端使用时,mysql连接和metastore同时出现在配置文件中,客户端会选择使用metastore -->
    <property>
      <name>hive.metastore.schema.verification</name>
      <value>false</value>
      <description>校验metastore版本信息是否与sparkjar 版本一致;true:校验;false:不校验</description>
    </property>
    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://nn1:9083</value>
    </property>
    <!-- metastore end -->
    <!-- hiveserver start -->
    <property>
          <name>hive.server2.thrift.min.worker.threads</name>
          <value>5</value>
      <description>Minimum number of Thrift worker threads</description>
    </property>
    <property>
          <name>hive.server2.thrift.max.worker.threads</name>
          <value>500</value>
          <description>Maximum number of Thrift worker threads</description>
    </property>
    <property>
      <name>hive.server2.thrift.bind.host</name>
      <value>nn1</value>
      <description>hive开启的thriftServer地址</description>
    </property>
    <property>
      <name>hive.server2.thrift.port</name>
      <value>20000</value>
      <description>开启spark的thriftServer端口</description>
    </property>
    <!-- hiveserver end -->
</configuration>

其中:

hive.metastore.schema.verification,用于校验 metastore版本信息是否与spark jar 版本一致;true:校验;false:不校验;

hive 有个hiveserver2服务,端口是10000;而spark 用的hiveserver2服务,配置的端口是20000,不冲突。

file

2)spark-env.sh

使得spark 能与hadoop关联。

file

3)减少spark sql 日志输出,修改spark conf 目录下的 log4j.properties

file

20.3 spark-sql shell(自己玩)

20.3.1 启动spark-SQL shell 步骤

# 启动yarn集群
#启动hive服务
nohup hive --service metastore > /dev/null 2>&1 &

#执行sparkSQL
spark-sql –master yarn –queue root.hainiu –num-executors 2 –executor-memory 1G --executor-cores 2

这种方式每个人一个driver彼此之间的数据无法共享;

启动任务后,发现还没跑 任务,就已经占用了 资源,因为现在还没有机制能计算出跑SQL任务会用多少内存。而hive是只有跑任务才去算占用多少资源。

file

20.3.2 运行sparkSQL

写hive命令即可。

-- 查看数据库
show databases;
create database hainiu;
-- 进入数据库
use hainiu; 
--创建表
create table student(id int,name string,age int)
row format delimited fields terminated by ' ';

1)查询统计student表记录数

select count(1) from student;

file

执行带有shuffle 的SQL,会产生200 partition。

select count(1),id from student group by id;

file

可以通过 set spark.sql.shuffle.partitions=20; 进行设置partition的个数,这样可以减少shuffle的次数。

set spark.sql.shuffle.partitions=20;

select count(1),id from student group by id;

file

20.3.3 通过bin/spark-sql –help可以查看CLI命令参数

spark-sql –help

file

spark-shell -h 查看帮助

20.4 spark thriftserver(共享玩)

ThriftServer是一个JDBC/ODBC接口,用户可以通过JDBC/ODBC连接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动了一个SparkSQL的应用程序,而通过JDBC/ODBC连接进来的客户端共同分享这个SparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据;ThriftServer启动时还开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,如果要使用Hive数据的话,还要提供Hive Metastore的uris。

这种方式所有人可以通过driver连接,彼此之间的数据可以共享。

file

20.4.1 启动spark thriftserver

#启动yarn集群

#启动hive服务
nohup hive --service metastore > /dev/null 2>&1 &

#启动thriftserver服务
/usr/local/spark/sbin/start-thriftserver.sh --master yarn --queue hainiu

在 op.hadoop 机器启动thriftserver 服务

集群只要启动一个即可,如果报端口被占用,说明有人已经启动过。

file

20.4.2 使用spark的beeline 连接 thriftserver

beeline 分为hive 和 spark的。

hive 的 beeline :/usr/local/hive/bin/beeline

spark 的 beeline : /usr/local/spark/bin/beeline

在op.hadoop上启动beelie, 连接 nn1 的thriftserver 服务

/usr/local/spark/bin/beeline

!connect jdbc:hive2://nn1:20000

file

yarn页面看见启动了sparkSQL任务

file

file

20.4.3 在thriftsever 上跑 sparkSQL

select count(1),id from student group by id;

file

首先准备数据

vim /home/hadoop/a.txt
2 lisi 30
3 wangwu 40
4 zhaosi 43
5 liuneng 44
6 guangkun 45
7 dajiao 46
8 daguo 47

插入数据

load data local inpath '/home/hadoop/a.txt' into table  student;
cache table 表名;
uncache table student;
cache table 数据集别名 as 查询SQL
cache table cnt_table as select id,count(1) from student group by id;
去除缓存
uncache table tablename;

内存缓存

file

uncache

file

20.5 spark-webUI

怎么合理的运用并行化,比如要处理的数据最终生成的partition是30个,那你的job设置的资源就应该是10到15个cores。为什么呢?因为官方推荐的设置是(2\~3)*cores = parttions,这样设置的主要原因是executor不会太闲置或者太繁忙。

模拟数据

# linux中存在python环境的
# 进入到nn1机器中
python
with open("/tmp/abc.txt","a") as f:
    for i in range(100000):
        f.write("%s,hainiu_%s,%s\n" % (i,i,i))

file

退出python

exit()

file

然后加载数据到sparksql的表中

create table stu(id int,name string, age int)
row format delimited fields terminated by ',';

--加载数据
load data local inpath '/tmp/abc.txt' into table stu;

file

多次load数据

file

如何判断任务并行化是否合理?

先看你的RDD会有多少个task,也就是有多少个partition

file

因为读取的数据个数10个文件,对应存在10个block块,分区就是10个

单独统计每个文件的元素的个数,然后整体统计所有的元素的个数

再看你的任务总cores资源是多少

file

job运行11个task,提供的CPU核数2个, 相对比较合理,但CPU核有空转的。

如何知道自己使用的RDD到底会使用多大的存储空间?

直接缓存表方式:

如果表数据是txt格式,可以根据表对应hdfs的大小来设定。

如果表数据是orc格式文件,那缓存的大小 = 对应hdfs的大小 * 3。

file

file

file

20.6 spark-sql执行过程

file

20.7 通过JDBC连接thriftserver

pom里添加spark的hive-jdbc,之前已经添加过

file

使用JDBC访问spark-sql server 程序:

package com.hainiu.spark

import org.apache.hive.jdbc.HiveDriver

import java.sql.{DriverManager, ResultSet}

object TestBeeline {
  def main(args: Array[String]): Unit = {
    //scala jdbc -> sql -->sparksql
    classOf[HiveDriver]
    val con = DriverManager.getConnection("jdbc:hive2://nn1:20000","hadoop",null)
    val prp = con.prepareStatement("select count(1) as cnt from stu")
    val set: ResultSet = prp.executeQuery()
    while(set.next()){
      val cnt = set.getLong("cnt")
      println("stu表的总条数是 :"+cnt)
    }
    con.close()
  }
}

结果:

file

20.8 spark-sql 编程

file

20.8.1 dataFrame对象

DataFrame:

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这样的数据集可以用SQL查询。

file

创建方式

准备数据

1 zhangsan 20 male
2 lisi 30 female
3 wangwu 35 male
4 zhaosi 40 female

toDF方式

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object TestSparkSql{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sql")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
     //环境对象包装
    import sqlSc.implicits._
      //引入环境信息
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0).toInt, strs(1), strs(2).toInt)
      })
    //增加字段信息
    val df = rdd.toDF("id", "name", "age")
    df.show() //展示表数据
    df.printSchema() //展示表格字段信息
  }
}

使用类定义schema

object TestSparkSql{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sql")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
         Student(strs(0).toInt, strs(1), strs(2).toInt)
      })

//    val df = rdd.toDF("id", "name", "age")
    val df = rdd.toDF()
    df.show() //打印数据,以表格的形式打印数据
    df.printSchema() //打印表的结构信息
  }
}
case class Student(id:Int,name:String,age:Int)

createDataFrame方式

这种方式需要将rdd和schema信息进行合并,得出一个新的DataFrame对象

package com.hainiu.spark

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object TestSparkSqlWithCreate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test create")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        Row(strs(0).toInt, strs(1), strs(2).toInt)
      })
//    rdd + schema
    val schema = StructType(
      Array(
        StructField("id",IntegerType),
        StructField("name",StringType),
        StructField("age",IntegerType)
      )
    )
    val df = sqlSc.createDataFrame(rdd, schema)
    df.show()
    df.printSchema()
  }
}

20.8.2 sparksql的查询方式

file

第二个部分关于df的查询

第一种sql api的方式查询

  • 使用的方式方法的形式编程
  • 但是思想还是sql形式
  • 和rdd编程特别相似的一种写法
object TestSql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sql")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0).toInt, strs(1), strs(2).toInt,strs(3))
      })
    val df = rdd.toDF("id", "name", "age","gender")
    //select * from student where age >20
    //df.where("age >20")
     //分组聚合
      //df.groupby("gender").sum("age")
    //几个问题
   //聚合函数不能增加别名 聚合函数不能多次聚合  orderby不识别desc 
   // df.groupBy("gender").agg(count("id").as("id"),sum("age").as("age")).orderBy($"age".desc) 
   //字段标识可以是字符串,也可以是字段对象
   //df.orderBy($"age".desc)   
   //df.orderBy(col("age").desc) 
   //df.orderBy(df("age").desc) 
   //增加字段对象可以实现高端操作
   //df.select($"age".+(1)) 
   //join问题
   //val df1 = sc.makeRDD(Array(
    //   (1,100,98),
     //  (2,100,95),
      // (3,90,92),
       //(4,90,93)
   //)).toDF("id","chinese","math")

   //df.join(df1,"id") //字段相同   
   //df.join(df1,df("id")===df1("id"))   
    //窗口函数
    //普通函数 聚合函数  窗口函数 sum|count|rowkey over (partition by gender order by age desc)
    //按照条件分割完毕进行数据截取
    //班级的前两名 每个性别年龄最高的前两个
    //select *,row_number() over (partition by gender order by age desc) rn from table
    import sqlSc.implicits._
    import org.apache.spark.sql.functions._
    df.withColumn("rn",row_number().over(Window.partitionBy("gender").orderBy($"age".desc)))
      .where("rn = 1")
      .show()
  }
}

第二种纯sql形式的查询

  • 首先注册表
  • 然后使用sql查询
  • 最终得出的还是dataFrame的对象
  • 其中和rdd的编程没有任何的区别,只不过现在使用sql形式进行处理了而已
package com.hainiu.spark

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object TestSparkSqlWithCreate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test create")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        Row(strs(0).toInt, strs(1), strs(2).toInt,strs(3))
      })
//    rdd + schema
    val schema = StructType(
      Array(
        StructField("id",IntegerType),
        StructField("name",StringType),
        StructField("age",IntegerType),
        StructField("gender",StringType),
      )
    )
    val df = sqlSc.createDataFrame(rdd, schema)
    //sql形式查询
    //select col from table
    df.createTempView("student")
    val df1 = sqlSc.sql(
      """
        |select count(1) cnt,gender from student group by gender
        |""".stripMargin)
    df1.createTempView("student1")
    val df2 = sqlSc.sql(
      """
        |select * from student1 where cnt>1
        |""".stripMargin)
    df2.show()
    df2.printSchema()
  }
}

函数查询练习

#电影推荐的练习
#首先将数据从 /public/data/movie_data中将数据复制到idea的data文件夹中
# movie中的数据是
# mid name type
# ratings中的数据是
# userid mid score time

整体代码如下:

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}

object TestMovieWithSql {
  def main(args: Array[String]): Unit = {
    //??movie???
    //1.id  middle=name  last=type
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions","20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)

    import sqlSc.implicits._
    //deal data
    val df = sc.textFile("data/movies.txt")
      .flatMap(t => {
        val strs = t.split(",")
        val mid = strs(0)
        val types = strs.reverse.head
        val name = strs.tail.reverse.tail.reverse.mkString(" ")
        types.split("\\|").map((mid, name, _))
      }).toDF("mid", "mname", "type")

    df.limit(1).show()

    val df1 = sc.textFile("data/ratings.txt")
      .map(t=>{
        val strs = t.split(",")
        (strs(0),strs(1),strs(2).toDouble)
      }).toDF("userid","mid","score")
    df1.limit(1).show()

    import org.apache.spark.sql.functions._
    df.join(df1,"mid").groupBy("userid","type")
      .agg(count("userid").as("cnt"))
      .withColumn("rn",row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc)))
      .where("rn = 1")
      .show()

//    df.createTempView("movie")
//    df1.createTempView("ratings")
//  import org.apache.spark.sql.functions._
//
//
//    sqlSc.sql(
//      """
//        |select userid,type,cnt
//        |from
//        |(select *,row_number() over (partition by userid order by cnt desc) rn
//        |from
//        |(select count(*) cnt,userid,type
//        |from
//        |(select userid,type
//        |from
//        |movie m join ratings r
//        |on m.mid = r.mid)t
//        |group by userid,type)t1)t2
//        |where rn = 1
//        |""".stripMargin)
//      .show(200,false)
  }
}

作业题

每个用户最喜欢哪个类型的电影
每个类型中最受欢迎的前三个电影?
然后给用户推荐
    val df11 = df.join(df1, "mid").groupBy("userid", "type")
      .agg(count("userid").as("cnt"))
      .withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc)))
      .where("rn = 1")
      .select("userid", "type")

    val df22 = df.join(df1, "mid").groupBy("type", "mname")
      .agg(avg("score").as("avg"))
      .withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc)))
      .where("rn<4")
      .select("type", "mname")

    df11.join(df22,"type")
      .show()

修改idea的内存参数

file

设定jvm的最大内存 -Xmx2048M

20.8.3 sparksql读写数据

我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象

但是sparksql给大家提供了多种便捷读取数据的方式

//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json

write存储数据的时候也是文件夹的,而且文件夹不能存在

csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的

text文本普通文本,但是这个文本必须只能保存一列内容

以上两个文本都是只有内容的,没有列的

json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}

多平台解析方便,带有格式信息

orc格式一个列式存储格式,hive专有的

parquet列式存储,顶级项目

以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息

jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql

整体代码:

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}

import java.util.Properties

object TestMovieWithSql {
  def main(args: Array[String]): Unit = {
    //??movie???
    //1.id  middle=name  last=type
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions","20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)

    import sqlSc.implicits._
    //deal data
    val df = sc.textFile("data/movies.txt")
      .flatMap(t => {
        val strs = t.split(",")
        val mid = strs(0)
        val types = strs.reverse.head
        val name = strs.tail.reverse.tail.reverse.mkString(" ")
        types.split("\\|").map((mid, name, _))
      }).toDF("mid", "mname", "type")

    df.limit(1).show()

    val df1 = sc.textFile("data/ratings.txt")
      .map(t=>{
        val strs = t.split(",")
        (strs(0),strs(1),strs(2).toDouble)
      }).toDF("userid","mid","score")
    df1.limit(1).show()

    import org.apache.spark.sql.functions._
    val df11 = df.join(df1, "mid").groupBy("userid", "type")
      .agg(count("userid").as("cnt"))
      .withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc)))
      .where("rn = 1")
      .select("userid", "type")

    val df22 = df.join(df1, "mid").groupBy("type", "mname")
      .agg(avg("score").as("avg"))
      .withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc)))
      .where("rn<4")
      .select("type", "mname")

    val df33 = df11.join(df22, "type")

    //spark3.1.2?? spark2.x

//    df33.write.csv()
    df33.write
      .format("csv")
      .save("data/csv")

//    df33.write.
//      csv("data/csv")
//    df33.write.json("data/json")

//    df33.write.parquet("data/parquet")
//    df33.write.orc("data/orc")
//    val pro = new Properties()
//    pro.put("user","root")
//    pro.put("password","hainiu")
//    df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)
  }
}

为了简化存储的计算方式

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object TestSink {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sink")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    import org.apache.spark.sql.functions._
    val df = sc.textFile("data/a.txt")
      .map(t=>{
        val strs = t.split(" ")
        (strs(0),strs(1),strs(2),strs(3))
      }).toDF("id","name","age","gender")
      .withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender"))
      .select("all")
//    df.write.csv("data/csv")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
//      .save("data/csv")
//    df.write.parquet("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
//      .save("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
//      .save("data/json")
    df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
      .save("data/text")
  }
}

读取数据代码:

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

import java.util.Properties

object TestReadData {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions", "20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
//    sqlSc.read.text("data/text").show()
//    sqlSc.read.csv("data/csv").show()
//  
//    sqlSc.read.parquet("data/parquet").show()
//    sqlSc.read.json("data/json").show()

    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()

    sqlSc.read.orc("data/orc").show()
    val pro = new Properties()
    pro.put("user","root")
    pro.put("password","hainiu")
    sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()
  }
}

20.8.4 读取hive数据

hive作为sparksql中的一个数据源,可以直接操作hive

准备工作

将hive-site.xml,core-site.xml,hdfs-site.xml放入到src/main/resources

直接在nn1机器远程发送配置文件到远程桌面的机器中

所有机器的root用户的密码是hainiu

scp /usr/local/hive/conf/hive-site.xml root@11.99.173.36:/headless/workspace/spark/src/main/resources
scp /usr/local/hadoop/etc/hadoop/hdfs-site.xml root@11.99.173.36:/headless/workspace/spark/src/main/resources
scp /usr/local/hadoop/etc/hadoop/core-site.xml root@11.99.173.36:/headless/workspace/spark/src/main/resources

file

读取代码;

首先修改hdfs的权限

hdfs dfs -chmod -R 777  /
#如果遇见内存不足我们换机器执行
package com.hainiu.spark

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object TestHive {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions", "20")
    val sc = new SparkContext(conf)
    //sparksql -->sqlSc
    //hive -->hiveSc(sqlSc)
    val hsc = new HiveContext(sc)
    hsc.sql(
      """
        |create table stu(id int,name string,age int)
        |row format delimited fields terminated by ','
        |""".stripMargin)

    hsc.sql(
      """
        |insert into stu(id,name,age)
        |values(1,'zhangsan',20)
        |""".stripMargin)

    hsc.sql(
      """
        |select * from stu
        |""".stripMargin).show()
  }
}

file

hdfs中的文件

file

20.8.5 sparksession

之前使用的操作对象有三个

  • sparkContext主要是为了rdd编程而产生的一个操作对象
  • sqlContext主要是为了sparksql的编程而产生的
  • hiveContext主要是操作hive的对象

归一化的对象

sparkSession对象融合了sc,sqlSc,hsc三种为一个整体

package com.hainiu.spark

import org.apache.spark.sql.SparkSession

object TestSession {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .enableHiveSupport() //hive
      .master("local[*]").appName("test").getOrCreate()
    //session --> sc.sqlsc.hivesc
    //sparkContext
    val sc = session.sparkContext
    session //sqlSc
    import session.implicits._
    val df = sc.textFile("file:///headless/workspace/spark/data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0), strs(1), strs(2), strs(3))
      }).toDF("id", "name", "age", "gender")
    df.show()
    df.createTempView("student")
    session.sql("select * from student").show()

    session.sql(
      """
        |select count(1) from stu
        |""".stripMargin)
      .show()
  }
}

20.8.6 dataset

dataset是dataFrame的升级版对象,dataframe是一个传统的sql编程对象,如果要想使用dataframe进行灵活开发的比较复杂的

file

dataset和dataFrame是一个类别的对象,都是可以进行sql查询数据的,并且可以支持rdd上面的方法

当我们需要对一个表对象进行二次处理的话建议大家转换为dataset而不是dataframe

package com.hainiu.spark

import org.apache.spark.sql.{Dataset, SparkSession}

object TestDSAndDF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    import session.implicits._
    val ds: Dataset[String] = session.read.textFile("file:///headless/workspace/spark/data/a.txt")
    ds.map(t=>{
      val strs = t.split(" ")
      (strs(0), strs(1), strs(2), strs(3))
    })

//    val df = session.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
//      .load("file:///headless/workspace/spark/data/a.txt")
//    
//    val ds: Dataset[(String, String, String, String)] = df.map(row => {
//      val line = row.getAs[String]("value")
//      val strs = line.split(" ")
//      (strs(0), strs(1), strs(2), strs(3))
//    })
  }
}

20.9 RDD、DataFrame、Dataset

20.9.1 概念

RDD:

弹性分布式数据集;

DataFrame:

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这样的数据集可以用SQL查询。

file

DataFrame 是 DataSet[Row]

DataSet:

Dataset是一个强类型的特定领域的对象,Dataset也被称为DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]。

file

20.9.2 三者之间的转换

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换;

    val ds: Dataset[String] = session.read.textFile("file:///headless/workspace/spark/data/a.txt")
    ds.map(t=>{
      val strs = t.split(" ")
      (strs(0), strs(1), strs(2), strs(3))
    })

    val df1 = ds.toDF("id","name","age","gender")

    val df: Dataset[Row] = session.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
      .load("file:///headless/workspace/spark/data/a.txt")

    val rdd = session.sparkContext.textFile("file:///headless/workspace/spark/data/a.txt")
    rdd.toDS()
    rdd.toDF()

    df.rdd
    ds.rdd

20.10 spark-sql的UDF

20.10.1 udf

数据库中的系统函数
oracle mysql impala hive sparksql flinksql
udf 一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值
udaf聚合函数,多行作为参数输出的结果是一个
udtf拆分函数,单行进来输出的结果是多行

首先自定义udf

准备数据

1 zhangsan 20000 10000
2 lisi 21000 20000
3 wangwu 22000 21000

定义udf统计每个人的年终总收入情况

程序:

package com.hainiu.spark

import org.apache.spark.sql.{SparkSession}

object TestUDF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/salary.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0), strs(1), strs(2).toInt, strs(3).toInt)
      }).toDF("id", "name", "salary", "bonus")

    session.udf.register("all_income",(sal:Int,bonus:Int)=>{
      sal*12 + bonus
    })

    import org.apache.spark.sql.functions
    df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
      .select("id","name","all")
      .show()

//
//    df.createTempView("salary")
//    session.sql(
//      """
//        |select id,name,all_income(salary,bonus) all from salary
//        |""".stripMargin)
//      .show()
  }
}

file

20.10.2 udaf 聚合函数

多进一出的函数

系统自带的聚合函数 count avg sum max min

以学生信息为主进行统计,所有人员的年龄的总和

或者每个性别的年龄的平均值

求和的整体代码:

package com.hainiu.spark

import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.{Aggregator, UserDefinedFunction}

object TestUDAF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0), strs(1), strs(2).toInt, strs(3))
      }).toDF("id", "name", "age", "gender")

    //avg(age)
    import org.apache.spark.sql.functions._
    //    val df1 = df.agg(avg("age"))
    //    val df2 = df.groupBy("gender").avg("age")
        //sum(age)
    import org.apache.spark.sql.functions._
//    df.agg(Mysum($"age")).show()
    val mysum = functions.udaf(MySum)
    df.agg(mysum($"age")).show

//    session.udf.register("mysum",functions.udaf(sum))
//
//    df.createTempView("student")
//    session.sql(
//      """
//        |select mysum(age) from student
//        |""".stripMargin)
//      .show()
  }
}
object MySum extends Aggregator[Int,Int,Int]{
  //初始化
  override def zero: Int = 0
  //聚合逻辑
  override def reduce(b: Int, a: Int): Int = a+b
  //整体聚合
  override def merge(b1: Int, b2: Int): Int = b1+b2
  //最终返回值
  override def finish(reduction: Int): Int = reduction
  //累加值的类型
  override def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  //输出结果的类型
  override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

求平均值的整体代码:

package com.hainiu.spark

import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.{Aggregator, UserDefinedFunction}

object TestUDAF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0), strs(1), strs(2).toInt, strs(3))
      }).toDF("id", "name", "age", "gender")

    //select avg(age)
    val myavg = functions.udaf(MyAvg)
    df.agg(myavg($"age")).show()
  }
}
case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{
  override def zero: AggragateVo = AggragateVo(0,0)

  override def reduce(b: AggragateVo, a: Int): AggragateVo = {
    b.cnt += 1
    b.sum += a
    b
  }

  override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {
    b1.cnt += b2.cnt
    b1.sum += b2.sum
    b1
  }

  override def finish(reduction: AggragateVo): Double = {
    reduction.sum.toDouble /reduction.cnt
  }

  override def bufferEncoder: Encoder[AggragateVo] = Encoders.product

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

20.10.3 udtf

拆分函数,进入的是一行内容出现的结果是多行内容

spark中不能定义拆分函数

但是可以使用hive中的udtf=>explode

首先准备文件

m.txt

1,wujiandao,liangchaowei|chenguanxi|liudehua
2,fenggou,chenguanxi|liudehua
3,dushen,zhourunfa|liudehua
4,shanghaitan,zhourunfa|liangchaowei
package com.hainiu.spark

import org.apache.spark.sql.SparkSession

object TestUDTF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()
    import session.implicits._
    val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt")
      .map(t => {
        val strs = t.split(",")
        (strs(0), strs(1), strs(2))
      }).toDF("id", "name", "actors")
    //explode map array
    df.createTempView("movies")
    session.sql(
      """
        |select id,name,actor  from movies lateral view explode(split(actors,'\\|')) t as actor
        |""".stripMargin)
      .createTempView("movies1")

    session.sql(
      """
        |select count(1),actor from movies1 group by actor
        |""".stripMargin)
      .show()

  }
}

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-DER,http://hainiubl.com/topics/76301
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter