更强大更灵活更全面丨一文搞懂DolphinDB窗口计算

枕上诗书闲处好,门前风景雨来佳。这篇文章主要讲述更强大更灵活更全面丨一文搞懂DolphinDB窗口计算相关的知识,希望能为你提供帮助。
在时序数据的处理中经常需要使用?窗口计算??。在DolphinDB中,窗口计算不仅仅应用于全量的历史数据计算,还可以应用于增量的流计算。?窗口函数??既可应用于SQL(处理表中的列),也可应用于面板数据(处理?矩阵??中的列)。DolphinDB对于窗口计算进行了精心优化,与其它系统相比,拥有显著的性能优势。除此之外,DolphinDB的窗口函数使用上更加灵活,不仅内置的或自定义的?vector函数??都可用于窗口计算,而且可以多个函数嵌套使用。
本篇将系统的介绍DolphinDB的窗口计算,从概念划分、应用场景、指标计算等角度,帮助用户快速掌握和运用DolphinDB强大的窗口计算功能。
本篇所有代码支持DolphinDB 1.30.15,2.00.3及以上版本。
1.30.7,2.00.0以上版本支持绝大部分代码,细节部分会在小节内部详细说明。
1. 窗口的概念及分类DolphinDB内有四种窗口,分别是:滚动窗口、滑动窗口、累计窗口和不定长窗口(包括会话窗口和segment window)。
在DolphinDB中,窗口的度量标准有两种:数据行数和时间。 为了方便理解,可以参考下图:

本章节将介绍各个窗口类型的概念,具体的应用实例会在第2-4章节详细介绍。
1.1 滚动窗口
滚动窗口将每个数据分配到一个指定大小的窗口中。通常滚动窗口大小固定,且相邻两个窗口没有重复的元素。
滚动窗口根据度量标准的不同可以分为2种:

  • 以行数划分窗口
图1-1-1 指定窗口大小为3行记录,横坐标以时间为单位。从图上可以看出,按每三行记录划分为了一个窗口,窗口之间的元素没有重叠。
  • 以时间划分窗口
图1-1-2 指定窗口大小为3个时间单位,横坐标以时间为单位。与以行数划分窗口不同,按时间划分的窗口内记录数是不固定的。从两个图中可以看出,同一数据按不同单位类型划分窗口的结果是不同的。

1.2 滑动窗口
滑动窗口的模式是:一定长度的窗口,根据步长,进行滑动。与滚动窗口不同,滑动窗口相邻两个窗口可能包括重复的元素。滑动窗口根据步长和窗口采用不同的度量标准,可以分成以下4种:
  • 步长为1行, 窗口为n行
图1-2-1 指定窗口大小为6行记录,窗口每次向后滑动1行记录。
  • 步长为1行,窗口为指定时间
图1-2-2 指定窗口大小为3个时间单位,窗口以右边界为基准进行前向计算,窗口每次向后滑动1行记录。
  • 步长为时间,窗口为n个步长时间
图1-2-3 指定窗口大小为4个时间单位,每次向后滑动2个时间单位。
  • 步长为n行,窗口为m行
图1-2-4 指定窗口大小为5行记录,窗口每次向后滑动3行记录。

1.3 累计窗口
累计窗口,即窗口的起始边界固定,结束边界累计右移。根据数据的增加或时间的增长,窗口的大小会变大。 累计窗口根据度量标准的不同可以分为2种:
  • 步长为指定时间单位
图1-3-1 窗口右边界每次右移2个时间单位,窗口大小累计增加。
  • 步长为1行
图1-3-2 窗口右边界每次右移1行,窗口大小累计增加。

1.4 不定长窗口
1.4.1 会话窗口
会话窗口是根据指定时间长度(session gap)切分窗口:若某条数据之后指定时间长度内无数据进入,则该条数据为一个窗口的终点,之后第一条新数据为另一个窗口的起点。 会话窗口的窗口大小可变,窗口的度量方式为时间。

1.4.2 segment窗口
segment窗口是根据给定的数据来切分窗口,连续的相同元素为一个窗口。窗口大小可变,窗口的度量方式为行。

2. SQL中的窗口计算以及窗口连接计算SQL中的窗口计算一般涉及滚动窗口,滑动窗口,累计窗口以及segment窗口。DolphinDB中也有涉及窗口计算的window join窗口连接。本章将对上述几个窗口计算一一介绍。
2.1 SQL中的窗口计算
2.1.1 滚动窗口
2.1.1.1 时间维度的滚动窗口
在SQL中,可使用??interval??,  ??bar??,  ??dailyAlignedBar??等函数配合??group by??语句实现滚动窗口的聚合计算。
以??bar??函数为例,下面的例子是将10:00:00到10:05:59每秒更新的数据,每2分钟统计一次交易量之和:
t=table(2021.11.01T10:00:00..2021.11.01T10:05:59 as time, 1..360 as volume)
select sum(volume) from t group by bar(time, 2m)

# output

bar_timesum_volume
------------------- ----------
2021.11.01T10:00:00 7260
2021.11.01T10:02:00 21660
2021.11.01T10:04:00 36060

??bar??函数的分组规则是根据每条记录最近的能整除duration参数的时间作为开始时间的,但是对于一些开始时间不能直接被整除的场景,??bar??函数不适用。 在金融场景中,往往在交易时段之外也有一些数据输入,但是在做数据分析的时候并不会用到这些数据;在期货市场,通常涉及到两个时间段,有些交易时段会隔天。??dailyAlignedBar??函数可以设置每天的起始时间和结束时间,很好地解决这类场景的聚合计算问题。
以期货市场为例,数据模拟为国内期货市场两天的两个交易时段下午1:30-3:00和晚上9:00-凌晨2:30。使用??dailyAlignedBar??函数计算每个交易时段中的7分钟均价。
sessions = 13:30:00 21:00:00
ts = 2021.11.01T13:30:00..2021.11.01T15:00:00 join 2021.11.01T21:00:00..2021.11.02T02:30:00
ts = ts join (ts+60*60*24)
t = table(ts, rand(10.0, size(ts)) as price)

select avg(price) as price, count(*) as count from t group by dailyAlignedBar(ts, sessions, 7m) as k7

# output

k7pricecount
------------------- ----------------- -----
2021.11.01T13:30:00 4.815287529108381 420
2021.11.01T13:37:00 5.265409774828835 420
2021.11.01T13:44:00 4.984934388122167 420
...
2021.11.01T14:47:00 5.031795592230213 420
2021.11.01T14:54:00 5.201864532018313 361
2021.11.01T21:00:00 4.945093814017518 420


//如果使用bar函数会不达预期
select avg(price) as price, count(*) as count from t group by bar(ts, 7m) as k7

# output

k7pricecount
------------------- ----------------- -----
2021.11.01T13:26:00 5.220721067537347 180//时间从13:26:00开始,不符合预期
2021.11.01T13:33:00 4.836406542137931 420
2021.11.01T13:40:00 5.100716347573325 420
2021.11.01T13:47:00 5.041169475132067 420
2021.11.01T13:54:00 4.853431270784876 420
2021.11.01T14:01:00 4.826169502311608 420

??interval??函数的主要应用是插值。如期货市场中有一些不活跃的期货,一段时间内可能都没有报价,但是在数据分析的时候需要每2秒都需要输出该期货的数据,缺失的数据根据前面的值进行插值;如果这2秒内有重复的值,则用最后一个作为输出值。这个场景下就需要用到??interval??函数。
t=table(2021.01.01T01:00:00+(1..5 join 9..11) as time, take(`CLF1,8) as contract, 50..57 as price)

select last(contract) as contract, last(price) as price from t group by interval(time, 2s,"prev")

# output

interval_timecontract price
------------------- -------- -----
2021.01.01T01:00:00 CLF150
2021.01.01T01:00:02 CLF152
2021.01.01T01:00:04 CLF154
2021.01.01T01:00:06 CLF154
2021.01.01T01:00:08 CLF155
2021.01.01T01:00:10 CLF157

//如果使用bar函数会不达预期

select last(contract) as contract, last(price) as price from t group by bar(time, 2s)

bar_timecontract price
------------------- -------- -----
2021.01.01T01:00:00 CLF150
2021.01.01T01:00:02 CLF152
2021.01.01T01:00:04 CLF154
2021.01.01T01:00:08 CLF155
2021.01.01T01:00:10 CLF157

2.1.1.2 记录数维度的滚动窗口
除了时间维度可以做滚动窗口计算之外,记录数维度也可以做滚动窗口计算。 在股票市场临近收盘的时候,往往一分钟之内的交易量、笔数是非常大的,做策略时如果单从时间维度去触发可能会导致偏差。因此分析师有时会想要从每100笔交易而非每一分钟的角度去做策略,这个时候就可以用??rolling??函数实现。
下面是某天股票市场最后一分钟内对每100笔交易做成交量之和的例子:
t=table(2021.01.05T02:59:00.000+(1..2000)*100 as time, take(`CL,2000) as sym, 10* rand(50, 2000) as vol)

select rolling(last,time,100) as last_time,rolling(last,t.sym,100) as sym, rolling(sum,vol,100) as vol_100_sum from t

# output (每次结果会因为rand函数结果而不同)

last_timesym vol_100_sum
----------------------- --- -----------
2021.01.05T02:59:00.100 CL26480
2021.01.05T02:59:00.200 CL25250
2021.01.05T02:59:00.300 CL25910
2021.01.05T02:59:00.400 CL22890
2021.01.05T02:59:00.500 CL24000
...

2.1.2 滑动窗口
滑动窗口计算涉及步长与窗口长度两个维度:
  • 步长是指计算如何触发:每隔一定数量的行或者每隔一定时间长度;
  • 窗口长度是指每次计算时包含的数据量:一定数量的行的数据或者一定时间长度的数据。
2.1.2.1 步长为1行,窗口长度为n行
此类情况可使用m系列函数,??moving???函数,或者??rolling??。下面以????msum????为例,滑动计算窗口长度为5行的vol值之和。
t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 as time, 1..10 as vol)

select time, vol, msum(vol,5,1) from t

# output

timevol msum_vol
------------------- --- --------
2021.11.01T10:00:00 11
2021.11.01T10:00:01 23
2021.11.01T10:00:02 36
2021.11.01T10:00:05 410
2021.11.01T10:00:06 515
...

DolphinDB SQL可以通过??context by???对各个不同的symbol在组内进行窗口计算。??context by??是DolphinDB独有的功能,是对标准SQL语句的拓展,具体其他用法参照:????context by???
t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 join 0 1 2 5 6 9 10 17 18 30 as time, 1..20 as vol, take(`A,10) join take(`B,10) as sym)

select time, sym, vol, msum(vol,5,1) from t context by sym

# output

timesym vol msum_vol
------------------- --- --- --------
2021.11.01T10:00:00 A11
2021.11.01T10:00:01 A23
2021.11.01T10:00:02 A36
...
2021.11.01T10:00:30 A1040
2021.11.01T10:00:00 B1111
2021.11.01T10:00:01 B1223
...
2021.11.01T10:00:30 B2090

m系列函数是经过优化的窗口函数,如果想要使用自定义函数做窗口计算,DolphinDB支持在??moving??函数和??rolling??函数中使用自定义聚合函数。下面以??moving??嵌套自定义聚合函数为例: 以下的行情数据有四列(代码,日期,close和volume),按照代码分组,组内按日期排序。设定窗口大小为20,在窗口期内按照volume排序,取volume最大的五条数据的平均close的计算。
//t是模拟的四列数据
t = table(take(`IBM, 100) as code, 2020.01.01 + 1..100 as date, rand(100,100) + 20 as volume, rand(10,100) + 100.0 as close)

//1.30.15及以上版本可以用一行代码实现
//moving支持用户使用自定义匿名聚合函数(https://www.dolphindb.cn/cn/help/130/Functionalprogramming/AnonymousFunction.html)
select code, date, moving(defg(vol, close)return close[isort(vol, false).subarray(0:min(5,close.size()))].avg(), (volume, close), 20) from t context by code

//其他版本可以用自定义命名聚合函数实现:
defg top_5_close(vol,close)
return close[isort(vol, false).subarray(0:min(5,close.size()))].avg()

select code, date, moving(top_5_close,(volume, close), 20) from t context by code

在做数据分析的时候,还会经常用到窗口嵌套窗口的操作。 举一个更复杂的例子:在做  101 Formulaic Alphas中98号因子计算的时候,DolphinDB可以运用窗口嵌套窗口的方法,将原本在C#中需要几百行的代码,简化成几行代码,且计算性能也有接近三个数量级的提升。 trade表有需要可以自行模拟数据,或用sample数据??CNTRADE??。
// 输入表trade的schema如下,如需要可自行模拟数据。

nametypeString typeInt
---------- ---------- -------
ts_codeSYMBOL17
trade_date DATE6
openDOUBLE16
volDOUBLE16
amountDOUBLE16

// alpha 98 计算:

def normRank(x)
return rank(x)\\x.size()


def alpha98SQL(t)
update t set adv5 = mavg(vol, 5), adv15 = mavg(vol, 15) context by ts_code
update t set rank_open = normRank(open), rank_adv15 = normRank(adv15) context by trade_date
update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by ts_code
return select ts_code, trade_date, normRank(decay7)-normRank(decay8) as a98 from t context by trade_date


input = select trade_date,ts_code,amount*1000/(vol*100 + 1) as vwap,vol,open from trade
timer alpha98DDBSql = alpha98SQL(input)

2.1.2.2 步长为1行,窗口为指定时间长度
此类情况可使用m系列或者tm系列函数。下面以??tmsum??为例,计算滑动窗口长度为5秒的vol值之和。
//1.30.14,2.00.2以上版本支持```tmsum```函数。
t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 as time, 1..10 as vol)
select time, vol, tmsum(time,vol,5s) from t

# output
timevol tmsum_time
------------------- --- ----------
2021.11.01T10:00:00 11
2021.11.01T10:00:01 23
2021.11.01T10:00:02 36
2021.11.01T10:00:05 49
2021.11.01T10:00:06 512
2021.11.01T10:00:09 615
2021.11.01T10:00:10 718
2021.11.01T10:00:17 88
2021.11.01T10:00:18 917
2021.11.01T10:00:30 1010

实际场景中,计算历史分位的时候也会广泛运用到这类情况的窗口计算,具体在3.1.1介绍。
2.1.2.3 步长为时间长度,窗口为n个步长时间
此类情况可使用??interval??函数配合??group by??语句。下面的例子以5秒为窗口步长,10秒为窗口大小,计算vol值之和。
【更强大更灵活更全面丨一文搞懂DolphinDB窗口计算】推荐使用1.30.14, 2.00.2及以上版本使用interval函数。
t=table(2021.11.01T10:00:00+0 3 5 6 7 8 15 18 20 29 as time, 1..10 as vol)
select sum(vol) from t group by interval(time, 10s, "null", 5s)

# output

interval_timesum_vol
------------------- -------
2021.11.01T10:00:00 21
2021.11.01T10:00:05 18
2021.11.01T10:00:10 15
2021.11.01T10:00:15 24
2021.11.01T10:00:20 19
2021.11.01T10:00:25 10

2.1.1.1中interval的场景可以看作是窗口大小与步长相等的特殊的滑动窗口,而本节则是窗口大小为n倍步长时间的滑动窗口。??interval??函数为数据分析提供了更便捷的工具。
2.1.2.4 步长为n行,窗口为k*n行
此类情况可使用高阶函数????rolling????。下面的例子计算步长为3行,窗口长度为6行的vol值之和。与??interval???函数不同的是,??rolling??不会对缺失值进行插值,如果窗口内的元素个数不足窗口大小,该窗口不会被输出。 该例子中,数据一共是10条,在前两个窗口计算完之后,第三个窗口因为只有4条数据,所以不输出第三个窗口的结果。
t=table(2021.11.01T10:00:00+0 3 5 6 7 8 15 18 20 29 as time, 1..10 as vol)
select rolling(last,time,6,3) as last_time, rolling(sum,vol,6,3) as sum_vol from t

# output

last_timesum_vol
------------------- -------
2021.11.01T10:00:08 21
2021.11.01T10:00:20 39

2.1.3 累计窗口
累计窗口有两种情况:一种是步长是1行,另一种是步长为指定时间长度。
2.1.3.1 步长为1行
步长为1行的累计窗口计算在SQL中通常直接用??cum??系列函数。下面的是累计求和??cumsum??的例子:
t=table(2021.11.01T10:00:00..2021.11.01T10:00:04 join 2021.11.01T10:00:06..2021.11.01T10:00:10 as time,1..10 as vol)
select *, cumsum(vol) from t

# output

timevol cum_vol
------------------- --- -------
2021.11.01T10:00:00 11
2021.11.01T10:00:01 23
2021.11.01T10:00:02 36
2021.11.01T10:00:03 410
2021.11.01T10:00:04 515
2021.11.01T10:00:06 621
2021.11.01T10:00:07 728
2021.11.01T10:00:08 836
2021.11.01T10:00:09 945
2021.11.01T10:00:10 1055

在实际场景中经常会用cum系列函数与??context by??连用,做分组内累计计算。比如行情数据中,根据各个不同股票的代码,做各自的累计成交量。
t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 join 0 1 2 5 6 9 10 17 18 30 as time, 1..20 as vol, take(`A,10) join take(`B,10) as sym)
select*, cumsum(vol) as cumsum_vol from t context by sym

# output

timevol sym cumsum_vol
------------------- --- --- ----------
2021.11.01T10:00:00 1A1
2021.11.01T10:00:01 2A3
...
2021.11.01T10:00:18 9A45
2021.11.01T10:00:30 10A55
2021.11.01T10:00:00 11B11
2021.11.01T10:00:01 12B23
...
2021.11.01T10:00:18 19B135
2021.11.01T10:00:30 20B155

2.1.3.2 步长为指定时间长度
要在SQL中实现步长为指定时间长度的累计窗口计算,可以使用??bar??类函数搭配??cgroup??来实现。
t=table(2021.11.01T10:00:00..2021.11.01T10:00:04 join 2021.11.01T10:00:06..2021.11.01T10:00:10 as time,1..10 as vol)
select sum(vol) from t cgroup by bar(time, 5s) as time order by time

# output

timesum_vol
------------------- -------
2021.11.01T10:00:00 15
2021.11.01T10:00:05 45
2021.11.01T10:00:10 55

2.1.4 segment窗口
以上所有例子中,窗口大小均固定。在DolphinDB中亦可将连续的相同元素做为一个窗口,用??segment??来实现。下面的例子是根据order_type中的数据进行窗口分割,进行累计求和计算。 实际场景中,??segment??经常用于逐笔数据中,连续相同的order_type做累计成交额。
vol = 0.1 0.2 0.1 0.2 0.1 0.2 0.1 0.2 0.1 0.2 0.1 0.2
order_type = 0 0 1 1 1 2 2 1 1 3 3 2;
t = table(vol,order_type);
select *, cumsum(vol) as cumsum_vol from t context by segment(order_type);

# output

vol order_type cumsum_vol
--- ---------- ----------
0.1 00.1
0.2 00.3
0.1 10.1
0.2 10.3
0.1 10.4
0.2 20.2
0.1 20.3
0.2 10.2
0.1 10.3
0.2 30.2
0.1 30.3
0.2 20.2

2.2 SQL中的窗口连接计算
在DolphinDB中,除了常规的窗口计算之外,还支持窗口连接计算。即在表连接的同时,进行窗口计算。这里用到的函数有??wj??和??pwj??。
window join在表连接的同时对右表进行步长为1行,窗口为时间长度的窗口计算。因为窗口的左右边界均可以指定,也可以为负数,所以也可以看作非常灵活的滑动窗口。详细用法参见用户手册????window join??。
//data
t1 = table(1 1 2 as sym, 09:56:06 09:56:07 09:56:06 as time, 10.6 10.7 20.6 as price)
t2 = table(take(1,10) join take(2,10) as sym, take(09:56:00+1..10,20) as time, (10+(1..10)\\10-0.05) join (20+(1..10)\\10-0.05) as bid, (10+(1..10)\\10+0.05) join (20+(1..10)\\10+0.05) as offer, take(100 300 800 200 600, 20) as volume);

//window join calculation
wj(t1, t2, -5s:0s, < avg(bid)> , `sym`time);

# output

sym timepriceavg_bid
--- -------- ----- -------
109:56:06 10.6 10.3
109:56:07 10.7 10.4
209:56:06 20.6 20.3

由于窗口可以灵活设置,所以不仅是多表连接的时候会用到,单表内部的窗口计算也可以用到window join。下面的例子可以看作是t2表中每一条数据做一个(time-6s)到(time+1s)的计算。
t2 = table(take(1,10) join take(2,10) as sym, take(09:56:00+1..10,20) as time, (10+(1..10)\\10-0.05) join (20+(1..10)\\10-0.05) as bid, (10+(1..10)\\10+0.05) join (20+(1..10)\\10+0.05) as offer, take(100 300 800 200 600, 20) as volume);

wj(t2, t2, -6s:1s, < avg(bid)> , `sym`time);

# output

sym timebidoffer volume avg_bid
--- -------- ---- ------ ------ --------
109:56:01 10.05 10.15 10010.1
...
109:56:08 10.75 10.85 80010.5
109:56:09 10.85 10.95 20010.6
109:56:10 10.95 11.05 60010.65
209:56:01 20.05 20.15 10020.1
209:56:02 20.15 20.25 30020.15
...
209:56:08 20.75 20.85 80020.5
209:56:09 20.85 20.920020.6
209:56:10 20.95 21.05 60020.65

3.面板数据使用窗口计算在DolphinDB中,面板数据可以是矩阵也可以是表。表的窗口计算在前一章节已经描述,所以在这一章节中着重讨论矩阵的计算。
3.1 面板数据的滑动窗口计算
滑动窗口m系列函数也可以适用于面板数据,即在矩阵每列内进行计算,返回一个与输入矩阵维度相同的矩阵。如果滑动维度为时间,则要先使用????setIndexedMatrix!????函数将矩阵的行与列标签设为索引。这里需要注意的是,行与列标签均须严格递增。在矩阵计算中,IndexedMatrix可以帮助对齐行与列的不同标签,非常实用。通常我们会使用??pivot by???语句配合??exec???或者??panel??函数将竖表转化为宽表(矩阵),因为这个操作会将矩阵的行与列按递增方式排列,方便我们设置索引矩阵以及后期的计算。
首先我们新建一个矩阵,并将其设为IndexedMatrix:
m=matrix(1..4 join 6, 11..13 join 8..9)
m.rename!(2020.01.01..2020.01.04 join 2020.01.06,`A`B)
m.setIndexedMatrix!();

面板数据的滑动窗口大小支持两种度量方式:记录数和时间。
3.1.1 步长为1行,窗口为n行
m系列函数的参数可以是一个正整数(记录数维度)或一个 duration(时间维度)。通过设定不同的参数,可以指定理想的滑动窗口类型。
以??msum??滑动求和为例。以下例子是对一个矩阵内部,每一列做窗口大小为3行的滑动求和。
msum(m,3,1)

# output

AB
-- --
2020.01.01|111
2020.01.02|323
2020.01.03|636
2020.01.04|933
2020.01.06|13 30

矩阵运算中,也可以做复杂的窗口嵌套。曾在2.1.2.1节中提到的98号因子也可以在矩阵中通过几行代码实现(trade表有需要可以自行模拟数据,或用sample数据??CNTRADE??):
// 输入表trade的schema如下,如需要可自行模拟数据:

nametypeString typeInt
---------- ---------- -------
ts_codeSYMBOL17
trade_date DATE6
openDOUBLE16
volDOUBLE16
amountDOUBLE16

// alpha 98 的矩阵计算

def prepareDataForDDBPanel()
t = select trade_date,ts_code,amount*1000/(vol*100 + 1) as vwap,vol,open from trade
return dict(`vwap`open`vol, panel(t.trade_date, t.ts_code, [t.vwap, t.open, t.vol]))


def myrank(x)
return rowRank(x)\\x.columns()


def alpha98Panel(vwap, open, vol)
return myrank(mavg(mcorr(vwap, msum(mavg(vol, 5), 26), 5), 1..7)) - myrank(mavg(mrank(9 - mimin(mcorr(myrank(open), myrank(mavg(vol, 15)), 21), 9), true, 7), 1..8))


input = prepareDataForDDBPanel()
alpha98DDBPanel = alpha98Panel(input.vwap, input.open, input.vol)

3.1.2 步长为1行,窗口为指定时间
以??msum??滑动求和为例。以下例子是对一个矩阵内部,每一列根据左边的时间列做窗口大小为3天的滑动求和。
msum(m,3d)

# output

AB
-- --
2020.01.01|111
2020.01.02|323
2020.01.03|636
2020.01.04|933
2020.01.06|10 17

在实际运用中,这类矩阵窗口运算是非常常见的。比如在做历史分位的计算中,将数据转化为IndexedMatrix之后,直接用一行代码就可以得到结果了。
下面例子是对m矩阵做10年的历史分位计算:
//推荐使用1.30.14, 2.00.2及以上版本来使用interval函数。
mrank(m, true, 10y, percent=true)

# output
A B
- ----
2020.01.01|1 1
2020.01.02|1 1
2020.01.03|1 1
2020.01.04|1 0.25
2020.01.06|1 0.4

3.2 面板数据的累计窗口计算
在面板数据中,累计函数??cum??系列也可以直接使用。 以??cumsum??为例:
cumsum(m)

# output

AB
-- --
2020.01.01|111
2020.01.02|323
2020.01.03|636
2020.01.04|10 44
2020.01.06|16 53

4.流式数据的窗口计算在DolphindDB中,设计了许多内置的流计算引擎。各类引擎都有不同的用法,有些支持聚合计算,有些则支持滑动窗口或者累计窗口计算,在此基础上也有针对于流数据的会话窗口引擎。可以满足不同的场景需求。下面根据不同窗口以及引擎分别介绍。
4.1 滚动窗口在流计算中的应用
实际场景中,滚动窗口计算在流数据中应用得最为广泛。比如5分钟k线,1分钟累计交易量等等的应用,都需要用到滚动窗口计算。滚动窗口在流计算中的应用是通过各种时间序列引擎实现的。
??createTimeSeriesEngine??时间序列引擎应用的很广泛,与他类似的引擎还有??createDailyTimeSeriesEngine??与??createSessionWindowEngine??。??createDailyTimeSeriesEngine??与??dailyAlignedBar??类似,可以指定自然日之内的时间段进行窗口计算,而非按照流入数据的时间窗口聚合计算。??createSessionWindowEngine??会在4.3中详细介绍。 本节以??createTimeSeriesEngine??为例。下例中,时间序列引擎timeSeries1订阅流数据表trades,实时计算表trades中过去1分钟内每只股票交易量之和。
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
timeSeries1 = createTimeSeriesEngine(name="timeSeries1", windowSize=60000, step=60000, metrics=< [sum(volume)]> , dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="timeSeries1", offset=0, handler=append!timeSeries1, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

sleep(10)

select * from output1;

# output

timesym sumVolume
----------------------- --- ---------
2018.10.08T01:02:00.000 A38
2018.10.08T01:02:00.000 B40
2018.10.08T01:03:00.000 A25
2018.10.08T01:03:00.000 B9


//to drop the time series engine
dropStreamEngine(`timeSeries1)
unsubscribeTable(tableName="trades", actionName="timeSeries1")
undef("trades",SHARED)

4.2 滑动、累计窗口在流计算中的应用
另一个常用的引擎是响应式状态引擎??createReactiveStateEngine??。在这个引擎中,我们可以使用经过优化的状态函数,其中包括累计窗口函数(cum系列函数)和滑动窗口函数(m系列函数以及tm系列函数)。
??createReactiveStateEngine??响应式状态引擎的功能非常强大,可以让流数据像SQL一样处理,实现批流一体。下面的例子同时展示了cum系列函数,m系列函数和tm系列函数在??createReactiveStateEngine??响应式状态引擎中的作用。
//1.30.14,2.00.2以上版本支持tmsum函数。
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output2 = table(10000:0, `sym`time`Volume`msumVolume`cumsumVolume`tmsumVolume, [ SYMBOL,TIMESTAMP,INT, INT,INT,INT])
reactiveState1= createReactiveStateEngine(name="reactiveState1", metrics=[< time> ,< Volume> ,< msum(volume,2,1)> ,< cumsum(volume)> ,< tmsum(time,volume,2m)> ], dummyTable=trades, outputTable=output2, keyColumn="sym")
subscribeTable(tableName="trades", actionName="reactiveState1", offset=0, handler=append!reactiveState1, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

sleep(10)

select * from output2

# output

sym timeVolume msumVolume cumsumVolume tmsumVolume
--- ----------------------- ------ ---------- ------------ -----------
A2018.10.08T01:01:01.785 10101010
B2018.10.08T01:01:02.125 26262626
A2018.10.08T01:01:12.457 28383838
B2018.10.08T01:01:10.263 14404040
A2018.10.08T01:02:10.789 15435353
B2018.10.08T01:02:12.005 9234949
A2018.10.08T01:02:30.021 10256363
A2018.10.08T01:04:02.236 29399254
B2018.10.08T01:04:04.412 32418141
B2018.10.08T01:04:05.152 235510464

//to drop the reactive state engine

dropAggregator(`reactiveState1)
unsubscribeTable(tableName="trades", actionName="reactiveState1")
undef("trades",SHARED)

4.3 会话窗口引擎
??createSessionWindowEngine??可以根据间隔时间(session gap)切分不同的窗口,即当一个窗口在大于session gap的时间内没有接收到新数据时i,窗口会关闭。所以这个引擎中的window size是会根据流入数据的情况发生变化的。
具体可以看以下例子:
share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
output1 = keyedTable(`time,10000:0, `time`sumVolume, [TIMESTAMP, INT])
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = < sum(volume)> , dummyTable = trades, outputTable = output1, timeColumn = `time)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!engine_sw, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
volumev = (1..n)%1000
insert into trades values(timev, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
insert into trades values(timev, volumev)

n = 3
timev = 2018.10.12T10:01:00.020 + (1..n)
volumev = (1..n)%1000
timev.append!(2018.10.12T10:01:00.027 + (1..n))
volumev.append!((1..n)%1000)
insert into trades values(timev, volumev)

select * from trades;

//传入数据如下:

timevolume
----------------------- ------
2018.10.12T10:01:00.001 1
2018.10.12T10:01:00.002 2
2018.10.12T10:01:00.003 3
2018.10.12T10:01:00.004 4
2018.10.12T10:01:00.005 5
2018.10.12T10:01:00.011 1
2018.10.12T10:01:00.012 2
2018.10.12T10:01:00.013 3
2018.10.12T10:01:00.014 4
2018.10.12T10:01:00.015 5
2018.10.12T10:01:00.021 1
2018.10.12T10:01:00.022 2
2018.10.12T10:01:00.023 3
2018.10.12T10:01:00.028 1
2018.10.12T10:01:00.029 2
2018.10.12T10:01:00.030 3


//经过createSessionWindowEngine会话窗口引擎后,根据session gap=5(ms)聚合形成的窗口计算结果为:
select * from output1

timesumVolume
----------------------- ---------
2018.10.12T10:01:00.001 15
2018.10.12T10:01:00.011 15
2018.10.12T10:01:00.021 6

// to drop SessionWindowEngine

unsubscribeTable(tableName="trades", actionName="append_engine_sw")
dropAggregator(`engine_sw)
undef("trades",SHARED)

5.窗口计算的空值处理规则在DolphinDB中,各个窗口函数的空值处理略有不同,此处分别讲述一下各个系列函数空值处理的规则:
5.1 moving,m系列函数,tm系列函数以及cum系列函数的空值处理
对于除了rank的m系列,tm系列以及cum系列窗口内的NULL值,与其聚合函数处理NULL值的规则一致,计算时忽略NULL值。 在??mrank??,??tmrank??以及??cumrank??函数中,可以指定NULL是否参与计算。
大部分moving以及m系列函数参数里都有一个可选参数 minPeriods。若没有指定 minPeriods,结果的前(window - 1)个元素为NULL;若指定了 minPeriods,结果的前( minPeriods - 1)个元素为NULL。如果窗口中的值全为NULL,该窗口的计算结果为NULL。minPeriods的默认值为window之值。
一个简单的例子:
m=matrix(1..5, 6 7 8 NULL 10)

//不指定minPeriods时,由于minPeriods默认值与window相等,所以结果的前二行均为NULL。

msum(m,3)

#0 #1
-- --


621
915
12 18

//若指定minPeriods=1,结果的前二行不是NULL值。

msum(m,3,1)

#0 #1
-- --
16
313
621
915
12 18

5.2 rolling的空值处理
与??moving??函数不同的是,??rolling??函数不输出前(window - 1)个元素的NULL值结果。可以通过下面的例子来感受:
t是一个包含NULL值的表,我们分别用??rolling??和??moving??对vol这一列做窗口为3行的窗口求和计算。
vol=1 2 3 4 NULL NULL NULL 6 7 8
t= table(vol)

//rolling做窗口为3行的滑动求和计算
rolling(sum,t.vol,3)

# output
[6,9,7,4,,6,13,21]

//moving做窗口为3行的滑动求和计算
moving(sum,t.vol,3)

# output
[,,6,9,7,4,,6,13,21]

//rolling做窗口为3行,步长为2行的窗口计算
rolling(sum,t.vol,3,2)

# output
[6,7,,13]///最后的窗口没有足够的元素时,不会输出

6. 常用指标的计算复杂度常用的m系列,tm系列函数都经过了优化,其时间复杂度为O(n),即每一次计算结果只会把位置0去掉,加入新的观察值。 而mrank与其他函数稍许不同,计算速度会比其他的慢,原因是其时间复杂度为O(mn),与其窗口大小有关,窗口越大,复杂度越高。即每一次都会将结果重置。
moving,tmoving,rolling这些高阶函数的复杂度与其参数内的func有关,是没有做过优化的。所以每一次滑动都是整个窗口对于func函数进行计算,而非m系列,tm系列函数的增量计算。
故相比于moving, tmoving, rolling, m系列和tm系列函数对于相同的计算功能会有更好的性能。
一个简单的例子:
n=1000000
x=norm(0,1, n);

//moving
timer moving(avg, x, 10);
Time elapsed:243.331 ms

//rolling
timer moving(avg, x, 10);
Time elapsed: 599.389ms

//mavg
timer mavg(x, 10);
Time elapsed: 3.501ms

7. 涉及到窗口计算的函数
聚合函数
m系列
ReactiveStateEngine 是否支持
tm系列

    推荐阅读