掘金 后端 ( ) • 2024-03-27 09:52

很多时候,我们需要快速的处理一些与时间相关性比较高的海量数据,比如IOT、监控、事件、告警等等,这一类数据我们称之为时序数据,传统数据库可以解决此类场景,但是当数据规模达到一定程度之后,开始变得没有那么好用了。

TimescaleDB是时序数据库中的一员,比较与众不同的是,他只是PostgreSQL的一个插件,对于PostgreSQL的爱好者来说,这是对pg非常好的一个增强。这篇文章会带你快速了解TimescaleDB的使用方法。

PS1:虽然是快速入门教程,但内容还是有点多。

PS2:最快的学习方法是边学边练,如果觉得自己部署环境麻烦,可以使用一些免费在线数据库

〇、时序表Hypertable

TimescaleDB的三个核心特性:Hypertable、持续聚合、压缩。最基础的是Hypertable。

Hypertable是自动按时间对数据进行分区的 PostgreSQL 表。使用Hypertable的方式与常规的 PostgreSQL 表相同,但增加了一些额外功能,使管理时序数据更加轻松。

0.1 创建Hypertable

先创建一个常规PostgreSQL表:

CREATE TABLE conditions (
   time        TIMESTAMPTZ       NOT NULL,
   location    TEXT              NOT NULL,
   device      TEXT              NOT NULL,
   temperature DOUBLE PRECISION  NULL,
   humidity    DOUBLE PRECISION  NULL
);

然后将其转换成Hypertable:

SELECT create_hypertable('conditions', by_range('time'));

0.2 调整Hypertable的chuck interval

调整chuck interval可以提升数据库的性能,查询当前chuck interval设置:

SELECT h.table_name, c.interval_length
  FROM _timescaledb_catalog.dimension c
  JOIN _timescaledb_catalog.hypertable h
    ON h.id = c.hypertable_id;

设置chuck interval的方法:

SELECT create_hypertable(
  'conditions',
  by_range('time', INTERVAL '1 day')
);

对已经存在的Hypertable调整chuck interval:

SELECT set_chunk_time_interval('conditions', INTERVAL '24 hours');

0.3 创建索引

  • 在device_id上创建唯一索引,以time作为分区键。
CREATE UNIQUE INDEX idx_deviceid_time
  ON hypertable_example(device_id, time);
  • 在time、user_id、device_id上创建唯一索引
CREATE UNIQUE INDEX idx_userid_deviceid_time
  ON hypertable_example(user_id, device_id, time);

在创建索引的时候,必须包含time字段,因为time字段是分区键。

0.4 删除Hypertable

DROP TABLE <TABLE_NAME>;

一、写入数据

Timescale使PostgreSQL的扩展,因此支持以与PostgreSQL相同的方式写入数据,使用INSERT、UPDATE、INSERT ... ON CONFLICT、DELETE等命令进行数据的插入、更新和删除操作。

1.1 Insert

  • 插入一行数据
INSERT INTO conditions(time, location, temperature, humidity)
  VALUES (NOW(), 'office', 70.0, 50.0);
  • 插入多行数据
INSERT INTO conditions
  VALUES
    (NOW(), 'office', 70.0, 50.0),
    (NOW(), 'basement', 66.5, 60.0),
    (NOW(), 'garage', 77.0, 65.2);
  • 插入数据并返回插入的数据行
INSERT INTO conditions
  VALUES (NOW(), 'office', 70.1, 50.1)
  RETURNING *;

上面的语句的返回结果:

time                          | location | temperature | humidity
------------------------------+----------+-------------+----------
2017-07-28 11:42:42.846621+00 | office   |        70.1 |     50.1
(1 row)

1.2 Update

  • 修改一行数据
UPDATE conditions
  SET temperature = 70.2, humidity = 50.0
  WHERE time = '2017-07-28 11:42:42.846621+00'
    AND location = 'office';
  • 修改多行数据
UPDATE conditions
  SET temperature = temperature + 0.1
  WHERE time >= '2017-07-28 11:40'
    AND time < '2017-07-28 11:50';

1.3 Upsert

Upsert的语义是:

  • 如果没有匹配(冲突)的数据,则插入一行新数据
  • 如果有匹配(冲突)的数据,则更新数据或者什么也不做

下面是冲突则更新的写法(DO UPDATE):

INSERT INTO conditions
  VALUES ('2017-07-28 11:42:42.846621+00', 'office', 70.2, 50.1)
  ON CONFLICT (time, location) DO UPDATE
    SET temperature = excluded.temperature,
        humidity = excluded.humidity;

下面是冲突则什么也不做的写法(DO NOTHING):

INSERT INTO conditions
  VALUES ('2017-07-28 11:42:42.846621+00', 'office', 70.1, 50.0)
  ON CONFLICT DO NOTHING;

1.4 Delete

使用pg语法删除数据:

DELETE FROM conditions WHERE temperature < 35 OR humidity < 60;

1.5 数据保留策略

在时序应用程序中,数据往往随着时间的增长而变得不那么有用。如果你不需要过期的历史数据,你可以在它达到一定年龄后将其删除。Timescale可以设置自动数据保留策略以丢弃旧数据。你也可以通过手动删除数据块来微调数据。

通常,我们只需要保留历史数据的统计汇总结果,但不需要保留原始数据。Timescale为开发者提供了内置的功能,允许你将持续聚合和数据保留策略合并使用。

1.5.1 按区块删除数据

Timescale数据保留适用于块,而不是行。逐行删除数据(例如使用PostgreSQL DELETE命令)会很慢,但是按块删除数据会更快,因为它会从磁盘中删除整个文件。它不需要垃圾收集和碎片整理。

无论是使用保留策略还是手动删除块,Timescale都会按块删除数据。它只删除所有数据都在指定时间范围内的块,如果该块还包含了其他时间段的数据,则不会被删除。

例如,我们可能存在如下三块数据:

  • 数据块1:超过36小时
  • 数据块2:在12到36小时之间
  • 数据块3:过去12小时

此时如果我们手动drop超过24小时的数据块,则实际上只会删除数据块1数据块2会被保留,因为它包含一些24小时内的数据,此时该数据块中的任何行都不会被删除。

1.5.2 创建数据保留策略

语法如下:

SELECT add_retention_policy('conditions', INTERVAL '24 hours');

上面的语句为conditions表设置的数据保留策略为保留24小时内的数据,超过24小时的数据块会被删除。

数据保留策略只能用来删除历史数据,要想删除未来时间的数据,可以手动删除:

-- 删除超过24小时的数据
SELECT drop_chunks('conditions', INTERVAL '24 hours');

-- 删除一个范围内的数
SELECT drop_chunks(
  'conditions',
  older_than => INTERVAL '3 months',
  newer_than => INTERVAL '4 months'
)

-- 删除大于当前时间3个月的数据
SELECT drop_chunks(
  'conditions',
  newer_than => now() + INTERVAL '3 months'
);

1.5.3 删除数据保留策略

SELECT remove_retention_policy('conditions');

1.5.4 持续聚合和数据保留

你可以通过将数据保留策略与持续聚合结合起来对数据进行降采样。如果正确设置刷新策略,你可以从Hypertable中删除旧数据,而不会从持续聚合中删除它。这样一来,你就可以节省原始数据存储空间,同时保留用于历史分析的汇总数据。

注意 :当刷新持续聚合数据时,Timescale会根据刷新窗口中原始数据的更改来更新聚合数据。如果发现原始数据已被删除,它也会删除聚合数据。为了防止这种情况发生,请确保聚合数据的刷新窗口不会与任何已删除的数据重叠。

下面是一个示例:

CREATE MATERIALIZED VIEW conditions_summary_daily (day, device, temp)
WITH (timescaledb.continuous) AS
  SELECT time_bucket('1 day', time), device, avg(temperature)
  FROM conditions
  GROUP BY (1, 2);

SELECT add_continuous_aggregate_policy('conditions_summary_daily', '7 days', '1 day', '1 day');

上面的SQL创建了一个名为 conditions_summary_daily 的持续聚合,其中存储了每个设备的每日温度数据。该聚合每天刷新一次。每次刷新时,它会更新从 7 天前到 1 天前的任何数据更改。

你不应该在 conditions 表上设置 24 小时的保留策略。如果你这样做了,超过 1 天的分块将被删除。然后,根据数据更改,聚合会进行刷新。由于数据更改是删除超过 1 天的数据,聚合也会删除数据。最终导致 conditions_summary_daily 表中没有数据。

此时可以将conditions 表的保留策略设置的长一些,比如30天:

SELECT add_retention_policy('conditions', INTERVAL '30 days');

持续聚合表业可以设置数据保留策略,比如我们保留聚合数据600天。

SELECT add_retention_policy('conditions_summary_daily ', INTERVAL '600 days');

二、查询数据

Timescale支持PostgreSQL的所有查询语法,因此你可以像使用pg一样使用Timescale。同时Timescale还提供了一些额外的特性来帮助开发者更高效的分析数据:

  • 使用SkipScan 加速 DISTINCT 查询
  • 使用Hyperfunction 可以改进数据分析场景的查询体验
  • 使用Function pipelines为SQL语言增加函数编程能力,使得对数据进行复杂转换变的简单。

2.1 简单SELECT查询

  • 从表中查询最近100条数据
SELECT * FROM conditions ORDER BY time DESC LIMIT 100;
  • 查询过去12小时写入到表中的数据
SELECT COUNT(*) FROM conditions
  WHERE time > NOW() - INTERVAL '12 hours';
  • 统计conditions表中过去一天的数据中有空调的地方的数量
SELECT COUNT(DISTINCT location) FROM conditions
  JOIN locations
    ON conditions.location = locations.location
  WHERE locations.air_conditioning = True
    AND time > NOW() - INTERVAL '1 day';

2.2 使用Time bucket

Time bucket可以让我们更方便的按时间段对数据进行聚合。例如,可以将数据分组为5分钟、1小时、3天、1周的Time bucket,以计算统计数据,下面是一些例子:

  • 按Time bucket分组进行数据统计

在 weather_conditions 的表中计算每日平均温度:

SELECT time_bucket('1 day', time) AS bucket,
  avg(temperature) AS avg_temp
FROM weather_conditions
GROUP BY bucket
ORDER BY bucket ASC;
  • 按Time bucket分组进行数据统计,且显示Time bucket的结束时间

默认情况下,time_bucket 列的显示值为time bucket的开始时间,如果希望显示接入时间,可以如下方式写SQL:

SELECT time_bucket('5 min', time) + '5 min' AS bucket,
  min(cpu),
  max(cpu)
FROM metrics
GROUP BY bucket
ORDER BY bucket DESC;
  • 按Time bucket分组进行数据统计,并修改时间范围

要更改Time bucket所覆盖的时间范围,可以使用偏移参数,该参数接受一个 INTERVAL 值。正偏移将Time bucket的起始和结束时间推后。负偏移将Time bucket的起始和结束时间提前。

例如,可以计算每5小时的平均 CPU 使用率,并将所有Time bucket的起始和结束时间推后1小时:

SELECT time_bucket('5 hours', time, '1 hour'::INTERVAL) AS bucket,
  avg(cpu)
FROM metrics
GROUP BY bucket
ORDER BY bucket DESC;
  • 计算一个特定时间的Time Bucket
  • 获取过去3小时内每个位置每15分钟时段的天气状况信息,并计算采集的测量次数、最高温度和最高湿度。按最高温度对结果进行排序:
SELECT time_bucket('15 minutes', time) AS fifteen_min,
    location,
    COUNT(*),
    MAX(temperature) AS max_temp,
    MAX(humidity) AS max_hum
  FROM conditions
  WHERE time > NOW() - INTERVAL '3 hours'
  GROUP BY fifteen_min, location
  ORDER BY fifteen_min DESC, max_temp DESC;

一些工作原理解释

Time bucket是一个时间段的描述,比如1小时,起始点则决定了Time bucket的开始和结束时间。默认情况下,Time bucket并不从数据中最早的时间戳开始。例如,第一个数据点是在00:37收到的,但我们通常希望每天的Time bucket从0点开始。同样的,我们可能在星期三收集到第一个数据点,但通常希望每周的Time bucket从星期日或星期一开始计算。

假设我们的数据中最早的时间戳是2020年4月24日。如果我们的Time bucket间隔设置为两周,则第一个Time bucket不会从4月24日开始,因为那是一个星期五。它也不会从4月20日开始,因为那是前一周的星期一。它会从2020年4月13日开始,因为您可以通过从2000年1月3日开始以两周为单位计数来到达2020年4月13日,这里Timescale使用2000年1月3日作为默认起始点进行推导。

  • 关于默认起始点的约定

对于不涉及月份或年份的间隔,默认起始点是2000年1月3日。对于月份、年份或世纪的间隔,默认起始点是2000年1月1日。对于整数时间值, 默认起始点是0。

这些选择使得Time bucket的时间范围更加直观。由于2000年1月3日是星期一,所以周Time bucket从星期一开始。这符合ISO标准计算日历周的要求。月度和年度Time bucket使用2000年1月1日作为起始点。这使它们能够从日历月或年的第一天开始。

  • 时区

起始时间取决于您的时间值的数据类型。

如果您使用 TIMESTAMP,默认情况下,Time bucket的开始时间与 00:00:00 对齐。每日和每周的Time bucket从 00:00:00 开始。较短的Time bucket从您可以通过从原始日期的 00:00:00 开始按Time bucket增量计数到达的时间开始。

如果您使用 TIMESTAMPTZ,默认情况下,Time bucket的开始时间与 00:00:00 UTC 对齐。要将Time bucket与另一个时区对齐,需要设置时区参数。

  • 常见错误
ERROR:  cannot create continuous aggregate with incompatible bucket width
DETAIL:  Time bucket width of "<BUCKET>" [1 year] should be multiple of the time bucket width of "<BUCKET>" [1 day].

如果尝试创建分层持续聚合,必须使用兼容的Time bucket。不能在具有可变宽度Time bucket的持续聚合上创建具有固定宽度Time bucket的持续聚合。

2.3 使用SkipScan来加速DISTINCT查询

注意:SkipScan 目前不能在压缩数据上执行。

SkipScan 可以加速 DISTINCT 查询的查询时间。它适用于 PostgreSQL 表、Timescale Hypertable。SkipScan 包含在 TimescaleDB 2.2.1 及更高版本中。

要查询数据库并找到某项的最新值,您可以使用 DISTINCT 查询。例如,您可能想要找到每项投资的最新股票或加密货币价格。或者您可能有一个可视化图表和警报,周期性查询每个设备或监控项的最新值。

随着表变得越来越大,DISTINCT 查询往往会变得更慢。这是因为 PostgreSQL 目前没有很好的机制来从有序索引中获取唯一值列表。即使您有一个与此类查询的确切顺序和列匹配的索引,PostgreSQL 也会扫描整个索引以找到所有唯一值。随着表的增长,此操作变得越来越慢。

SkipScan 允许查询在不读取中间所有行的情况下逐步从一个有序值跳到下一个值。如果不支持此功能,数据库引擎必须扫描整个有序索引,然后在结束时去重,这是一个非常慢的过程。

SkipScan 是对形式为 SELECT DISTINCT ON column_name 的查询进行优化的。从概念上讲,SkipScan 是一个常规的 IndexScan,它在索引中跳过查找下一个大于当前值的值。

使用SkipScan,需要创建需要满足如下条件的索引:

  • DISTINCT 字段必须是索引的第一个字段
  • 使用BTREE索引
  • 索引的排序规则与查询语句保持一致

查询语句必须满足如下条件:

  • 只能在一列上使用DISTINCT

例如:

CREATE INDEX "cpu_customer_tags_id_time_idx" 
ON readings (customer_id, tags_id, time DESC)

2.4 高级分析

  • 2.4.1 计算百分位数和中位数(一种特殊的百分位:50%)
SELECT percentile_cont(0.5)
  WITHIN GROUP (ORDER BY temperature)
  FROM conditions;
  • 2.4.2 计算累计总和

使用 sum(sum(column)) OVER(ORDER BY group) 计算累计总和:

SELECT location, sum(sum(temperature)) OVER(ORDER BY location)
  FROM conditions
  GROUP BY location;

上面的窗口函数 sum(sum(temperature)) OVER(ORDER BY location) 计算的方式如下:

  1. 首先,内部的 SUM(temperature) 函数会计算每个 location 的温度总和。
  2. 然后,外部的 SUM() 窗口函数将对上述计算的温度总和进行累加。
  3. OVER(ORDER BY location) 部分指定了窗口的排序方式,即按照 location 的顺序进行排序。

因此,这个查询会返回一个按照 location 排序的结果集,其中每行的值表示该行之前所有行中对应位置的温度总和的累加值。

  • 2.4.3 计算移动平均值

要计算简单移动平均值,使用 OVER 窗口函数跨越一定数量的行,然后在这些行上计算聚合函数。例如,要通过对最近十次读数进行平均来找出设备的平滑温度值:

SELECT time, AVG(temperature) OVER(ORDER BY time
      ROWS BETWEEN 9 PRECEDING AND CURRENT ROW)
    AS smooth_temp
  FROM conditions
  WHERE location = 'garage' and time > NOW() - INTERVAL '1 day'
  ORDER BY time DESC;

上面的SQL从 conditions 表中查询时间(time)和温度的平均值,其中平均值是基于时间窗口计算的。时间窗口包括当前行及其之前的最近的 9 行数据。这样可以计算出每个时间点(当前行)及其之前 9 个时间点的温度的平均值。

  • 2.4.4 计算值的增加

要计算值的增加,您需要考虑计数器重置。如果主机重新启动或容器重新启动,则可能会发生计数器重置。此示例查找发送的字节数,并将计数器重置考虑在内:

SELECT
  time,
  (
    CASE
      WHEN bytes_sent >= lag(bytes_sent) OVER w
        THEN bytes_sent - lag(bytes_sent) OVER w
      WHEN lag(bytes_sent) OVER w IS NULL THEN NULL
      ELSE bytes_sent
    END
  ) AS "bytes"
  FROM net
  WHERE interface = 'eth0' AND time > NOW() - INTERVAL '1 day'
  WINDOW w AS (ORDER BY time)
  ORDER BY time

上面的SQL从 net 表中查询时间(time)和字节数("bytes"),其中字节数是根据前一行字节数计算得出的。

在 CASE 语句中:

  1. 如果当前行的字节数(bytes_sent)大于或等于前一行的字节数,则计算当前行字节数与前一行字节数之间的差值,并将结果作为当前行的字节数。
  2. 如果前一行的字节数为 NULL,则将当前行的字节数设为 NULL。
  3. 否则,将当前行的字节数设为当前行的字节数(bytes_sent)。
  • 2.4.5 计算变化率

与增量类似,变化率适用于具有单调增加计数器的情况。如果您的采样间隔是可变的,或者在不同系列之间使用不同的采样间隔,将值归一化为公共时间间隔以使计算出的值可比较将会很有帮助。此示例查找每秒发送的字节数,并考虑计数器重置:

SELECT
  time,
  (
    CASE
      WHEN bytes_sent >= lag(bytes_sent) OVER w
        THEN bytes_sent - lag(bytes_sent) OVER w
      WHEN lag(bytes_sent) OVER w IS NULL THEN NULL
      ELSE bytes_sent
    END
  ) / extract(epoch from time - lag(time) OVER w) AS "bytes_per_second"
  FROM net
  WHERE interface = 'eth0' AND time > NOW() - INTERVAL '1 day'
  WINDOW w AS (ORDER BY time)
  ORDER BY time

上面的SQL从 net 表中查询时间(time)和传输速率("bytes_per_second")的数据。

对于每一行数据,使用 CASE 语句来判断:

  1. 如果当前行的 bytes_sent 大于或等于上一行的 bytes_sent,则计算两行之间的字节差值,然后除以两行之间的时间间隔,得到字节传输速率。
  2. 如果上一行的 bytes_sent 为 NULL,则返回 NULL。
  3. 否则,将当前行的 bytes_sent 作为字节传输速率。

在这个SQL中,WINDOW 子句定义了一个窗口(window),命名为 w,并指定了排序规则,即按照时间(time)字段的顺序进行排序(ORDER BY time),这个窗口 w 被用于两个地方:

  1. 在两个 OVER 子句中使用了窗口 w。这些 OVER 子句是用来执行窗口函数的,即 lag() 函数,用于获取前一行的数据。通过指定 w,我们告诉数据库应该按照窗口 w 中定义的排序规则来获取前一行的数据。
  2. 在主查询中,将窗口 w 用作 window function 的参数。在 CASE 语句中,通过指定 OVER w,我们指示数据库在计算 lag() 函数时应该按照窗口 w 中定义的排序规则来获取前一行的数据。这样可以确保计算字节传输速率时使用的是正确的时间间隔。

总之,WINDOW 子句在这个 SQL 中的作用是定义了一个命名窗口 w,并指定了排序规则,以确保在窗口函数中正确地获取前一行的数据,从而计算出正确的字节传输速率。

  • 2.4.6 计算差量

在许多监控和物联网(IoT)应用场景中,设备或传感器报告的指标不经常变化,任何变化都被视为异常。当您查询随时间变化的这些值时,通常不希望传输所有值,而只希望传输观察到变化的值。这有助于最小化发送的数据量。您可以使用窗口函数和子查询的组合来实现此目的。此示例使用差异来过滤未更改值的行,并仅传输发生更改的行:

SELECT time, value FROM (
  SELECT time,
    value,
    value - LAG(value) OVER (ORDER BY time) AS diff
  FROM hypertable) ht
WHERE diff IS NULL OR diff != 0;
  • 2.4.7 分组计算指标的变化

要根据某个字段对数据进行分组,并计算每个组内指标的变化量,可以使用 LAG ... OVER (PARTITION BY ...)。例如,给定一些天气数据,计算每个城市的温度变化量:

SELECT ts, city_name, temp_delta
FROM (
  SELECT
    ts,
    city_name,
    avg_temp - LAG(avg_temp) OVER (PARTITION BY city_name ORDER BY ts) as temp_delta
  FROM weather_metrics_daily
) AS temp_change
WHERE temp_delta IS NOT NULL
ORDER BY bucket;

上面的SQL从 weather_metrics_daily 表中查询时间戳(ts)、城市名称(city_name)和温度变化量(temp_delta)的数据。

在内部查询中,对每个城市按时间戳排序,计算每个城市平均温度与上一个时间戳的平均温度之间的差值(temp_delta)。

最后,从内部查询中选择非空的温度变化量数据,并按照 bucket 字段进行排序(这里的 bucket 字段没有在内部查询中定义)。

这个查询的目的是找出每个城市在相邻时间戳之间的平均温度变化量。

  • 2.4.8 查询一个列的第一个和最后一个值

Timescale提供了first和last函数,允许你按另一列的顺序获取一列的值。

SELECT location, last(temperature, time)
  FROM conditions
  GROUP BY location;

SELECT time_bucket('5 minutes', time) five_min, location, last(temperature, time)
  FROM conditions
  GROUP BY five_min, location
  ORDER BY five_min DESC LIMIT 12;
  • 2.4.9 生成直方图

Timescale的 [histogram](https://docs.timescale.com/api/latest/hyperfunctions/histogram/) 函数允许你生成数据的直方图。此示例定义了一个直方图,其中有五个桶,定义在范围 60 到 85 之间。生成的直方图有七个箱子:第一个箱子用于值低于最小阈值 60,中间的五个箱子用于指定范围内的值,最后一个箱子用于值高于 85 的值:

SELECT location, COUNT(*),
    histogram(temperature, 60.0, 85.0, 5)
   FROM conditions
   WHERE time > NOW() - INTERVAL '7 days'
   GROUP BY location;

结果如下:

location   | count |        histogram
------------+-------+-------------------------
 office     | 10080 | {0,0,3860,6220,0,0,0}
 basement   | 10080 | {0,6056,4024,0,0,0,0}
 garage     | 10080 | {0,2679,957,2420,2150,1874,0}
  • 2.4.10 填补时序数据的间隙

您可以显示所选时间范围内的记录,即使部分时间范围内没有数据也可以。这称为填补间隙,通常涉及对缺失数据记录空值的操作。

下面示例中,使用包含时间戳、正在交易的资产代码、资产价格和交易的资产量的交易数据。

创建一个查询,用于获取九月份每天交易的资产“TIMS”的交易量:

SELECT
    time_bucket('1 day', time) AS date,
    sum(volume) AS volume
  FROM trades
  WHERE asset_code = 'TIMS'
    AND time >= '2021-09-01' AND time < '2021-10-01'
  GROUP BY date
  ORDER BY date DESC;

输出如下:

date          | volume
------------------------+--------
 2021-09-29 00:00:00+00 |  11315
 2021-09-28 00:00:00+00 |   8216
 2021-09-27 00:00:00+00 |   5591
 2021-09-26 00:00:00+00 |   9182
 2021-09-25 00:00:00+00 |  14359
 2021-09-22 00:00:00+00 |   9855

可以看到,上面的输出缺少了一些日期的数据,可以通过time_bucket_gapfill 函数来补全数据,以方便结果的使用:

SELECT
  time_bucket_gapfill('1 day', time) AS date,
  sum(volume) AS volume
FROM trades
WHERE asset_code = 'TIMS'
  AND time >= '2021-09-01' AND time < '2021-10-01'
GROUP BY date
ORDER BY date DESC;

输出如下:

date          | volume
------------------------+--------
 2021-09-30 00:00:00+00 |
 2021-09-29 00:00:00+00 |  11315
 2021-09-28 00:00:00+00 |   8216
 2021-09-27 00:00:00+00 |   5591
 2021-09-26 00:00:00+00 |   9182
 2021-09-25 00:00:00+00 |  14359
 2021-09-24 00:00:00+00 |
 2021-09-23 00:00:00+00 |
 2021-09-22 00:00:00+00 |   9855

还可以使用 Timescale 的 time_bucket_gapfill 函数生成包含时间戳的数据点。这对于图表展示非常有用,便于它们可以准确地在图表中绘制间隙。下面这个例子,在过去两周内生成了 1080 个数据点,填充了空缺值,并为每个空值分配了一个时间戳。


SELECT
  time_bucket_gapfill(INTERVAL '2 weeks' / 1080, time, now() - INTERVAL '2 weeks', now()) AS btime,
  sum(volume) AS volume
FROM trades
WHERE asset_code = 'TIMS'
  AND time >= now() - INTERVAL '2 weeks' AND time < now()
GROUP BY btime
ORDER BY btime;

结果如下:

btime          | volume
------------------------+----------
 2021-03-09 17:28:00+00 |  1085.25
 2021-03-09 17:46:40+00 |  1020.42
 2021-03-09 18:05:20+00 |
 2021-03-09 18:24:00+00 |  1031.25
 2021-03-09 18:42:40+00 |  1049.09
 2021-03-09 19:01:20+00 |  1083.80
 2021-03-09 19:20:00+00 |  1092.66
 2021-03-09 19:38:40+00 |
 2021-03-09 19:57:20+00 |  1048.42
 2021-03-09 20:16:00+00 |  1063.17
 2021-03-09 20:34:40+00 |  1054.10
 2021-03-09 20:53:20+00 |  1037.78
  • 2.4.11 通过向前推进最后一次观测来填补空白

如果您的数据集仅在实际值发生变化时记录一行数据,但您的可视化界面可能仍然需要所有数据点才能正确显示结果。在这种情况下,可以将上一次观察到的值传递到缺失的数据点中。例如:

SELECT
  time_bucket_gapfill(INTERVAL '5 min', time, now() - INTERVAL '2 weeks', now()) as 5min,
  meter_id,
  locf(avg(data_value)) AS data_value
FROM my_hypertable
WHERE
  time > now() - INTERVAL '2 weeks'
  AND meter_id IN (1,2,3,4)
GROUP BY 5min, meter_id
  • 2.4.12 找到每个唯一项的最后一条数据

您可以在数据库中找到每个唯一项目的最后一个数据点。例如,每个物联网设备的最后记录的测量值。减少搜索最后一个数据点所需数据量的标准方法是使用时间谓词来严格限制要遍历的时间量或块的数量。除非所有项目在时间范围内至少有一条记录,否则此方法不起作用。更健壮的方法是使用最后一个数据点查询来确定每个唯一项目的最后一条记录。

我们看一个例子:

CREATE TABLE vehicles (
  vehicle_id INTEGER PRIMARY KEY,
  vin_number CHAR(17),
  last_checkup TIMESTAMP
);

CREATE TABLE location (
  time TIMESTAMP NOT NULL,
  vehicle_id INTEGER REFERENCES vehicles (vehicle_id),
  latitude FLOAT,
  longitude FLOAT
);

SELECT create_hypertable('location', by_range('time'));

可以使用第一个表来对location表执行 LATERAL JOIN ,该表提供了一组不同的车辆:

SELECT data.* FROM vehicles v
  INNER JOIN LATERAL (
    SELECT * FROM location l
      WHERE l.vehicle_id = v.vehicle_id
      ORDER BY time DESC LIMIT 1
  ) AS data
ON true
ORDER BY v.vehicle_id, data.time DESC;

            time            | vehicle_id | latitude  |  longitude
----------------------------+------------+-----------+-------------
 2017-12-19 20:58:20.071784 |         72 | 40.753690 |  -73.980340
 2017-12-20 11:19:30.837041 |        156 | 40.729265 |  -73.993611
 2017-12-15 18:54:01.185027 |        231 | 40.350437 |  -74.651954

这种方法需要保留一个单独的表来存储唯一项目的标识符或名称。可以如上所示,通过 REFERENCES 定义Hypertable到元数据表的外键来实现这一点。

元数据表(本例中即:vehicles )可以通过业务逻辑进行填充,例如当车辆首次在系统中注册时。另外,您还可以在对Hypertable执行插入或更新时使用触发器动态填充它。例如:

CREATE OR REPLACE FUNCTION create_vehicle_trigger_fn()
  RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
BEGIN
  INSERT INTO vehicles VALUES(NEW.vehicle_id, NULL, NULL) ON CONFLICT DO NOTHING;
  RETURN NEW;
END
$BODY$;

CREATE TRIGGER create_vehicle_trigger
  BEFORE INSERT OR UPDATE ON location
  FOR EACH ROW EXECUTE PROCEDURE create_vehicle_trigger_fn();

三、数据压缩

TimescaleDB以更高效的格式存储数据,与普通PostgreSQL表相比,压缩率高达20倍。TimescaleDB压缩是在PostgreSQL中原生实现的,不需要特殊的存储格式。

列存压缩的一个额外好处是,某些查询明显更快,因为需要读取到内存中的数据更少。

3.1 设置压缩策略

以下面的表为例,我们演示一下如何使用压缩:

CREATE TABLE "metrics"(
    created timestamp with time zone default now() not null,
    type_id integer                                not null,
    value   double precision                       not null
);
SELECT create_hypertable('metrics', by_range('created'));
  • 使用Alter table命令,对表启用压缩,并设置segmentby字段和orderby字段:
ALTER TABLE metrics 
SET (
    timescaledb.compress, 
    timescaledb.compress_segmentby='type_id', 
    timescaledb.compress_orderby='created DESC'
);

segmentby和orderby的选择不同,性能和压缩比会不一样,如何正确的选中列,参考这里

  • 开启压缩后,可以手动压缩数据
SELECT compress_chunk(c) from show_chunks('metrics') c;

也可以配置自动压缩策略,在下一节中会详细介绍。

  • 查看压缩效果
SELECT 
    pg_size_pretty(before_compression_total_bytes) as before,
    pg_size_pretty(after_compression_total_bytes) as after
 FROM hypertable_compression_stats('metrics');

结果:

before | after 
--------+-------
 180 MB | 16 MB
(1 row)

3.2 设置自动压缩策略

为了避免每次有数据要压缩时都手动运行压缩,可以设置压缩策略。压缩策略允许您压缩超过特定时间的数据,例如,压缩超过8天的所有数据块:

SELECT add_compression_policy('metrics', INTERVAL '8 days');

压缩策略定期运行,默认情况下每天运行一次,这意味着使用上述设置,可能有长达9天的未压缩数据。

更多关于压缩策略的信息可以查看这里

3.3 查询加速

前面我们将压缩设置为按type_id列值进行分段(segmentby),这意味着通过对该列进行过滤或分组来获取数据将更加高效。同时,我们按created字段进行降序排序,在查询语句中使用该字段进行降序排序查询性能会更好。

下面是一个利用上述规则加速查询的例子:

SELECT time_bucket('1 day', created, 'Europe/Berlin') AS "time",
        round((last(value, created) - first(value, created)) * 
100.) / 100. AS value
FROM metrics                                   
WHERE type_id = 5
GROUP BY 1;

在压缩和解压的情况下,分别执行上述SQL,会看到相当大的性能差异。

解压数据的方法:

SELECT decompress_chunk(c) from show_chunks('metrics') c;

四、持续聚合

数据规模随着时间推移变得非常庞大是时序数据的典型特点,持续聚合旨在加快对非常大型数据集的查询速度。Timescale 持续聚合使用 PostgreSQL 的雾化视图,并以持续和增量方式在后台刷新数据,因此当您运行查询时,只需计算已更改的数据,而不是整个数据集。

4.1 创建持续聚合

创建一个持续聚合分两步:

  1. 创建一个视图

持续聚合依赖Time bucket,下面是一个例子:

CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT device,
   time_bucket(INTERVAL '1 day', time) AS bucket,
   AVG(temperature),
   MAX(temperature),
   MIN(temperature)
FROM conditions
GROUP BY device, bucket;
  1. 为该视图设置刷新策略,以持续更新视图
SELECT add_continuous_aggregate_policy('conditions_summary_daily',
  start_offset => INTERVAL '1 month',
  end_offset => INTERVAL '1 day',
  schedule_interval => INTERVAL '1 hour');
  1. 使用 WITH NO DATA 选项

默认情况下,当您首次创建视图时,它会被填充数据。这是为了使聚合可以跨整个超表进行计算。如果您不希望发生这种情况,例如如果表非常大,或者正在不断添加新数据,您可以控制数据刷新的顺序。您可以通过在持续聚合策略中使用 WITH NO DATA 选项来添加手动刷新。

WITH NO DATA 选项允许立即创建持续聚合,因此您无需等待数据被聚合。数据只有在策略开始运行时才开始填充。这意味着只有新于 start_offset 时间的数据才开始填充持续聚合。如果您有比 start_offset 时间间隔更早的历史数据,则需要手动刷新历史数据,直到当前的 start_offset,以便实时查询能够高效运行。

CREATE MATERIALIZED VIEW cagg_rides_view
WITH (timescaledb.continuous) AS
SELECT vendor_id,
time_bucket('1h', pickup_datetime) AS hour,
  count(*) total_rides,
  avg(fare_amount) avg_fare,
  max(trip_distance) as max_trip_distance,
  min(trip_distance) as min_trip_distance
FROM rides
GROUP BY vendor_id, time_bucket('1h', pickup_datetime)
WITH NO DATA;

手动刷新视图:

CALL refresh_continuous_aggregate('cagg_rides_view', NULL, localtimestamp - INTERVAL '1 week');

添加刷新策略:

SELECT add_continuous_aggregate_policy('cagg_rides_view',
  start_offset => INTERVAL '1 week',
  end_offset   => INTERVAL '1 hour',
  schedule_interval => INTERVAL '30 minutes');
  1. 使用JOIN查询
CREATE MATERIALIZED VIEW conditions_summary_daily_3
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
   AVG(temperature),
   MAX(temperature),
   MIN(temperature),
   name
FROM devices JOIN conditions USING (device_id)
GROUP BY name, bucket;

4.2 查询持续聚合的数据

跟查询普通表和视图一样:

SELECT *
  FROM conditions_summary_hourly
  WHERE device = 5
  AND bucket >= '2020-01-01'
  AND bucket < '2020-04-01';

4.3 实时聚合

持续聚合是周期性刷新的,因此通常不包括来自底层Hypertable的最新数据块。实时聚合会使用持续聚合数据,并将最新的原始数据添加到其中,以提供准确和最新的结果,而无需在写入数据时进行聚合。

在 Timescale 的版本 1.7 到 2.12 中,默认情况下启用了实时聚合;当您创建一个持续聚合视图时,对该视图的查询将包括最新的数据,即使它尚未被聚合。在 Timescale 2.13 及更高版本中,默认情况下禁用了实时聚合。

  • 启用实时聚合
ALTER MATERIALIZED VIEW table_name set (timescaledb.materialized_only = false);
  • 关闭实时聚合
ALTER MATERIALIZED VIEW table_name set (timescaledb.materialized_only = true);

开启实时聚合后,在查询持续聚合时会自动添加最新的数据。换句话说,它们包括比您上次物化的数据更新的数据。

如果您向已物化的数据中添加新的历史数据,则这些数据不会反映在实时聚合中。这些历史数据需要等到下一轮刷新,或者通过调用 refresh_continuous_aggregate 进行手动刷新才能被查询到。您可以将实时聚合视为对于历史数据的最终一致性。

4.4 与窗口函数一起使用

目前,持续聚合不支持窗口函数。您可以通过以下方式解决此问题:

  • 首先为查询的其他部分创建持续聚合,
  • 然后在查询时对您的持续聚合使用窗口函数。

例如,假设您有一个名为 example 的Hypertable,其中包含一个time列和一个value列。您将数据按时间进行分段,并使用 lag 窗口函数计算time bucket之间的增量:

WITH t AS (
  SELECT
    time_bucket('10 minutes', time) as bucket,
    first(value, time) as value
  FROM example GROUP BY bucket
)
SELECT
  bucket,
  value - lag(value, 1) OVER (ORDER BY bucket) delta
  FROM t;

你无法直接使用上面的SQL创建持续聚合,因为上面的SQL使用了窗口函数lag。你可以先把窗口函数排除,创建一个持续聚合:

CREATE MATERIALIZED VIEW example_aggregate
  WITH (timescaledb.continuous) AS
    SELECT
      time_bucket('10 minutes', time) AS bucket,
      first(value, time) AS value
    FROM example GROUP BY bucket;

然后在查询时使用lag函数在持续聚合上计算差量:

SELECT
  bucket,
  value - lag(value, 1) OVER (ORDER BY bucket) AS delta
FROM example_aggregate;

这可以利用持续聚合的能力加速查询。

4.5 级联/多层持续聚合

Timescale允许在其他持续聚合之上创建持续聚合,这样可以让你以不同的粒度汇总数据。例如,你可能有一个按小时汇总逐分钟数据的持续聚合。要获取每日摘要,您可以在小时级聚合的基础上创建一个新的持续聚合。这比在原始Hypertable之上创建每日聚合更高效,因为您可以重用小时级聚合的计算结果。

在持续聚合上创建持续聚合跟在Hypertable上创建持续聚合的语法一模一样,就不再赘述。

实时聚合

默认情况下,所有持续聚合都使用实时聚合,这意味着对持续聚合的所有查询得到的总是实时最新的数据。Timescale通过将持续聚合产生的物化视图与源表或未雾化的原始数据进行连接来实现这一点。

当持续聚合被级联时,每个持续聚合仅了解其下一层。未物化数据的连接会递归进行,直到到达底层,从而确保在任何层级都能访问到最新的实时数据。

如果您将级联中的所有持续聚合都保持为实时聚合,底层是源Hypertable,这意味着级联中的每个持续聚合都可以访问所有最近的数据。

如果级联中的某个位置存在非实时持续聚合,递归JOIN将在该非实时持续聚合处停止。较高层的持续聚合不会从较低层接收任何未物化的数据。

例如,假设您有以下持续聚合:

  1. 在源Hypertable上进行实时每小时持续聚合
  2. 在每小时持续聚合上进行实时每日持续聚合
  3. 在每日持续聚合上进行非实时(仅雾化)每月持续聚合
  4. 在每月持续聚合上进行实时每年持续聚合

每小时和每日持续聚合的查询包括来自源Hypertable的实时、未物化的数据。每月持续聚合的查询仅返回已物化的数据。每年持续聚合的查询返回来自每年持续聚合本身的物化数据,以及来自每月持续聚合的近期的数据。但是,数据受限于每月持续聚合中已物化的内容,并且不会从源Hypertable获取最新的数据。

4.6 在持续聚合上创建索引

  • 自动创建索引

当您创建连续聚合时,会自动为每个 GROUP BY 列创建一个索引。该索引是一个复合索引,将 GROUP BY 列与 time_bucket 列结合起来。

例如,如果您定义了一个带有 GROUP BY 设备、位置、桶的连续聚合视图,那么会创建两个复合索引:一个在 {device, bucket} 上,另一个在 {location, bucket} 上。

可以在创建持续聚合时,将 timescaledb.create_group_indexes 设置为 false 来关闭自动创建索引的功能:

CREATE MATERIALIZED VIEW conditions_daily
  WITH (timescaledb.continuous, timescaledb.create_group_indexes=false)
  AS
  ...
  • 手动创建索引

Timescale 2.7开始,允许在任意字段上创建索引,就跟pg索引的语法一样:

CREATE INDEX avg_temp_idx ON weather_daily (avg_temp);

4.7 数据压缩

持续聚合通常是对历史数据的采样,因此如果数据只是用来查询分析用,不会进行修改,则可以开启压缩以节省空间。

连续聚合上的压缩与Hypertable上的压缩工作方式类似。当启用压缩且没有提供其他选项时,segment_by 值将自动设置为连续聚合的 group by 列,并且 time_bucket 列将用作压缩配置中的 order_by 列。

  • 对已经存在的持续聚合开启压缩
ALTER MATERIALIZED VIEW cagg_name set (timescaledb.compress = true);
  • 关闭压缩
ALTER MATERIALIZED VIEW cagg_name set (timescaledb.compress = false);

如果持续聚合上有压缩的块,关闭压缩会失败,此时需要先解压数据

SELECT decompress_chunk(c, true) FROM show_chunks('cagg_name') c;

设置压缩策略

在为连续聚合设置压缩策略之前,需要先设置一个刷新策略。压缩策略的间隔应设置得不会对正在刷新的区域进行压缩。这是为了防止刷新策略失败。

以下面的刷新策略为例:

SELECT add_continuous_aggregate_policy('cagg_name',
  start_offset => INTERVAL '30 days',
  end_offset => INTERVAL '1 day',
  schedule_interval => INTERVAL '1 hour');

压缩策略需要设置compress_after 参数的值大于刷新策略的start_offset 的值:

SELECT add_compression_policy('cagg_name', compress_after=>'45 days'::interval);

4.8 关于时间和时区

在连续聚合中,不支持依赖于本地时区设置的函数,无法调整到本地时间,因为时区设置会因用户而异。

为了解决这个问题,可以在视图定义中使用显式时区。另外,还可以为使用整数时间列的表创建自己的自定义聚合方案。

  • 在创建持续聚合时显示声明时区
CREATE MATERIALIZED VIEW device_summary
WITH (timescaledb.continuous)
AS
SELECT
  time_bucket('1 hour', observation_time) AS bucket,
  min(observation_time AT TIME ZONE 'EST') AS min_time,
  device_id,
  avg(metric) AS metric_avg,
  max(metric) - min(metric) AS metric_spread
FROM
  device_readings
GROUP BY bucket, device_id;
  • 在数字类型的时间上创建持续聚合

日期和时间通常以年-月-日小时:分钟:秒表示。大多数 Timescale 数据库使用日期/时间类型列来表示日期和时间。然而,在某些情况下,您可能需要将这些常见的时间和日期格式转换为使用整数的格式。最常见的整数时间是 Unix 时间戳,它是自 1970-01-01 Unix 纪元以来的秒数,但也有可能使用其他类型的基于整数的时间格式。

要创建一个使用整数列作为时间的Hypertable,需要提供块时间间隔。下面的示例使用每个块10 分钟作为间隔:

-- 基础表,time字段是整数
CREATE TABLE devices(
  time BIGINT,        -- Time in minutes since epoch
  cpu_usage INTEGER,  -- Total CPU usage
  disk_usage INTEGER, -- Total disk usage
  PRIMARY KEY (time)
);

-- Hypertable,间隔时间为10分钟
SELECT create_hypertable('devices', by_range('time', 10));

要在使用整数型时间的Hypertable上定义一个连续聚合,您需要一个函数以正确的格式获取当前时间,并将其设置为Hypertable。您可以使用 set_integer_now_func 函数来实现这一点。它可以被定义为一个普通的 PostgreSQL 函数,但必须是 STABLE,不带参数,并返回与表中时间列相同类型的整数值。当您设置好时间处理后,就可以创建连续聚合了。

创建一个函数将事件转换为Unix时间戳:

CREATE FUNCTION current_epoch() RETURNS BIGINT
LANGUAGE SQL STABLE AS $$
SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::bigint;$$;

 SELECT set_integer_now_func('devices', 'current_epoch');

devices表创建持续聚合:

CREATE MATERIALIZED VIEW devices_summary
WITH (timescaledb.continuous) AS
SELECT time_bucket('500', time) AS bucket,
   avg(cpu_usage) AS avg_cpu,
   avg(disk_usage) AS avg_disk
FROM devices
GROUP BY bucket;

插入测试数据:

CREATE EXTENSION tablefunc;

INSERT INTO devices(time, cpu_usage, disk_usage)
SELECT time,
   normal_rand(1,70,10) AS cpu_usage,
  normal_rand(1,2,1) * (row_number() over()) AS disk_usage
FROM generate_series(1,10000) AS time;

上面的SQL使用 tablefunc 扩展来生成正态分布的数据,并使用 row_number 函数将其转换为累积序列。