3.Kudu_API

教程 阿布都的都 ⋅ 于 2023-01-06 20:43:33 ⋅ 928 阅读

1 项目pom添加kudu依赖

    <!-- 使用cdh自带的仓库-->
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0-cdh6.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.0.0-cdh6.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.10.0-cdh6.3.2</version>
        </dependency>
    </dependencies>

2 资源配置文件

在cdh webui hbase或yarn中下载客户端配置,并把配置文件上传到工程资源目录下

file

3 kerberos.properties 配置文件

将集群的 krb5.conf 和 impala.keytab 文件下载到本地并放到 src/main/conf 目录下

file

在资源目录下创建 kerberos.properties 文件,内部配置内容如下,目的是通过读取properties文件获取kerberos 认证信息

krb5.conf.path=E:/workspaces/scala/class33/hainiuhbase/src/main/conf/krb5.conf
kerberos.user=impala@HAINIU.COM
kerberos.keytab.path=E:/workspaces/scala/class33/hainiuhbase/src/main/conf/impala.keytab

4 kerberos 认证代码

    /**
     * kerberos 安全认证
     * @param flag 是否打印认证过程信息
     */
    private static void kerberosAuth(boolean flag) {
        Properties prop = new Properties();
        try {
            prop.load(KuduAPI.class.getResourceAsStream("/kerberos.properties"));
            String krb5Path = prop.getProperty("krb5.conf.path");
            String principal = prop.getProperty("kerberos.user");
            String keytabPath = prop.getProperty("kerberos.keytab.path");
            //hadoop配置文件
            Configuration configuration = new Configuration();
            // 通过系统设置参数设置krb5.conf
            System.setProperty("java.security.krb5.conf",krb5Path);
            // 指定kerberos 权限认证
            configuration.set("hadoop.security.authentication","Kerberos");

            //打印kerberos认证过程信息
            if (flag){
                System.setProperty("sun.security.krb5.debug","true");
            }

            // 用 UserGroupInformation 类做kerberos认证
            UserGroupInformation.setConfiguration(configuration);
            try {
                // 用于刷新票据,当票据过期的时候自动刷新
                UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();

                // 通过 keytab 登录
                // 参数1:认证主体
                // 参数2:认证文件
                UserGroupInformation.loginUserFromKeytab(principal,keytabPath);

                UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
                System.out.println("loginUser:" + loginUser);
            } catch (IOException e) {
                e.printStackTrace();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

5 kudu api操作

5.1 获取 KuduClient 对象

    /**
     * 获取 kudu 的客户端对象
     * @return
     */
    private static KuduClient getKuduClient() {
        //设置kudu master地址
        String kudumasters = System.getProperty("kuduMasters", "worker-1:7051");
        KuduClient kuduClient = null;
        // 获取kudu客户端
        try {
             kuduClient = UserGroupInformation.getLoginUser().doAs(new PrivilegedAction<KuduClient>() {
                @Override
                public KuduClient run() throws Exception {
                    final KuduClient client = new KuduClient.KuduClientBuilder(kudumasters).build();
                    return client;
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return kuduClient;
    }

5.2 创建表

    /**
     * 创建kudu表
     *  id 、name、age, id作为主键, 用id做hash分区
     * @param kuduClient
     * @param tableName
     */
    private static void createTable(KuduClient kuduClient, String tableName) {
        // 创建 SchemaColums
        List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();

        // 创建id, 作为主键, snappy压缩
        final ColumnSchema idColumnSchema = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT64)
                .key(true)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();
        // 创建 name, snappy压缩
        final ColumnSchema nameColumnSchema = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 创建 age, snappy压缩
        final ColumnSchema ageColumnSchema = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 将创建columnSchema添加到列表里
        columnSchemas.add(idColumnSchema);
        columnSchemas.add(nameColumnSchema);
        columnSchemas.add(ageColumnSchema);

        // 设置 按照 id 做hash分区
        List<String> hashKeys = new ArrayList<String>();
        hashKeys.add("id");

        // 创建创建表操作对象
        final CreateTableOptions tableOptions = new CreateTableOptions();

        // 设置所建kudu表的副本数
        tableOptions.setNumReplicas(1);
        // 设置安装id进行hash分区, 共三个
        tableOptions.addHashPartitions(hashKeys, 3);

        // 创建 schema对象
        Schema schema = new Schema(columnSchemas);
        try {

            if(kuduClient.tableExists(tableName)){
                System.out.println(tableName + " is exists!");
                return;
            }

            // 创建表
            kuduClient.createTable(tableName, schema, tableOptions);
            System.out.println(tableName + ": create finished");
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

执行创建后,在kudu webui 查看:

file

file

5.2.1 按照id hash分区

CreateTableOptions createTableOptions = new CreateTableOptions();
// 设置一个副本
createTableOptions.setNumReplicas(1);

// 设置 按照 id 来 hash 分区, 分 3 分区
List<String> hashKeys = new ArrayList<>();
hashKeys.add("id");
createTableOptions.addHashPartitions(hashKeys, 3);

5.2.2 按照age range分区


/**
     * id 、name、age, id,age作为主键, 用age做range分区
     * CREATE TABLE panniu.kudu_api_range_t1
     * (
     *   id bigint,
     *   age int,
     *   name string,
     *   PRIMARY KEY(id, age)
     * )
     * partition by range (age)
     * (
     *     partition 10 <= VALUES < 15,
     *     partition 15 <= VALUES < 25,
     *     partition 25 <= VALUES < 30
     * )
     * STORED AS KUDU
     * TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
     * @param kuduClient
     */
    private static void createKuduTableWithRange(KuduClient kuduClient) {
        String tableNameRange = "xinniu.kudu_api_range_t1";
        List<ColumnSchema> columns = new ArrayList<ColumnSchema>();

        // 创建 id 的 ColumnSchema对象
        // id bigint 、主键、snappy压缩
        final ColumnSchema idcs = new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64)
                .key(true)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 创建 age 的 ColumnSchema对象
        // age int, 主键、snappy压缩
        final ColumnSchema agecs = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32)
                .key(true)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 创建 name 的 ColumnSchema对象
        final ColumnSchema namecs = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        columns.add(idcs);
        columns.add(agecs);
        columns.add(namecs);

        final Schema schema = new Schema(columns);

        final CreateTableOptions options = new CreateTableOptions();
        // 设置副本数是1
        options.setNumReplicas(1);

        List<String> keys = new ArrayList<String>();
        keys.add("age");
        // 设置range分区字段
        options.setRangePartitionColumns(keys);

        // 循环添加分区范围
        int[] lowerArr = {10, 15, 25};
        int[] upperArr = {15, 25, 30};
        for(int i = 0; i < lowerArr.length; i++){
            final int lowerNum = lowerArr[i];
            final int upperNum = upperArr[i];
            final PartialRow lowerRow = new PartialRow(schema);
            lowerRow.addInt("age", lowerNum);
            final PartialRow upperRow = new PartialRow(schema);
            upperRow.addInt("age", upperNum);
            options.addRangePartition(lowerRow, upperRow);
        }

        // 参数1:表名
        // 参数2:schema是列的集合
        // 参数3:建表参数
        try {
            kuduClient.createTable(tableNameRange, schema,options);
        } catch (KuduException e) {
            e.printStackTrace();
        }

    }

5.2.3 先按照id hash分区,后按age range分区

 private static void createKuduTableWithHashAndRange(KuduClient kuduClient) {
        String tableNameRange = "xinniu.kudu_api_range_t2";
        List<ColumnSchema> columns = new ArrayList<ColumnSchema>();

        // 创建 id 的 ColumnSchema对象
        // id bigint 、主键、snappy压缩
        final ColumnSchema idcs = new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64)
                .key(true)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 创建 age 的 ColumnSchema对象
        // age int, 主键、snappy压缩
        final ColumnSchema agecs = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32)
                .key(true)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 创建 name 的 ColumnSchema对象
        final ColumnSchema namecs = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        columns.add(idcs);
        columns.add(agecs);
        columns.add(namecs);

        final Schema schema = new Schema(columns);

        final CreateTableOptions options = new CreateTableOptions();
        // 设置副本数是1
        options.setNumReplicas(1);

        // 设置按照id hash分区, 5个
        List<String> hashKeys = new ArrayList<String>();
        hashKeys.add("id");
        options.addHashPartitions(hashKeys, 5);

        List<String> rangeKeys = new ArrayList<String>();
        rangeKeys.add("age");
        // 设置range分区字段
        options.setRangePartitionColumns(rangeKeys);

        // 循环添加分区范围
        int[] lowerArr = {10, 15, 25};
        int[] upperArr = {15, 25, 30};
        for(int i = 0; i < lowerArr.length; i++){
            final int lowerNum = lowerArr[i];
            final int upperNum = upperArr[i];
            final PartialRow lowerRow = new PartialRow(schema);
            lowerRow.addInt("age", lowerNum);
            final PartialRow upperRow = new PartialRow(schema);
            upperRow.addInt("age", upperNum);
            options.addRangePartition(lowerRow, upperRow);
        }

        // 参数1:表名
        // 参数2:schema是列的集合
        // 参数3:建表参数
        try {
            kuduClient.createTable(tableNameRange, schema,options);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

5.3 添加数据


    /**
     * 添加数据
     * @param kuduClient
     * @param tableName
     */
    private static void insertData(KuduClient kuduClient, String tableName) {
        KuduSession kuduSession = null;
        try{
            // 打开kudu表
            KuduTable kuduTable = kuduClient.openTable(tableName);
            // 打开kudu会话窗口
            kuduSession = kuduClient.newSession();
            // 设置关闭自动提交
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);

            // 创建upser对象并添加一行数据
            final Upsert upsert1 = kuduTable.newUpsert();
            final PartialRow row1 = upsert1.getRow();
            row1.addLong("id", 1L);
            row1.addString("name", "user1");
            row1.addInt("age", 11);

            final Upsert upsert2 = kuduTable.newUpsert();
            final PartialRow row2 = upsert2.getRow();
            row2.addLong("id", 2L);
            row2.addString("name", "user2");
            row2.addInt("age", 12);
            // 执行操作
            kuduSession.apply(upsert1);
            kuduSession.apply(upsert2);
            // 手动提交
            kuduSession.flush();
            kuduSession.close();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(!kuduSession.isClosed()){
                try {
                    kuduSession.close();
                } catch (KuduException e) {
                    e.printStackTrace();
                }
            }
        }
    }

添加完成后,创建impala表指向kudu表,用impala来查询

-- 创建 impala表
create EXTERNAL table xinniu.impala_kuduapi
stored as kudu
TBLPROPERTIES (
  'kudu.table_name' = 'xinniu.kudu_api_t'
);

-- 用impala表查询
select * from xinniu.impala_kuduapi;

file

kudu刷写磁盘的三种方式:

AUTO_FLUSH_SYNC(默认):
所有的写入都只有在自动刷新到服务器后才会返回。不会发生批处理,在这种模式下,Flush()函数不会产生任何效果,因为每个应用程序在返回之前已经刷新了缓冲区。

AUTO_FLUSH_BACKGROUND:
每一个应用的KuduSession.apply()函数都会返回的非常快,但是写操作会被发送到后台进程,可能与来自同一会话的其他写入一起进行批处理。如果没有足够的缓冲空间,KuduSession.apply()会阻塞,缓冲空间不可用。因为写入操作是在后台应用进行的的,因此任何错误都将存储在一个会话本地缓冲区中。注意:这个模式可能会导致数据插入是乱序的,这是因为在这种模式下,多个写操作可以并发地发送到服务器。

MANUAL_FLUSH:
每一个应用的KuduSession.apply()函数都会返回的非常快,但是写操作不会发送,直到用户使用flush()函数,如果缓冲区超过了配置的空间限制,KuduSession.apply()函数会返回一个错误。

5.4 查询表

   /**
     * 扫描全表
     * @param kuduClient
     * @param tableName
     */
    private static void scanTable(KuduClient kuduClient, String tableName) {
        try{
            // 打开kudu表
            final KuduTable kuduTable = kuduClient.openTable(tableName);
            // 创建扫描表的对象
            final KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
            // 遍历查询结果
            for(RowResult rowResult : scanner){
                long id = rowResult.getLong("id");
                String name = rowResult.getString("name");
                int age = rowResult.getInt("age");
                System.out.println("id:" + id + ", name:" + name + ", age: " + age);
            }
            // 关闭scanner
            scanner.close();

        }catch (Exception e){
            e.printStackTrace();
        }
    }

查询结果:

file

5.5 带有条件的查询

    /**
     * 实现 带有条件的scan
     * 相当于实现: select id, age from kudu_table where id >= 1 and name = user1;
     * @param kuduClient
     * @param tableName
     */
    private static void scanTableBy(KuduClient kuduClient, String tableName) {
        try{
            // 打开kudu表
            final KuduTable kuduTable = kuduClient.openTable(tableName);
            // 创建scanner对象
            final KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(kuduTable);

            // 设置查询返回字段列表
            List<String> names = new ArrayList<>();
            names.add("id");
            names.add("age");
            builder.setProjectedColumnNames(names);

            // 获取id的ColumnSchema
            final ColumnSchema idColumnSchema = kuduTable.getSchema().getColumn("id");
            // 增加scan筛选条件 id >= 1L
            final KuduPredicate idPredicate = KuduPredicate.newComparisonPredicate(idColumnSchema,
                    KuduPredicate.ComparisonOp.GREATER_EQUAL,
                    1L);

            // 添加scan条件
            builder.addPredicate(idPredicate);

            // 获取name的columnSchema
            final ColumnSchema nameColumnSchema = kuduTable.getSchema().getColumn("name");
            // 增加筛选条件 name = user1
            final KuduPredicate namePredicate = KuduPredicate.newComparisonPredicate(nameColumnSchema,
                    KuduPredicate.ComparisonOp.EQUAL,
                    "user1");
            builder.addPredicate(namePredicate);

            final KuduScanner kuduScanner = builder.build();
            for(RowResult rowResult : kuduScanner){
                final long id = rowResult.getLong("id");
//                final String name = rowResult.getString("name");
                final int age = rowResult.getInt("age");
                System.out.println("id:" + id  + ", age: " + age);

            }
            kuduScanner.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

查询结果:

file

5.6 删除表

    /**
     * 删除表
     * @param kuduClient
     * @param tableName
     */
    private static void deleteTable(KuduClient kuduClient, String tableName) {
        try {
            kuduClient.deleteTable(tableName);
            System.out.println(tableName + " :删除完成!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

通过kudu webui 查看,发现列表里已经删除:

file

5.7 全部代码

package com.hainiu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KuduAPI {

    public static void main(String[] args) {
        kerberosAuth(false);

        KuduClient kuduClient = getKuduClient();

        String tableName = "xinniu.kudu_api_t";
        createTable(kuduClient, tableName);
//        insertData(kuduClient, tableName);
//        scanTable(kuduClient, tableName);
//        scanTableBy(kuduClient, tableName);
//        deleteTable(kuduClient, tableName);
    }

    /**
     * 实现 带有条件的scan
     * 相当于实现: select id, age from kudu_table where id >= 1 and name = user1;
     * @param kuduClient
     * @param tableName
     */
    private static void scanTableBy(KuduClient kuduClient, String tableName) {
        try{
            // 打开kudu表
            final KuduTable kuduTable = kuduClient.openTable(tableName);
            // 创建scanner对象
            final KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(kuduTable);

            // 设置查询返回字段列表
            List<String> names = new ArrayList<>();
            names.add("id");
            names.add("age");
            builder.setProjectedColumnNames(names);

            // 获取id的ColumnSchema
            final ColumnSchema idColumnSchema = kuduTable.getSchema().getColumn("id");
            // 增加scan筛选条件 id >= 1L
            final KuduPredicate idPredicate = KuduPredicate.newComparisonPredicate(idColumnSchema,
                    KuduPredicate.ComparisonOp.GREATER_EQUAL,
                    1L);

            // 添加scan条件
            builder.addPredicate(idPredicate);

            // 获取name的columnSchema
            final ColumnSchema nameColumnSchema = kuduTable.getSchema().getColumn("name");
            // 增加筛选条件 name = user1
            final KuduPredicate namePredicate = KuduPredicate.newComparisonPredicate(nameColumnSchema,
                    KuduPredicate.ComparisonOp.EQUAL,
                    "user1");
            builder.addPredicate(namePredicate);

            final KuduScanner kuduScanner = builder.build();
            for(RowResult rowResult : kuduScanner){
                final long id = rowResult.getLong("id");
//                final String name = rowResult.getString("name");
                final int age = rowResult.getInt("age");
                System.out.println("id:" + id  + ", age: " + age);

            }
            kuduScanner.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 扫描全表
     * @param kuduClient
     * @param tableName
     */
    private static void scanTable(KuduClient kuduClient, String tableName) {
        try{
            // 打开kudu表
            final KuduTable kuduTable = kuduClient.openTable(tableName);
            // 创建扫描表的对象
            final KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
            // 遍历查询结果
            for(RowResult rowResult : scanner){
                long id = rowResult.getLong("id");
                String name = rowResult.getString("name");
                int age = rowResult.getInt("age");
                System.out.println("id:" + id + ", name:" + name + ", age: " + age);
            }
            // 关闭scanner
            scanner.close();

        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 获取 kudu 的客户端对象
     * @return
     */
    private static KuduClient getKuduClient() {
        //设置kudu master地址
        String kudumasters = System.getProperty("kuduMasters", "worker-1:7051");
        KuduClient kuduClient = null;
        // 获取kudu客户端
        try {
             kuduClient = UserGroupInformation.getLoginUser().doAs(new PrivilegedAction<KuduClient>() {
                @Override
                public KuduClient run() throws Exception {
                    final KuduClient client = new KuduClient.KuduClientBuilder(kudumasters).build();
                    return client;
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return kuduClient;
    }

    /**
     * 删除表
     * @param kuduClient
     * @param tableName
     */
    private static void deleteTable(KuduClient kuduClient, String tableName) {
        try {
            kuduClient.deleteTable(tableName);
            System.out.println(tableName + " :删除完成!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 添加数据
     * @param kuduClient
     * @param tableName
     */
    private static void insertData(KuduClient kuduClient, String tableName) {
        KuduSession kuduSession = null;
        try{
            // 打开kudu表
            KuduTable kuduTable = kuduClient.openTable(tableName);
            // 打开kudu会话窗口
            kuduSession = kuduClient.newSession();
            // 设置关闭自动提交
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);

            // 创建upser对象
            final Upsert upsert1 = kuduTable.newUpsert();
            final PartialRow row1 = upsert1.getRow();
            row1.addLong("id", 1L);
            row1.addString("name", "user1");
            row1.addInt("age", 11);

            final Upsert upsert2 = kuduTable.newUpsert();
            final PartialRow row2 = upsert2.getRow();
            row2.addLong("id", 2L);
            row2.addString("name", "user2");
            row2.addInt("age", 12);
            // 执行操作
            kuduSession.apply(upsert1);
            kuduSession.apply(upsert2);
            // 手动提交
            kuduSession.flush();
            kuduSession.close();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(!kuduSession.isClosed()){
                try {
                    kuduSession.close();
                } catch (KuduException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 创建kudu表
     *  id 、name、age, id作为主键, 用id做hash分区
     * @param kuduClient
     * @param tableName
     */
    private static void createTable(KuduClient kuduClient, String tableName) {
        // 创建 SchemaColums
        List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();

        // 创建id, 作为主键, snappy压缩
        final ColumnSchema idColumnSchema = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT64)
                .key(true)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();
        // 创建 name, snappy压缩
        final ColumnSchema nameColumnSchema = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 创建 age, snappy压缩
        final ColumnSchema ageColumnSchema = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32)
                .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY)
                .build();

        // 将创建columnSchema添加到列表里
        columnSchemas.add(idColumnSchema);
        columnSchemas.add(nameColumnSchema);
        columnSchemas.add(ageColumnSchema);

        // 设置 按照 id 做hash分区
        List<String> hashKeys = new ArrayList<String>();
        hashKeys.add("id");

        // 创建创建表操作对象
        final CreateTableOptions tableOptions = new CreateTableOptions();

        // 设置所建kudu表的副本数
        tableOptions.setNumReplicas(1);
        // 设置安装id进行hash分区, 共三个
        tableOptions.addHashPartitions(hashKeys, 3);

        // 创建 schema对象
        Schema schema = new Schema(columnSchemas);
        try {

            if(kuduClient.tableExists(tableName)){
                System.out.println(tableName + " is exists!");
                return;
            }

            // 创建表
            kuduClient.createTable(tableName, schema, tableOptions);
            System.out.println(tableName + ": create finished");
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    /**
     * kerberos 安全认证
     * @param flag 是否打印认证过程信息
     */
    private static void kerberosAuth(boolean flag) {
        Properties prop = new Properties();
        try {
            prop.load(KuduAPI.class.getResourceAsStream("/kerberos.properties"));
            String krb5Path = prop.getProperty("krb5.conf.path");
            String principal = prop.getProperty("kerberos.user");
            String keytabPath = prop.getProperty("kerberos.keytab.path");
            //hadoop配置文件
            Configuration configuration = new Configuration();
            // 通过系统设置参数设置krb5.conf
            System.setProperty("java.security.krb5.conf",krb5Path);
            // 指定kerberos 权限认证
            configuration.set("hadoop.security.authentication","Kerberos");

            //打印kerberos认证过程信息
            if (flag){
                System.setProperty("sun.security.krb5.debug","true");
            }

            // 用 UserGroupInformation 类做kerberos认证
            UserGroupInformation.setConfiguration(configuration);
            try {
                // 用于刷新票据,当票据过期的时候自动刷新
                UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();

                // 通过 keytab 登录
                // 参数1:认证主体
                // 参数2:认证文件
                UserGroupInformation.loginUserFromKeytab(principal,keytabPath);

                UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
                System.out.println("loginUser:" + loginUser);
            } catch (IOException e) {
                e.printStackTrace();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

6 impala关联kudu表(非impala创建)

-- impala 关联 已有的非impala创建的kudu表, 那需要指定 kudu.table_name 参数
create EXTERNAL table xinniu.impala_kuduapi
stored as kudu
TBLPROPERTIES (
  'kudu.table_name' = 'xinniu.kudu_api_t'
);

file

插入数据后,可查询结果

file

删除impala表,kudu表还存在

file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-阿布都的都,http://hainiubl.com/topics/76078
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter