Apache Calcite初体验

  |   0 评论   |   963 浏览

背景

如果想自己实现一个JDBC接口的话,我们可以从Apache Calcite来开始,而不是从头开始。

Apache Calcite没有实现数据存储,数据处理和元数据存储,把这三部分留给了用户来实现。但是Calcite实现了其它部分,包括SQL解析,SQL算子实现等等。

现在可以直接从下面的实例来看,也可以阅读参考1来进一步了解。

CSV使用示例

通过CSV使用示例,可以了解到:

  • 通过SchemaFactory和Schema接口来用户自定义Schema
  • 通过json文件模型来声明schema
  • 通过json文件模型来声明view
  • 通过Table接口来用户自定义table
  • 决定Table的字段类型
  • Table的简单实现:通过ScannableTable接口来枚举表内容
  • Table的高级实现:通过FilterableTable接口来预测截断表内容
  • Table的高级实现:通过TranslatableTable接口来通过计划规则(planner rules)来改变关系算子

建立环境

$ git clone https://github.com/apache/calcite.git
$ cd calcite
$ mvn install -DskipTests -Dcheckstyle.skip=true
$ cd example/csv

如果留心的话,可以看到构建的包有:

[INFO] Reactor Summary:
[INFO]
[INFO] Calcite ............................................ SUCCESS [  1.949 s]
[INFO] Calcite Linq4j ..................................... SUCCESS [  1.835 s]
[INFO] Calcite Core ....................................... SUCCESS [01:12 min]
[INFO] Calcite Cassandra .................................. SUCCESS [  7.165 s]
[INFO] Calcite Druid ...................................... SUCCESS [  6.347 s]
[INFO] Calcite Elasticsearch .............................. SUCCESS [ 17.064 s]
[INFO] Calcite Elasticsearch5 ............................. SUCCESS [ 14.316 s]
[INFO] Calcite Examples ................................... SUCCESS [  0.023 s]
[INFO] Calcite Example CSV ................................ SUCCESS [  1.541 s]
[INFO] Calcite Example Function ........................... SUCCESS [  0.965 s]
[INFO] Calcite File ....................................... SUCCESS [  4.427 s]
[INFO] Calcite Geode ...................................... SUCCESS [ 32.260 s]
[INFO] Calcite MongoDB .................................... SUCCESS [  3.839 s]
[INFO] Calcite Pig ........................................ SUCCESS [ 49.779 s]
[INFO] Calcite Piglet ..................................... SUCCESS [  1.238 s]
[INFO] Calcite Plus ....................................... SUCCESS [  3.705 s]
[INFO] Calcite Server ..................................... SUCCESS [  2.051 s]
[INFO] Calcite Spark ...................................... SUCCESS [ 49.440 s]
[INFO] Calcite Splunk ..................................... SUCCESS [  1.177 s]
[INFO] Calcite Ubenchmark ................................. SUCCESS [  7.500 s]

启动应用

$ ./sqlline
sqlline version 1.3.0
sqlline>

连接CSV数据源

sqlline> !connect jdbc:calcite:model=example/csv/target/test-classes/model.json admin admin
0: jdbc:calcite:model=example/csv/target/test>

table结构

0: jdbc:calcite:model=example/csv/target/test> !tables
+-----------+-------------+------------+------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+-------------+------------+------------+---------+----------+------------+-----------+---------------------------+----------------+
|           | SALES       | DEPTS      | TABLE      |         |          |            |           |                           |                |
|           | SALES       | EMPS       | TABLE      |         |          |            |           |                           |                |
|           | SALES       | SDEPTS     | TABLE      |         |          |            |           |                           |                |
|           | metadata    | COLUMNS    | SYSTEM TABLE |         |          |            |           |                           |                |
|           | metadata    | TABLES     | SYSTEM TABLE |         |          |            |           |                           |                |
+-----------+-------------+------------+------------+---------+----------+------------+-----------+---------------------------+----------------+

注:

!tables命令对应的是DatabaseMetaData.getTables()实现

!columns!describe也是同样的情况

上面的结果中,其中COLUMNSTABLES表是系统表,其余三张表DEPTS, EMPSSDEPTS分别对应于相应的CSV文件。

查询

0: jdbc:calcite:model=example/csv/target/test> SELECT * FROM emps;
+------------+------+------------+--------+------+------------+------------+---------+---------+----------+
|   EMPNO    | NAME |   DEPTNO   | GENDER | CITY |   EMPID    |    AGE     | SLACKER | MANAGER | JOINEDAT |
+------------+------+------------+--------+------+------------+------------+---------+---------+----------+
| 100        | Fred | 10         |        |      | 30         | 25         | true    | false   | 1996-08-03 |
| 110        | Eric | 20         | M      | San Francisco | 3          | 80         |         | false   | 2001-01-01 |
| 110        | John | 40         | M      | Vancouver | 2          | null       | false   | true    | 2002-05-03 |
| 120        | Wilma | 20         | F      |      | 1          | 5          |         | true    | 2005-09-07 |
| 130        | Alice | 40         | F      | Vancouver | 2          | null       | false   | true    | 2007-01-01 |
+------------+------+------------+--------+------+------------+------------+---------+---------+----------+
5 rows selected (0.064 seconds)

JOIN和GROUP BY

0: jdbc:calcite:model=example/csv/target/test> SELECT d.name, COUNT(*) FROM emps AS e JOIN depts AS d ON e.deptno = d.deptno GROUP BY d.name;
+------+---------------------+
| NAME |       EXPR$1        |
+------+---------------------+
| Sales | 1                   |
| Marketing | 2                   |
+------+---------------------+
2 rows selected (0.322 seconds)

CSV使用原理

Calcite对CSV格式一无所知,那是怎么感知表结构和内容的呢?

答案就在calcite-example-csv工程的代码中。

schema

Calcite是怎么得知CSV格式的Schema呢?答案是通过model文件。

在连接数据源的同时,需要指定model文件。Calcite先解析model文件,再通过model文件中的factory来生成对应的Schema。

在本例中,Csv文件格式的model文件如下:

{
  "version": "1.0",
  "defaultSchema": "SALES",
  "schemas": [
    {
      "name": "SALES",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
      "operand": {
        "directory": "sales"
      }
    }
  ]
}

可见model文件中定义了SALES表的schema。

创建schema和表

Calcite通过SchemaFactory创建Schema,再通过Schema创建若干个table,每个table通过扫描csv文件来得到数据。

创建schema

紧接上面的例子。Calcite通过调用org.apache.calcite.adapter.csv.CsvSchemaFactory工厂类的create方法,传入目录参数directory,本例中其值为sales,来创建得到Schema结果。具体如下:

  public Schema create(SchemaPlus parentSchema, String name,
      Map<String, Object> operand) {
    final String directory = (String) operand.get("directory");
    final File base =
        (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
    File directoryFile = new File(directory);
    if (base != null && !directoryFile.isAbsolute()) {
      directoryFile = new File(base, directory);
    }
    String flavorName = (String) operand.get("flavor");
    CsvTable.Flavor flavor;
    if (flavorName == null) {
      flavor = CsvTable.Flavor.SCANNABLE;
    } else {
      flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase(Locale.ROOT));
    }
    return new CsvSchema(directoryFile, flavor);
  }

这里的Schema接口的实现类是org.apache.calcite.adapter.csv.CsvSchema

创建表

Schema的作用是得到table列表,即重写getTableMap方法。Schema也支持列出子Schema,和表函数,但是在本示例中没有涉及。

本示例中重写getTableMap方法如下:

private Map createTableMap() {
  // Look for files in the directory ending in ".csv", ".csv.gz", ".json",
 // ".json.gz".  final Source baseSource = Sources.of(directoryFile);
  File[] files = directoryFile.listFiles(
      new FilenameFilter() {
        public boolean accept(File dir, String name) {
          final String nameSansGz = trim(name, ".gz");
          return nameSansGz.endsWith(".csv")
              || nameSansGz.endsWith(".json");
        }
      });
  if (files == null) {
    System.out.println("directory " + directoryFile + " not found");
    files = new File[0];
  }
  // Build a map from table name to table; each file becomes a table.
  final ImmutableMap.Builder builder = ImmutableMap.builder();
  for (File file : files) {
    Source source = Sources.of(file);
    Source sourceSansGz = source.trim(".gz");
    final Source sourceSansJson = sourceSansGz.trimOrNull(".json");
    if (sourceSansJson != null) {
      JsonTable table = new JsonTable(source);
      builder.put(sourceSansJson.relative(baseSource).path(), table);
      continue;
    }
    final Source sourceSansCsv = sourceSansGz.trim(".csv");

    final Table table = createTable(source);
    builder.put(sourceSansCsv.relative(baseSource).path(), table);
  }
  return builder.build();
}

/** Creates different sub-type of table based on the "flavor" attribute. */
private Table createTable(Source source) {
  switch (flavor) {
  case TRANSLATABLE:
    return new CsvTranslatableTable(source, null);
  case SCANNABLE:
    return new CsvScannableTable(source, null);
  case FILTERABLE:
    return new CsvFilterableTable(source, null);
  default:
    throw new AssertionError("Unknown flavor " + this.flavor);
  }
}

Schema会找到目录下以.csv结尾的文件,然后分别建表。本例中,找到了文件DEPTS.csv, EMPS.csv.gzSDEPTS.csv,建了对应的表DEPTS, EMPSSDEPTS

注意:

  1. 我们在model中没有定义任何一个table, table是由Schema来自动发现的。
  2. 可以使用model中的tables关键字来定义外部表。
  3. SchemaFactoryDriverManager.getConnection时即会创建和调用
  4. table内容读取是在query时进行的

创建视图

视图在查询时,和表一模一样,但是实现上并不存储数据。

一个视图的例子model-with-view.json如下:

{
  "version": "1.0",
  "defaultSchema": "SALES",
  "schemas": [
    {
      "name": "SALES",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
      "operand": {
        "directory": "sales"
      },
      "tables": [
        {
          "name": "FEMALE_EMPS",
          "type": "view",
          "sql": "SELECT * FROM emps WHERE gender = 'F'"
        }
      ]
    }
  ]
}

其与tables的区别在于其中的typeview,并且包括一个sql部分。

查询测试视图如下

> !connect jdbc:calcite:model=example/csv/target/test-classes/model-with-view.json admin admin
1: jdbc:calcite:model=example/csv/target/test> SELECT e.name, d.name FROM female_emps AS e JOIN depts AS d on e.deptno = d.deptno;
+------+------+
| NAME | NAME |
+------+------+
| Wilma | Marketing |
+------+------+
1 row selected (0.981 seconds)

自定义表

示例见model-with-custom-table.json, 内容如下:

{
  "version": "1.0",
  "defaultSchema": "CUSTOM_TABLE",
  "schemas": [
    {
      "name": "CUSTOM_TABLE",
      "tables": [
        {
          "name": "EMPS",
          "type": "custom",
          "factory": "org.apache.calcite.adapter.csv.CsvTableFactory",
          "operand": {
            "file": "sales/EMPS.csv.gz",
            "flavor": "scannable"
          }
        }
      ]
    }
  ]
}

先进行下查询,试一下:

> !connect jdbc:calcite:model=example/csv/target/test-classes/model-with-custom-table.json admin admin
3: jdbc:calcite:model=example/csv/target/test> SELECT empno, name FROM custom_table.emps;
+------------+------+
|   EMPNO    | NAME |
+------------+------+
| 100        | Fred |
| 110        | Eric |
| 110        | John |
| 120        | Wilma |
| 130        | Alice |
+------------+------+
5 rows selected (0.076 seconds)

这个结果是符合我们对于Schema的预期的。

  1. 这里不再调用SchemaFactory来生成Schema
  2. 这里直接调用org.apache.calcite.adapter.csv.CsvTableFactory工厂, 在create方法中,通过传入file参数,来得到CsvScannableTable实例

具体如下:

public CsvTable create(SchemaPlus schema, String name,
    Map operand, RelDataType rowType) {
  String fileName = (String) operand.get("file");
  final File base =
      (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
  final Source source = Sources.file(base, fileName);
  final RelProtoDataType protoRowType =
      rowType != null ? RelDataTypeImpl.proto(rowType) : null;
  return new CsvScannableTable(source, protoRowType);
}
  1. 得到table之后的过程是一样的:在本例中,通过Schema方式得到的table和通过CUSTOM_TABLE得到的table均为CsvScannableTable
  2. 定制table比定制Schema更容易:虽然最终都实现了Table接口,但是定制table不需要实现metadata发现,即本例中不需要扫描磁盘上的csv文件。
  3. 定制table开发量更大,也更灵活:定制table时,由于是以table的维度来开发,多个table时需要处理多个定制类。但是同时灵活性也更强一些,可以对于不同table有不同的参数控制。

注释语法

合法的注释有两种:

  • /* ... */
  • //

查询表

Calcite解析查询语句,生成对于这些表的执行计划,再调用执行计划来得到数据。

前面示例中,数据量不大,使用起来很友好。一旦数据量增加,比如有100列,100万行数据,直接查询就会变慢了。此时,我们不希望每个查询都遍历所有数据了,而是希望Calcite和adaptor一起,只访问所需要的数据。

这个过程就是一个最简单的查询优化过程。Calcite允许通过增加执行计划(planner rules)来进行优化。执行计划算子通过查询语法树,来将匹配的节点给替换掉,来实现优化。

见示例,我们有两个模型model.jsonsmart.json,使用如下:

3: jdbc:calcite:model=example/csv/target/test> !connect jdbc:calcite:model=example/csv/target/test-classes/model.json admin admin
4: jdbc:calcite:model=example/csv/target/test> explain plan for select name from emps;
+------+
| PLAN |
+------+
| EnumerableCalc(expr#0..9=[{inputs}], NAME=[$t1])
  EnumerableInterpreter
    BindableTableScan(table=[[SALES, EMPS]])
 |
+------+
1 row selected (0.017 seconds)

4: jdbc:calcite:model=example/csv/target/test> !connect jdbc:calcite:model=example/csv/target/test-classes/smart.json admin admin
5: jdbc:calcite:model=example/csv/target/test> explain plan for select name from emps;
+------+
| PLAN |
+------+
| CsvTableScan(table=[[SALES, EMPS]], fields=[[1]])
|
+------+
1 row selected (0.085 seconds)

是什么造成了两者的差异呢?在smart.json文件中,

{
  "version": "1.0",
  "defaultSchema": "SALES",
  "schemas": [
    {
      "name": "SALES",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
      "operand": {
        "directory": "sales",
        "flavor": "TRANSLATABLE"
      }
    }
  ]
}

可见多了一行:

flavor: "TRANSLATABLE"

这导致CsvSchema在构造的时候,会多传入参数flavor = TRANSLATABLE。这时createTable方法会创建CsvTranslatableTable对象来解析表,不再使用CsvScannableTable对象。

CsvTranslatableTable类实现了TranslatableTable.toRel接口,来得到CsvTableScan对象。TableScans处理语法树的叶子部分,默认实现为EnumerableTableScan,也可以定制为自己的实现。

具体来讲:

  1. smart模型中, flavor参数为TRANSLATABLE
  2. CsvSchemacreate方法中, 创建了CsvTranslatableTable对象
  3. CsvTranslatableTabletoRel方法中, 创建了CsvTableScan对象
  4. CsvTableScan中,注册了执行计划CsvProjectTableScanRule
  5. CsvProjectTableScanRule根据onMatch中的方法的传入参数RelOptRuleCall, 裁剪到只剩第一列.

解析查询语句

生成执行计划

Calcite中内置的执行计划有:

org.apache.calcite.plan.volcano.VolcanoPlanner

    addRule(FilterJoinRule.FILTER_ON_JOIN);
    addRule(FilterJoinRule.JOIN);
    addRule(AbstractConverter.ExpandConversionRule.INSTANCE);
    addRule(JoinCommuteRule.INSTANCE);
    addRule(SemiJoinRule.PROJECT);
    addRule(SemiJoinRule.JOIN);
    if (CalcitePrepareImpl.COMMUTE) {
      addRule(JoinAssociateRule.INSTANCE);
    }
    addRule(AggregateRemoveRule.INSTANCE);
    addRule(UnionToDistinctRule.INSTANCE);
    addRule(ProjectRemoveRule.INSTANCE);
    addRule(AggregateJoinTransposeRule.INSTANCE);
    addRule(AggregateProjectMergeRule.INSTANCE);
    addRule(CalcRemoveRule.INSTANCE);
    addRule(SortRemoveRule.INSTANCE);

org.apache.calcite.plan.RelOptUtil中:

    planner.addRule(AggregateProjectPullUpConstantsRule.INSTANCE2);
    planner.addRule(UnionPullUpConstantsRule.INSTANCE);
    planner.addRule(PruneEmptyRules.UNION_INSTANCE);
    planner.addRule(PruneEmptyRules.INTERSECT_INSTANCE);
    planner.addRule(PruneEmptyRules.MINUS_INSTANCE);
    planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
    planner.addRule(PruneEmptyRules.FILTER_INSTANCE);
    planner.addRule(PruneEmptyRules.SORT_INSTANCE);
    planner.addRule(PruneEmptyRules.AGGREGATE_INSTANCE);
    planner.addRule(PruneEmptyRules.JOIN_LEFT_INSTANCE);
    planner.addRule(PruneEmptyRules.JOIN_RIGHT_INSTANCE);
    planner.addRule(PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE);
    planner.addRule(UnionMergeRule.INSTANCE);
    planner.addRule(UnionMergeRule.INTERSECT_INSTANCE);
    planner.addRule(UnionMergeRule.MINUS_INSTANCE);
    planner.addRule(ProjectToWindowRule.PROJECT);
    planner.addRule(FilterMergeRule.INSTANCE);
    planner.addRule(DateRangeRules.FILTER_INSTANCE);
    planner.addRule(IntersectToDistinctRule.INSTANCE);

还有一些别的执行计划,如MaterializedView相关Rule等,具体将另文分析。

在本示例中,在CsvTableScan中,注册了执行计划:CsvProjectTableScanRule.INSTANCE,来设置只扫描需要的列,内容如下:

public class CsvProjectTableScanRule extends RelOptRule {
  public static final CsvProjectTableScanRule INSTANCE =
      new CsvProjectTableScanRule();

  private CsvProjectTableScanRule() {
    super(
        operand(Project.class,
            operand(CsvTableScan.class, none())),
        "CsvProjectTableScanRule");
  }

  @Override
  public void onMatch(RelOptRuleCall call) {
    final Project project = call.rel(0);
    final CsvTableScan scan = call.rel(1);
    int[] fields = getProjectFields(project.getProjects());
    if (fields == null) {
      // Project contains expressions more complex than just field references.
      return;
    }
    call.transformTo(
        new CsvTableScan(
            scan.getCluster(),
            scan.getTable(),
            scan.csvTable,
            fields));
  }

  private int[] getProjectFields(List<RexNode> exps) {
    final int[] fields = new int[exps.size()];
    for (int i = 0; i < exps.size(); i++) {
      final RexNode exp = exps.get(i);
      if (exp instanceof RexInputRef) {
        fields[i] = ((RexInputRef) exp).getIndex();
      } else {
        return null; // not a simple projection
      }
    }
    return fields;
  }
}

其中通过onMatch方法来实现了附加条件,默认返回值为true。当规则成功运行时,返回RelOptRuleCall.transformTo()

调用执行计划

TBD.

查询优化过程

TBD.

JDBC适配器

读取外部的JDBC表。

目前仅支持table扫描的下推,不支持filtering, joins, aggregations的下推。

但是Calcite的目标是尽可能多的下推。如果是单一的JDBC数据源,则全部下推。如果是混合的数据源,则下推的尽可能多。

JDBC数据源的clone

TBD.

从头构建

参考

评论

发表评论

validate