目录结构
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hainiu</groupId>
<artifactId>kuduAPI</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>5.0.0-HBase-2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置文件
- kerberos configuration
krb5.conf.path=/Users/fujie/hainiu/krb5.conf
kerberos.user=impala@HAINIU.COM
kerberos.keytab.path=/Users/fujie/hainiu/impala.keytab
工具类
package utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.Properties;
/**
* @author : xiniu
* @date : 2021/6/3
* @describe :kerberos init
*/
public class KerberosInit {
public static void kbAuth(Boolean flag){
try {
Properties properties = new Properties();
properties.load(KerberosInit.class.getClass().getResourceAsStream("/kerberos.properties"));
System.setProperty("java.security.krb5.conf", properties.getProperty("krb5.conf.path"));
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
if (flag) System.setProperty("sun.security.krb5.debug", "true");
Configuration configuration = new Configuration();
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.user"), properties.getProperty("kerberos.keytab.path"));
} catch(Exception e) {
e.printStackTrace();
}
}
}
api操作
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.log4j.Logger;
import utils.KerberosInit;
import java.security.PrivilegedExceptionAction;
import java.util.*;
/**
* @author : xiniu
* @date : 2021/6/3
* @describe :使用api操作kerberos安全认证环境下kudu
*/
public class kuduAPI {
static Logger logger = Logger.getLogger(kuduAPI.class);
public static void main(String[] args) {
//kerberos 初始化
KerberosInit.kbAuth(false);
//设置kudu master server地址
String kuduMasters = System.getProperty("kuduMasters", "worker-2:7051");
try {
try {
//使用kerberos认证获得kudu客户端
KuduClient kuduClient = UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<KuduClient>() {
@Override
public KuduClient run() throws Exception {
KuduClient kuduClient = new KuduClient.KuduClientBuilder(kuduMasters).build();
return kuduClient;
}
});
//操作表表名
String tableName = "xiniu.testbyapi";
// String tableName = "impala::xiniu.my_first_table";
//创建表
// createTable(kuduClient,tableName);
//删除表
// deleteTable(kuduClient,tableName);
//插入数据
HashMap<Integer, String> data = new HashMap<>();
data.put(1,"user1");
data.put(2,"user2");
// upsertData(kuduClient,tableName,data);
//查询数据
// scanTable(kuduClient,tableName);
//条件查询
scanTableByColumn(kuduClient,tableName);
//show tables
// showTables(kuduClient);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查询所有表
* @param kuduClient 客户端
*/
private static void showTables(KuduClient kuduClient) {
try {
ListTablesResponse tablesList = kuduClient.getTablesList();
List<String> tables = tablesList.getTablesList();
for (String table : tables) {
logger.info("table name : " + table);
}
} catch (KuduException e) {
logger.error(e);
}
}
/**
* 条件查询
* @param kuduClient 客户端
* @param tableName 表名
*/
private static void scanTableByColumn(KuduClient kuduClient, String tableName) {
try {
//开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//表的列名<所有的列都要添加进来>
ArrayList<String> projectColumns = new ArrayList<>();
projectColumns.add("id");
projectColumns.add("name");
KuduScanner.KuduScannerBuilder scanner = kuduClient.newScannerBuilder(kuduTable).setProjectedColumnNames(projectColumns);
KuduPredicate id = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, 1L);
scanner.addPredicate(id);
KuduScanner build = scanner.build();
//遍历结果
while (build.hasMoreRows()){
RowResultIterator rowResults = build.nextRows();
while (rowResults.hasNext()){
RowResult next = rowResults.next();
long idResult = next.getLong("id");
String nameResult = next.getString("name");
logger.info("query result : <id:" + idResult + ",name:" + nameResult + ">");
}
}
build.close();
} catch (KuduException e) {
logger.error(e);
}
}
/**
* 扫描表所有数据
* @param kuduClient 客户端
* @param tableName 表名
*/
private static void scanTable(KuduClient kuduClient, String tableName) {
try {
//开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//扫描
KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
//遍历结果
while (scanner.hasMoreRows()){
RowResultIterator rowResults = scanner.nextRows();
while (rowResults.hasNext()){
RowResult next = rowResults.next();
long id = next.getLong("id");
String name = next.getString("name");
logger.info("query result : <id:" + id + ",name:" + name + ">");
}
}
scanner.close();
} catch (KuduException e) {
logger.error(e);
}
}
/**
* upsert插入或者更新数据
* @param kuduClient 客户端
* @param tableName 表名
* @param data 插入的数据
*/
private static void upsertData(KuduClient kuduClient, String tableName, HashMap<Integer, String> data) {
KuduTable kuduTable = null;
KuduSession kuduSession = null;
Set<Map.Entry<Integer, String>> entries = data.entrySet();
for (Map.Entry<Integer, String> entry : entries) {
long id = entry.getKey().longValue();
String name = entry.getValue();
try {
//打开表
kuduTable = kuduClient.openTable(tableName);
//开会话窗口,用于事务管理,默认是自动提交的
kuduSession = kuduClient.newSession();
//关掉自动提交
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
//创建upsert操作对象
Upsert upsert = kuduTable.newUpsert();
//创建rowset对象
PartialRow row = upsert.getRow();
row.addLong("id",id);
row.addString("name",name);
//提交事务
kuduSession.apply(upsert);
logger.info("upsert data successful:<id:" + id + ",name:" + name + ">");
} catch (Exception e) {
logger.error(e);
}
}
try {
kuduSession.flush();
kuduClient.close();
} catch (KuduException e) {
logger.error(e);
}
}
/**
* 删除表
* @param kuduClient 客户端
* @param tableName 表名
*/
private static void deleteTable(KuduClient kuduClient, String tableName) {
try {
kuduClient.deleteTable(tableName);
logger.info(tableName + " delete successful!");
} catch (KuduException e) {
logger.error(e);
}
}
/**
* 创建表
* @param kuduClient 客户端
* @param tableName 表名
*/
private static void createTable(KuduClient kuduClient, String tableName) {
//创建列
ArrayList<ColumnSchema> columns = new ArrayList<>();
ColumnSchema id = new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64).key(true)
.compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build();
ColumnSchema name = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).comment("name")
.compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build();
columns.add(id);
columns.add(name);
//创建schema
Schema schema = new Schema(columns);
//hashkeys
ArrayList<String> hashkeys = new ArrayList<>();
hashkeys.add("id");
//创建表操作对象
CreateTableOptions createTableOptions = new CreateTableOptions();
//添加hash分区信息
createTableOptions.addHashPartitions(hashkeys, 3);
//使用client创建表
try {
if (kuduClient.tableExists(tableName)){
logger.info(tableName + " is exists!");
return;
}
kuduClient.createTable(tableName, schema, createTableOptions);
logger.info("create table " + tableName + "successful");
} catch (KuduException e) {
logger.error(e);
}
}
}