FlinkSQL元数据验证
Flink1.9以后引入CatalogManager来管理Catalog和CatalogBaseTable,在执行DDL语句时将表信息封装为CatalogBaseTable存储在CatalogManager中。同时扩展了calcite的Schema接口使得Calcite在Validate阶段能够读取CatalogManager中的表信息。
CatalogTable写入
通过执行DDL语句,查看BlinkPlanner如何解析DDL语句,并存储到CatalogManamer中,重点查看如何解析**protime as PROCTIME() **计算列的。
CREATE TABLE user_address ( userId BIGINT, addressInfo VARCHAR, proctime AS PROCTIME()) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'topic' = 'tp02', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )
执行createCatalogTable调用链路。
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql org.apache.flink.table.planner.delegation.ParserImpl#parse org.apache.flink.table.planner.operations.SqlToOperationConverter#convert org.apache.flink.table.planner.operations.SqlCreateTableConverter#convertCreateTable // 从SqlCreateTable语句中解析出CatalogTable org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
从SqlCreateTable中提取TableSchema、表分区、主键、注释等信息,从而构建CatalogTableImpl。
private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) { final TableSchema sourceTableSchema; final List<String> sourcePartitionKeys; final List<SqlTableLike.SqlTableLikeOption> likeOptions; final Map<String, String> sourceProperties; // 处理 create table like ... if (sqlCreateTable.getTableLike().isPresent()) { SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get(); CatalogTable table = lookupLikeSourceTable(sqlTableLike); sourceTableSchema = table.getSchema(); sourcePartitionKeys = table.getPartitionKeys(); likeOptions = sqlTableLike.getOptions(); sourceProperties = table.getProperties(); } else { sourceTableSchema = TableSchema.builder().build(); sourcePartitionKeys = Collections.emptyList(); likeOptions = Collections.emptyList(); sourceProperties = Collections.emptyMap(); } // 处理SqlTableLike中的选项,INCLUDING ALL、OVERWRITING OPTIONS、EXCLUDING PARTITIONS等 Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> mergingStrategies = mergeTableLikeUtil.computeMergingStrategies(likeOptions); Map<String, String> mergedOptions = mergeOptions(sqlCreateTable, sourceProperties, mergingStrategies); // 提取主键 Optional<SqlTableConstraint> primaryKey = sqlCreateTable.getFullConstraints() .stream() .filter(SqlTableConstraint::isPrimaryKey) .findAny(); // 获取TableSchema TableSchema mergedSchema = mergeTableLikeUtil.mergeTables( mergingStrategies, sourceTableSchema, // 非create table like 语句,sourceTableSchema为null。 sqlCreateTable.getColumnList().getList(), sqlCreateTable.getWatermark().map(Collections::singletonList).orElseGet(Collections::emptyList), primaryKey.orElse(null) ); // 表分区 List<String> partitionKeys = mergePartitions( sourcePartitionKeys, sqlCreateTable.getPartitionKeyList(), mergingStrategies ); verifyPartitioningColumnsExist(mergedSchema, partitionKeys); // 注释 String tableComment = sqlCreateTable.getComment() .map(comment -> comment.getNlsString().getValue()) .orElse(null); return new CatalogTableImpl(mergedSchema, partitionKeys, mergedOptions, tableComment); }
在提取TableSchema时,会将Calcite中的列的类型转换为Flink内部的数据类型。如果包含了计算列,例如procime()则会对该表达式进行验证,FlinkSqlOperatorTable类中包含了FlinkSQL的所有内置函数。
/** * Function used to access a processing time attribute. */ public static final SqlFunction PROCTIME = new CalciteSqlFunction( "PROCTIME", SqlKind.OTHER_FUNCTION, PROCTIME_TYPE_INFERENCE, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE, false );
TableColumn生成过程:
将非计算列进行类型转换,并存储到physicalFieldNamesToTypes集合。
对通过表达式生成的计算列,进行存在性验证,并返回该函数对应的RelDataType。
将字段的RelDataType转换为LogicalType,再转换为DataType,并构建为TableColumn。需要具体查看不同类型系统的区别。
private void appendDerivedColumns( Map<FeatureOption, MergingStrategy> mergingStrategies, List<SqlNode> derivedColumns) {// 非计算列进行数据转换,存储到physicalFieldNamesToTypes collectPhysicalFieldsTypes(derivedColumns); for (SqlNode derivedColumn : derivedColumns) { final SqlTableColumn tableColumn = (SqlTableColumn) derivedColumn; final TableColumn column; if (tableColumn.isGenerated()) { String fieldName = tableColumn.getName().toString(); //验证表达式,例如:proctime()函数 是否在FlinkSqlOperatorTable内注册 SqlNode validatedExpr = sqlValidator.validateParameterizedExpression( tableColumn.getExpr().get(), physicalFieldNamesToTypes); // 验证返回类型:proctime() 对应的RelDataType 为Flink扩展的TimeIndicatorRelDataType final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr); column = TableColumn.of( fieldName,// RelDataType--->LogicalType--->DataType fromLogicalToDataType(toLogicalType(validatedType)), escapeExpressions.apply(validatedExpr)); computedFieldNamesToTypes.put(fieldName, validatedType); } else { // 非计算列转换为Flink 内部的数据类型 String name = tableColumn.getName().getSimple();// RelDataType --> LogicalType --> DataType LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name)); column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType)); } columns.put(column.getName(), column); } }
计算列proctime信息,需要看下proctime Function定义时绑定的类型。
Validate 读取元数据
FlinkSchema包含三个子类分别为:CatalogManagerCalciteSchema,CatalogCalciteSchema,DatabaseCalciteSchema。在Calcite进行Validate时,通过调用重写的getSubSchema方法依次获取Catalog、Database信息,最终从catalogManager中获取对应的Table信息。
通过创建自定义Schema,查看Calcite获取表Schema信息。测试用例:
// 当前Scheam包含了USERS、JOBS两张表信息,public class CatalogManagerCalciteSchema implements Schema { static Map<String, Table> TABLES = Maps.newHashMap(); static { TABLES.put("USERS", new AbstractTable() { //note: add a table @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) { RelDataTypeFactory.Builder builder = typeFactory.builder(); builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() { }, SqlTypeName.INTEGER)); builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() { }, SqlTypeName.CHAR)); builder.add("AGE", new BasicSqlType(new RelDataTypeSystemImpl() { }, SqlTypeName.INTEGER)); return builder.build(); } }); TABLES.put("JOBS", new AbstractTable() { @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) { RelDataTypeFactory.Builder builder = typeFactory.builder(); builder.add("ID", new BasicSqlType(new RelDataTypeSystemImpl() { }, SqlTypeName.INTEGER)); builder.add("NAME", new BasicSqlType(new RelDataTypeSystemImpl() { }, SqlTypeName.CHAR)); builder.add("COMPANY", new BasicSqlType(new RelDataTypeSystemImpl() { }, SqlTypeName.CHAR)); return builder.build(); } }); } @Override public Table getTable(String name) { return TABLES.get(name); } @Override public Set<String> getTableNames() { return null; } @Override public RelProtoDataType getType(String name) { return null; } @Override public Set<String> getTypeNames() { return null; } @Override public Collection<Function> getFunctions(String name) { return null; } @Override public Set<String> getFunctionNames() { return Collections.emptySet(); } @Override public Schema getSubSchema(String name) { return null; } @Override public Set<String> getSubSchemaNames() { return null; } @Override public Expression getExpression(SchemaPlus parentSchema, String name) { return null; } @Override public boolean isMutable() { return false; } @Override public Schema snapshot(SchemaVersion version) { return this; }}
public static void main(String[] args) throws SqlParseException { // CatalogManagerCalciteSchema是自定义的,非Flink内部的 CalciteSchema rootSchema = CalciteSchemaBuilder.asRootSchema(new CatalogManagerCalciteSchema()); SchemaPlus schemaPlus = rootSchema.plus(); SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); // 创建CalciteCatalogReader在rel阶段时从SimpleCalciteSchema中读取元数据 CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader( CalciteSchema.from(schemaPlus), CalciteSchema.from(schemaPlus).path(null), factory, new CalciteConnectionConfigImpl(new Properties())); String sql = "select u.id as user_id, u.name as user_name, j.company as user_company, u.age as user_age \n" + "from users u join jobs j on u.name=j.name"; SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT); SqlNode sqlNode = parser.parseStmt(); SqlValidator validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), calciteCatalogReader, factory, SqlConformanceEnum.DEFAULT); SqlNode validateSqlNode = validator.validate(sqlNode); System.out.println(validateSqlNode); }
Calcite通过Validate访问CatalogManagerCalciteSchema的调用链路,getSubSchema为null时代表没有子的Schema信息,则从当前Scheam读取Table信息。
Flink定义了三级Schema,通过调用getSubSchema从CatalogManager中读取Catalog、Database、Table。具体调用需要参考:org.apache.calcite.sql.validate.EmptyScope#resolve_。
CatalogManagerCalciteSchema#getSubSchemaNames:通过表名中的catalog信息,从catalogManager中获取CatalogSchema。
@Override public Schema getSubSchema(String name) { if (catalogManager.schemaExists(name)) { return new CatalogCalciteSchema(name, catalogManager, isStreamingMode); } else { return null; } }
CatalogCalciteSchema#getSubSchemaNames:通过表名中的database信息,从catalogManager中获取DatabaseSchema。
/** * Look up a sub-schema (database) by the given sub-schema name. * * @param schemaName name of sub-schema to look up * @return the sub-schema with a given database name, or null */ @Override public Schema getSubSchema(String schemaName) { if (catalogManager.schemaExists(catalogName, schemaName)) { return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode); } else { return null; } }
DatabaseSchema没有SubScheam,则从当前Schema中获取Table信息。
public Table getTable(String tableName) { ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName); return catalogManager.getTable(identifier) .map(result -> { CatalogBaseTable table = result.getTable(); FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier); return new CatalogSchemaTable( identifier, result, statistic, catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new), isStreamingMode); }) .orElse(null); }
Proctime 字段验证
flinkSQL在validate读取Table schema时,会对计算列rowtime、proctime类型进行转换,转换为calcite能识别的RelDataType类型。 先列举下计算列类型转换的代码。
## CatalogManagerpublic Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) { CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier); if (temporaryTable != null) { TableSchema resolvedSchema = resolveTableSchema(temporaryTable); return Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema)); } else { return getPermanentTable(objectIdentifier); } }
## org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolve /** * Resolve the computed column's type for the given schema. * * @param tableSchema Table schema to derive table field names and data types * @return the resolved TableSchema */ public TableSchema resolve(TableSchema tableSchema) { final String rowtime; String[] fieldNames = tableSchema.getFieldNames(); DataType[] fieldTypes = tableSchema.getFieldDataTypes(); TableSchema.Builder builder = TableSchema.builder(); for (int i = 0; i < tableSchema.getFieldCount(); ++i) { TableColumn tableColumn = tableSchema.getTableColumns().get(i); DataType fieldType = fieldTypes[i]; if (tableColumn.isGenerated()) { // 通过获取计算列的表达式,提取对应的DataType fieldType = resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema); if (isProctime(fieldType)) { if (fieldNames[i].equals(rowtime)) { throw new TableException("Watermark can not be defined for a processing time attribute column."); } } } ...... if (tableColumn.isGenerated()) { builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get()); } else { builder.field(fieldNames[i], fieldType); } } tableSchema.getWatermarkSpecs().forEach(builder::watermark); tableSchema.getPrimaryKey().ifPresent( pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0]))); return builder.build(); }
# org.apache.flink.table.api.internal.CatalogTableSchemaResolver#resolveExpressionDataType private DataType resolveExpressionDataType(String expr, TableSchema tableSchema) { ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema); if (resolvedExpr == null) { throw new ValidationException("Could not resolve field expression: " + expr); } return resolvedExpr.getOutputDataType(); } # org.apache.flink.table.planner.delegation.ParserImpl#parseSqlExpression public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) { SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema); RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression); // [[RelDataType]] ----> [[LogicalType]] LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType()); return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType)); } # org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl#convertToRexNodes public RexNode[] convertToRexNodes(String[] exprs) { // 通过构造临时表查询,获取RexNode String query = String.format(QUERY_FORMAT, String.join(",", exprs)); SqlNode parsed = planner.parser().parse(query); SqlNode validated = planner.validate(parsed); // 转换为relNode RelNode rel = planner.rel(validated).rel; // The plan should in the following tree // LogicalProject // +- TableScan if (rel instanceof LogicalProject && rel.getInput(0) != null && rel.getInput(0) instanceof TableScan) { return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]); } else { throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString()); } }
proctime()类型提取大致流程:
包含计算列则将建表语句中TableSchema注册为一张表temp_table。
根据建表中的计算列的表达式,例如proctime(),构建临时查询语句select proctime() from temp_table,proctime() 为Flink 内置函数。
对该查询语句进行validate,并转换RelNode,从RelNode提取行表达式RexNode。
从RexNode提取proctime() 对应的RelDataType,最终转换为DataType。
[FLINK-18378]之前对计算列的处理流程。根据proctime、rowtime单独做了区分。
疑问:在DDL语句中已经将proctime转换为DataType,在validate获取Table schema是直接拿fieldType即可,为什么还要做一次解析。
for (int i = 0; i < tableSchema.getFieldCount(); ++i) { TableColumn tableColumn = tableSchema.getTableColumns().get(i); DataType fieldType = fieldTypes[i]; if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) { if (fieldNames[i].equals(rowtime)) { throw new TableException("Watermark can not be defined for a processing time attribute column."); } TimestampType originalType = (TimestampType) fieldType.getLogicalType(); LogicalType proctimeType = new TimestampType( originalType.isNullable(), TimestampKind.PROCTIME, originalType.getPrecision()); fieldType = TypeConversions.fromLogicalToDataType(proctimeType); } else if (isStreamingMode && fieldNames[i].equals(rowtime)) { TimestampType originalType = (TimestampType) fieldType.getLogicalType(); LogicalType rowtimeType = new TimestampType( originalType.isNullable(), TimestampKind.ROWTIME, originalType.getPrecision()); fieldType = TypeConversions.fromLogicalToDataType(rowtimeType); } if (tableColumn.isGenerated()) { builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get()); } else { builder.field(fieldNames[i], fieldType); } }
作者:todd5167
链接:https://www.jianshu.com/p/30c40c7aae30