基本概念:

连续查询(Continuous Query)简称CQ,是influxdb中的一种查询类型,其会按照用户指定的查询规则,自动的、周期性的查询实时数据并执行指定运算,然后将结果保存到指定的表(Measurement)。

持续查询中用户可以指定的查询规则包括:查询时间间隔、单次查询时间范围、查询规则。InlfuxDB会根据用户指定的规则,定期的将过去一段时间内的原始数据以用户所期望的方式保存至用户新的结果表(Measurement)中,极大的减少了新表中的数据量。并且,新数据表中数据是跟用户业务高度贴近的方式存储,在新表中查询数据会提升查询速度,降低复杂度。

创建连续查询语法:

创建连续查询的基础语法如下

CREWATE CONTINUOUS QUERY <cq_name> ON <database_name>
BEGIN
    <cq_query>
END
  • cq_name: 连续查询名称
  • database_name: 连续查询所在数据库的名字
  • cq_query: 连续查询规则语句,该块的语法是 InfluxQL格式具体格式如下

    SELECT <functions[s]>
    INTO <destination_measurement>
    FROM <source_measurement>
    [WHERE <stuff>]
    GROUP BY time(<interval>)[,<tag_key[s]>]
    

其中SELECT、INTO、FROM、GROUP BY time()字句是必选设置项,具体参数如下

  • <functions[s]>: 要查询的字段以及数据处理的内置函数
  • <destinaion_measurement>: 用于保存查询结果的目标数据表,如果表不存在InfluxDB会自动创建。
  • <source_measurement>: 连续查询语句所要查询的数据表
  • <stuff>: 具体的查询条件,可选
  • <interval>: 连续查询语句执行的时间间隔和查询的时间范围,实际使用中要注意,不应在where条件中指定时间范围,即使指定也会被系统忽略。
  • <tag_key[s]>: 归类的标签字段,可选参数

实战

业务场景介绍:

结合流量统计的业务场景看下具体如何使用连续查询。
在的业务系统中,终端设备会间隔持续上报流量,主要tag和field有如下几个信息

device_id  设备标识
sim_card_id  流量卡标识
flow  当前sim卡累计流量

接收数据的业务会将接收的数据按如下方式存储到influxdb中,具体的写入语句如下

flow_raw_rx,device_id=xxxxxx,sim_card_id=aaaaaa flow=1234
flow_raw_tx,device_id=xxxxxx,sim_card_id=aaaaaa flow=5678

比如要实现如下几个具体需求:

  • 查询SIM卡小时、天、月、年的流量
  • 查询SIM卡总流量
  • 查询某个设备卡小时、天、月、年的流量
  • 查询某个设备总流量

通过分析,我们的原始数据是巨大的,按1K台活跃设备计算,每5s上报一次数据,每天8h来计算,每年产生20亿条数据,如此庞大的数据直接查询是难以想象的。
流量统计并不需要如此精细粒度的数据,有必要进行二次处理。我们以半小时为单位,将原始数据做聚合。原始数据只保留当天数据,通过二次聚合处理的数据和当天的数据就能准确满足我们的需求。

创建连续查询,二次聚合数据处理:

#创建数据
create database multiwan
create database multiwan_inf

#创建连续查询
create continuous query flow_tx_cq on multiwan 
begin 

select sum(value) 
into multiwan_inf.autogen.flow_tx_cq_result 
from multiwan_raw_tx 
group by time(30m), device_id, sim_card_id

end

multiwan 是原始数据,multiwan_inf是连续查询的结果数据
同样对于上行流量也是一样的语句。创建成功之后可以通过命令行直接查看创建的连续查询

> show continuous queries;
name: _internal
name query
---- -----

name: telegraf
name query
---- -----

name: multiwan
name       query
----       -----
flow_rx_cq CREATE CONTINUOUS QUERY flow_rx_cq ON multiwan BEGIN SELECT sum(value) INTO multiwan_inf.autogen.flow_rx_cq_result FROM multiwan.autogen.multiwan_raw_rx GROUP BY time(30m), device_id, sim_card_id END
flow_tx_cq CREATE CONTINUOUS QUERY flow_tx_cq ON multiwan BEGIN SELECT sum(value) INTO multiwan_inf.autogen.flow_tx_cq_result FROM multiwan.autogen.multiwan_raw_tx GROUP BY time(30m), device_id, sim_card_id END

如果创建的连续查询有问题则可以执行drop操作,直接删除该条连续查询

drop continuous query flow_rx_cq on multiwan
drop continuous query flow_tx_cq on multiwan

创建数据保留策略:

为减少数据量,会保留n-2之后的数据。因此需要将旧的数据删除,只保留我们想要的数据。
保留策略是InfluxDB的一个重要组成部分,InfluxDB通过保留策略决定数据保留时长。InfluxDB通过计算本地服务器时间和存储数据的时间戳差值,决定是否保留数据,过期的数据则直接删除。

创建保留策略的基本语法如下

CREATE RETENTION POLICY <retention_policy_name> ON <database name>
DURATION <duration> REPLICATION <n> [SHARED DURATION <duration>]
[DEFAULT]
  • DURATION子句指定了influxDB需要保留的数据时长。DURATION所指定的值<duration>为保留时间长度或者未INF(标识无限长,infinite)。其最小保留时间间隔是一小时。
  • REPLICATION子句指定集群中每条数据有多少个不相关的副本,其主要在集群版本中应用,单机开源版无效。
  • SHARED DURATION:决定一个分片组对应的时间范围,它所设置的<duration>值也是一个时间长度,但不支持无限长。

在本应用我们保留时间设置为2天,因为是开源版本。具体操作语句如下

#创建2天数据保留策略
create retention policy "rp_two_day" on multiwan duration 2d replication 1

总结

multiwan 存储原始数据,其保留策略为只包括最近两天的数据。 multiwan_inf是持续查询后的结果,其数据会永久保存按之前的量估算每年的数据量在500w级别,而且该粒度能满足我们的需求。
这样极大降低了数据复杂度。

Tags: influxdb

Related Posts:

Leave a Comment