阅读 22 SEO

ClickHouse - 06

9、框架整合


9.1、Java 读写 ClickHouse API


Maven 依赖

<!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
<dependency>
  <groupId>ru.yandex.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <version>0.2.6</version>
</dependency>

Java 读取 ClickHouse 数据

ClickHouseProperties props = new ClickHouseProperties();
props.setUser("");
props.setPassword("");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
ClickHouseStatement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("select id, name from users");
while (rs.next()) {
  int id = rs.getInt("id");
  String name = rs.getString("name");
  System.out.println("id = " + id + ",name = " + name);
}
rs.close();
statement.close();
conn.close();

Java 向 ClickHouse 写入数据

ClickHouseProperties props = new ClickHouseProperties();
props.setUser("");
props.setPassword("");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://192.168.110.150:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
PreparedStatement statement = conn.prepareStatement("insert into test values(?, ?, ?)");
statement.setInt(1, 100);
statement.setString(2, "张三");
statement.setInt(3, 30);
boolean success = statement.execute();
System.out.println(success);
statement.close();
conn.close();

9.2、Spark 写入 ClickHouse API


SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse 对应的表中。在 ClickHouse 中需要预先创建好对应的结果表。

Maven 依赖

<dependency>
  <groupId>ru.yandex.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <version>0.2.6</version>
  <exclusions>
    <exclusion>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
    </exclusion>
    <exclusion>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Spark 代码

import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import java.util.Properties

object ClickHouse {

  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()
    val jsonList: Seq[String] = List[String](
      "{\"id\":1,\"name\":\"张三\",\"age\":18}",
      "{\"id\":2,\"name\":\"李四\",\"age\":19}",
      "{\"id\":3,\"name\":\"王五\",\"age\":20}"
    )

    //将jsonList数据转换成DataSet
    import session.implicits._
    val ds: Dataset[String] = jsonList.toDS()

    val df: DataFrame = session.read.json(ds)
    df.show()

    //将结果写往ClickHouse
    val url = "jdbc:clickhouse://node01:8123/default"
    val table = "test"

    val properties = new Properties()
    properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
    properties.put("user", "default")
    properties.put("password", "")
    properties.put("socket_timeout", "300000")

    df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
  }

}

9.3、Flink 写入 ClickHouse API


可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink在 1.11.0 版本对其 JDBC Connnector 进行了重构:

  • 重构之前(1.10.x 及之前版本),包名为 flink-jdbc
  • 重构之后(1.11.x 及之后版本),包名为 flink-connector-jdbc

二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:

API名称 flink-jdbc flink-connector-jdbc
DataStream API 不支持 支持
Table API & SQL 支持 不支持

flink-jdbc

Maven 依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.1</version>
</dependency>

<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>

Flink 代码

/**
  *  通过 flink-jdbc API 将 Flink 数据结果写入到ClickHouse中,只支持Table API
  *
  *  注意:
  *   1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  *   2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
  */

case class PersonInfo(id:Int,name:String,age:Int)

object FlinkWriteToClickHouse1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1,后期每个并行度满批次需要的条数时,会插入click中
    env.setParallelism(1)
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //读取Socket中的数据
    val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
    val ds: DataStream[PersonInfo] = sourceDS.map(line => {
      val arr: Array[String] = line.split(",")
      PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
    })

    //将 ds 转换成 table 对象
    import org.apache.flink.table.api.scala._
    val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)

    //将table 对象写入ClickHouse中
    //需要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;
    val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //准备ClickHouse table sink
    val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl("jdbc:clickhouse://node1:8123/default")
      .setUsername("default")
      .setPassword("")
      .setQuery(insertIntoCkSql)
      .setBatchSize(2) //设置批次量,默认5000条
      .setParameterTypes(Types.INT, Types.STRING, Types.INT)
      .build()

    //注册ClickHouse table Sink,设置sink 数据的字段及Schema信息
    tableEnv.registerTableSink("ck-sink",
      sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))

    //将数据插入到 ClickHouse Sink 中
    tableEnv.insertInto(table,"ck-sink")

    //触发以上执行
    env.execute("Flink Table API to ClickHouse Example")

  }
}

flink-connector-jdbc

Maven 依赖

<!-- Flink1.11 后需要 Flink-client包-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>

Flink 代码

/**
  *  Flink 通过 flink-connector-jdbc 将数据写入ClickHouse ,目前只支持DataStream API
  */
object FlinkWriteToClickHouse2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1
    env.setParallelism(1)
    import org.apache.flink.streaming.api.scala._

    val ds: DataStream[String] = env.socketTextStream("node5",9999)

    val result: DataStream[(Int, String, Int)] = ds.map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0).toInt, arr(1), arr(2).toInt)
    })

    //准备向ClickHouse中插入数据的sql
    val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //设置ClickHouse Sink
    val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
      //插入数据SQL
      insetIntoCkSql,

      //设置插入ClickHouse数据的参数
      new JdbcStatementBuilder[(Int, String, Int)] {
        override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
          ps.setInt(1, tp._1)
          ps.setString(2, tp._2)
          ps.setInt(3, tp._3)
        }
      },
      //设置批次插入数据
      new JdbcExecutionOptions.Builder().withBatchSize(5).build(),

      //设置连接ClickHouse的配置
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUrl("jdbc:clickhouse://node1:8123/default")
        .withUsername("default")
        .withUsername("")
        .build()
    )

    //针对数据加入sink
    result.addSink(ckSink)

    env.execute("Flink DataStream to ClickHouse Example")

  }
}

10、可视化工具


TABiX 支持通过浏览器直接连接 ClickHouse,不需要安装其他软件,就可以访问 ClickHouse。有两种使用方式,一种是直接浏览器访问配置。另一种是使用ClickHouse 内嵌方式。

Tabix具有以下特点:

  • ⾼亮语法的编辑器。
  • ⾃动命令补全。
  • 查询命令执⾏的图形分析⼯具。
  • 配⾊⽅案选项。

直接浏览器访问

打开http://ui.tabix.io/,配置 ClickHouse。

连接 ClickHouse 的用户名默认为 default,密码默认为空。

访问 ClickHouse 端口默认为 8123

ClickHouse内嵌方式

ClickHouse 自带了配置连接 TABiX,这里通过 ClickHouse Server 节点访问http://ui.tabix.io/网址进行 ClickHouse 界面化操作,可以进入 ClickHouse Server 节点路径 /etc/clickhouse-server,配置 config.xml 解开“<http_server_default_response>”标签。

重启 ClickHouse Server,可以通过http://node01:8123来访问 TABiX。

文章分类
后端
文章标签
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 gxwowoo@163.com 举报,一经查实,本站将立刻删除。
相关推荐