本文作者: CDY,观远算法团队工程师,毕业于伦敦帝国理工学院计算机系,主要研究方向为时间序列算法及其落地应用,热衷于尝试各类新奇工具库,深耕于零售消费品场景,解决供应链的优化问题,为客户提供基于机器学习的AI解决方案。概述 Kats是Facebook发布的一个专门为了时间序列服务的工具库。它作为一个Toolkit包,提供了四种简易且轻量化的API。
- 预测(封装了10+models,主要是传统的时间序列模型,这些模型有支持ensemble的API,结合时间序列特征的功能甚至可以做meta-learning)
- 检测(官方叫做detection,大致是对时间序列做类似于异常检测,change point detection之类的检测)
- 时间序列特征(API十分简单,可以得到65个时间序列相关的features)
- 模拟(simulator,可以按照某些时序特征比如seasonality去创造时间序列来方便实验)
下面的内容可能会比较细致,如果想直接看总结的优缺点建议拉到最后。
Basic Representation 首先这类库都会封装一个数据类型,用来作为它的API的调用输入。类似的库有AutoGluon的TabularDataset,fastai里的TabularDataLoaders。
- 在Kats里面,所有的pandas dataframe,series甚至numpy都可以先转成一个叫做TimeSeriesData的东西
- 由于传统时间序列并不是很关心表格类数据其他的特征,而更专注于目标列,所以其实TimeSeriesData只需要两样东西,时间和目标列
- 大致的调用方式如下,实验的dataset 是我司(Guandata)自己开发的一个机器学习工具库里的data,是一个销量预测的数据,包含的列是store,sku,date和sales。
from gaia.datasets.toy_datasets import load_interview_dataset
from kats.consts import TimeSeriesData
interview_df = load_interview_dataset()
# 首先简单的把某一个store,sku的数据拿出来做成时间序列
single_ts_df = interview_df[(interview_df['store']==0)&(interview_df['sku']==114)]
# 第一种变成Kats.TimeSeriesData的方法
# 作为时间序列只关心目标列和时间, TimeSeriesData(df), 其中df只包括两列,一列叫做time,一列叫做value
single_ts_df = single_ts_df[['date', 'sales']].rename(columns={'date':'time', 'sales':'value'})
single_kats_ts = TimeSeriesData(single_ts_df)
# 第二种是显式的传入对应列(传入的可以是dataframe,series或者numpy array)
single_kats_ts2 = TimeSeriesData(time = single_ts_df.time, value=https://www.it610.com/article/single_ts_df.value)
那么目前single_kats_ts就是一个kats.consts.TimeSeriesData的datatype的东西了,这个数据类型的大部分操作以及print之后的输出都是和pandas.DataFrame一模一样,它还比普通的pandas DataFrame多支持了一些操作,最有用也是用的最多的就是画图:
import matplotlib.pyplot as plt
%matplotlib inline
single_kats_ts.plot(cols=['value'])
plt.show()
BTW, kats当然支持multi-variate timeseries,方式就是在初始化的时候value传入一个dataframe,然后里面的列都成为variate value了。因为篇幅原因不再介绍,没有什么更特殊操作。
Simulator 在我们实际使用过程中,我们可能面对的更多的是一些表格类的数据,但在很多传统的时间序列任务中,会更关心目标列,并把它作为一条时间序列,所以我们首先先模拟出N条univariate的时间序列,模拟真实销量预测场景中by store by sku的每一条销量预测曲线。
from kats.utils.simulator import Simulator
# simulate 90 days of data
sim = Simulator(n=90, freq="D", start = "2021-01-01")
# 目前 simulator支持合成一些时间序列
# 合成的方法包括根据STL分解,或者Arima的系数产生ts,并且加入一些trend, noise等
# 简单介绍一下arima_sim是一个用ARIMA model来模拟ts的函数,参数包括ar的参数,ma的参数,d代表非平稳时间序列的单位根(unit root)数。
arima_sim_list = [sim.arima_sim(ar=[0.1, 0.05], ma = [0.04, 0.1], d = 1) for _ in range(10)]
# 通过simulator,我们还能在之前的序列基础上增加很多drift,trend,seasonality等
# 下面就是调用方式,强行在里面加一些changepoint
# generate 10 TimeSeriesData with trend shifts
trend_sim_list = [
sim.trend_shift_sim(
cp_arr = [30, 60, 75],
trend_arr=[3, 15, 2, 8],
intercept=30,
noise=50,
seasonal_period=7,
seasonal_magnitude=np.random.uniform(10, 100),
random_seed=random_seed
) for _ in range(10)
]
# generate 10 TimeSeriesData with level shifts
level_shift_list = [
sim.level_shift_sim(
cp_arr = [30, 60, 75],
level_arr=[1.35, 1.05, 1.35, 1.2],
noise=0.05,
seasonal_period=7,
seasonal_magnitude=np.random.uniform(0.1, 1.0),
random_seed=random_seed
) for _ in range(10)
]
【深度详解Facebook时序工具库Kats】通过源码可以看到,在arima_sim的时候,它模拟了arima的输出作为一个array,然后调用之前TimeSeriesData进行输出。
文章图片
类似地,Simulator还支持很多add_noise, add_trend, add_seasonality之类的函数,其实就是还在trend_shift_sim之类的函数里调用到了。
TsFeatures 这是我觉得这个库最爽的功能,靠这个功能我们可以做很多的事情。
众所周知,目前表格类数据的时序预测场景树模型是比较NB的,DL和传统时序分析似乎差点意思,我个人认为主要是在实际场景下,大部分数据和特征工程适应性更加匹配树模型。比如真实数据的seasonality可能不明显,data drift大,工业对训练模型的速度有一定消耗要求等。
但回头想,也有一部分数据,可能就用简单的arima训练就能达到不错的效果,或者在开始你需要一个不错的baseline作为基础,转发一篇大佬的文章,用这些时序的特征进行分类,观察,分析,是可以选取到不同的模型进行尝试的。那这时候,对tsFeatures的提取就异常关键了。
之前也接触过一些快速且自动提取time-series related features的库,比如tsfresh,但是在我有限的tsfresh使用尝试中,我的个人体验十分的差,总结为:
- 内存消耗大,速度慢
- 提取到大量特征(4000+)个特征,而且命名极其复杂,特征名很长。
在Kats这个库中,提取就是效率很高。
import warnings
warnings.filterwarnings('ignore')
model = TsFeatures()
# 还是之前的那一单条数据
ts_feats = model.transform(single_kats_ts)```
得到的ts_feats是一个包括了40个特征的dict:
文章图片
通过这个features的提取会发现,它其实有很多种不同类型的特征分组,比如length, mean, var是统计值特征, 大量的acf,pacf等自相关性特征,entropy, stability等稳定性特征,我们通过源码可以看到,其实Kats内部对特征的确进行了分组。
文章图片
并且根据分组,你还可以通过参数去控制你要选择提取什么类的特征,或者不提取什么类型的特征。同理还有很多参数去控制提取特征时候的window_size, lag_size, stl_period等,这些参数大部分要看源码注释才能发现,目前还没有官方API。
model = TsFeatures(selected_features=['stl_features'])
model.transform(single_kats_ts)
# 如果要反向选择,则是将这个组或者一个特征的value设为False
model = TsFeatures(stl_features = False)
model.transform(single_kats_ts)
那么这些特征对于我们分析时间序列有什么作用呢,比如我们可以根据这些统计值,对时间序列进行分类,对于不同的分类做不同的模型,下面是一个例子:
fcst_ts_list = []
fcst_name_list = []
for name, group in interview_df.groupby(['store', 'sku']):
fcst_df = group[['date', 'sales']].rename(columns={'date':'time', 'sales':'value'})
fcst_ts_df = TimeSeriesData(fcst_df)
fcst_ts_list.append(fcst_ts_df)
fcst_name_list.append(name)
model = TsFeatures(selected_features=['stl_features','statistics'])
# 生成对应每个pair的特征
output_features = [model.transform(ts) for ts in fcst_ts_list]
# 将生成的特征和store sku联系回去
df_features = pd.DataFrame(output_features)
fcst_name_df = pd.DataFrame(fcst_name_list)
fcst_name_df.columns = ['store', 'sku']
df_features = pd.concat([fcst_name_df, df_features], axis=1)```
文章图片
得到了每条时间序列的时序相关特征之后,我们就可以对它进行聚类操作:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
X_feats = StandardScaler().fit_transform(df_features.values)
# 为了在一个坐标系里面画出这些时间序列降维后的结果,暂且把它降到2d
pca_res = PCA(n_components=2).fit_transform(X=X_feats)
df_features['pca_component_1'] = pca_res[:,0]
df_features['pca_component_2'] = pca_res[:,1]
# 画图
df_features.plot(x='pca_component_1', y='pca_component_2', kind='scatter')
# 聚类
kmeans = KMeans(n_clusters=3).fit(df_features[['pca_component_1', 'pca_component_2']])
# 看看每个类的value_counts
print(np.unique(kmeans.labels_, return_counts=True))```
Detection 之前的两大Features,Simulator是为了更好的创造模拟时间序列,但是在实际工程化项目中实用性比较低, tsFeature这块功能全面,但是很多项目可能不会这么细节的去分析每一条序列的特征。
而Detection这一块是Kats这个库花了最大量的代码量的一个功能点,它的主要作用是在异常值的检测,恢复,大幅度的changepoint的检测等。在之前如果在时序预测的场景中遇到要进行异常值检测的话,我一般都会用Pyod。现在在看完这个库之后,对于time-series specific的场景,可能就会多一种选择了。
文章图片
我们先从一个比较高的维度看一下Kats Detection实现了什么功能
从源码来看kats.detectors.detector里面实现了一个Detector抽象类:
文章图片
大量的具体实现的检测器都继承了这个类,通过方法和对应签名还是注释看应该就是detector()去检测对应你想要检测的异常点,然后remover()方法进行去除异常点操作,或者选择interpolate=True应该可以有一些插值操作。
从具体检测器的分类上来看,检测器分为两大类,一类是专门用来检测离群点异常点,一类是专门用来检测突变点(changepoint)
Outlier Detection Outlier Detection的用法是相对简单的,它的原理实际上就是调用了statsmodels.tsa.seasonal.seasonal_decompose对时间序列进行分解,如果剩下的序列seasonality或者trend很明显的话,再去除seasonality和trend,在剩下相对稳定,没有季节性,趋势性影响的序列中,如果还是一个超过3倍四分位距(inter quartile range)的点则为离群点。
# 该outlier detector只能处理univariate time series, decomp参数是指它被进行seasonal_decompose的方法, iqr_mult是大于几倍的iqr被认为是离群点
ts_outlierDetection = OutlierDetector(single_kats_ts, decomp = 'additive', iqr_mult= 3)
ts_outlierDetection.detector() # apply OutlierDetector
# 查看对应所有outliers 的time_index
print(ts_outlierDetection.outliers[0])
# 之前在基抽象类中看到的remover,如果选择interpolate的话,会是一个线性插值
ts_outliers_removed = ts_outlierDetection.remover(interpolate = False) # No interpolation
ts_outliers_interpolated = ts_outlierDetection.remover(interpolate = True) # With interpolation```
文章图片
类似的outlier detector应该还有:
# 暂时还没用过,用完之后如果觉得效果好补充相关代码
from kats.detectors.prophet_detector import ProphetDetectorModel
文章图片
可以从源码中看到ProphetDetectorModel继承的基类是DetectorModel,实现的方法是fit_predict,比如对于ProphetDetectorModel,需要输入的是historical_data和未来希望找anomaly point的data
self, data: TimeSeriesData, historical_data: Optional[TimeSeriesData] = None
) -> AnomalyResponse:
"""Trains a model, and returns the anomaly scores.Returns the AnomalyResponse, when data is passed to it.Args:
data: TimeSeriesData on which detection is run.
historical_data: TimeSeriesData corresponding to history. History ends exactly where
the data begins.Returns:
AnomalyResponse object. The length of this object is same as data. The score property
gives the score for anomaly.
"""# train on historical, then predict on all data.```
changepoint detection 所谓Changepoint Detection,是有定义的,Wikipedia:In statistical analysis, change detection or change point detection tries to identify times when the probability distribution of a stochastic process or time series changes,最简单的就可能是改变了整个时间序列的平均值。
文章图片
那么Kats这个库在这一块加入了很多模型,看了一些简单的介绍一下。
CusumDetector Cusum的全程是cumulative sum,是一种在信号处理领域检测信号突变的一种算法。
在Kats的实现里,它做了两件事情:
1、根据cusum的算法,找出一个changepoint,这个changepoint的满足,在它之前和之后的时间的累计和突变最大。这是一个迭代的寻找,初始化为整个序列最中心的点,然后iteratively开始移动寻找。
2、对这个点进行hypothesis test,这里的null hypothesis是这个找到的changepoint没有平均值上的突变,如果它reject null hypothesis 则视为通过假设判定。
值得注意的是,这个算法只能找出一个changpoint。
>>> # Univariate CUSUM
>>> timeseries = TimeSeriesData(...)>>> detector = CusumDetector(timeseries)
>>> #Run detector
>>> changepoints = detector.detector()
>>> # Plot the results
>>> detector.plot(changepoints)
文章图片
之前说了这个Detector只能找出一个changpoint,那如果一个时间序列里面有超过一个changepoint,或者你拿到一个真实销量曲线,发现2020年上半年都是疫情期间,销量很多异常,其实你在寻找突变点的时候并不想考虑那一段疫情的异常销量怎么办呢,这时候可以使用interest_window参数:
文章图片
但是,如果你将这个参数选取到[40,60],你将看到:
文章图片
这为什么没有找到changepoint呢,背后的原因还是和这个算法实现本身有关系的,在加入interest window之后,算法首先会在你的window内寻找changepoint,还记得之前的算法有两步,首先找到changepoint,然后通过hypothesis test,也就是reject null hypothesis。然后在window里面就有一个changepoint的结果了,接下来你还是要将这个结果在整个时间序列的数据里一起做一个hypothesis test,如果这个点在整个时间序列上都通过了hypothesis test, 才能视为changpoint,这也就是为什么在40-60的window上没有找到突变点。
使用interest point这个参数,我们拆分时间序列去找到多个changepoint。
文章图片
Forecasting 这一块是我认为比较鸡肋的一块,是对各种ts传统预测模型的封装(Prophet, Arima, LSTM, etc),但是可惜的是依然没有自适配多条时间序列并行预测的API,如果通过python multiprocessing去做多线程速度依然很慢(lgb 3分钟能跑完的数据跑了大概50mins),后续可能会尝试另一个似乎很厉害的Ray去试一试。\
总结 综合来看,总结一下Kats这个库的优缺点:
优点
- Easy to use
- 功能全面
缺点
- 只是封装了一堆模型并且加入了ensemble的API,感觉在运行速度上和单独调用Prophet/Arima并没有什么提升,也没有支持多条时间序列的并行。
- 目前还缺乏比较完备的官方API文档