随着数据仓库逐渐迁移到大数据集群,慢慢开始学习新的技术。本系列主要来源于对官网的一些翻译,以及补充自己遇到的一直知识点
phoenix 入门概述
Phoenix是一个开源的HBASE SQL层。它不仅可以使用标准的JDBC API替代HBASE client API创建表,插入和查询HBASE,也支持二级索引,事物以及多种SQL层优化。
访问phoenix的方式如下:
- 通过jdbc连接到hbase集群方式如下
Connection conn = DriverManager.getConnection(“jdbc:phoenix:server1,server2:3333”,props);
- 使用Python编写的命令行工具(sqlline, sqlline-thin和psql等)
下载phoenix jar包解压到适合的目录下,进入bin目录,执行如下命令
./sqlline.py zk1,zk2,zk3:2181 - SQuirrel
提供客户端界面,配置步骤自行百度
建表测试
CREATE TABLE my_schema.my_table (id BIGINT not null primary key, date Date)
CREATE TABLE stats.prod_metrics ( host char(50) not null, created_date date not null,
txn_count bigint CONSTRAINT pk PRIMARY KEY (host, created_date) )
看到建表语句,也许你会疑惑。my_schema是什么鬼!这边解释一下,schema在phoenix中有点像数据仓库中的db一样,schema对应的是hbase的namespace。
在phoenix表中必须指定PRIMARY KEY,它可以是一个字段或者多个字段组成。
增删改查测试
SELECT (与常用sql基本一直)
从一张或者多张表查询。
UNION ALL 组合行从多个select语句
ORDER BY 根据给定的表达式进行排序
LIMIT (FETCH FIRST) 限制返回的行数
OFFSET 查询结果中跳过n行返回,常与limit配合使用。
Example:
SELECT * FROM TEST LIMIT 1000;
SELECT * FROM TEST LIMIT 1000 OFFSET 100;
SELECT full_name FROM SALES_PERSON WHERE ranking >= 5.0
UNION ALL SELECT reviewer_name FROM CUSTOMER_REVIEW WHERE score >= 8.0
upsert values
此处upsert语义有异于标准SQL中的Insert,当写入值不存在时,表示写入数据,否则更新数据。
Example:
UPSERT INTO TEST VALUES('foo','bar',3);
UPSERT INTO TEST(NAME,ID) VALUES('foo',123);
UPSERT INTO TEST(ID, COUNTER) VALUES(123, 0) ON DUPLICATE KEY UPDATE COUNTER = COUNTER + 1;
UPSERT INTO TEST(ID, MY_COL) VALUES(123, 0) ON DUPLICATE KEY IGNORE;
upsert select
从另外一张表中读取数据写入到目标表中,如果数据存在则更新,否则插入数据。插入目标表的值顺序和查询表指定查询字段一致。当auto commit被打开并且select子句没有聚合时,写入目标表这个过程是在server端完成的,否则查询的数据会先缓存在客户端再写入目标表中(phoenix.mutate.upsertBatchSize表示从客户端一次commit的行数,默认10000行)。
UPSERT INTO test.targetTable(col1, col2) SELECT col3, col4 FROM test.sourceTable WHERE col5 < 100
UPSERT INTO foo SELECT * FROM bar;
delete
删除where条件选中的行
Example:
DELETE FROM TEST;
DELETE FROM TEST WHERE ID=123;
DELETE FROM TEST WHERE NAME LIKE 'foo%';
访问方式测试示例
jdbc api
通过meavn创建工程,添加依赖包:
<dependencies>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-core</artifactId>
<version>${version}</version>
</dependency>
</dependencies>
创建测试文件:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.PreparedStatement;
import java.sql.Statement;
public class test {
public static void main(String[] args) throws SQLException {
Statement stmt = null;
ResultSet rset = null;
Connection con = DriverManager.getConnection("jdbc:phoenix:[zookeeper]");
stmt = con.createStatement();
stmt.executeUpdate("create table test (mykey integer not null primary key, mycolumn varchar)");
stmt.executeUpdate("upsert into test values (1,'Hello')");
stmt.executeUpdate("upsert into test values (2,'World!')");
con.commit();
PreparedStatement statement = con.prepareStatement("select * from test");
rset = statement.executeQuery();
while (rset.next()) {
System.out.println(rset.getString("mycolumn"));
}
statement.close();
con.close();
}
}
python QueryServer
import phoenixdb
import phoenixdb.cursor
# 使用python需在环境中安装phoenix-python驱动,参见https://python-phoenixdb.readthedocs.io/en/latest/
if __name__ == "__main__":
# url修改为运行环境的QueryServer连接地址
database_url = 'http://localhost:8765'
# python客户端仅支持自动提交方式
conn = phoenixdb.connect(database_url, autocommit=True)
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS users")
# 创建测试表
cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, username VARCHAR)")
# 创建索引
cursor.execute("CREATE INDEX users_index ON users (username)")
# 插入一条数据
cursor.execute("UPSERT INTO users VALUES (?, ?)", (11, 'admin'))
# 批量插入数据
param = ((1, "tom"), (2, "alice"), (3, "bob"))
cursor.executemany("UPSERT INTO users VALUES (?, ?)", param)
# 查询数据
cursor.execute("SELECT * FROM users")
print cursor.fetchall()
cursor = conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
# query data use index
cursor.execute("SELECT * FROM users WHERE username='admin'")
print cursor.fetchone()['ID']