spark thriftserver
1.启动服务
#启动thriftserver服务 如果报端口被占用,说明有人已经启动过
/usr/local/spark/sbin/start-thriftserver.sh --master yarn --queue hainiu
#启动beelie, 连接 op.hadoop 的thriftserver 服务
/usr/local/spark/bin/beeline!connect jdbc:hive2://op.hadoop:20000
1.缓存表数据语法:
ache table 表名;
cache table 数据集别名 as 查询SQL
2.cache table 数据集
cache table 数据集别名 as 查询SQL
--先缓存SQL
cache table groupbydata as select count(*) as num from user_install_status_other group by country;
--利用数据集查询
select sum(num) from groupbydata;
3.清空缓存uncache
uncache table 表名或数据集别名
2. spark-webUI
--判断任务并行化是否合理
一个task对应2个核
--RDD到底会使用多大的存储空间
如果表数据是txt格式,可以根据表对应hdfs的大小来设定。
如果表数据是orc格式文件,那缓存的大小 = 对应hdfs的大小 * 3。
3.通过JDBC连接thriftserver
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
<scope>${project.build.scope}</scope>
</dependency>
使用JDBC访问spark-sql server
/**
* @Description(描述): 使用JDBC访问spark-sql server
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间):
* @ * * * * * * * * * * * * * @
*/
import java.sql.{Connection, DriverManager, ResultSet, Statement}
import org.apache.hive.jdbc.HiveDriver
object ThriftServerClient {
def main(args: Array[String]): Unit = {
// 加载hivedriver
classOf[HiveDriver]
var conn: Connection = null
var stmt: Statement = null
try {
conn = DriverManager.getConnection("jdbc:hive2://op.hadoop:20000/panniu", "", "")
stmt = conn.createStatement()
// 为了防止group by 产生过多分区,设置的
stmt.execute("set spark.sql.shuffle.partitions=2")
val sql: String =
"""
|select sum(num) from (select count(*) as num
|from user_install_status_other group by country) a
""".stripMargin
// 执行查询
val rs: ResultSet = stmt.executeQuery(sql)
while (rs.next()) {
val num: Long = rs.getLong(1)
println(s"num:${num}")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
stmt.close()
conn.close()
}
}
}
4.sparksql读取json写入hdfs
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<scope>${project.build.scope}</scope>
</dependency>
spark-sql的json
1.设置两个分区(两种方法)
//conf.set("spark.sql.shuffle.partitions", "2")
//val coalesceRdd: RDD[String] = rdd2.coalesce(2)
2.DataFrame转为RDD后返回RDD[Row]
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description(描述): 读json文件
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间):
* @ * * * * * * * * * * * * * @
*/
object SparkSQLReadJson {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQLReadJson").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建SQLContext对象
val sqlc: SQLContext = new SQLContext(sc)
// 读取json文件 创建DataFrame对象
val df: DataFrame = sqlc.read.json("/tmp/sparksql/input_json/data_json.txt")
// select country, count(*) from xxx group by country;
val groupByDF: DataFrame = df.groupBy(df("country")).count()
groupByDF.printSchema()
groupByDF.show()
// 将统计后的结果保存到hdfs上
// 将dataframe 转rdd, 直接转即可
val rdd: RDD[Row] = groupByDF.rdd
val rdd2: RDD[String] = rdd.map(row => {
// rdd[Row] 提取数据用下面的方式
val countryName: String = row.getString(0)
val num: Long = row.getLong(1)
s"${countryName}\t${num}"
})
//设置两个分区
val coalesceRdd: RDD[String] = rdd2.coalesce(2)
val outputPath:String = "/tmp/sparksql/output_text"
//import com.hainiu.util.MyPredef.string2HdfsUtil
//outputPath.deleteHdfs
coalesceRdd.saveAsTextFile(outputPath)
}
}
5.SQLContext、HiveContext、SparkSession区别
1.//读取hive配置使用hive功能,用HiveContext;否则用SQLContext;
2.//spark2.X 版本中 SQLContext 和 HiveContext 都被SparkSession替代;
6.spark-sql自定义schema
把这个数据集映射成4个字段的表结构,然后筛选这个表中带有CN的数据,并统计记录数
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* @Description(描述): 根据Rdd[Row] 构建 DataFrame
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间):
* @ * * * * * * * * * * * * * @
*/
object SparkSQLForSchema {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL02").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "2")
val sc: SparkContext = new SparkContext(conf)
val sqlc: SQLContext = new SQLContext(sc)
val rdd: RDD[String] = sc.textFile("/tmp/sparksql/input_text")
// RDD[String] --> RDD[Row]
val rowRdd: RDD[Row] = rdd.map(f => {
val arr: Array[String] = f.split("\t")
val country: String = arr(0)
val gpcategory: String = arr(1)
val pkgname: String = arr(2)
val num: Long = arr(3).toLong
Row(country, gpcategory, pkgname, num)
})
// 设置row里面每个字段的具体类型
val fields: ArrayBuffer[StructField] = new ArrayBuffer[StructField]
fields += new StructField("country", DataTypes.StringType, true)
fields += new StructField("gpcategory", DataTypes.StringType, true)
fields += new StructField("pkgname", DataTypes.StringType, true)
fields += new StructField("num", DataTypes.LongType, true)
val structType: StructType = StructType(fields)
// 根据rowrdd 构建dataframe
val df: DataFrame = sqlc.createDataFrame(rowRdd, structType)
df.printSchema()
df.show()
// select country, count(*) from xxx group by country
df.groupBy(df("country")).count().show()
}
}
7.spark-sql用对象自定义schema
//定义一个数据bean,作用是可以根据这个数据bean通过反射的方式映射出表的结构和生成ds数据集
//根据Bean对象创建一个临时视图,通过筛选临时视图得到例子2的结果,并将结果转成rdd打印输出
1.RDD[String] --> RDD[DFBean]
//val beanRdd: RDD[DFBean] = rdd.map
2.通过datafram创建临时试图
//df.createOrReplaceTempView("dftable")
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description(描述): spark-sql用对象自定义schema
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021-09-01.
* @ * * * * * * * * * * * * * @
*/
case class DFBean(val country: String, val gpcategory: String, val pkgname: String, val num: Long) {
// 通过反射拿到字段名country, 拼接getCountry方法来获取数据
def getCountry = this.country
def getGpcategory = this.gpcategory
def getPkgname = this.pkgname
def getNum = this.num
}
// 根据Rdd[Bean] 构建 DataFrame
object SparkSQL03 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL03").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "2")
val sc: SparkContext = new SparkContext(conf)
val sqlc: SQLContext = new SQLContext(sc)
val rdd: RDD[String] = sc.textFile("/tmp/sparksql/input_text")
// RDD[String] --> RDD[DFBean]
val beanRdd: RDD[DFBean] = rdd.map(f => {
val arr: Array[String] = f.split("\t")
val country: String = arr(0)
val gpcategory: String = arr(1)
val pkgname: String = arr(2)
val num: Long = arr(3).toLong
DFBean(country, gpcategory, pkgname, num)
})
val df: DataFrame = sqlc.createDataFrame(beanRdd, classOf[DFBean])
df.printSchema()
df.show()
// select country, count(*) from xxx group by country
// 通过这个方法给DataFrame数据集创建临时视图,并设置视图名称
df.createOrReplaceTempView("dftable")
// 用视图直接写sql
val groupByDF: DataFrame = sqlc.sql("select country, count(*) as num1 from dftable group by country")
groupByDF.printSchema()
groupByDF.show()
}
}
8.spark-sql读取hive的ORC文件
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
<scope>${project.build.scope}</scope>
</dependency>
读取orc文件,用api获取查询结果,并写入orc和json文件。
1.//sparksql cache 的默认缓存级别:storageLevel: StorageLevel = MEMORY_AND_DISK
import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description(描述): 以读orc的方式读取orc文件构建DF
* @Description(描述): 用DF的api方式查询筛选结果写入orc文件和json文件
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间):
* @ * * * * * * * * * * * * * @
*/
object SparkSQLReadORC {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL04").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "1")
val sc: SparkContext = new SparkContext(conf)
// 创建SQLContext对象
val sqlc: SQLContext = new SQLContext(sc)
// 读取orc文件生成DataFrame
val df: DataFrame = sqlc.read.orc("/tmp/sparksql/input_orc")
// select country, num from
// (select country, count(*) as num from xxx group by country) t
// where t.num > 5
// 通过Dataframe api 拼接SQL
val groupByDF: DataFrame = df.groupBy(df("country")).count()
val filterDF: Dataset[Row] = groupByDF.filter(groupByDF("count") > 5)
// sparksql cache 的默认缓存级别:storageLevel: StorageLevel = MEMORY_AND_DISK
val cacheDS: Dataset[Row] = filterDF.cache()
// 把结果写入orc和json文件
// 以 覆盖写入方式, 写入到orc文件, 文件输出到 /tmp/sparksql/output_orc
cacheDS.write.mode(SaveMode.Overwrite).format("orc").save("/tmp/sparksql/output_orc")
cacheDS.write.mode(SaveMode.Overwrite).format("json").save("/tmp/sparksql/output_json")
}
}
9. 使用HIVESQL读取orc文件,用创建视图的方式查询结果
// 如果想把dataframe的数据写入text文件,那这个数据集必须只有一个字段,否则写入失败
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description(描述): 以读orc的方式读取orc文件构建DF
* @Description(描述): 通过DF构建临时视图,用SQL查询的方式筛选结果写入text文件
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间):
* @ * * * * * * * * * * * * * @
*/
object SparkSQL05 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL05").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "1")
val sc: SparkContext = new SparkContext(conf)
val hqlc: HiveContext = new HiveContext(sc)
// 读取orc文件生成DataFrame
val df: DataFrame = hqlc.read.orc("/tmp/sparksql/input_orc")
// 创建临时视图
df.createOrReplaceTempView("dftable")
// select country, num from
// (select country, count(*) as num from xxx group by country) t
// where t.num > 5
val sql: String =
"""
|select concat(country, '\t',num) as country_num from
|(select country, count(*) as num from dftable group by country) t where t.num>5
""".stripMargin
val df2: DataFrame = hqlc.sql(sql)
df2.write.mode(SaveMode.Overwrite).format("text").save("/tmp/sparksql/output_text")
}
}
10.使用HIVESQL创建数据库,创建表,查询表,导出text文件
//需要配置hive-site.xml文件
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description(描述): 以读orc的方式读取orc文件构建DF
* @Description(描述): 通过DF构建hive库、hive表 通过hive查询的方式筛选结果
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021-09-01.
* @ * * * * * * * * * * * * * @
*/
object SparkSQL06 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL06").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "1")
val sc: SparkContext = new SparkContext(conf)
// 创建HiveContext对象
val hqlc: HiveContext = new HiveContext(sc)
// 建数据库
hqlc.sql("create database if not exists c30pan")
// 进入数据库
hqlc.sql("use c30pan")
// 建表
hqlc.sql(
"""
|CREATE TABLE if not exists `spark_user_orc`(
| `aid` string COMMENT 'from deserializer',
| `pkgname` string COMMENT 'from deserializer',
| `uptime` bigint COMMENT 'from deserializer',
| `type` int COMMENT 'from deserializer',
| `country` string COMMENT 'from deserializer')
|ROW FORMAT SERDE
| 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
|STORED AS INPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
|OUTPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
""".stripMargin)
// 导入数据
hqlc.sql("load data local inpath '/tmp/sparksql/input_orc' overwrite into table spark_user_orc")
// 通过SQL视图的方式查询结果
val df: DataFrame = hqlc.sql("select * from spark_user_orc")
df.printSchema()
df.show()
}
}
11.sparkSession使用
import java.util.Properties
import com.mysql.jdbc.Driver
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @Description(描述): 使用spark-sql的JDBC访问MYSQL 用SparkSession
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021-09-01.
* @ * * * * * * * * * * * * * @
*/
object SparkSQL08 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL08").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "2")
// 创建sparksession对象
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// sparkSession.sparkContext.xxx
// 设置数据库用户名和密码
val prop: Properties = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "111111")
// jdbc连接MySQL生成DataFrame
val df1: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/hainiu_test", "student", prop)
val df2: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/hainiu_test", "student_course", prop)
//两种写法
/**
val df2: DataFrame = sparkSession.read.format("jdbc")
.option("driver", classOf[Driver].getName)
.option("url", "jdbc:mysql://localhost:3306/hainiu_test")
.option("dbtable", "student_course")
.option("user", "root")
.option("password", "111111").load()
*/
df1.createOrReplaceTempView("s")
df2.createOrReplaceTempView("sc")
// 将 student表和 student_course表 inner join
val joinDF: DataFrame = sparkSession.sql("select * from s, sc where s.S_ID=sc.SC_S_ID")
joinDF.printSchema()
joinDF.show()
}
}
12.用SparkSession 实现 rdd\dataframe\dataset间转换
//转换需要引入隐士转换
import sparkSession.implicits._
转换
// DataFrame/Dataset 转 RDD:
DataFrame/Dataset.RDD
//RDD 转 DataFrame:
import sparkSession.implicits._
val testDF = rdd map {line=>
(line._1,line._2)
}.toDF("col1","col2")
// RDD转Dataset
mport spark.implicits._
val testDS = rdd map {line=>
(line._1,line._2)
}.toDS()
sparksession
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//sparksession转为RDD
import sparkSession.implicits._
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(List("a 1", "b 2", "a 3"), 2)
代码
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* @Description(描述): 用SparkSession 实现 rdd\dataframe\dataset间转换
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021-09-01.
* @ * * * * * * * * * * * * * @
*/
object SparkSQL09 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL09").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "2")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//sparksession转为RDD
import sparkSession.implicits._
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(List("a 1", "b 2"), 2)
val rdd2: RDD[(String, Int)] = rdd.map(f => {
val arr: Array[String] = f.split(" ")
(arr(0), arr(1).toInt)
})
// rdd转为DataFrame 通过 toDF 函数,得到 DateFrame
val df: DataFrame = rdd2.toDF("word", "num")
// RDD转Dataset 通过 toDS 函数,得到 DateSet
val ds: Dataset[(String, Int)] = rdd2.toDS()
ds.printSchema()
val ds2: Dataset[ColClass] = ds.map(f => {
ColClass(f._1, f._2)
})
val ds3: Dataset[String] = ds2.map {
case ColClass(word, num) => {
s"${word}\t${num}"
}
}
// df --> rdd
// ds --> rdd
val dfRdd: RDD[Row] = df.rdd
val ds3Rdd: RDD[String] = ds3.rdd
val arr: Array[String] = ds3Rdd.collect()
println(arr.toBuffer)
val arr2: Array[Row] = dfRdd.collect()
println(arr2.toBuffer)
}
}
case class ColClass(val word: String, val num: Int)
13.spark-sql的UDF
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @Description(描述): 使用spark-sql的JDBC访问MYSQL 用SparkSession 自定义udf函数,并使用
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021-09-01.
* @ * * * * * * * * * * * * * @
*/
object SparkSQL10 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkSQL10").setMaster("local[*]")
conf.set("spark.sql.shuffle.partitions", "2")
// 创建sparksession对象
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//注册函数 输入字符串 ==> 计算字符串长度
sparkSession.udf.register("len", (name: String) => name.length)
// 设置数据库用户名和密码
val prop: Properties = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "111111")
// jdbc连接MySQL生成DataFrame
val df1: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/hainiu_test", "student", prop)
df1.createOrReplaceTempView("s")
// 使用函数
val df2: DataFrame = sparkSession.sql("select s_name, len(s_name) as namelen from s")
df2.printSchema()
df2.show()
}
}