10分钟教你写一个数据库

今天教大家借助一款框架快速实现一个数据库,这个框架就是 Calcite,下面会带大家通过两个例子快速教会大家怎么实现,一个是可以通过 SQL 语句的方式可以直接查询文件内容,第二个是模拟 Mysql 查询功能,以及最后告诉大家怎么实现 SQL 查询 Kafka 数据。

Calcite

Calcite 是一个用于优化异构数据源的查询处理的可插拔基础框架(他是一个框架),可以将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且我们可以选择性的使用它的部分功能。

Calcite能干什么

  1. 使用 SQL 访问内存中某个数据
  2. 使用 SQL 访问某个文件的数据
  3. 跨数据源的数据访问、聚合、排序等(例如 Mysql 和 Redis 数据源中的数据进行join)

当我们需要自建一个数据库的时候,数据可以为任何格式的,比如text、word、xml、mysql、es、csv、第三方接口数据等等,我们只有数据,我们想让这些数据支持 SQL 形式动态增删改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等项目中,数据处理系统都是使用 Calcite 来做 SQL 解析和查询优化,当然,还有部分用来构建自己的 JDBC driver。

名词解释

Token

就是将标准 SQL(可以理解为Mysql)关键词以及关键词之间的字符串截取出来,每一个 token,会被封装为一个 SqlNodeSqlNode会衍生很多子类,比如 Select会被封装为 SqlSelect,当前 SqlNode 也能反解析为 SQL 文本。

RelDataTypeField

某个字段的名称和类型信息

RelDataType

多个 RelDataTypeField 组成了 RelDataType,可以理解为数据行

Table

一个完整的表的信息

Schema

所有元数据的组合,可以理解为一组 Table 或者库的概念

开始使用

1. 引入包


    org.apache.calcite
    calcite-core

    1.32.0

2. 创建model.json文件和表结构csv

model.json 里面主要描述或者说告诉 Calcite 如何创建 Schema,也就是告诉框架怎么创建出库。

{
"version": "1.0",//忽略
"defaultSchema": "CSV",//设置默认的schema
"schemas": [//可定义多个schema
        {
          "name": "CSV",//相当于namespace和上面的defaultSchema的值对应
          "type": "custom",//写死
          "factory": "csv.CsvSchemaFactory",//factory的类名必须是你自己实现的factory的包的全路径
          "operand": { //这里可以传递自定义参数,最终会以map的形式传递给factory的operand参数
          "directory": "csv"//directory代表calcite会在resources下面的csv目录下面读取所有的csv文件,factory创建的Schema会吧这些文件全部构建成Table,可以理解为读取数据文件的根目录,当然key的名称也不一定非得用directory,你可以随意指定
                }
        }
      ]
}

接下来还需要定义一个 csv 文件,用来定义表结构。

NAME:string,MONEY:string
aixiaoxian,10000万
xiaobai,10000万
adong,10000万
maomao,10000万
xixi,10000万
zizi,10000万
wuwu,10000万
kuku,10000万

整个项目的结构大概就是这样:

10分钟教你写一个数据库

3. 实现Schema的工厂类

在上述文件中指定的包路径下去编写 CsvSchemaFactory 类,实现 SchemaFactory 接口,并且实现里面唯一的方法 create 方法,创建 Schema(库)。

public class CsvSchemaFactory implements SchemaFactory {
    /**
     * parentSchema 父节点,一般为root
     * name 为model.json中定义的名字
     * operand 为model.json中定于的数据,这里可以传递自定义参数
     *
     * @param parentSchema Parent schema
     * @param name         Name of this schema
     * @param operand      The "operand" JSON property
     * @return
     */
    @Override
    public Schema create(SchemaPlus parentSchema, String name,
                         Map operand) {
        final String directory = (String) operand.get("directory");
        File directoryFile = new File(directory);
        return new CsvSchema(directoryFile, "scannable");
    }
}

4. 自定义Schma类

有了 SchemaFactory,接下来需要自定义 Schema 类。

自定义的 Schema 需要实现 Schema 接口,但是直接实现要实现的方法太多,我们去实现官方的 AbstractSchema 类,这样就只需要实现一个方法就行(如果有其他定制化需求可以实现原生接口)。

核心的逻辑就是 createTableMap方法,用于创建出 Table 表。

他会扫描指定的 Resource下面的所有 csv 文件,将每个文件映射成 Table对象,最终以 map形式返回, Schema接口的其他几个方法会用到这个对象。

//实现这一个方法就行了
    @Override
    protected Map getTableMap() {
        if (tableMap == null) {
            tableMap = createTableMap();
        }
        return tableMap;
    }
  private Map createTableMap() {
        // Look for files in the directory ending in ".csv"
        final Source baseSource = Sources.of(directoryFile);
        //会自动过滤掉非指定文件后缀的文件,我这里写的csv
        File[] files = directoryFile.listFiles((dir, name) -> {
            final String nameSansGz = trim(name, ".gz");
            return nameSansGz.endsWith(".csv");
        });
        if (files == null) {
            System.out.println("directory " + directoryFile + " not found");
            files = new File[0];
        }
        // Build a map from table name to table; each file becomes a table.
        final ImmutableMap.Builder builder = ImmutableMap.builder();
        for (File file : files) {
            Source source = Sources.of(file);
            final Source sourceSansCsv = source.trimOrNull(".csv");
            if (sourceSansCsv != null) {
                final Table table = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        }
        return builder.build();
    }

5. 自定义 Table

Schema 有了,并且数据文件 csv 也映射成 Table 了,一个 csv文件对应一个 Table

接下来我们去自定义 Table,自定义 Table 的核心是我们要定义字段的类型和名称,以及如何读取 csv文件。

  1. 先获取数据类型和名称,即单表结构,从 csv文件头中获取(当前文件头需要我们自己定义,包括规则我们也可以定制化)。
/**
 * Base class for table that reads CSV files.
 */
public abstract class CsvTable extends AbstractTable {
    protected final Source source;
    protected final @Nullable RelProtoDataType protoRowType;
    private @Nullable RelDataType rowType;
    private @Nullable List fieldTypes;

    /**
     * Creates a CsvTable.
     */
    CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
        this.source = source;
        this.protoRowType = protoRowType;
    }
  /**
  * 创建一个CsvTable,继承AbstractTable,需要实现里面的getRowType方法,此方法就是获取当前的表结构。
   Table的类型有很多种,比如还有视图类型,AbstractTable类中帮我们默认实现了Table接口的一些方法,比如getJdbcTableType   方法,默认为Table类型,如果有其他定制化需求可直接实现Table接口。
   和AbstractSchema很像
  */
    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (protoRowType != null) {
            return protoRowType.apply(typeFactory);
        }
        if (rowType == null) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    null);
        }
        return rowType;
    }

    /**
     * Returns the field types of this CSV table.
     */
    public List getFieldTypes(RelDataTypeFactory typeFactory) {
        if (fieldTypes == null) {
            fieldTypes = new ArrayList<>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    fieldTypes);
        }
        return fieldTypes;
    }

   public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                            Source source, @Nullable List fieldTypes) {
        final List types = new ArrayList<>();
        final List names = new ArrayList<>();
        try (CSVReader reader = openCsv(source)) {
            String[] strings = reader.readNext();
            if (strings == null) {
                strings = new String[]{"EmptyFileHasNoColumns:boolean"};
            }
            for (String string : strings) {
                final String name;
                final RelDataType fieldType;
                //就是简单的读取字符串冒号前面是名称,冒号后面是类型
                final int colon = string.indexOf(':');
                if (colon >= 0) {
                    name = string.substring(0, colon);
                    String typeString = string.substring(colon + 1);
                    Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
                    if (decimalMatcher.matches()) {
                        int precision = Integer.parseInt(decimalMatcher.group(1));
                        int scale = Integer.parseInt(decimalMatcher.group(2));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else {
                        switch (typeString) {
                            case "string":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                            case "boolean":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
                                break;
                            case "byte":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
                                break;
                            case "char":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
                                break;
                            case "short":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
                                break;
                            case "int":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
                                break;
                            case "long":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
                                break;
                            case "float":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
                                break;
                            case "double":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
                                break;
                            case "date":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
                                break;
                            case "timestamp":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
                                break;
                            case "time":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
                                break;
                            default:
                                LOGGER.warn(
                                        "Found unknown type: {} in file: {} for column: {}. Will assume the type of "
                                                + "column is string.",
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                        }
                    }
                } else {
                    //  如果没定义,默认都是String类型,字段名称也是string
                    name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                }
                names.add(name);
                types.add(fieldType);
                if (fieldTypes != null) {
                    fieldTypes.add(fieldType);
                }
            }
        } catch (IOException e) {
            // ignore
        }
        if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}
  1. 获取文件中的数据,上面把 Table的表结构字段名称和类型都获取到了以后,就剩最后一步了,获取文件中的数据。我们需要自定义一个类,实现 ScannableTable 接口,并且实现里面唯一的方法 scan 方法,其实本质上就是读文件,然后把文件的每一行的数据和上述获取的 fileType 进行匹配。
@Override
    public Enumerable scan(DataContext root) {
        JavaTypeFactory typeFactory = root.getTypeFactory();
        final List fieldTypes = getFieldTypes(typeFactory);
        final List fields = ImmutableIntList.identity(fieldTypes.size());
        final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
        return new AbstractEnumerable() {
            @Override
            public Enumerator enumerator() {
                //返回我们自定义的读取数据的类
                return new CsvEnumerator<>(source, cancelFlag, false, null,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false));
            }
        };
    }

 public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
                         @Nullable String @Nullable [] filterValues, RowConverter rowConverter) {
        this.cancelFlag = cancelFlag;
        this.rowConverter = rowConverter;
        this.filterValues = filterValues == null ? null
                : ImmutableNullableList.copyOf(filterValues);
        try {

            this.reader = openCsv(source);
            //跳过第一行,因为第一行是定义类型和名称的
            this.reader.readNext(); // skip header row
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
//CsvEnumerator必须实现calcit自己的迭代器,里面有current、moveNext方法,current是返回当前游标所在的数据记录,moveNext是将游标指向下一个记录,官网中自己定义了一个类型转换器,是将csv文件中的数据转换成文件头指定的类型,这个需要我们自己来实现
     @Override
    public E current() {
        return castNonNull(current);
    }

    @Override
    public boolean moveNext() {
        try {
            outer:
            for (; ; ) {
                if (cancelFlag.get()) {
                    return false;
                }
                final String[] strings = reader.readNext();
                if (strings == null) {
                    current = null;
                    reader.close();
                    return false;
                }
                if (filterValues != null) {
                    for (int i = 0; i 

6. 最后

至此我们需要准备的东西:库、表名称、字段名称、字段类型都有了,接下来我们去写我们的 SQL 语句查询我们的数据文件。

创建好几个测试的数据文件,例如上面项目结构中我创建 2 个 csv 文件 USERINFO.csvASSET.csv,然后创建测试类。

这样跑起来,就可以通过 SQL 语句的方式直接查询数据了。

public class Test {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            print(statement.executeQuery("select * from asset "));

            print(statement.executeQuery(" select * from userinfo "));

            print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));

            print(statement.executeQuery(" select * from userinfo where age >60 "));

            print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
        } finally {
            connection.close();
        }
    }

    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i 

查询结果:

10分钟教你写一个数据库

这里在测试的时候踩到2个坑,大家如果自己实验的时候可以避免下。

  1. Calcite 默认会把你的 SQL 语句中的表名和类名全部转换为大写,因为默认的 csv(其他文件也一样)文件的名称就是表名,除非你自定义规则,所以你的文件名要写成大写。
  2. Calcite 有一些默认的关键字不能用作表名,不然会查询失败,比如我刚开始定的 user.csv就一直查不出来,改成 USERINFO就可以了,这点和 Mysql 的内置关键字差不多,也可以通过个性化配置去改。

演示Mysql

  1. 首先,还是先准备 Calcite 需要的东西:库、表名称、字段名称、字段类型。

如果数据源使用 Mysql的话,这些都不用我们去 JAVA 服务中去定义,直接在 Mysql 客户端创建好,这里直接创建两张表用于测试,就和我们的 csv文件一样。

CREATE TABLE USERINFO1 (
  NAME varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  AGE int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE TABLE ASSET (
  NAME varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  MONEY varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 这些都不需要创建,因为 Calcite 默认提供了 Mysql 的 Adapter适配器。
  2. 其实,上述两步都不需要做,我们真正要做的是,告诉 Calcite 你的 JDBC 的连接信息就行了,也是在 model.json 文件中定义。
{
  "version": "1.0",
  "defaultSchema": "Demo",
  "schemas": [
    {
      "name": "Demo",
      "type": "custom",
    //  这里是calcite默认的SchemaFactory,里面的流程和我们上述自己定义的相同,下面会简单看看源码。
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        //  我用的是mysql8以上版本,所以这里注意包的名称
        "jdbcDriver": "com.mysql.cj.jdbc.Driver",
        "jdbcUrl": "jdbc:mysql://localhost:3306/irving",
        "jdbcUser": "root",
        "jdbcPassword": "123456"
      }
    }
  ]
}
  1. 在项目中引入 Mysql 的驱动包

  mysql
  mysql-connector-java
  8.0.30
  1. 写好测试类,这样直接就相当于完成了所有的功能了。
public class TestMysql {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            statement.executeUpdate(" insert into  userinfo1 values ('xxx',12) ");
            print(statement.executeQuery("select * from asset "));

            print(statement.executeQuery(" select * from userinfo1 "));

            print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));

            print(statement.executeQuery(" select * from userinfo1 where age >60 "));

            print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
        } finally {
            connection.close();
        }

    }

    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i 

查询结果:

10分钟教你写一个数据库

Mysql实现原理

上述我们在 model.json 文件中指定了 org.apache.calcite.adapter.jdbc.JdbcSchema$Factory类,可以看下这个类的代码。

这个类是把 FactorySchema 写在了一起,其实就是调用 schemafactory类的 create方法创建一个 schema 出来,和我们上面自定义的流程是一样的。

其中 JdbcSchema类也是 Schema 的子类,所以也会实现 getTable方法(这个我们上述也实现了,我们当时是获取表结构和表的字段类型以及名称,是从csv文件头中读文件的), JdbcSchema的实现是通过连接 Mysql 服务端查询元数据信息,再将这些信息封装成 Calcite需要的对象格式。

这里同样要注意 csv方式的2个注意点,大小写和关键字问题。

public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map operand) {
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
        //会走在这里来,这里就是我们在model.json中指定的jdbc的连接信息,最终会创建一个datasource
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");

    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  }

  @Override public @Nullable Table getTable(String name) {
    return getTableMap(false).get(name);
  }

  private synchronized ImmutableMap getTableMap(
      boolean force) {
    if (force || tableMap == null) {
      tableMap = computeTables();
    }
    return tableMap;
  }

  private ImmutableMap computeTables() {
    Connection connection = null;
    ResultSet resultSet = null;
    try {
      connection = dataSource.getConnection();
      final Pair catalogSchema = getCatalogSchema(connection);
      final String catalog = catalogSchema.left;
      final String schema = catalogSchema.right;
      final Iterable tableDefs;
      Foo threadMetadata = THREAD_METADATA.get();
      if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
        final List tableDefList = new ArrayList<>();
        //  获取元数据
        final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null, null);
        while (resultSet.next()) {
        //获取库名,表明等信息
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      }

      final ImmutableMap.Builder builder =
          ImmutableMap.builder();
      for (MetaImpl.MetaTable tableDef : tableDefs) {
        final String tableTypeName2 =
            tableDef.tableType == null
            ? null
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
        final TableType tableType =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        }
        //  最终封装成JdbcTable对象
        final JdbcTable table =
            new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      }
      return builder.build();
    } catch (SQLException e) {
      throw new RuntimeException(
          "Exception while reading tables", e);
    } finally {
      close(connection, null, resultSet);
    }
  }

SQL执行流程

OK,到这里基本上两个简单的案例已经演示好了,最后补充一下整个 Calcite架构和整个 SQL 的执行流程。

10分钟教你写一个数据库

整个流程如下:SQL解析(Parser)=> SQL校验(Validator)=> SQL查询优化(optimizer)=> SQL生成 => SQL执行

SQL Parser

所有的 SQL 语句在执行前都需要经历 SQL 解析器解析,解析器的工作内容就是将 SQL 中的 Token 解析成抽象语法树,每个树的节点都是一个 SqlNode,这个过程其实就是 Sql Text => SqlNode 的过程。

我们前面的 Demo 没有自定义 Parser,是因为 Calcite 采用了自己默认的 Parser(SqlParserImpl)。

10分钟教你写一个数据库
SqlNode

SqlNode是整个解析中的核心,比如图中你可以发现,对于每个比如 selectfromwhere关键字之后的内容其实都是一个 SqlNode

10分钟教你写一个数据库
parserConfig方法主要是设置 SqlParserFactory 的参数,比如我们上面所说的我本地测试的时候踩的大小写的坑,就可以在这里设置。

直接调用 setCaseSensitive=false即不会将 SQL 语句中的表名列名转为大写,下面是默认的,其他的参数可以按需配置。

10分钟教你写一个数据库

SQL Validator

SQL 语句先经过 Parser,然后经过语法验证器,注意 Parser 并不会验证语法的正确性。

10分钟教你写一个数据库

真正的校验在 validator 中,会去验证查询的表名是否存在,查询的字段是否存在,类型是否匹配,这个过程比较复杂,默认的 validatorSqlValidatorImpl

查询优化

比如关系代数,比如什么投影、笛卡尔积这些, Calcite提供了很多内部的优化器,也可以实现自己的优化器。

适配器

Calcite 是不包含存储层的,所以提供一种适配器的机制来访问外部的数据存储或者存储引擎。

最后,进阶

官网里面写了未来会支持 Kafka适配器到公共 Api中,到时候使用起来就和上述集成 Mysql一样方便,但是现在还没有支持,我这里给大家提供个自己实现的方式,这样就可以通过 SQL 的方式直接查询 Kafka 中的 Topic 数据等信息。

这里我们内部集成实现了 KSQL的能力,查询结果是OK的。

10分钟教你写一个数据库

还是像上述步骤一样,我们需要准备库、表名称、字段名称、字段类型、数据源(多出来的地方)。

  1. 自定义 Sql解析,之前我们都没有自定义解析,这里需要自定义解析,是因为我需要动态解析 sqlwhere条件里面的 partation

  2. 配置解析器,就是之前案例中提到的配置大小写之类的

  3. 创建解析器,使用的默认 SqlParseImpl
  4. 开始解析,生成 AST,我们可以基于生成的 SqlNode做一些业务相关的校验和参数解析

  5. 适配器获取数据源

public class KafkaConsumerAdapter {
       public static List executor(KafkaSqlInfo kafkaSql) {
           Properties props = new Properties();
           props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           KafkaConsumer consumer = new KafkaConsumer<>(props);
           List topics = new ArrayList<>();
           for (Integer partition : kafkaSql.getPartition()) {
               TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
               topics.add(tp);
           }
           consumer.assign(topics);
           for (TopicPartition tp : topics) {
               Map offsets = consumer.endOffsets(Collections.singleton(tp));
               long position = 500;
               if (offsets.get(tp).longValue() > position) {
                   consumer.seek(tp, offsets.get(tp).longValue() - 500);
               } else {
                   consumer.seek(tp, 0);
               }
           }
           List results = new ArrayList<>();
           boolean flag = true;
           while (flag) {
               ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
               for (ConsumerRecord record : records) {
                   //转成我定义的对象集合
                   KafkaResult result = new KafkaResult();
                   result.setPartition(record.partition());
                   result.setOffset(record.offset());
                   result.setMsg(record.value());
                   result.setKey(record.key());
                   results.add(result);
               }
               if (!records.isEmpty()) {
                   flag = false;
               }
           }
           consumer.close();
           return results;
       }

   }
  1. 执行查询,就可以得到我们想要的效果了。
public class TestKafka {
       public static void main(String[] args) throws Exception {
           KafkaService kafkaService = new KafkaService();
           //把解析到的参数放在我自己定义的kafkaSqlInfo对象中
           KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from cmdb-calltopo where partition in (0,1,2) limit 1000 ");
           //适配器获取数据源,主要是从上述的sqlInfo对象中去poll数据
           List results = KafkaConsumerAdapter.executor(sqlInfo);
           //执行查询
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());

           sqlInfo = kafkaService.parseSql("select * from cmdb-calltopo where partition in (0,1,2) AND msg like '%account%'  limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());

           sqlInfo = kafkaService.parseSql("select count(*) AS addad  from cmdb-calltopo where partition in (0,1,2) limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
       }

       private static void query(String tableName, List results,
                                 String sql) throws Exception {
           //创建model.json,设置我的SchemaFactory,设置库名
           String model = createTempJson();
           //设置我的表结构,表名称和表字段名以及类型
           KafkaTableSchema.generateSchema(tableName, results);
           Properties info = new Properties();
           info.setProperty("lex", Lex.JAVA.toString());
           Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
           Statement st = connection.createStatement();
           //执行
           ResultSet result = st.executeQuery(sql);
           ResultSetMetaData rsmd = result.getMetaData();
           List> ret = new ArrayList<>();
           while (result.next()) {
               Map map = new LinkedHashMap<>();
               for (int i = 1; i 
  • 生成临时的 model.json,之前是基于文件,现在基于 text字符串, mode=inline模式
  • 设置我的表结构、表名称、字段名、字段类型等,并放置在内存中,同时将适配器查询出来的数据也放进去 table里面
  • 获取连接,执行查询,完美!

10分钟教你写一个数据库

Original: https://blog.csdn.net/awl910213/article/details/127293683
Author: 艾小仙
Title: 10分钟教你写一个数据库

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817338/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球