phoenix 入门

随着数据仓库逐渐迁移到大数据集群,慢慢开始学习新的技术。本系列主要来源于对官网的一些翻译,以及补充自己遇到的一直知识点

phoenix 入门概述

Phoenix是一个开源的HBASE SQL层。它不仅可以使用标准的JDBC API替代HBASE client API创建表,插入和查询HBASE,也支持二级索引,事物以及多种SQL层优化。

访问phoenix的方式如下:

  • 通过jdbc连接到hbase集群方式如下

    Connection conn = DriverManager.getConnection(“jdbc:phoenix:server1,server2:3333”,props);

  • 使用Python编写的命令行工具(sqlline, sqlline-thin和psql等)

    下载phoenix jar包解压到适合的目录下,进入bin目录,执行如下命令
    ./sqlline.py zk1,zk2,zk3:2181

  • SQuirrel

    提供客户端界面,配置步骤自行百度

建表测试

CREATE TABLE my_schema.my_table (id BIGINT not null primary key, date Date)

CREATE TABLE stats.prod_metrics ( host char(50) not null, created_date date not null,
txn_count bigint CONSTRAINT pk PRIMARY KEY (host, created_date) )

看到建表语句,也许你会疑惑。my_schema是什么鬼!这边解释一下,schema在phoenix中有点像数据仓库中的db一样,schema对应的是hbase的namespace。

在phoenix表中必须指定PRIMARY KEY,它可以是一个字段或者多个字段组成。

增删改查测试

SELECT (与常用sql基本一直)

从一张或者多张表查询。
UNION ALL 组合行从多个select语句
ORDER BY 根据给定的表达式进行排序
LIMIT (FETCH FIRST) 限制返回的行数 OFFSET 查询结果中跳过n行返回,常与limit配合使用。

Example:

SELECT * FROM TEST LIMIT 1000;
SELECT * FROM TEST LIMIT 1000 OFFSET 100;
SELECT full_name FROM SALES_PERSON WHERE ranking >= 5.0
    UNION ALL SELECT reviewer_name FROM CUSTOMER_REVIEW WHERE score >= 8.0

upsert values

此处upsert语义有异于标准SQL中的Insert,当写入值不存在时,表示写入数据,否则更新数据。

Example:

UPSERT INTO TEST VALUES('foo','bar',3);
UPSERT INTO TEST(NAME,ID) VALUES('foo',123);
UPSERT INTO TEST(ID, COUNTER) VALUES(123, 0) ON DUPLICATE KEY UPDATE COUNTER = COUNTER + 1;
UPSERT INTO TEST(ID, MY_COL) VALUES(123, 0) ON DUPLICATE KEY IGNORE;

upsert select

从另外一张表中读取数据写入到目标表中,如果数据存在则更新,否则插入数据。插入目标表的值顺序和查询表指定查询字段一致。当auto commit被打开并且select子句没有聚合时,写入目标表这个过程是在server端完成的,否则查询的数据会先缓存在客户端再写入目标表中(phoenix.mutate.upsertBatchSize表示从客户端一次commit的行数,默认10000行)。

UPSERT INTO test.targetTable(col1, col2) SELECT col3, col4 FROM test.sourceTable WHERE col5 < 100
UPSERT INTO foo SELECT * FROM bar;

delete

删除where条件选中的行

Example:
DELETE FROM TEST;
DELETE FROM TEST WHERE ID=123;
DELETE FROM TEST WHERE NAME LIKE 'foo%';

访问方式测试示例

jdbc api

通过meavn创建工程,添加依赖包:

<dependencies>
    <dependency>
        <groupId>com.aliyun.phoenix</groupId>
        <artifactId>ali-phoenix-core</artifactId>
        <version>${version}</version>
    </dependency>
</dependencies>

创建测试文件:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.PreparedStatement;
import java.sql.Statement;

public class test {

    public static void main(String[] args) throws SQLException {
        Statement stmt = null;
        ResultSet rset = null;
        
        Connection con = DriverManager.getConnection("jdbc:phoenix:[zookeeper]");
        stmt = con.createStatement();
        
        stmt.executeUpdate("create table test (mykey integer not null primary key, mycolumn varchar)");
        stmt.executeUpdate("upsert into test values (1,'Hello')");
        stmt.executeUpdate("upsert into test values (2,'World!')");
        con.commit();
        
        PreparedStatement statement = con.prepareStatement("select * from test");
        rset = statement.executeQuery();
        while (rset.next()) {
            System.out.println(rset.getString("mycolumn"));
        }
        statement.close();
        con.close();
    }
}

python QueryServer

import phoenixdb
import phoenixdb.cursor

# 使用python需在环境中安装phoenix-python驱动,参见https://python-phoenixdb.readthedocs.io/en/latest/
if __name__ == "__main__":
    # url修改为运行环境的QueryServer连接地址
    database_url = 'http://localhost:8765'
    # python客户端仅支持自动提交方式
    conn = phoenixdb.connect(database_url, autocommit=True)

    cursor = conn.cursor()
    cursor.execute("DROP TABLE IF EXISTS users")
    # 创建测试表
    cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, username VARCHAR)")
    # 创建索引
    cursor.execute("CREATE INDEX users_index ON users (username)")

    # 插入一条数据
    cursor.execute("UPSERT INTO users VALUES (?, ?)", (11, 'admin'))
    # 批量插入数据
    param = ((1, "tom"), (2, "alice"), (3, "bob"))
    cursor.executemany("UPSERT INTO users VALUES (?, ?)", param)

    # 查询数据
    cursor.execute("SELECT * FROM users")
    print cursor.fetchall()
    cursor = conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
    # query data use index
    cursor.execute("SELECT * FROM users WHERE username='admin'")
    print cursor.fetchone()['ID']

相关资源连接

phoenix 官网
git phoenix访问示例

Life is more than the present.