sparksql 学习笔记

分享 123456789987654321 ⋅ 于 2021-10-06 14:13:10 ⋅ 1470 阅读

spark thriftserver

1.启动服务

#启动thriftserver服务  如果报端口被占用,说明有人已经启动过
/usr/local/spark/sbin/start-thriftserver.sh --master yarn --queue hainiu

#启动beelie, 连接 op.hadoop 的thriftserver 服务
/usr/local/spark/bin/beeline!connect jdbc:hive2://op.hadoop:20000

1.缓存表数据语法:

ache table 表名;
cache table 数据集别名 as 查询SQL

2.cache table 数据集

cache table 数据集别名 as 查询SQL

--先缓存SQL
cache table groupbydata as select count(*) as num from user_install_status_other group by country;
--利用数据集查询
select sum(num) from groupbydata;

3.清空缓存uncache

uncache table 表名或数据集别名

2. spark-webUI

--判断任务并行化是否合理
一个task对应2个核
--RDD到底会使用多大的存储空间
如果表数据是txt格式,可以根据表对应hdfs的大小来设定。
如果表数据是orc格式文件,那缓存的大小 = 对应hdfs的大小 * 3。

3.通过JDBC连接thriftserver

<dependency>
    <groupId>org.spark-project.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.2.1.spark2</version>
    <scope>${project.build.scope}</scope>
</dependency>

使用JDBC访问spark-sql server

/**
 *  @Description(描述):  使用JDBC访问spark-sql server
 *  @Version(版本): 1.0  
 *  @Author(创建者): ALIENWARE  
 *  @Date(创建时间):
 *  @ * * * * * * * * * * * * * @
 */

import java.sql.{Connection, DriverManager, ResultSet, Statement}
import org.apache.hive.jdbc.HiveDriver

object ThriftServerClient {
    def main(args: Array[String]): Unit = {
        // 加载hivedriver
        classOf[HiveDriver]
        var conn: Connection = null
        var stmt: Statement = null
        try {
            conn = DriverManager.getConnection("jdbc:hive2://op.hadoop:20000/panniu", "", "")
            stmt = conn.createStatement()
            // 为了防止group by 产生过多分区,设置的
            stmt.execute("set spark.sql.shuffle.partitions=2")
            val sql: String =
                """
                   |select sum(num) from (select count(*) as num
                   |from user_install_status_other group by country) a
               """.stripMargin
            // 执行查询
            val rs: ResultSet = stmt.executeQuery(sql)
            while (rs.next()) {
                val num: Long = rs.getLong(1)
                println(s"num:${num}")
            }
        } catch {
            case e: Exception => e.printStackTrace()
        } finally {
            stmt.close()
            conn.close()
        }
    }
}

4.sparksql读取json写入hdfs

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
    <scope>${project.build.scope}</scope>
</dependency>

spark-sql的json

1.设置两个分区(两种方法)
  //conf.set("spark.sql.shuffle.partitions", "2")
  //val coalesceRdd: RDD[String] = rdd2.coalesce(2)
2.DataFrame转为RDD后返回RDD[Row] 

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
 *  @Description(描述):    读json文件
 *  @Version(版本): 1.0  
 *  @Author(创建者): ALIENWARE  
 *  @Date(创建时间):
 *  @ * * * * * * * * * * * * * @
 */

object SparkSQLReadJson {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQLReadJson").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)
        // 创建SQLContext对象
        val sqlc: SQLContext = new SQLContext(sc)
        // 读取json文件  创建DataFrame对象
        val df: DataFrame = sqlc.read.json("/tmp/sparksql/input_json/data_json.txt")

        // select country, count(*) from xxx group by country;
        val groupByDF: DataFrame = df.groupBy(df("country")).count()
        groupByDF.printSchema()
        groupByDF.show()
        // 将统计后的结果保存到hdfs上
        // 将dataframe 转rdd, 直接转即可
        val rdd: RDD[Row] = groupByDF.rdd
        val rdd2: RDD[String] = rdd.map(row => {
            // rdd[Row] 提取数据用下面的方式
            val countryName: String = row.getString(0)
            val num: Long = row.getLong(1)
            s"${countryName}\t${num}"
        })
        //设置两个分区
        val coalesceRdd: RDD[String] = rdd2.coalesce(2)
        val outputPath:String = "/tmp/sparksql/output_text"
        //import com.hainiu.util.MyPredef.string2HdfsUtil
        //outputPath.deleteHdfs
        coalesceRdd.saveAsTextFile(outputPath)
    }
}

5.SQLContext、HiveContext、SparkSession区别

1.//读取hive配置使用hive功能,用HiveContext;否则用SQLContext;
2.//spark2.X 版本中 SQLContext 和 HiveContext 都被SparkSession替代;

6.spark-sql自定义schema

把这个数据集映射成4个字段的表结构,然后筛选这个表中带有CN的数据,并统计记录数

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
/**
  *  @Description(描述):  根据Rdd[Row] 构建 DataFrame
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间):
  *  @ * * * * * * * * * * * * * @
  */
object SparkSQLForSchema {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL02").setMaster("local[*]")
            conf.set("spark.sql.shuffle.partitions", "2")
        val sc: SparkContext = new SparkContext(conf)
        val sqlc: SQLContext = new SQLContext(sc)
        val rdd: RDD[String] = sc.textFile("/tmp/sparksql/input_text")

      // RDD[String] --> RDD[Row]
        val rowRdd: RDD[Row] = rdd.map(f => {
            val arr: Array[String] = f.split("\t")
            val country: String = arr(0)
            val gpcategory: String = arr(1)
            val pkgname: String = arr(2)
            val num: Long = arr(3).toLong
            Row(country, gpcategory, pkgname, num)
        })

        // 设置row里面每个字段的具体类型
        val fields: ArrayBuffer[StructField] = new ArrayBuffer[StructField]
        fields += new StructField("country", DataTypes.StringType, true)
        fields += new StructField("gpcategory", DataTypes.StringType, true)
        fields += new StructField("pkgname", DataTypes.StringType, true)
        fields += new StructField("num", DataTypes.LongType, true)
        val structType: StructType = StructType(fields)
        // 根据rowrdd 构建dataframe
        val df: DataFrame = sqlc.createDataFrame(rowRdd, structType)
        df.printSchema()
        df.show()
        // select country, count(*) from xxx group by country
        df.groupBy(df("country")).count().show()
    }
}

7.spark-sql用对象自定义schema

//定义一个数据bean,作用是可以根据这个数据bean通过反射的方式映射出表的结构和生成ds数据集
//根据Bean对象创建一个临时视图,通过筛选临时视图得到例子2的结果,并将结果转成rdd打印输出

1.RDD[String] --> RDD[DFBean]
  //val beanRdd: RDD[DFBean] = rdd.map   
2.通过datafram创建临时试图  
  //df.createOrReplaceTempView("dftable")

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
  *  @Description(描述):    spark-sql用对象自定义schema
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间): Created in 2021-09-01.
  *  @ * * * * * * * * * * * * * @
  */
case class DFBean(val country: String, val gpcategory: String, val pkgname: String, val num: Long) {
    // 通过反射拿到字段名country, 拼接getCountry方法来获取数据
    def getCountry = this.country
    def getGpcategory = this.gpcategory
    def getPkgname = this.pkgname
    def getNum = this.num
}

// 根据Rdd[Bean] 构建 DataFrame
object SparkSQL03 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL03").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "2")
        val sc: SparkContext = new SparkContext(conf)
        val sqlc: SQLContext = new SQLContext(sc)
        val rdd: RDD[String] = sc.textFile("/tmp/sparksql/input_text")
        // RDD[String] --> RDD[DFBean]
        val beanRdd: RDD[DFBean] = rdd.map(f => {
            val arr: Array[String] = f.split("\t")
            val country: String = arr(0)
            val gpcategory: String = arr(1)
            val pkgname: String = arr(2)
            val num: Long = arr(3).toLong
          DFBean(country, gpcategory, pkgname, num)
        })
        val df: DataFrame = sqlc.createDataFrame(beanRdd, classOf[DFBean])
        df.printSchema()
        df.show()

        // select country, count(*) from xxx group by country
        // 通过这个方法给DataFrame数据集创建临时视图,并设置视图名称
        df.createOrReplaceTempView("dftable")
        // 用视图直接写sql
        val groupByDF: DataFrame = sqlc.sql("select country, count(*) as num1 from dftable group by country")
        groupByDF.printSchema()
        groupByDF.show()
    }
}

8.spark-sql读取hive的ORC文件

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-hive_2.11</artifactId>
     <version>2.1.1</version>
     <scope>${project.build.scope}</scope>
</dependency>

读取orc文件,用api获取查询结果,并写入orc和json文件。

1.//sparksql cache 的默认缓存级别:storageLevel: StorageLevel = MEMORY_AND_DISK

import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Description(描述):   以读orc的方式读取orc文件构建DF
  * @Description(描述):   用DF的api方式查询筛选结果写入orc文件和json文件
  * @Version(版本): 1.0
  * @Author(创建者): ALIENWARE
  * @Date(创建时间):
  * @ * * * * * * * * * * * * * @
*/

object SparkSQLReadORC {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL04").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "1")
        val sc: SparkContext = new SparkContext(conf)
        // 创建SQLContext对象
        val sqlc: SQLContext = new SQLContext(sc)
        // 读取orc文件生成DataFrame
        val df: DataFrame = sqlc.read.orc("/tmp/sparksql/input_orc")

        // select country, num from
        // (select country, count(*) as num from xxx group by country) t
        //  where t.num > 5
        // 通过Dataframe api 拼接SQL
        val groupByDF: DataFrame = df.groupBy(df("country")).count()
        val filterDF: Dataset[Row] = groupByDF.filter(groupByDF("count") > 5)

        // sparksql cache 的默认缓存级别:storageLevel: StorageLevel = MEMORY_AND_DISK
        val cacheDS: Dataset[Row] = filterDF.cache()
        // 把结果写入orc和json文件
        // 以 覆盖写入方式, 写入到orc文件, 文件输出到 /tmp/sparksql/output_orc
        cacheDS.write.mode(SaveMode.Overwrite).format("orc").save("/tmp/sparksql/output_orc")
        cacheDS.write.mode(SaveMode.Overwrite).format("json").save("/tmp/sparksql/output_json")
    }
}

9. 使用HIVESQL读取orc文件,用创建视图的方式查询结果

// 如果想把dataframe的数据写入text文件,那这个数据集必须只有一个字段,否则写入失败

import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
  *  @Description(描述):  以读orc的方式读取orc文件构建DF
  *  @Description(描述):  通过DF构建临时视图,用SQL查询的方式筛选结果写入text文件
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间): 
  *  @ * * * * * * * * * * * * * @
*/
object SparkSQL05 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL05").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "1")
        val sc: SparkContext = new SparkContext(conf)
        val hqlc: HiveContext = new HiveContext(sc)
        // 读取orc文件生成DataFrame
        val df: DataFrame = hqlc.read.orc("/tmp/sparksql/input_orc")
        // 创建临时视图
        df.createOrReplaceTempView("dftable")

        // select country, num from
        // (select country, count(*) as num from xxx group by country) t
        //  where t.num > 5
        val sql: String =
            """
               |select concat(country, '\t',num) as country_num from
               |(select country, count(*) as num from dftable group by country) t where t.num>5
            """.stripMargin
        val df2: DataFrame = hqlc.sql(sql)
        df2.write.mode(SaveMode.Overwrite).format("text").save("/tmp/sparksql/output_text")
    }
}

10.使用HIVESQL创建数据库,创建表,查询表,导出text文件

//需要配置hive-site.xml文件

import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
  *  @Description(描述):  以读orc的方式读取orc文件构建DF 
  *  @Description(描述):  通过DF构建hive库、hive表 通过hive查询的方式筛选结果
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间): Created in 2021-09-01.
  *  @ * * * * * * * * * * * * * @
*/
object SparkSQL06 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL06").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "1")
        val sc: SparkContext = new SparkContext(conf)
        // 创建HiveContext对象
        val hqlc: HiveContext = new HiveContext(sc)
        // 建数据库
        hqlc.sql("create database if not exists c30pan")
        // 进入数据库
        hqlc.sql("use c30pan")
        // 建表
        hqlc.sql(
            """
              |CREATE TABLE if not exists `spark_user_orc`(
              |  `aid` string COMMENT 'from deserializer',
              |  `pkgname` string COMMENT 'from deserializer',
              |  `uptime` bigint COMMENT 'from deserializer',
              |  `type` int COMMENT 'from deserializer',
              |  `country` string COMMENT 'from deserializer')
              |ROW FORMAT SERDE
              |  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
              |STORED AS INPUTFORMAT
              |  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
              |OUTPUTFORMAT
              |  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
           """.stripMargin)
        // 导入数据
        hqlc.sql("load data local inpath '/tmp/sparksql/input_orc' overwrite into table spark_user_orc")
        // 通过SQL视图的方式查询结果
        val df: DataFrame = hqlc.sql("select * from spark_user_orc")
        df.printSchema()
        df.show()
    }
}

11.sparkSession使用

import java.util.Properties
import com.mysql.jdbc.Driver
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * @Description(描述):  使用spark-sql的JDBC访问MYSQL  用SparkSession
  * @Version(版本): 1.0
  * @Author(创建者): ALIENWARE
  * @Date(创建时间): Created in 2021-09-01.
  *              @ * * * * * * * * * * * * * @
*/
object SparkSQL08 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL08").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "2")
        // 创建sparksession对象
        val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //    sparkSession.sparkContext.xxx
        // 设置数据库用户名和密码
        val prop: Properties = new Properties()
        prop.setProperty("user", "root")
        prop.setProperty("password", "111111")
        // jdbc连接MySQL生成DataFrame
        val df1: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/hainiu_test", "student", prop)
        val df2: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/hainiu_test", "student_course", prop)
        //两种写法
        /**
            val df2: DataFrame = sparkSession.read.format("jdbc")
            .option("driver", classOf[Driver].getName)
            .option("url", "jdbc:mysql://localhost:3306/hainiu_test")
            .option("dbtable", "student_course")
            .option("user", "root")
            .option("password", "111111").load()
        */
        df1.createOrReplaceTempView("s")
        df2.createOrReplaceTempView("sc")
        // 将 student表和 student_course表 inner join
        val joinDF: DataFrame = sparkSession.sql("select * from s, sc where s.S_ID=sc.SC_S_ID")
        joinDF.printSchema()
        joinDF.show()
    }
}

12.用SparkSession 实现 rdd\dataframe\dataset间转换

//转换需要引入隐士转换
import sparkSession.implicits._

转换

// DataFrame/Dataset 转 RDD:
DataFrame/Dataset.RDD
//RDD 转 DataFrame:
import sparkSession.implicits._
val testDF = rdd map {line=>
                        (line._1,line._2)
                     }.toDF("col1","col2")
// RDD转Dataset
mport spark.implicits._
val testDS = rdd map {line=>
                        (line._1,line._2)
                     }.toDS()    
sparksession
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//sparksession转为RDD
import sparkSession.implicits._
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(List("a 1", "b 2", "a 3"), 2)

代码

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
  *  @Description(描述):   用SparkSession 实现 rdd\dataframe\dataset间转换
  *  @Version(版本): 1.0
  *  @Author(创建者): ALIENWARE
  *  @Date(创建时间): Created in 2021-09-01.
  *  @ * * * * * * * * * * * * * @
*/
object SparkSQL09 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL09").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "2")
        val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //sparksession转为RDD
        import sparkSession.implicits._
        val rdd: RDD[String] = sparkSession.sparkContext.parallelize(List("a 1", "b 2"), 2)
        val rdd2: RDD[(String, Int)] = rdd.map(f => {
            val arr: Array[String] = f.split(" ")
            (arr(0), arr(1).toInt)
        })

        // rdd转为DataFrame  通过 toDF 函数,得到 DateFrame
        val df: DataFrame = rdd2.toDF("word", "num")

        // RDD转Dataset 通过 toDS 函数,得到 DateSet
        val ds: Dataset[(String, Int)] = rdd2.toDS()
        ds.printSchema()
        val ds2: Dataset[ColClass] = ds.map(f => {
            ColClass(f._1, f._2)
        })
        val ds3: Dataset[String] = ds2.map {
            case ColClass(word, num) => {
                s"${word}\t${num}"
            }
        }
        // df --> rdd
        // ds --> rdd
        val dfRdd: RDD[Row] = df.rdd
        val ds3Rdd: RDD[String] = ds3.rdd
        val arr: Array[String] = ds3Rdd.collect()
        println(arr.toBuffer)
        val arr2: Array[Row] = dfRdd.collect()
        println(arr2.toBuffer)
    }
}

case class ColClass(val word: String, val num: Int)

13.spark-sql的UDF

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * @Description(描述):  使用spark-sql的JDBC访问MYSQL 用SparkSession  自定义udf函数,并使用
  * @Version(版本): 1.0
  * @Author(创建者): ALIENWARE
  * @Date(创建时间): Created in 2021-09-01.
  *              @ * * * * * * * * * * * * * @
*/
object SparkSQL10 {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkSQL10").setMaster("local[*]")
        conf.set("spark.sql.shuffle.partitions", "2")
        // 创建sparksession对象
        val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //注册函数 输入字符串 ==> 计算字符串长度
        sparkSession.udf.register("len", (name: String) => name.length)
        // 设置数据库用户名和密码
        val prop: Properties = new Properties()
        prop.setProperty("user", "root")
        prop.setProperty("password", "111111")
        // jdbc连接MySQL生成DataFrame
        val df1: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/hainiu_test", "student", prop)
        df1.createOrReplaceTempView("s")
        // 使用函数
        val df2: DataFrame = sparkSession.sql("select s_name, len(s_name) as namelen from s")
        df2.printSchema()
        df2.show()
    }
}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75789
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter