1 SMB Join(sort merge bucket)
SMB Join是 sort merge bucket操作,首先进行排序,继而合并,然后放到所对应的bucket中去,bucket是hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的bucket中去。在进行两个表联合的时候。我们首先进行分桶,在join会大幅度的对性能进行优化。
桶可以保证相同key 的数据都分在了一个桶里,这个时候我们关联的时候不需要去扫描整个表的数据,只需要扫描对应桶里的数据(因为key 相同的一定在一个桶里),smb的设计是为了解决大表和大表之间的join的,核心思想就是大表化成小表,然后map side join 解决是典型的分而治之的思想。
2 hive的SMB join 成立的前提条件
1)两张表是桶表,且分桶字段和桶内排序字段要一致,在创建表的时候需要指定:
CREATE TABLE(……) CLUSTERED BY (col_1) SORTED BY (col_1) INTO buckets_Nums BUCKETS
2)两张表分桶的字段必须是JOIN 的 KEY
3)设置bucket 的相关参数,默认是 false,true 代表开启 msb join。
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
4)两个join的桶表内桶数量可以相等,也可以是倍数关系。
3 实例验证
3.1 两个桶表数量相等的join
1)创建桶表(按照country 分桶, 桶内文件按照 country 排序)
CREATE TABLE user_buckets(
`aid` string,
`pkgname` string,
`uptime` bigint,
`type` int,
`country` string,
`gpcategory` string)
COMMENT 'This is the buckets_table table'
CLUSTERED BY(country) SORTED BY(country) INTO 20 BUCKETS;
2)导入数据
insert overwrite table user_buckets
select
aid,
pkgname,
uptime,
type,
country,
gpcategory
from user_install_status_txt
where dt='20141228';
3)设置桶相关的参数,并进行执行计划对比
-- 设置都为true,查看执行计划发现没有Reducer Operator Tree, 采用SMB join
hive (c30pan)> set hive.auto.convert.sortmerge.join=true;
hive (c30pan)> set hive.optimize.bucketmapjoin = true;
hive (c30pan)> set hive.optimize.bucketmapjoin.sortedmerge = true;
hive (c30pan)> explain select t2.* from user_buckets t1
> inner join user_buckets t2 on t1.country=t2.country;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: country is not null (type: boolean)
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: country (type: string)
outputColumnNames: _col0
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
-- 只有mapper没有reducer,采用的join是 Sorted Merge Bucket
Sorted Merge Bucket Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: string)
1 _col4 (type: string)
outputColumnNames: _col1, _col2, _col3, _col4, _col5, _col6
Select Operator
expressions: _col1 (type: string), _col2 (type: string), _col3 (type: bigint), _col4 (type: int), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.146 seconds, Fetched: 41 row(s)
--------------------------------------
-- 设置都为false,查看执行计划发现有Reducer Operator Tree,采用的是Common join
hive (c30pan)> set hive.auto.convert.sortmerge.join=false;
hive (c30pan)> set hive.optimize.bucketmapjoin = false;
hive (c30pan)> set hive.optimize.bucketmapjoin.sortedmerge = false;
hive (c30pan)> explain select t2.* from user_buckets t1
> inner join user_buckets t2 on t1.country=t2.country;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: country is not null (type: boolean)
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: country (type: string)
outputColumnNames: _col0
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
TableScan
alias: t1
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: country is not null (type: boolean)
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: aid (type: string), pkgname (type: string), uptime (type: bigint), type (type: int), country (type: string), gpcategory (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col4 (type: string)
sort order: +
Map-reduce partition columns: _col4 (type: string)
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint), _col3 (type: int), _col5 (type: string)
-- 带有reducer
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: string)
1 _col4 (type: string)
outputColumnNames: _col1, _col2, _col3, _col4, _col5, _col6
Statistics: Num rows: 10128850 Data size: 674290600 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col1 (type: string), _col2 (type: string), _col3 (type: bigint), _col4 (type: int), _col5 (type: string), _col6 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
Statistics: Num rows: 10128850 Data size: 674290600 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 10128850 Data size: 674290600 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.095 seconds, Fetched: 66 row(s)
3.2 两个桶表数量倍数关系的join
1)创建桶表(按照country 分桶, 桶内文件按照 country 排序)
桶数量是 user_buckets 表的两倍
CREATE TABLE `country_dict_buckets`(
`country` string,
`name` string,
`region` string)
COMMENT 'This is the buckets_table table'
CLUSTERED BY (country) SORTED BY (country ASC) INTO 40 BUCKETS;
2)导入数据
记录数是 1551475 条
insert overwrite table country_dict_buckets
select code, name, region from country_dict;
3)进行SMBJoin
-- 设置都为true,查看执行计划发现没有Reducer Operator Tree, 采用SMB join
hive (c30pan)> set hive.auto.convert.sortmerge.join=true;
hive (c30pan)> set hive.optimize.bucketmapjoin.sortedmerge = true;
hive (c30pan)> set hive.optimize.bucketmapjoin = true;
hive (c30pan)> explain select t2.* from user_buckets t1
> inner join country_dict_buckets t2 on t1.country=t2.country;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: country is not null (type: boolean)
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: country (type: string)
outputColumnNames: _col0
Statistics: Num rows: 9208046 Data size: 612991442 Basic stats: COMPLETE Column stats: NONE
Sorted Merge Bucket Map Join Operator -- msbjoin
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: string)
1 _col0 (type: string)
outputColumnNames: _col1, _col2, _col3
Select Operator
expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string)
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.119 seconds, Fetched: 41 row(s)
3.3 执行SQL看对比情况
1)开启SMBJoin的运行
hive (c30pan)> set hive.auto.convert.sortmerge.join=true;
hive (c30pan)> set hive.optimize.bucketmapjoin.sortedmerge = true;
hive (c30pan)> set hive.optimize.bucketmapjoin = true;
hive (c30pan)> select count(*) from
> (
> select t2.* from country_dict_buckets t1
> inner join country_dict_buckets t2 on t1.country=t2.country
> ) t3;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = panniu_20210630231053_20dc1a24-6faa-4ac6-b7bc-5b8319603aa7
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1623410979404_5850, Tracking URL = http://nn1.hadoop:8041/proxy/application_1623410979404_5850/
Kill Command = /usr/local/hadoop/bin/hadoop job -kill job_1623410979404_5850
Hadoop job information for Stage-1: number of mappers: 40; number of reducers: 1
2021-06-30 23:11:14,545 Stage-1 map = 0%, reduce = 0%
2021-06-30 23:11:25,163 Stage-1 map = 4%, reduce = 0%, Cumulative CPU 183.3 sec
2021-06-30 23:11:26,216 Stage-1 map = 5%, reduce = 0%, Cumulative CPU 247.74 sec
2021-06-30 23:11:27,272 Stage-1 map = 7%, reduce = 0%, Cumulative CPU 338.36 sec
2021-06-30 23:11:28,323 Stage-1 map = 14%, reduce = 0%, Cumulative CPU 473.21 sec
2021-06-30 23:11:29,373 Stage-1 map = 18%, reduce = 0%, Cumulative CPU 561.52 sec
2021-06-30 23:11:30,414 Stage-1 map = 22%, reduce = 0%, Cumulative CPU 611.33 sec
2021-06-30 23:11:31,480 Stage-1 map = 25%, reduce = 0%, Cumulative CPU 670.88 sec
2021-06-30 23:11:32,528 Stage-1 map = 29%, reduce = 0%, Cumulative CPU 708.38 sec
2021-06-30 23:11:33,580 Stage-1 map = 30%, reduce = 0%, Cumulative CPU 735.09 sec
2021-06-30 23:11:34,633 Stage-1 map = 34%, reduce = 0%, Cumulative CPU 786.53 sec
2021-06-30 23:11:35,682 Stage-1 map = 37%, reduce = 0%, Cumulative CPU 823.27 sec
2021-06-30 23:11:36,735 Stage-1 map = 38%, reduce = 0%, Cumulative CPU 852.77 sec
2021-06-30 23:12:07,634 Stage-1 map = 61%, reduce = 0%, Cumulative CPU 942.41 sec
2021-06-30 23:12:08,672 Stage-1 map = 88%, reduce = 0%, Cumulative CPU 2268.99 sec
2021-06-30 23:12:17,050 Stage-1 map = 88%, reduce = 29%, Cumulative CPU 2270.15 sec
2021-06-30 23:12:27,958 Stage-1 map = 89%, reduce = 29%, Cumulative CPU 2297.12 sec
2021-06-30 23:12:37,066 Stage-1 map = 90%, reduce = 29%, Cumulative CPU 2360.78 sec
2021-06-30 23:12:40,221 Stage-1 map = 91%, reduce = 29%, Cumulative CPU 2443.65 sec
2021-06-30 23:12:43,337 Stage-1 map = 92%, reduce = 29%, Cumulative CPU 2460.19 sec
2021-06-30 23:12:46,466 Stage-1 map = 93%, reduce = 29%, Cumulative CPU 2476.57 sec
2021-06-30 23:12:48,577 Stage-1 map = 94%, reduce = 29%, Cumulative CPU 2521.11 sec
2021-06-30 23:12:54,863 Stage-1 map = 96%, reduce = 29%, Cumulative CPU 2329.68 sec
2021-06-30 23:12:58,013 Stage-1 map = 96%, reduce = 30%, Cumulative CPU 2344.81 sec
2021-06-30 23:12:59,057 Stage-1 map = 97%, reduce = 30%, Cumulative CPU 2345.71 sec
2021-06-30 23:13:00,105 Stage-1 map = 99%, reduce = 30%, Cumulative CPU 2351.01 sec
2021-06-30 23:13:13,680 Stage-1 map = 100%, reduce = 30%, Cumulative CPU 2368.0 sec
2021-06-30 23:13:23,074 Stage-1 map = 100%, reduce = 31%, Cumulative CPU 2186.1 sec
2021-06-30 23:13:26,294 Stage-1 map = 100%, reduce = 33%, Cumulative CPU 2186.17 sec
2021-06-30 23:13:27,518 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2189.62 sec
MapReduce Total cumulative CPU time: 36 minutes 29 seconds 620 msec
Ended Job = job_1623410979404_5850
MapReduce Jobs Launched:
Stage-Stage-1: Map: 40 Reduce: 1 Cumulative CPU: 2189.62 sec HDFS Read: 73474091 HDFS Write: 111 SUCCESS
Total MapReduce CPU Time Spent: 36 minutes 29 seconds 620 msec
OK
13153396573
Time taken: 168.857 seconds, Fetched: 1 row(s)
2)没开启SMBJoin的运行
hive (c30pan)> set hive.auto.convert.sortmerge.join=false;
hive (c30pan)> set hive.optimize.bucketmapjoin.sortedmerge = false;
hive (c30pan)> set hive.optimize.bucketmapjoin = false;
hive (c30pan)> select count(*) from
> (
> select t2.* from country_dict_buckets t1
> inner join country_dict_buckets t2 on t1.country=t2.country
> ) t3;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = panniu_20210630231634_c7714429-e51c-4c17-9d7c-e4e123fcb5c2
Total jobs = 2
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1623410979404_5851, Tracking URL = http://nn1.hadoop:8041/proxy/application_1623410979404_5851/
Kill Command = /usr/local/hadoop/bin/hadoop job -kill job_1623410979404_5851
Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1
2021-06-30 23:16:41,732 Stage-1 map = 0%, reduce = 0%
2021-06-30 23:16:44,883 Stage-1 map = 17%, reduce = 0%, Cumulative CPU 12.27 sec
2021-06-30 23:16:45,946 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 14.45 sec
2021-06-30 23:16:49,092 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 16.33 sec
2021-06-30 23:16:50,145 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 17.37 sec
2021-06-30 23:16:51,200 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 19.2 sec
2021-06-30 23:16:57,504 Stage-1 map = 100%, reduce = 67%, Cumulative CPU 28.56 sec
2021-06-30 23:17:15,436 Stage-1 map = 100%, reduce = 68%, Cumulative CPU 49.85 sec
2021-06-30 23:17:42,729 Stage-1 map = 100%, reduce = 69%, Cumulative CPU 80.65 sec
2021-06-30 23:18:04,763 Stage-1 map = 100%, reduce = 70%, Cumulative CPU 104.0 sec
2021-06-30 23:18:31,964 Stage-1 map = 100%, reduce = 71%, Cumulative CPU 133.74 sec
2021-06-30 23:18:52,841 Stage-1 map = 100%, reduce = 72%, Cumulative CPU 157.84 sec
2021-06-30 23:19:17,937 Stage-1 map = 100%, reduce = 73%, Cumulative CPU 184.7 sec
2021-06-30 23:19:38,835 Stage-1 map = 100%, reduce = 74%, Cumulative CPU 208.15 sec
2021-06-30 23:20:02,796 Stage-1 map = 100%, reduce = 75%, Cumulative CPU 235.21 sec
2021-06-30 23:20:24,703 Stage-1 map = 100%, reduce = 76%, Cumulative CPU 258.85 sec
2021-06-30 23:20:51,844 Stage-1 map = 100%, reduce = 77%, Cumulative CPU 289.24 sec
2021-06-30 23:21:12,753 Stage-1 map = 100%, reduce = 78%, Cumulative CPU 312.97 sec
2021-06-30 23:21:36,801 Stage-1 map = 100%, reduce = 79%, Cumulative CPU 340.32 sec
2021-06-30 23:21:58,681 Stage-1 map = 100%, reduce = 80%, Cumulative CPU 364.56 sec
2021-06-30 23:22:28,814 Stage-1 map = 100%, reduce = 81%, Cumulative CPU 398.69 sec
2021-06-30 23:22:56,973 Stage-1 map = 100%, reduce = 82%, Cumulative CPU 429.31 sec
2021-06-30 23:23:24,096 Stage-1 map = 100%, reduce = 83%, Cumulative CPU 459.95 sec
2021-06-30 23:23:44,904 Stage-1 map = 100%, reduce = 84%, Cumulative CPU 483.95 sec
2021-06-30 23:24:08,933 Stage-1 map = 100%, reduce = 85%, Cumulative CPU 511.2 sec
2021-06-30 23:24:30,809 Stage-1 map = 100%, reduce = 86%, Cumulative CPU 535.3 sec
2021-06-30 23:24:54,815 Stage-1 map = 100%, reduce = 87%, Cumulative CPU 562.95 sec
2021-06-30 23:25:16,677 Stage-1 map = 100%, reduce = 88%, Cumulative CPU 586.94 sec
2021-06-30 23:25:43,808 Stage-1 map = 100%, reduce = 89%, Cumulative CPU 617.6 sec
2021-06-30 23:26:07,827 Stage-1 map = 100%, reduce = 90%, Cumulative CPU 644.62 sec
2021-06-30 23:26:34,944 Stage-1 map = 100%, reduce = 91%, Cumulative CPU 675.18 sec
2021-06-30 23:26:59,904 Stage-1 map = 100%, reduce = 92%, Cumulative CPU 702.32 sec
2021-06-30 23:27:23,912 Stage-1 map = 100%, reduce = 93%, Cumulative CPU 729.76 sec
2021-06-30 23:27:51,029 Stage-1 map = 100%, reduce = 94%, Cumulative CPU 760.19 sec
2021-06-30 23:28:19,232 Stage-1 map = 100%, reduce = 95%, Cumulative CPU 790.8 sec
2021-06-30 23:28:49,489 Stage-1 map = 100%, reduce = 96%, Cumulative CPU 824.96 sec
2021-06-30 23:29:13,442 Stage-1 map = 100%, reduce = 97%, Cumulative CPU 852.12 sec
2021-06-30 23:29:40,543 Stage-1 map = 100%, reduce = 98%, Cumulative CPU 882.45 sec
2021-06-30 23:30:07,723 Stage-1 map = 100%, reduce = 99%, Cumulative CPU 913.02 sec
2021-06-30 23:30:52,674 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 963.8 sec
MapReduce Total cumulative CPU time: 16 minutes 3 seconds 800 msec
Ended Job = job_1623410979404_5851
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1623410979404_5857, Tracking URL = http://nn1.hadoop:8041/proxy/application_1623410979404_5857/
Kill Command = /usr/local/hadoop/bin/hadoop job -kill job_1623410979404_5857
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2021-06-30 23:31:01,492 Stage-2 map = 0%, reduce = 0%
2021-06-30 23:31:02,557 Stage-2 map = 100%, reduce = 0%, Cumulative CPU 3.35 sec
2021-06-30 23:31:03,613 Stage-2 map = 100%, reduce = 100%, Cumulative CPU 4.74 sec
MapReduce Total cumulative CPU time: 4 seconds 740 msec
Ended Job = job_1623410979404_5857
MapReduce Jobs Launched:
Stage-Stage-1: Map: 6 Reduce: 1 Cumulative CPU: 963.8 sec HDFS Read: 193649423 HDFS Write: 2250716 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 4.74 sec HDFS Read: 7018 HDFS Write: 607107 SUCCESS
Total MapReduce CPU Time Spent: 16 minutes 8 seconds 540 msec
OK
13153396573
Time taken: 870.771 seconds, Fetched: 1 row(s)
结论:开启MSBJoin 要比不开启快很多,大表优化可以采用。