Calcite元数据定义和获取
组织结构:
|-Model(数据模型)
| |-Schema(数据模式)
| | |-Table(表/视图)
| | |-Function(函数)
| | |-Type(数据类型)
使用model.json定义Schema
{
"version": "1.0",
"defaultSchema": "FOODMART_CLONE",
"schemas": [
{
"name": "FOODMART_CLONE",
"type": "custom",
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
"jdbcDriver": "com.mysql.jdbc.Driver",
"jdbcUrl": "jdbc:mysql://localhost/foodmart",
"jdbcUser": "foodmart",
"jdbcPassword": "foodmart"
}
}
]
}
除了name、type、factory和operand(额外参数)之外,还有Table(表/视图)、Function(函数)以及Type(数据类型) Schema除了直接从json\yaml文件中读取出来之外,另一种方式是通过相应的XxxSchemaFactory进行创建,只需实现SchemaFactory这个接口即可。
public interface SchemaFactory {
/** Creates a Schema.
*
* @param parentSchema Parent schema
* @param name Name of this schema
* @param operand The "operand" JSON property
* @return Created schema
*/
Schema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand);
}
例如:创建jdbcSchema的 JdbcSchema.Factory (jdbc)、创建FileSchema的FileSchemaFactory(文件) 以及 创建java对象Schema的ReflectiveSchema.Factory(内存)
public class JdbcSchema implements Schema {
// jdbcSchema中独有的 java.sql.DataSource 以及 SqlDialect
final DataSource dataSource;
final @Nullable String catalog;
final @Nullable String schema;
public final SqlDialect dialect;
final JdbcConvention convention;
// JdbcSchema 中,不提供Function 和 Type,只有Table
private @Nullable ImmutableMap<String, JdbcTable> tableMap;
/**
* 可以看到,其中特有的 jdbcUrl相关参数,以及 sqlDialectFactory
*/
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
DataSource dataSource;
try {
final String dataSourceName = (String) operand.get("dataSource");
if (dataSourceName != null) {
dataSource =
AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
final String jdbcDriver = (String) operand.get("jdbcDriver");
final String jdbcUser = (String) operand.get("jdbcUser");
final String jdbcPassword = (String) operand.get("jdbcPassword");
dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {
throw new RuntimeException("Error while reading dataSource", e);
}
String jdbcCatalog = (String) operand.get("jdbcCatalog");
String jdbcSchema = (String) operand.get("jdbcSchema");
String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
return JdbcSchema.create(
parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
SqlDialectFactory factory =
AvaticaUtils.instantiatePlugin(SqlDialectFactory.class,
sqlDialectFactory);
return JdbcSchema.create(parentSchema, name, dataSource, factory,
jdbcCatalog, jdbcSchema);
}
}
}
总之,要想接入一种数据库类型,需要自定义实现 SchemaFactory -> Schema/AbstractSchema , TableFactory -> Table ,它们之间的关系如图:
实践
使用JdbcSchema读取并构建MySql数据库中的meta信息,能够输出任一表的全部字段类型。为后续Calcite校验器提供元数据支持。
准备工作
我们需要创建一个MySqlDatasource 或是 准备好jdbc的相关配置
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/test");
dataSource.setUser("root");
dataSource.setPassword("123456");
我们还需要一个辅助函数,帮助辨url中是否带有特定的catalog,也就是jdbc:mysql://127.0.0.1:3306/test
中的test
protected static boolean isReadFromCatalog(Connection conn) throws SQLException {
return conn.getMetaData().getCatalogs().next();
}
第一步,尝试遍历并取出该数据库应用下的所有表的名称
protected static Set<String> readAllDatabase(Connection conn){
Set<String> databases = new HashSet<>();
try {
boolean isCatalog = isReadFromCatalog(conn);
String currentDataBase = isCatalog? conn.getCatalog() : conn.getSchema();
if (StringUtils.isNoneEmpty(currentDataBase)){
return Set.of(currentDataBase);
}
DatabaseMetaData metaData = conn.getMetaData();
ResultSet rs = isCatalog? metaData.getCatalogs() : metaData.getSchemas();
while (rs.next()) {
String database = rs.getString(1);
databases.add(database);
}
return databases;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
第二步,构建RootSchema和JdbcSchema
public static CalciteSchema buildCalciteSchema(DataSource datasource) throws SQLException {
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
SchemaPlus rootSchemaPlus = rootSchema.plus();
Connection connection = datasource.getConnection();
// 遍历所有的表
for (String database : readAllDatabase(connection)){
String catalog = isReadFromCatalog(connection)?database:null;
String schema = isReadFromCatalog(connection)?connection.getSchema():database;
// 创建jdbcSchema.也可以使用JdbcSchema.Factory.INSTANCE.create() 进行创建,后者可以传入sqlDialectFactory 如果项目中重写了一些SqlDialect的话。
Schema schemaObj = JdbcSchema.create(rootSchemaPlus, database, datasource, catalog,schema);
rootSchema.add(database, schemaObj);
}
return rootSchema;
}
第三步,使用。我们可以通过如下用法,在控制台打印表t_user的全部字段结构
CalciteSchema rootSchema = buildCalciteSchema(dataSource);
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
System.out.println(rootSchema2.getSubSchema("test",false).getTable("t_user",false).getTable().getRowType(typeFactory));
// will print : RecordType(INTEGER id, VARCHAR(255) name, INTEGER age, BOOLEAN sex, TIMESTAMP(0) create_time)
深入探究
table map
获取数据库test中的所有表,最终会调用到org.apache.calcite.adapter.jdbc.JdbcSchema#computeTables
方法。是一个懒加载模式,并做了一定的缓存。下面只贴出关键代码。
final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
final DatabaseMetaData metaData = connection.getMetaData();
// 本用例中 catalog= "test" schema= null
resultSet = metaData.getTables(catalog, schema, null, null);
while (resultSet.next()) {
final String catalogName = resultSet.getString(1);
final String schemaName = resultSet.getString(2);
final String tableName = resultSet.getString(3);
final String tableTypeName = resultSet.getString(4);
tableDefList.add(
new MetaImpl.MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
RowType
获取表 test.t_user下的所有字段,从JdbcTable中绕了一圈org.apache.calcite.adapter.jdbc.JdbcTable#supplyProto
,最终回到了org.apache.calcite.adapter.jdbc.JdbcSchema#getRelDataType(java.lang.String, java.lang.String, java.lang.String)
方法。关键代码如下
final ResultSet resultSet =
metaData.getColumns(catalogName, schemaName, tableName, null);
while (resultSet.next()) {
final String columnName = requireNonNull(resultSet.getString(4), "columnName");
final int dataType = resultSet.getInt(5);
final String typeString = resultSet.getString(6);
final int precision;
final int scale;
switch (SqlType.valueOf(dataType)) {
case TIMESTAMP:
case TIME:
precision = resultSet.getInt(9); // SCALE
scale = 0;
break;
default:
precision = resultSet.getInt(7); // SIZE
scale = resultSet.getInt(9); // SCALE
break;
}
RelDataType sqlType =
sqlType(typeFactory, dataType, precision, scale, typeString);
boolean nullable = resultSet.getInt(11) != DatabaseMetaData.columnNoNulls;
fieldInfo.add(columnName, sqlType).nullable(nullable);
}
彩蛋
Apache Doris数据库,使用mysql-connector-j:8.0.31 建立连接,在获取部分视图表结构时,会遇到 varchar(*)
类型。无法正确转换为Int类型,进而抛出错误,中断整个表字段的扫描。
经排查,是Doris 1.x版本创建时导致的问题,切换到2.x版本解决。