idea运行 运行 hive模型
//查看hive目录
which hive
//通过sh -x /usr/local/hive/bin/hive查看执行了那个类
sh -x /hive目录
1.hiveRunner启动类
1.ctrl+N 搜索jar包中的启动类 org.apache.hadoop.hive.cli.CliDrive
public static void main(String[] args) throws Exception {
System.setProperty("jline.WindowsTerminal.directConsole", "false");
int ret = (new CliDriver()).run(args);
System.exit(ret);
}
2.建立mysql用户
-- 创建hive的数据库
create database hive_meta default charset utf8 collate utf8_general_ci;
-- 创建用户
CREATE USER 'hive'@'%' IDENTIFIED BY '000000';
CREATE USER 'hive'@'localhost' IDENTIFIED BY '000000';
-- 用户赋权并制定访问权限
grant all privileges on hive_meta.* to 'hive'@'%' identified by '000000';
grant all privileges on hive_meta.* to 'hive'@'localhost' identified by '000000';
-- 刷新权限
flush privileges;
运行本地sql文件,创建原始hive表
3.加入mysql驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
4.获取hive的两个配置文件
修改hive-site.xml
<configuration>
<!-- 数据库 start -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://127.0.0.1:3306/hive_meta</value>
<description>mysql连接</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>mysql驱动</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>数据库使用用户名</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>000000</value>
<description>数据库密码</description>
</property>
<!-- 数据库 end -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/tmp/hive/warehouse</value>
<description>hive使用的HDFS目录</description>
</property>
<!--不校验hive的版本和metastore版本是否匹配 -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
</configuration>
5.采坑hive找不到文件
1.hive文件权限问题
https://blog.csdn.net/iamboluke/article/details/103878312
C:\Users\Ms.ma>echo %HADOOP_HOME%
D:\develop\hadoop
C:\Users\Ms.ma>%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
C:\Users\Ms.ma>%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators ALIENWARE\None 0 Mar 24 2021 E:\tmp\hive
2.配置hadoop环境变量
3.idea的hive实在本地新建一个数据库,不是在线上服务器的数据库
一、UDF函数
1.编写udf
package com.bigdata.hive.function;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* @Description(描述): UDF函数
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021/7/2.
* @ * * * * * * * * * * * * * @
*/
public class CountryCode2CountryNameUDF extends GenericUDF {
private static Map<String,String> countryMap = new HashMap<String,String>();
static{
// 将 字典文件数据写入内存
try(
InputStream is = CountryCode2CountryNameUDF.class.getResourceAsStream("/country_dict.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8"));
){
String line = null;
while((line = reader.readLine()) != null){
String[] arr = line.split("\t");
String code = arr[0];
String name = arr[1];
countryMap.put(code, name);
}
System.out.println("countryMap.size: " + countryMap.size());
}catch(Exception e){
e.printStackTrace();
}
}
//初始化参数校验
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
System.out.println("initialize");
// 校验函数输入参数和 设置函数返回值类型
// 1) 校验入参个数
if(arguments.length != 1){
throw new UDFArgumentException("input params must one");
}
ObjectInspector inspector = arguments[0];
// 2) 校验参数的大类
// 大类有: PRIMITIVE(基本类型), LIST, MAP, STRUCT, UNION
if(! inspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentException("input params Category must PRIMITIVE");
}
// 3) 校验参数的小类
// VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
// DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
// UNKNOWN
if(! inspector.getTypeName().equalsIgnoreCase(PrimitiveObjectInspector.PrimitiveCategory.STRING.name())){
throw new UDFArgumentException("input params PRIMITIVE Category type must STRING");
}
// 4) 设置函数返回值类型
// writableStringObjectInspector 里面有 Text, Text 里面有String类型
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
/**
* 定义函数输出对象
*/
Text output = new Text();
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
System.out.println("evaluate");
// 核心算法, 一行调用一次
// 获取入参数据
Object obj = arguments[0].get();
String code = null;
if(obj instanceof LazyString){
LazyString lz = (LazyString)obj;
Text t = lz.getWritableObject();
code = t.toString();
}else if(obj instanceof Text){
Text t = (Text)obj;
code = t.toString();
}else{
code = (String)obj;
}
// 翻译国家码
String countryName = countryMap.get(code);
countryName = countryName == null ? "某个小国" : countryName;
output.set(countryName);
return output;
}
//说明
@Override
public String getDisplayString(String[] strings) {
return Arrays.toString(strings);
}
}
------------------------------------------------------------------------------------
//可以实现 configure 获取 job对象
@Override
public void configure(MapredContext context) {
JobConf jobConf = context.getJobConf();
super.configure(context);
}
2.hive加载并使用自定义函数
-- 创建自定义函数
CREATE TEMPORARY FUNCTION f1 AS 'com\bigdata\hive\function\CountryCode2CountryNameUDF.java';
--使用自定义函数
select country,f1(country) as name from user_install_status limit 10;
二、Hive 编写 UDAF 函数实现
用户自定义聚合函数实现
适用于聚合函数 (列转行函数)
1)自定义UDAF类,需要继承AbstractGenericUDAFResolver;
2)自定义Evaluator类,需要继承GenericUDAFEvaluator,真正实现UDAF的逻辑;
3)自定义bean类,需要继承 AbstractAggregationBuffer,用于在mapper或reducer内部传递数据;
Mode
public static enum Mode {
/**
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
* 将会调用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
COMPLETE
};
代码实现
package com.bigdata.hive.function;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
public class SumUDAF extends AbstractGenericUDAFResolver{
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
// 校验函数输入参数 和 返回 具体能实现udaf函数的Evaluator对象
// 校验函数入参的数量
if(info.length != 1){
throw new UDFArgumentException("input param must one!");
}
// 校验函数入参的大类类型
ObjectInspector inspector = (ObjectInspector) info[0];
// 获取大类类型的方法: inspector.getCategory()
// 获取小类类型的方法:inspector.getTypeName()
if(! inspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentException("input param Category must PRIMITIVE!");
}
// 校验函数入参的小类类型
// PRIMITIVE大类类型有 VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
// DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,UNKNOWN 小类类型
if(! inspector.getTypeName().equalsIgnoreCase(PrimitiveObjectInspector.PrimitiveCategory.INT.name())){
throw new UDFArgumentException("input param PRIMITIVE Category must INT type");
}
return new SumEvaluator();
}
/**
* 实现sum函数的核心类
*/
public static class SumEvaluator extends GenericUDAFEvaluator{
/**
* 用来存mapper内部(局部) 或 reducer内部聚合的结果(全局)
*/
public static class SumAgg extends AbstractAggregationBuffer{
private int sum = 0;
public int getSum() {
return sum;
}
public void setSum(int sum) {
this.sum = sum;
}
}
/**
* 用于mapper端和reducer端的输出
*/
IntWritable output = new IntWritable();
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
// 设置各个阶段的返回值类型
// 因为 mapper阶段和reducer阶段都是输出int类型,那就不用分阶段了,这个比较特殊
super.init(m, parameters);
// writableIntObjectInspector(IntWritable)
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
// 获取新的bean对象
return new SumAgg();
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
// bean对象可能会重复应用,应用前清0
SumAgg sumAgg = (SumAgg)agg;
sumAgg.setSum(0);
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// 把mapper中每行的数据汇总到一起,放到一个bean里面,这个bean在mapper和reducer的内部。
// 通俗理解: 将 每行的 parameters里面的金额,汇总到 bean对象里
Object object = parameters[0];
int num = 0;
if(object instanceof LazyInteger){
// LazyString ---> Text --> String
LazyInteger lz = (LazyInteger)object;
IntWritable t = lz.getWritableObject();
num = t.get();
}else if(object instanceof IntWritable){
IntWritable t = (IntWritable)object;
num = t.get();
}else{
num = (Integer)object;
}
SumAgg sumAgg = (SumAgg)agg;
// 之前的 + 当前的 = 汇总后的
sumAgg.setSum(sumAgg.getSum() + num);
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
// 把带有中间结果的bean转换成能实现序列化在mapper 和 reducer 端传输的对象。
// 通俗理解: bean对象里的 汇总结果 序列化到 IntWritable, 用于输出到reducer
SumAgg sumAgg = (SumAgg)agg;
output.set(sumAgg.getSum());
return output;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
// reduce端把mapper端输出的数据进行全局合并,把合并的结果放到bean里
// 通俗理解: mapper 输出的局部数据 partial, 汇总到 bean对象里
int num = 0;
if(partial instanceof LazyInteger){
// LazyString ---> Text --> String
LazyInteger lz = (LazyInteger)partial;
IntWritable t = lz.getWritableObject();
num = t.get();
}else if(partial instanceof IntWritable){
IntWritable t = (IntWritable)partial;
num = t.get();
}else{
num = (Integer)partial;
}
SumAgg sumAgg = (SumAgg)agg;
// 之前的 + 当前的 = 汇总后的
sumAgg.setSum(sumAgg.getSum() + num);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
// 把bean里的全局聚合结果,转换成能实现序列化的输出对象
SumAgg sumAgg = (SumAgg)agg;
output.set(sumAgg.getSum());
return output;
}
}
}
三、Hive 编写 UDTF 函数实现
用户自定义表函数实现
将 整行数据拆分成多行多列的数据 --> 一般成这个函数叫做 行转列函数
UDTF(User-Defined Table-Generating Functions) :接受零个或者多个输入,然后产生多列或者多行输出。
--原数据
id name_nickname
1 name1#n1;name2#n2
2 name3#n3;name4#n4;name5#n5
split_udtf 之后得到的结果
--结果数据
id name nickname
1 name1 n1
1 name2 n2
2 name3 n3
2 name4 n4
2 name5 n5
1.1 开发UDAF步骤
1)自定义UDAF类,需要继承AbstractGenericUDAFResolver;
2)自定义Evaluator类,需要继承GenericUDAFEvaluator,真正实现UDAF的逻辑;
3)自定义bean类,需要继承 AbstractAggregationBuffer,用于在mapper或reducer内部传递数据;
//开发步骤
1)继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。
2)UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。
3)初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每调用一次forward()产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
4)最后close()方法调用,对需要清理的方法进行清理。
package com.bigdata.hive.function;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
/**
* @Description(描述): UDTF函数
* @Version(版本): 1.0
* @Author(创建者): ALIENWARE
* @Date(创建时间): Created in 2021/7/2.
* @ * * * * * * * * * * * * * @
*/
public class SplitUDTF extends GenericUDTF{
@Override
public StructObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 校验函数输入参数和 设置函数返回值类型
// 1) 校验入参个数
if(arguments.length != 1){
throw new UDFArgumentException("input params must one");
}
ObjectInspector inspector = arguments[0];
// 2) 校验参数的大类
// 大类有: PRIMITIVE(基本类型), LIST, MAP, STRUCT, UNION
if(! inspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentException("input params Category must PRIMITIVE");
}
// 3) 校验参数的小类
// VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
// DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
// UNKNOWN
if(! inspector.getTypeName().equalsIgnoreCase(PrimitiveObjectInspector.PrimitiveCategory.STRING.name())){
throw new UDFArgumentException("input params PRIMITIVE Category type must STRING");
}
// 4) 设置函数返回值类型(struct<name:string, nickname:string>)
// struct内部字段的名称
List<String> names = new ArrayList<String>();
names.add("name");
names.add("nickname");
// struct内部字段的名称对应的类型
List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>();
inspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
inspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
}
/**
* 函数输出类型
* 第一个参数:name
* 第二个参数:nickname
*/
Object[] outputs = new Object[]{new Text(), new Text()};
@Override
public void process(Object[] args) throws HiveException {
// 核心方法 一行调用一次
System.out.println("process()");
Object obj = args[0];
String data = null;
if(obj instanceof LazyString){
LazyString lz = (LazyString)obj;
Text t = lz.getWritableObject();
data = t.toString();
}else if(obj instanceof Text){
Text t = (Text)obj;
data = t.toString();
}else{
data = (String)obj;
}
// name1#n1;name2#n2
String[] arr1 = data.split(";");
// name1#n1
for(String data2 : arr1){
String[] arr2 = data2.split("#");
String name = arr2[0];
String nickname = arr2[1];
// 想输出就调用forward()
((Text)outputs[0]).set(name);
((Text)outputs[1]).set(nickname);
System.out.println("forward()");
forward(outputs);
}
}
@Override
public void close() throws HiveException {
}
}
2.创建函数
CREATE TEMPORARY FUNCTION udtf_split AS 'com\bigdata\hive\function\SplitUDTF.java';
3.执行函数测试
-- 带有表头字段
set hive.cli.print.header=true;
select udtf_split(name_nickname) from udtf_table;
select udtf_split(name_nickname) as (name1,nickname1) from udtf_table;
4.UDTF的两种使用方法
1 )直接select中使用
-- 可以自定义表头字段名称
select udtf_func(properties) as (col1,col2) from tablename;
-- 用UDTF代码里写的字段名称
select udtf_func(properties) from tablename;
2)UDTF函数不可以使用的场景
(1)不可以添加其他字段使用;
select values, udtf_split(valuse) as (col1,col2) from udtftest;
(2)不可以嵌套调用;
select udtf_split(udtf_split(strsplit)) from udtftest;
(3)不可以和group by/cluster by/distribute by/sort by一起使用;
select udtf_split(strsplit) as (col1,col2) from udtftest group by col1, col2;
5.和lateral view一起使用
通过Lateral view可以方便的将UDTF得到的行转列的结果集,合在一起提供服务;
lateral view用于和UDTF一起使用,为了解决UDTF不允许在select字段的问题;
它能够将一行数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
lateral view首先为原始表的每行调用UDTF,UTDF会把一行拆分成一或者多行;
lateral view再把结果组合,产生一个支持别名表的虚拟表。
-- 首先table_name表每行调用udtf_func,会把一行拆分成一或者多行
-- 再把结果组合,产生一个支持别名表tableAlias的虚拟表
select table_name.id, tableAlias.col1, tableAlias.col2
from table_name
lateral view udtf_func(properties) tableAlias as col1,col2;
示例:利用lateral view 查看表的数据
select t1.id, t2.name, t2.nickname
from udtf_table t1
lateral view udtf_split(name_nickname) t2 as name,nickname;
在示例1的基础上,分组查询统计每个id有多少条记录
select t3.id, count(*) as n from
(select t1.id, t2.name, t2.nickname
from udtf_table t1
lateral view udtf_split(name_nickname) t2 as name,nickname) t3 group by t3.id;
HIVE优化
一、合理设置reduce的数量
//参数1:hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,在Hive 0.14.0及更高版 本中默认为256M)
//参数2:hive.exec.reducers.max(每个任务最大的reduce数,在Hive 0.14.0及更高版本中默认为1009)
二、job并行运行设置
hive.exec.parallel (默认是false, true:开启并行运行)
hive.exec.parallel.thread.number (最多可以并行执行多少个作业, 默认是 8)
三、小文件的问题优化
1.如果小文件多,在map输入时
hive默认是将小文件合并成大文件。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; (默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;(关闭小文件合并大文件)
2.如果map输出的小文件过多hive 默认是开启map 输出合并
set hive.merge.mapfiles=true (默认是 true)
hive.merge.size.per.task(合并文件的大小,默认 256M)
hive.merge.smallfiles.avgsize(文件的平均大小小于该值时,会启动一个MR任务执行merge,默认16M )
3.如果reduce输出的小文件过多
--hive需要手动设置开启reduce输出合并。
set hive.merge.mapredfiles=true (默认是 false)
hive.merge.size.per.task(合并文件的大小,默认 256M)
hive.merge.smallfiles.avgsize(文件的平均大小小于该值时,会启动一个MR任务执行merge,默认16M )
四、 join 操作优化
1.多表join,如果join字段一样,--只生成一个job 任务; Join 的字段类型要一致。
2.如果你有一张表非常非常小,而另一张关联的表非常非常大的时候,
你可以使用mapjoin此Join 操作在 Map 阶段完成,不再需要Reduce,hive默认开启mapjoin。
-- 将小表刷入内存中,默认是true
set hive.auto.convert.join=true;
set hive.ignore.mapjoin.hint=true;
-- 刷入内存表的大小(字节),根据自己的数据集加大
set hive.mapjoin.smalltable.filesize=2500000;
-- 也可以手动设置 /*+mapjoin(c)+*/
五、hive的数据倾斜优化
1. 数据倾斜的原因
1)key分布不均匀
2)业务数据本身的特性
3)某些SQL语句本身就有数据倾斜
关键词 | 情形 | 后果 |
---|---|---|
Join | 其中一个表较小,但是key集中 | 分发到某一个或几个Reduce上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值或空值过多 | 这些空值都由一个reduce处理,非常慢 | |
group by | group by 维度过小,某值的数量过多 | 处理某值的reduce非常耗时 |
Count Distinct | 某特殊值过多 | 处理此特殊值的reduce耗时 |
2.解决方案——参数调节
--对于group by 产生倾斜的问题
set hive.map.aggr=true; (默认是 true)
--开启map端combiner,减少reduce 拉取的数据量。 --(设置了负载均衡)
set hive.groupby.skewindata=true; (默认是 false)
--示例
如果开启负载均衡:
执行:
set mapred.reduce.tasks=3;
set hive.groupby.skewindata=true;
select sum(1) as n from
( select country from panniu.user_install_status group by country) t ;
会有3个MapReduce任务
第一个:select country from user_install_status group by country, 将key放到不同的reduce里
第二个:将第一个任务生成的临时文件,按照key 进行group by,此时,会将数据放到一个reduce里。
第三个:用group by 的中间结果,进行sum。
六、SQL 语句调节
1.非法数据太多,比如null。
1.假如null值没有用处的话,可以将null值先过滤掉,再进行 union
2.把空值的key变成一随机数(随机值类型需要跟key的类型一致),
把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
2.count distinct 数据倾斜
原sql:
select count (distinct country) from user_install_status_limit;
优化后的sql:
select count(*) from (select distinct country from user_install_status_limit)t;
2.1多个count distinct 优化
groupingsets