Apache Calcite查询MySQL

  |   0 评论   |   1,375 浏览

背景

为什么要用Apache Calcite来查询MySQL呢?直接用JDBC访问MySQL多方便啊。

ok,本文其实是尝试一下如何用Apache Calcite来访问JDBC数据源,这里取MySQL为JDBC数据源的代表。

方法

Calcite官网上提供了一个访问MySQL的示例,经过测试,使用起来非常友好。

POC使用

准备MySQL环境

找一个mysql环境,准备测试数据

CREATE DATABASE IF NOT EXISTS `hr` DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
USE `hr`;

DROP TABLE IF EXISTS `emps`;
DROP TABLE IF EXISTS `depts`;

CREATE TABLE `depts` (
  `deptno` int(11) NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


CREATE TABLE `emps` (
  `empno` int(11) NOT NULL DEFAULT '0',
  `name` varchar(128) DEFAULT NULL,
  `deptno` int(11) DEFAULT '0',
  `gender` varchar(128) DEFAULT NULL,
  `city` varchar(128) DEFAULT NULL,
  `empid` varchar(128) NOT NULL DEFAULT '0',
  `age` int(11) DEFAULT '0',
  `slacker` int(1) DEFAULT '0',
  `manager` int(1) DEFAULT '0',
  `joinedat` datetime DEFAULT NULL,
  PRIMARY KEY (`empno`,`empid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


INSERT INTO `depts` (`deptno`, `name`) VALUES
	(10,'Sales'),
	(20,'Marketing'),
	(30,'Accounts');

INSERT INTO `emps` (`empno`, `name`, `deptno`, `gender`, `city`, `empid`, `age`, `slacker`, `manager`, `joinedat`) VALUES
	(100,'Fred',10,NULL,NULL,'30',25,1,1,'1996-08-03 00:00:00'),
	(110,'John',40,'M','Vancouver','2',0,0,1,'2002-05-03 00:00:00'),
	(110,'Eric',20,'M','San Francisco','3',80,0,0,'2001-01-01 00:00:00'),
	(120,'Wilma',20,'F',NULL,'1',5,0,1,'2005-09-07 00:00:00'),
	(130,'Alice',40,'F','Vancouver','2',0,0,1,'2007-01-01 00:00:00');

MySQL依赖

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.46</version>
    </dependency>

准备代码

package com.abeffect.calcite.mysql;

import java.io.PrintStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.commons.dbcp.BasicDataSource;

public class MainTestSchema {

	public static void main(String[] args) throws ClassNotFoundException, SQLException {

		Class.forName("org.apache.calcite.jdbc.Driver");

		Properties info = new Properties();
		info.setProperty("lex", "JAVA");
		Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
		CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
		SchemaPlus rootSchema = calciteConnection.getRootSchema();

		// 本地Schema.
		// Schema schema = ReflectiveSchema.create(calciteConnection, rootSchema, "hr",
		// new HrSchema());

		Class.forName("com.mysql.jdbc.Driver");
		BasicDataSource dataSource = new BasicDataSource();
		dataSource.setUrl("jdbc:mysql://localhost/hr");
		dataSource.setUsername("flowaters");
		dataSource.setPassword("abeffect");
		Schema schema = JdbcSchema.create(rootSchema, "hr", dataSource, null, "hr");

		rootSchema.add("hr", schema);
		Statement statement = calciteConnection.createStatement();
		ResultSet resultSet = statement.executeQuery(
				"select d.deptno, min(e.empid) from hr.emps as e join hr.depts as d on e.deptno = d.deptno "
						+ "group by d.deptno having count(*) > 1");

		output(resultSet, System.out);
		resultSet.close();
		statement.close();
		connection.close();
	}

	private static void output(ResultSet resultSet, PrintStream out) throws SQLException {
		final ResultSetMetaData metaData = resultSet.getMetaData();
		final int columnCount = metaData.getColumnCount();
		while (resultSet.next()) {
			for (int i = 1;; i++) {
				out.print(resultSet.getString(i));
				if (i < columnCount) {
					out.print(", ");
				} else {
					out.println();
					break;
				}
			}
		}
	}
}

执行代码

20, 1

验证下推使用

这里来验证一下Calcite确实使用了下推(push down),将joingroup by放到了MySQL中进行。

思路是打开MySQL慢查询日志,查询日志内容。

打开MySQL慢查询日志

查看慢查询开关,和慢查询日志文件

mysql> show variables like '%query%';

结果

+----------------------------------------+-----------------------------------+
| Variable_name                          | Value                             |
+----------------------------------------+-----------------------------------+
| long_query_time                        | 10.000000                         |
| slow_query_log                         | OFF                               |
| slow_query_log_file                    | /usr/local/var/mysql/fox-slow.log |
+----------------------------------------+-----------------------------------+

打开慢查询

mysql> set global slow_query_log = ON;
Query OK, 0 rows affected (0.00 sec)

mysql> SET GLOBAL long_query_time = 0.0001;
Query OK, 0 rows affected (0.00 sec)

mysql> set global log_output='FILE';
Query OK, 0 rows affected (0.00 sec)

如果想输出到mysql.slow_log中,则设置

mysql> set global log_output='TABLE';
Query OK, 0 rows affected (0.00 sec)

但是输出到mysql.slow_log中的sql_text字段类型是mediumblob的,不能直接读。我这里选择的是输出到文件的方式。

查看慢查询日志

SET timestamp=1525481025;
SHOW FULL TABLES FROM `hr` LIKE '%';

SET timestamp=1525481025;
SHOW FULL TABLES FROM `hr` LIKE 'emps';

SET timestamp=1525481025;
SHOW FULL COLUMNS FROM `emps` FROM `hr` LIKE '%';

SET timestamp=1525481025;
SHOW FULL TABLES FROM `hr` LIKE 'depts';

SET timestamp=1525481025;
SHOW FULL COLUMNS FROM `depts` FROM `hr` LIKE '%';

SET timestamp=1525481025;
SELECT `t`.`deptno`, MIN(`t0`.`empid`)
FROM (SELECT `deptno`
FROM `hr`.`depts`) AS `t`
INNER JOIN (SELECT `deptno`, `empid`
FROM `hr`.`emps`) AS `t0` ON `t`.`deptno` = `t0`.`deptno`
GROUP BY `t`.`deptno`
HAVING COUNT(*) > 1;

可见确实是进行了下推操作.

讨论

  1. Calcite支持使用优化rule (optimizer rules)把Join和Group By请求下推(push down)到数据源进行.
  2. 开发支持新的数据源时,需要编写一个adapter来告诉Calcite数据源中的哪些数据构成了table
  3. Calcite支持开发优化rule (optimizer rules):包括

1) 访问新的数据源格式
2) 编写新的算子(如更好的Join算子)
3) 优化查询query怎么转换成算子
4) 应用CBO(cost-based optimization)模型,来生成还好的执行计划

附录

Calcite官方直接支持的数据源

参考

评论

发表评论

validate