掘金 后端 ( ) • 2024-03-28 17:29

Calcite元数据定义和获取

组织结构:

|-Model(数据模型)
|   |-Schema(数据模式)
|   |   |-Table(表/视图)
|   |   |-Function(函数)
|   |   |-Type(数据类型)

使用model.json定义Schema

{
    "version": "1.0",
    "defaultSchema": "FOODMART_CLONE",
    "schemas": [
      {
        "name": "FOODMART_CLONE",
        "type": "custom",
        "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
        "operand": {
          "jdbcDriver": "com.mysql.jdbc.Driver",
          "jdbcUrl": "jdbc:mysql://localhost/foodmart",
          "jdbcUser": "foodmart",
          "jdbcPassword": "foodmart"
        }
      }
    ]
  }

除了name、type、factory和operand(额外参数)之外,还有Table(表/视图)、Function(函数)以及Type(数据类型) Schema除了直接从json\yaml文件中读取出来之外,另一种方式是通过相应的XxxSchemaFactory进行创建,只需实现SchemaFactory这个接口即可。

public interface SchemaFactory {
  /** Creates a Schema.
   *
   * @param parentSchema Parent schema
   * @param name Name of this schema
   * @param operand The "operand" JSON property
   * @return Created schema
   */
  Schema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand);
}

例如:创建jdbcSchema的 JdbcSchema.Factory (jdbc)、创建FileSchema的FileSchemaFactory(文件) 以及 创建java对象Schema的ReflectiveSchema.Factory(内存)

public class JdbcSchema implements Schema {
    // jdbcSchema中独有的 java.sql.DataSource 以及 SqlDialect
  final DataSource dataSource;
  final @Nullable String catalog;
  final @Nullable String schema;
  public final SqlDialect dialect;
  final JdbcConvention convention;
  // JdbcSchema 中,不提供Function 和 Type,只有Table
  private @Nullable ImmutableMap<String, JdbcTable> tableMap;

    /**
    * 可以看到,其中特有的 jdbcUrl相关参数,以及 sqlDialectFactory
    */
  public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");

    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory =
          AvaticaUtils.instantiatePlugin(SqlDialectFactory.class,
              sqlDialectFactory);
      return JdbcSchema.create(parentSchema, name, dataSource, factory,
          jdbcCatalog, jdbcSchema);
    }
  }
}

总之,要想接入一种数据库类型,需要自定义实现 SchemaFactory -> Schema/AbstractSchema , TableFactory -> Table ,它们之间的关系如图:

2024-03-28_15-26.png

实践

使用JdbcSchema读取并构建MySql数据库中的meta信息,能够输出任一表的全部字段类型。为后续Calcite校验器提供元数据支持。

准备工作

我们需要创建一个MySqlDatasource 或是 准备好jdbc的相关配置

        MysqlDataSource dataSource = new MysqlDataSource();
        dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/test");
        dataSource.setUser("root");
        dataSource.setPassword("123456");

我们还需要一个辅助函数,帮助辨url中是否带有特定的catalog,也就是jdbc:mysql://127.0.0.1:3306/test中的test

    protected static boolean isReadFromCatalog(Connection conn) throws SQLException {
        return conn.getMetaData().getCatalogs().next();
    }

第一步,尝试遍历并取出该数据库应用下的所有表的名称

 protected static Set<String> readAllDatabase(Connection conn){
        Set<String> databases = new HashSet<>();
        try {
            boolean isCatalog = isReadFromCatalog(conn);
            String currentDataBase = isCatalog? conn.getCatalog() : conn.getSchema();
            if (StringUtils.isNoneEmpty(currentDataBase)){
                return Set.of(currentDataBase);
            }
            DatabaseMetaData metaData = conn.getMetaData();
            ResultSet rs = isCatalog? metaData.getCatalogs() : metaData.getSchemas();
            while (rs.next()) {
                String database = rs.getString(1);
                databases.add(database);
            }
            return databases;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

第二步,构建RootSchema和JdbcSchema

    public static CalciteSchema buildCalciteSchema(DataSource datasource) throws SQLException {
        CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
        SchemaPlus rootSchemaPlus = rootSchema.plus();
        Connection connection = datasource.getConnection();
        // 遍历所有的表
        for (String database : readAllDatabase(connection)){
            String catalog = isReadFromCatalog(connection)?database:null;
            String schema =  isReadFromCatalog(connection)?connection.getSchema():database;
            // 创建jdbcSchema.也可以使用JdbcSchema.Factory.INSTANCE.create() 进行创建,后者可以传入sqlDialectFactory 如果项目中重写了一些SqlDialect的话。
            Schema schemaObj = JdbcSchema.create(rootSchemaPlus, database, datasource, catalog,schema);
            rootSchema.add(database, schemaObj);
        }
        return rootSchema;
    }

第三步,使用。我们可以通过如下用法,在控制台打印表t_user的全部字段结构

CalciteSchema rootSchema = buildCalciteSchema(dataSource);
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
System.out.println(rootSchema2.getSubSchema("test",false).getTable("t_user",false).getTable().getRowType(typeFactory));
// will print : RecordType(INTEGER id, VARCHAR(255) name, INTEGER age, BOOLEAN sex, TIMESTAMP(0) create_time)

深入探究

table map

获取数据库test中的所有表,最终会调用到org.apache.calcite.adapter.jdbc.JdbcSchema#computeTables方法。是一个懒加载模式,并做了一定的缓存。下面只贴出关键代码。

        final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
        final DatabaseMetaData metaData = connection.getMetaData();
        // 本用例中 catalog= "test" schema= null
        resultSet = metaData.getTables(catalog, schema, null, null);
        while (resultSet.next()) {
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }

RowType

获取表 test.t_user下的所有字段,从JdbcTable中绕了一圈org.apache.calcite.adapter.jdbc.JdbcTable#supplyProto,最终回到了org.apache.calcite.adapter.jdbc.JdbcSchema#getRelDataType(java.lang.String, java.lang.String, java.lang.String) 方法。关键代码如下

final ResultSet resultSet =
        metaData.getColumns(catalogName, schemaName, tableName, null);
while (resultSet.next()) {
      final String columnName = requireNonNull(resultSet.getString(4), "columnName");
      final int dataType = resultSet.getInt(5);
      final String typeString = resultSet.getString(6);
      final int precision;
      final int scale;
      switch (SqlType.valueOf(dataType)) {
      case TIMESTAMP:
      case TIME:
        precision = resultSet.getInt(9); // SCALE
        scale = 0;
        break;
      default:
        precision = resultSet.getInt(7); // SIZE
        scale = resultSet.getInt(9); // SCALE
        break;
      }
      RelDataType sqlType =
          sqlType(typeFactory, dataType, precision, scale, typeString);
      boolean nullable = resultSet.getInt(11) != DatabaseMetaData.columnNoNulls;
      fieldInfo.add(columnName, sqlType).nullable(nullable);
}

彩蛋

Apache Doris数据库,使用mysql-connector-j:8.0.31 建立连接,在获取部分视图表结构时,会遇到 varchar(*) 类型。无法正确转换为Int类型,进而抛出错误,中断整个表字段的扫描。 2024-03-28_16-50.png 经排查,是Doris 1.x版本创建时导致的问题,切换到2.x版本解决。