我可以使用`xarray.apply_ufunc`并行化`numpy.bincount`吗()

大鹏一日同风起,扶摇直上九万里。这篇文章主要讲述我可以使用`xarray.apply_ufunc`并行化`numpy.bincount`吗?相关的知识,希望能为你提供帮助。
我想使用numpy.bincountapply_ufunc API并行化xarray函数,以下代码是我尝试过的:

import numpy as np import xarray as xr da = xr.DataArray(np.random.rand(2,16,32), dims=['time', 'y', 'x'], coords={'time': np.array(['2019-04-18', '2019-04-19'], dtype='datetime64'), 'y': np.arange(16), 'x': np.arange(32)})f = xr.DataArray(da.data.reshape((2,512)),dims=['time','idx']) x = da.x.values y = da.y.values r = np.sqrt(x[np.newaxis,:]**2 + y[:,np.newaxis]**2) nbins = 4 if x.max() > y.max(): ri = np.linspace(0., y.max(), nbins) else: ri = np.linspace(0., x.max(), nbins)ridx = np.digitize(np.ravel(r), ri)func = lambda a, b: np.bincount(a, weights=b) xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

但是我收到以下错误:
--------------------------------------------------------------------------- ValueErrorTraceback (most recent call last) < ipython-input-203-974a8f0a89e8> in < module> () 12 13 func = lambda a, b: np.bincount(a, weights=b) ---> 14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_ufunc(func, *args, **kwargs) 979signature=signature, 980join=join, --> 981exclude_dims=exclude_dims) 982elif any(isinstance(a, Variable) for a in args): 983return variables_ufunc(*args)~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_dataarray_ufunc(func, *args, **kwargs) 208 209data_vars = [getattr(a, 'variable', a) for a in args] --> 210result_var = func(*data_vars) 211 212if signature.num_outputs > 1:~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, *args, **kwargs) 558raise ValueError('unknown setting for dask array handling in ' 559'apply_ufunc: {}'.format(dask)) --> 560result_data = https://www.songbingjia.com/android/func(*input_data) 561 562if signature.num_outputs == 1:< ipython-input-203-974a8f0a89e8> in < lambda> (a, b) 11 ridx = np.digitize(np.ravel(r), ri) 12 ---> 13 func = lambda a, b: np.bincount(a, weights=b) 14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)ValueError: object too deep for desired array

我有点迷失在错误产生的地方,非常感谢帮助......
答案问题是apply_along_axis迭代了应用函数的第一个参数的1D片而不是其他任何片。如果我正确理解你的用例,你实际上想要迭代权重(weights in the np.bincount signature)的一维切片,而不是整数数组(x签名中的np.bincount)。
解决此问题的一种方法是在np.bincount周围编写一个瘦包装函数,只需切换参数的顺序:
def wrapped_bincount(weights, x): return np.bincount(x, weights=weights)

然后我们可以将np.apply_along_axis与此函数一起用于您的用例:
def apply_bincount_along_axis(x, weights, axis=-1): return np.apply_along_axis(wrapped_bincount, axis, weights, x)

最后,我们可以使用apply_ufunc将这个新函数包装用于xarray,注意它可以使用dask自动并行化(同时请注意,我们不需要提供axis参数,因为xarray会自动将输入核心维度dim移动到在应用函数之前weights数组中的最后一个位置):
def xbincount(x, weights): if len(x.dims) != 1: raise ValueError('x must be one-dimensional')dim, = x.dims nbins = x.max() + 1return xr.apply_ufunc(apply_bincount_along_axis, x, weights, input_core_dims=[[dim], [dim]], output_core_dims=[['bin']], dask='parallelized', output_dtypes=[np.float], output_sizes={'bin': nbins})

将此函数应用于您的示例,然后看起来像:
xbincount(ridx, f)< xarray.DataArray (time: 2, bin: 5)> array([[0.,7.934821,34.066872,51.118065, 152.769169], [0.,11.692989,33.262936,44.993856, 157.642972]]) Dimensions without coordinates: time, bin

【我可以使用`xarray.apply_ufunc`并行化`numpy.bincount`吗()】根据需要,它也适用于dask数组:
xbincount(ridx, f.chunk({'time': 1}))< xarray.DataArray (time: 2, bin: 5)> dask.array< shape=(2, 5), dtype=float64, chunksize=(1, 5)> Dimensions without coordinates: time, bin


    推荐阅读