1.phoenix

教程 阿布都的都 ⋅ 于 2023-01-06 20:01:16 ⋅ 1292 阅读

1 phoenix原理

1.1 phoenix定位

​ Phoenix最早是saleforce的一个开源项目,后来成为Apache基金的顶级项目。Phoenix是一个HBASE SQL层(即为HBase的一个SQL引擎),用作应用层和HBASE之间的中间件。Phoeinx可以用标准的JDBC API替代HBASE client API来创建表、插入与查询HBASE中的数据。

1.2 phoenix在大数据应用处于什么环节

file

etl工具:
kettle:免费,开源产品,纯 java 编写。
informatica:第二,商业软件,相当专业的etl工具,适合大部分etl场景。
datastage:大哥,商业软件,最专业的etl工具,比 informatica 贵,适合大部分etl场景。

phoenix的出现解决了hbase filter的弱点,内置二级索引查询效率较高,实现SQL on nosql。

1.3 应用场景

​ Phoenix已与Hadoop其他服务完全集成,如Spark,Hive,Flume和Map Reduce。适合高并发、低延迟、简单查询、二级索引的场景。Phoenix通过结合HBase与SQL两者的优点,在Hadoop中为低延迟应用程序启用OLTP和运营分析。

1.4 SQL支持

​ Apache Phoenix支持所有标准SQL查询构造,包括SELECT,FROM,WHERE,GROUP BY,HAVING,ORDER BY等。它还支持一整套DML命令以及通过DDL命令创建表和版本化增量更改。

1.5 架构设计

file

​ Phoenix 分为 client 和 server,其中 client 又分为 thin(本质上是一个 JDBC 驱动,所依赖的第三方类较少)和非 thin (所依赖的第三方类较多)两种;server 是针对 thin client 而言的,为 standalone 模式,是由一台 Java 服务器组成,代表客户端管理 Phoenix 的连接,可以进行横向扩展。

​ Phoenix 在hbase中引入了一个中间层,将 HBase 非关系型形式转换成关系型数据模型,在创建表时默认会将 PK 与 HBase 中表的 Rowkey 映射起来,PK 支持多字段组合,剩下的列可以根据需求进行选择,列簇如果未显式定义,则会被忽略,Qualifier 会转换成表的字段名。如下图所示:

file

file

2. cdh集成phoenix

2.1 下载安装包

​ 上传安装包服务器,如/opt目录下,然后创建httpd的phoenix目录,然后将phoenix包复制到httpd目录下,用于本地parcel安装。

mkdir /opt/phoenix/
tar -xvf phoenix.tar -C /opt/phoenix/
mkdir /var/www/html/phoenix
cp PHOENIX-5.0.0-cdh6.2.0.p0.1308267-el7.parcel* manifest.json /var/www/html/phoenix
cp PHOENIX-1.0.jar /opt/cloudera/csd/

检查http服务是否可以访问

http://worker-1/phoenix/

file

2.2 cdh页面操作

2.2.1 配置parcel

file

file

2.2.2 下载激活

file

file

分配的位置:

file

点 激活,再点确定,直到激活完成

file

2.2.3 重启服务、配置生效

file

重启过期配置后,集成完成。

3 phoenix登录

3.1 phoenix目录

file

3.2 配置path

将Phoenix 的bin 目录添加的 /etc/profile 文件的path中,目的是可在任意目录执行。

file

source /etc/profile 使配置生效

file

3.3 登录phoenix

# hbase认证
kinit hbase
# 访问zookeeper客户端的 /hbase znode, 读取hbase元数据信息
phoenix-sqlline worker-1:2181:/hbase

file

4 phoenix实操

4.1 创建测试表

在phoenix中创建测试表,必须指定主键,主键对应hbase的rowkey。

-- 表名不带双引号,默认转成大写
create table phtest1(
    pk varchar not null primary key,
    col1 varchar,
    col2 varchar,
    col3 varchar
);
-- 表名带双引号,不转大写
create table "phtest2"(
    pk varchar not null primary key,
    col1 varchar,
    col2 varchar,
    col3 varchar
);

-- 查看表列表
!tables

-- 查看表结构
!describe "phtest1"

file

在hbase shell中查询(phoenix严格区分大小写,所有小写在phoenix中都会被翻译为大写)

file

4.2 插入/查询数据

-- upsert插入时如果主键已经存在则更新,如果不存在则插入。
-- 数据插入与hbase shell插入数据性质一致,如果插入相同主键的值,则保持最新的一条数据。
-- 直接 values 插入或更新
upsert into PHTEST1 values ('x0001','1','2','3');
upsert into PHTEST1 values ('x0001','1','22','3');

upsert into PHTEST1 values ('x0002','1','2','3');

-- 指定字段插入更新
upsert into PHTEST1 (pk,col1,col2,col3) values ('x0003','1','2','3');

# 查询
select * from PHTEST1;
select col1 from PHTEST1;
select t.col1 from PHTEST1 t;

file

在hbase shell中查询

file

4.3 测试删除

插入多行,删除其中某一行

-- 插入多行,一次只能插入一行,不能插入多行
upsert into PHTEST1 values ('x0005','2','3','4');
upsert into PHTEST1 values ('x0006','3','4','5');
upsert into PHTEST1 values ('x0007','4','5','6');
-- 查询验证
select * from PHTEST1;
-- 删除一行
delete from PHTEST1 where col1='2';
-- 查询验证
select * from PHTEST1;

file

4.4 批量更新

-- 使用select查询结果集批量更新表

-- 创建一张临时表PHTEST2
create table PHTEST2(
    pk varchar not null primary key,
    col1 varchar,  
    col2 varchar,
    col3 varchar
);
-- 临时表插入数据,比phtest1表多了'x0005'、'x0006'和'x0002'三行,其中'x0003'、'x0004'与phtest1的一致
upsert into PHTEST2 values ('x0001','newvalue','newvalue','newvalue');
upsert into PHTEST2 values ('x0002','newvalue','newvalue','newvalue');
upsert into PHTEST2 values ('x0003','3','4','5');
upsert into PHTEST2 values ('x0004','4','5','6');
upsert into PHTEST2 values ('x0005','newvalue','newvalue','newvalue');
upsert into PHTEST2 values ('x0006','newvalue','newvalue','newvalue');

-- 执行批量更新, 将PHTEST2表的数据覆盖到PHTEST1表
upsert into PHTEST1 select * from PHTEST2;

file

4.5 删除表

drop table PHTEST2;

4.6 统计查询

select count(1) from PHTEST1;
select count(distinct col1) from PHTEST1;
select sum(num) from (select col1, count(*) as num from PHTEST1 group by col1) t1;

select col1, count(*) as num from PHTEST1 group by col1 order by num desc;

file

4.7 数据导入

使用官方提供的数据样例,phoenix数据导入只支持csv文件格式。

# 在客户端外
# 执行SQL文件
# 对标hive的-f test.sql ${hiveconf:batch_date}
phoenix-sqlline worker-1:2181:/hbase STOCK_SYMBOL.sql 

# 加载csv文件数据到 STOCK_SYMBOL 表
phoenix-psql -t STOCK_SYMBOL worker-1:2181:/hbase STOCK_SYMBOL.csv
# 注意:
#   1)phoenix数据导入只支持后缀为.csv的文件, csv文件名称不需要和表名称一致,文件名可以小写
#   2)指定的表必须是大写,小写就报错

file

执行SQL文件内的SQL语句:

file

加载csv文件数据到 STOCK_SYMBOL 表后查询:

file

file

4.8 在phoenix建表时指定列族

-- 用 列族名.字段名
create table "cftest" (
    pk varchar not null primary key,
    cf1.col1 varchar,
    cf2.col2 varchar);

-- 查询时可以不用列族
select col1 from "cftest"

file

4.9 在phoenix建表时指定压缩格式

-- 在后面可指定压缩格式
create table "comptest" (
    pk varchar not null primary key,
    cf1.col1 varchar,
    cf2.col2 varchar) 
    compression='snappy';

file

4.10 在phoenix建表时预分region

-- 用 split on ('x0001','x0002','x0003','x0004','x0005') 来进行预分region
-- 其中 on 里面的 是 splitkey
create table "split_region_test" (
    pk varchar not null primary key,
    cf1.col1 varchar,
    cf2.col2 varchar) 
    compression='snappy' 
    split on ('x0001','x0002','x0003','x0004','x0005');

file

查看hbase web ui:

file

4.11 phoenix与hbase表关联

1)在hbase中创建带有命名空间的表,并添加数据

# 已知hbase命名空间 xinniu

# 创建带有命名空间的表
create 'xinniu:relatetable',{NAME => 'cf1',COMPRESSION => 'snappy'},{NAME => 'cf2',COMPRESSION => 'snappy'}

# 添加数据
put 'xinniu:relatetable','x0001','cf1:name','user1'
put 'xinniu:relatetable','x0002','cf1:name','user2'
put 'xinniu:relatetable','x0001','cf1:age','20'
put 'xinniu:relatetable','x0002','cf1:age','21'
put 'xinniu:relatetable','x0001','cf2:address','beijing'
put 'xinniu:relatetable','x0002','cf2:address','shanghai'

2)在phoenix中创建schema(schema相当于命名空间)

-- 先在phoenix中创建schema,对应hbase的namespace
create schema if not exists "xinniu";

执行报错:

file

在phoenix中创建schema报错解决方式:在hbase的hbase-site.xml中添加phoenix.schema.isNamespaceMappingEnabled=true和phoenix.schema.mapSystemTablesToNamespace=true

建议在服务端和客户端都添加,用hbase web ui添加配置

file

file

并重启过期配置。

重新进入Phoenix 客户端

-- 退出客户端
!q
-- 进入客户端
phoenix-sqlline worker-1:2181:/hbase

-- 先在phoenix中创建schema,对应hbase的namespace
create schema if not exists "xinniu";

3)创建带有命名空间的表

-- 在phoenix创建'xinniu:relatetable'的关联表
-- 其中: column_encoded_bytes=0 是把字段名转成字符串,而不是原来的byte数组
create table "xinniu"."relatetable"(
    id varchar not null primary key,
    "cf1"."name" varchar,
    "cf1"."age" varchar,
    "cf2"."address" varchar
) column_encoded_bytes=0;

-- 在phoenix中插入一条数据测试
upsert into "xinniu"."relatetable" (id,"cf1"."name","cf1"."age","cf2"."address") values ('x0003','user3','22','guangzhou');

select * from "xinniu"."relatetable";

select "cf1"."name" from "xinniu"."relatetable";

-- 没有给进行BYTES.tostring
create table "xinniu"."relatetable1"(
    id varchar not null primary key,
    "cf1"."name" varchar,
    "cf1"."age" varchar,
    "cf2"."address" varchar
);
upsert into "xinniu"."relatetable1" (id,"cf1"."name","cf1"."age","cf2"."address") values ('x0003','user3','22','guangzhou');

建表语句中带有 column_encoded_bytes=0, 从hbase查询,字段名能看得懂:

file

建表语句中不带有 column_encoded_bytes=0, 从hbase查询,字段名看不懂:

file

4.12 phoenix建表时指定组合rowkey

-- 通过 CONSTRAINT pk primary key ( prefix,id )  设定联合主键,作为rowkey
-- 当prefix和id作为联合主键, 只在hbase的rowkey中存在, column里没有
-- 建表语句
create table "xinniu"."combinationkey_table" (
    prefix varchar not null,
    id varchar not null,
    col1 varchar,
    col2 varchar
    CONSTRAINT pk primary key ( prefix,id ) 
) 
    column_encoded_bytes=0, 
    compression='snappy'  
    split on ('1','2','|');

-- 插入数据
upsert into "xinniu"."combinationkey_table" (prefix,id,col1,col2) values ('1','001','user1','20');
upsert into "xinniu"."combinationkey_table" (prefix,id,col1,col2) values ('1','002','user2','21');

-- 查看表结构
!describe "xinniu"."combinationkey_table"

file

添加数据后:

file

查看hbase表:

file

4.13 phoenix实现动态列

-- 创建表
create table "xinniu"."dynamic_table"(
    pk varchar not null primary key,
    col1 varchar,
    col2 varchar
)column_encoded_bytes=0;

-- 插入数据
upsert into "xinniu"."dynamic_table"  (pk,col1,col2) values ('x0001','user1','20');
upsert into "xinniu"."dynamic_table"  (pk,col1,col2) values ('x0002','user1','21');
upsert into "xinniu"."dynamic_table"  (pk,col1,col2) values ('x0003','user1','22');
upsert into "xinniu"."dynamic_table"  (pk,col1,col2) values ('x0004','user1','23');

-- 动态插入列
-- 动态插入 col3 和 col4 列
upsert into "xinniu"."dynamic_table" (pk,col1,col2,col3 varchar,col4 varchar) values ('x0005','user1','23','beijing','hainiu');
-- 动态插入 col4 和 col5 列
upsert into "xinniu"."dynamic_table" (pk,col1,col2,col4 varchar,col5 varchar) values ('x0006','user2','32','huawei','30K');

-- 动态插入 col3、col4、col5 列
upsert into "xinniu"."dynamic_table" (pk,col1,col2,col3 varchar,col4 varchar,col5 varchar) values ('x0007','user3','33','shanghai','ali','22K');
-- 动态插入 col3、col4、col5、col6 列 
upsert into "xinniu"."dynamic_table" (pk,col1,col2,col3 varchar,col4 varchar,col5 varchar,col6 varchar) values ('x0008','user4','35','shanghai','baidu','12K','false');

-- phoenix中查询动态列
select * from "xinniu"."dynamic_table"(col3 varchar,col4 varchar);
select * from "xinniu"."dynamic_table"(col3 varchar,col4 varchar,col5 varchar) ;
select * from "xinniu"."dynamic_table"(col3 varchar,col4 varchar,col5 varchar,col6 varchar) ;

select * from "xinniu"."dynamic_table"(col3 varchar,col4 varchar);

file

select * from "xinniu"."dynamic_table"(col3 varchar,col4 varchar,col5 varchar) ;

file

select * from "xinniu"."dynamic_table"(col3 varchar,col4 varchar,col5 varchar,col6 varchar) ;

file

5 索引

5.1 开启索引

配置hbase-site.xml文件(服务端与客户端),用hbase web ui添加配置hbase.regionserver.wal.codec=org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec

搜索“hbase-site”

在服务端添加

file

在客户端添加

file

重启过期配置后,重新进入phoenix 客户端。

5.2 数据准备

-- 创建测试表
create table "xinniu"."testindex"(
    pk varchar not null primary key,
    col1 varchar
)column_encoded_bytes=0;

-- 插入数据
upsert into "xinniu"."testindex" values ('x1','1');
……
upsert into "xinniu"."testindex" values ('x20000','20000');
-- 编写脚本,生成SQL文件
[root@worker-1 hdfs_test]# vim s1.sh 
#! /bin/bash

for((i=1;i<=20000;i++))
do
        echo "upsert into \"xinniu\".\"testindex\" values ('x${i}','${i}');" >> testindex.sql
done

-- 执行SQL文件导入表
phoenix-sqlline worker-1:2181:/hbase testindex.sql 
--通过导入csv文件的方式将数据导入到表
phoenix-psql -t "xinniu"."testindex" worker-1:2181:/hbase t1.csv

file

5.3 索引开启前查询

-- 查看执行计划,发现全表扫描
explain select * from "xinniu"."testindex" where COL1 = '200';
-- 查询
select * from "xinniu"."testindex" where COL1 = '200';

file

​ 通过执行计划可以发现,查询为FULL SCAN。

5.4 索引操作

-- 基于 COL1字段 创建索引, 当创建完后,索引里存的是已经排序好的COL1数据
create local index myindex on "xinniu"."testindex" (COL1);

-- 查看执行计划,发现不全表扫描
explain select * from "xinniu"."testindex" where COL1 = '200';
select * from "xinniu"."testindex" where COL1 = '200';

-- 删除索引
drop index myindex on "xinniu"."testindex";

file

6 api 操作Phoenix

6.1 环境版本信息

cdh6.3.2版本,phoenix5.0.0版本,hbase2.1.0版本

6.2 pom中添加Phoenix依赖

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>5.0.0-HBase-2.0</version>
</dependency>

6.3 相关配置文件

file

  • kerberos配置文件在集群环境下载,krb5.conf为kerberos安装配置时的配置文件,对应路径为/etc/krb5.conf,hbase.keytab为票据文件。
  • hbase相关配置文件,在cm界面hbase→操作→下载客户端配置

file

如果在hbase-site.xml中没有 上面两个配置,需要添加到hbase-site.xml文件中

  <property>
    <name>phoenix.schema.isNamespaceMappingEnabled</name>
    <value>true</value>
    <description>命名空间开启</description>
  </property>

  <property>
    <name>hbase.regionserver.wal.codec</name>
    <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
    <description>二级索引支持</description>
  </property>

6.4 创建测试表

-- 创建测试表
create table "xinniu"."testindex2"(
    pk varchar not null primary key,
    col1 varchar
)column_encoded_bytes=0;

6.5 代码

package com.hainiu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class HbaseOpForPhoenix {
    static Configuration conf = HBaseConfiguration.create();

    public static void main(String[] args) {
        String krb5Path = "src/main/conf/krb5.conf";
        String principal = "hbase@HAINIU.COM";
        String keytabPath = "src/main/conf/hbase.keytab";
        kerberosAuth(conf,krb5Path,principal,keytabPath);

        writeDataToHbaseByPhoenix();
    }

    /**
     * 通过Phoenix写入hbase表
     */
    private static void writeDataToHbaseByPhoenix() {
        String url = "jdbc:phoenix:worker-1:2181";
        try {
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            try(
                    Connection conn = DriverManager.getConnection(url);
                    Statement stmt = conn.createStatement();
            ){
                for(int i = 100; i < 999; i++){
                    String sql = "upsert into \"xinniu\".\"testindex2\" values ('x"+ i + "','" + i + "') ";
                    stmt.addBatch(sql);
//                    System.out.println(sql);
                }
                stmt.executeBatch();
                // 提交事务
                conn.commit();

            }catch (SQLException e){
                e.printStackTrace();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * kerberos认证
     * @param configuration hadoop配置文件
     * @param krb5Path kerberos配置文件(krb5.conf)路径
     * @param principal 认证主体名
     * @param keytabPath keytab文件路径
     */
    public static void kerberosAuth(Configuration configuration, String krb5Path, String principal, String keytabPath){
        // 通过系统设置参数设置krb5.conf
        System.setProperty("java.security.krb5.conf",krb5Path);
        // 指定kerberos 权限认证
        configuration.set("hadoop.security.authentication","Kerberos");

        // 用 UserGroupInformation 类做kerberos认证
        UserGroupInformation.setConfiguration(configuration);
        try {
            // 用于刷新票据,当票据过期的时候自动刷新
            UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();

            // 通过 keytab 登录
            // 参数1:认证主体
            // 参数2:认证文件
            UserGroupInformation.loginUserFromKeytab(principal,keytabPath);

            UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
            System.out.println("loginUser:" + loginUser);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

运行时报这样的错误,可忽略

file

执行插入后,查看表,说明数据已经导入

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-阿布都的都,http://hainiubl.com/topics/76064
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter