clickhouse(一)环境安装&操作分布式表

clickhouse是当下最流行的OLAP产品,我总结其代表能力体现为:

  • 存储数据:与presto等直接读取外部数据进行计算的方式不同,clickhouse大部分情况是冗余存储一份数据的,所以clickhouse需要提供多种数据集成的方案与生态。
  • 即席查询:典型ad-hoc产品,海量数据秒出数据,计算能力可以扩充计算节点实现,可以用作实时数仓(其他常见方案是hbase->hive->presto)。

在学习过程中,我发现clickhouse架构的自动化程度比较有限(可能还需要一段时间的发展才能成熟),需要用户理解的概念比较庞杂,属于入门门槛挺高的一款产品,所以我会尝试通过几篇博客来拎清这些概念之间的关系,帮助大家克服学习困难。


本篇博客我们完成2个任务:

  • 搭建只包含1个node的clickhouse集群,虽然只有1个节点但是我们会按照分布式集群的搭建原理来展示这个过程,扩展到多个节点也是毫无问题的。
  • 通过1个简单案例快速理解clickhouse的建表语句、插入语句、查询语句,重点理解clickhouse本地表与分布式表的架构关系。

安装clickhouse

根据官方文档(中文)利用apt/yum安装即可:https://clickhouse.tech/docs/zh/getting-started/install/,安装完利用systemctl工具管理它:

(注:apt/yum安装过程中会让你输入clickhouse默认密码,大家有可能敲错密码,没关系后面会带大家重置)

安装zookeeper

clickhouse集群架构方案非常的”原始“,简单说一个集群逻辑上划分为多个sharding,每个sharding内的数据为做replica副本。

zookeeper在集群中做2件事情:

  • 将DDL(create/alter…)操作先写入到ZK,然后通过ZK触发生效到所有Node。
  • 以Shard0为例,写入可以发生在Replica0和Replica1的任意实例上,承载客户端写入的node会向ZK写入新数据块的信息,从而通知sharding内其他replica根据ZK信息来同步这份数据。

DDL与ZK的关系我们展开解释:

  • 本地DDL:用客户端直连某个node提交DDL建表语句,默认情况下这个表只在这个node可见,其他Node不可见,这意味着其实clickhouse没有全局的元数据管理,这是很奇怪的一点。
  • 分布式DDL:为了让某个DDL操作生效到所有node,执行DDL时需要指定为分布式类型,则DDL语句先写入ZK,通过ZK触发集群中所有Node在自己的本地执行DDL。

Replica与ZK的关系我们展开解释:

  • 同Shard内的多个Replica没有主次关系,每一个都可以写入,数据互相同步。
  • 同步是依靠ZK通知的,因此每次写入数据都要同时写ZK,所以ZK会成为高频写入的瓶颈,这也是clickhouse要求应用批量写入的重要原因。(另外,clickhouse允许为不同的表使用不同的ZK集群进行写入,这样可以解决ZK瓶颈问题)

我们下载ZK:https://zookeeper.apache.org/releases.html

拷贝conf/zoo_sample.cfg到conf/zoo.cfg,修改一下里面的zk数据存储目录即可(具体路径大家自行指定):

dataDir=/root/clickhouse/apache-zookeeper-3.7.0-bin/data

然后启动ZK:

bin/zkServer.sh start

配置clickhouse

clickhouse配置文件所在位置:

config.xml是配置主文件,config.d下面的xml会被合并到config.xml结构中,我们应该去config.d配置xml片段,而不应直接修改config.xml,这样管理起来更加清晰方便。

users.xml是用户配置文件,users.d下面的xml会被合并到users.xml中,我们同样应该去users.d下面配置xml片段。

配置default用户密码

默认clickhouse用default用户登陆,密码在apt/yum安装中可能误敲,所以这里我们重置一下。

编辑users.d/default-password.xml,设置上自己的密码为123:

default-password.xml中的XML结构会合并到users.xml中,大家自己看一下就懂了。

配置zookeeper地址

编辑config.d/zk.xml,填入zk地址:

这个XML结构会合并到config.xml的XML结构中。

配置cluster集群

clickhouse的集群概念比较”原始“,我们需要区分看待”物理集群“和”逻辑集群“的差异。

  • 首先clickhouse是若干等价的物理机组成的”物理集群“,这些物理机之间彼此并不认识。
  • 其次clickhouse在”物理集群“之上,可以通过config.xml配置出一个”逻辑集群“,让这些节点彼此认识对方。

一套”物理集群“理论上可以配置N套”逻辑集群“,每套”逻辑集群“可以由不同的node组成,选择不同的sharding和replica策略,比如:

  • 配置cluster1,它用node1和node2两个节点组成1个shard的2个replica。
  • 配置cluster2,它用node1和node2成为shard0,用node3和node4成为shard1,然后node1和node2互为replica,node3和node4互为replica。

有了多个可选的”逻辑集群“,我们在建库建表的时候就可以指定采用哪个cluster,从而决定分片个数和副本个数,满足不同的可靠性和性能需求。

出于简单考虑,我们就做一套标准的逻辑集群即可,编辑config.d/cluster.xml:

这里配置了一个mycluster逻辑集群,因为我们就1个node,所以就配置成1个shard的单replica集群了,9000端口是clickhouse的内部通讯端口。

如果我们有多台node,这样的cluster.xml配置文件需要分发到所有node,让大家互相认识彼此。

配置macro宏

除了用cluster.xml来配置逻辑集群的构成结构之外,我们还需要为每个node分发不同的身份标识,这对于clickhouse进行数据分片和副本复制是至关重要的。

编辑config.d/macro.xml,为我们唯一的node写入它在逻辑集群mycluster中的身份:

连接clickhouse

现在重启clickhouse让上述配置生效:

systemctl restart clickhouse-server

然后命令行访问:

clickhouse-client –host 127.0.0.1 –password 123

大部分命令和mysql一样,大家可以自行体验一下show database;use等语句。

实践clickhouse分布式表

clickhouse的SQL语句默认情况下都是单机本地的,除非我们显式指定生效到cluster的所有Node,否则是不会通过ZK通知到集群所有Node的。

大部分情况我们肯定希望库表是分布式的,具备分片并行能力,具备分片内的多replica冗余能力,否则单点故障数据丢了谁也受不起。

因此,下面我们就来创建一套分布式的数据库表,并完成分布式的写入和查询(意味着数据自动分布到多sharding进行写入和计算)。

创建database

CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER mycluster

这是一条DDL操作,因为我们带了ON CLUSTER mycluster,所以会通过ZK下发到mycluster中的每一个node(虽然我们只部署了1个node),这叫做一次分布式DDL:

返回的信息包含了这次分布式DDL影响到的所有node,这里127.0.0.1:9000就是我们当前客户端所连接的node。

我们在节点127.0.0.1:9000上执行show databases,看到的是本地的database列表,因为刚才是分布式DDL所以本地也有了这个database,分布式DDL和本地database出现之间的关系需要大家理解明白,clickhouse没有全局的元数据管理,只是将操作扇出到所有node都做了一遍而已,元数据在每台node自己本地。

我们看到database的engine叫做atomic,其大概含义就是能够支持对该database下面的table进行原子性的rename之类的操作,是默认的database引擎,了解即可。

创建本地table(向集群中每个node)

接下来,我们要在mycluster的所有node上创建同样的table,并且让同一个sharding内的replica之间互相复制,这个就要求我们创建一个带复制能力的table:

ReplicatedMergeTree是带有复制能力的MergeTree(LSM树)引擎,两个参数是指定ZK中的路径用途。

填充符{shard}和{replica}是每个node上marco.xml决定的,因此这个分布式DDL建表语句通过ZK推到各个node之后,各个node本地执行时都会用各自的值填充进去。

总结一下,每个node都有自己独立的ZK路径可以写入,当向user表某个node写入数据时,该node查看本地表中/clickhouse/tables/{shard}/db1/user和{replica}的实际值,然后向ZK的/clickhouse/tables/{shard}/db1/user/replicas/{replica}下面写入自己的新数据块信息,这样同shard下的其他replica就会监听到该路径的变化,取出新数据块的元信息,然后调用写入node接口来拉取新数据块本地,这就完成了数据复制。

我们可以看一下ZK的路径关系就懂了:

至于ORDER BY(id)是指user表的数据文件中按id字段排序存储并生成索引文件,这样如果查找id的话走索引二分查找就会很快,其他查询条件则扫表计算,这是clickhouse主要的查询原理。(可以把ORDER BY暂时理解为主键PRIMARY KEY,但其实只是为了加速查询用的,并没有去重功能)

创建分布式table(向集群中每个node)

当集群中每个node都有了这个table定义之后,我们希望插入数据时能负载均衡数据到各个node,查询时能自动从各个node聚合结果,这时候就需要配置一个分布式table,就有点像反向代理一样。

分布式表的特性是:

  • 写入时会按某字段打散,这样数据可以均衡到各个shard。
  • 读取时会从所有shard读取,然后聚合结果。

因为读取是所有shard都读,所以写入时即便是随机打散也没有关系,clickhouse分布式表会完成多shard的二次聚合,保证统计结果正确。

所以,我们就建一个随机打散的分布式表,这样数据最均衡:

CREATE TABLE db1.dis_user ON CLUSTER mycluster AS db1.user ENGINE = Distributed(mycluster, db1, user, rand());

这个语句的意思是创建一个Distributed引擎的表,这个表通过ON CLUSTER mycluster语法会推送到所有node上创建,因此后续我们无论访问哪个node都可以访问到这个分布式表dis_user。

AS db1.user表示和db1.user表结构一致。

同时,Distributed(mycluster, db1, user, rand())指明了这个分布式表背后的本地表是mycluster集群的db1数据库的user表,写入时通过rand()随机打散写入到各个shard。

创建成功后列出了其背后各个node,我们只有1台。

插入数据到分布式表

连接任意clickhouse节点,执行SQL:

insert into db1.dis_user values(1,’hello’);

insert into dis_user values(2,’goodbye’);

然后查询:

select * from dis_user;

可以看到数据:

查询分布式表

执行SQL:

select count(*) from dis_user;

返回:

再次执行:

insert into dis_user values(2,’building’);

你会发现插入成功了,也就是说虽然id是主键,但是id=2的重复记录还是能插入成功:

实际上clickhouse的MergeTree引擎是类似LSM的数据库,满足高吞吐写入,但根据网上说法clickhouse的删除和更新操作并不能通过LSM追加的方式实现,具体原因没细看。

总之,

  • clickhouse默认MergeTree引擎是没有主键去重功能的,主键只是为了加速查询索引用的。
  • 不建议使用UPDATE更新和DELETE删除,因为这些操作对clickhouse实现来说性能很差,应该通过追加新记录的方式实现伪更新和伪删除,在这篇博客就不展开了,免得大家糊涂,在下一篇博客我会通过一个案例讲一下clickhouse的这块思想。

总结

本文关键是理解如何通过ON CLUSTER关键字完成分布式DDL,记住clickhouse没有全局元信息,因此所有的DDL操作都应该分发到每个Node上执行。

通过在所有node上创建一份分布式表的方式,可以确保客户端访问任意节点均能访问到该分布式表,并且该分布式表能够代理完成对所有node上本地表的查询和写入负载。

最后,我们应该也意识到了clickhouse在分布式架构上的简陋性,对其底层LSM模型有个基本的了解。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~