2.4.10 广义ufunc函数

与本节内容对应的Notebook为:02-numpy/numpy-470-gufuncs.ipynb。

从NumPy 1.8开始正式支持广义ufunc函数(generalized ufunc,以下简称gufunc)。gufunc是对ufunc的推广,所谓ufunc就是将对单个数值的运算通过广播运用到整个数组中的所有元素之上,而gufunc则是将对单个矩阵的运算通过广播运用到整个数组之上。例如numpy.linalg.inv( )是求逆矩阵的gufunc函数。在其文档中描述其输入输出数组的形状如下:

    ainv = inv(a)
    a : (..., M, M)
    ainv : (..., M, M)

输入数组a的形状中带有“...”,它表示0到任意多个轴。当它为空时,就是对单个矩阵求逆,gufunc函数将对这些轴进行广播运算。最后两个轴的长度为M,表示任意大小的方形矩阵。

NumPy中的线性代数模块linalg中提供的函数大都为广义ufunc函数。在SciPy中也提供了线性代数模块linalg,但其中的函数都是一般函数,只能对单个矩阵进行计算。关于线性代数函数库的用法将在下一章进行详细介绍。

在输出数组ainv中,由于逆矩阵的形状与原矩阵相同,因此ainv的最后两轴的形状也是(M,M)。“...”表示广播运算之后的形状,而由于矩阵求逆只对一个矩阵进行运算,因此“...”的形状和a中的“...”的形状相同。

在下面的例子中,a的形状为(10, 20, 3, 3),其中(10, 20)与“...”对应,3与M对应。而inv( )通过广播运算对10×20个形状为(3, 3)的矩阵求逆。得到的结果ainv的形状与a相同,也是(10, 20, 3, 3)。

    a = np.random.rand(10, 20, 3, 3)
    ainv = np.linalg.inv(a)
    ainv.shape
    (10, 20, 3, 3)

下面的程序验证第i行、第j列的矩阵及其逆矩阵的乘积,应该近似等于3阶单位矩阵:

    i, j = 3, 4
    np.allclose(np.dot(a[i, j], ainv[i, j]), np.eye(3))
    True

numpy.linalg.det( )计算矩阵的行列式,它也是一个gufunc函数。它的输入输出的形状为:

    adet = det(a)
    a : (..., M, M)
    adet : (...)

由于矩阵的行列式是将一个M×M的矩阵映射到一个标量,因此输出adet的形状中只包含“...”。

    adet = np.linalg.det(a)
    adet.shape
    (10, 20)

下面以多个二次函数的数据拟合为例,介绍如何使用gufunc函数提高运算效率。首先通过随机数函数创建测试用的数据x和y,这两个数组的形状都为(n,10)。其中的每行数据(x[i]和y[i])构成一个曲线拟合的数据集,它们的关系为:y=β21 x+β0 x2。现在需要计算每对数据所对应的系数β。

    n = 10000
    np.random.seed(0)
    beta = np.random.rand(n, 3)
    x = np.random.rand(n, 10)
    y = beta[:,2, None] + x*
beta[:, 1, None] + x**
2*
beta[:, 0, None]

显然使用前面介绍过的numpy.polyfit( )可以很方便地完成这个任务,下面的程序输出第42组的实际系数以及拟合的结果:

    print beta[42]
    print np.polyfit(x[42], y[42], 2)
    [ 0.0191932   0.30157482  0.66017354]
    [ 0.0191932   0.30157482  0.66017354]

只需要循环调用n次numpy.polyfit( )即可得到所需的结果,但是它的运算速度有些慢:

    %time beta2 = np.vstack([np.polyfit(x[i], y[i], 2) for i in range(n)])
    Wall time: 1.52 s
    np.allclose(beta, beta2)
    True

在numpy.polyfit( )内部实际上是通过调用最小二乘法函数numpy.linalg.lstsq( )来实现多项式拟合的,我们也可以直接调用lstsq( )计算系数:

    xx = np.column_stack(([x[42]**
2, x[42], np.ones_like(x[42])]))
    print np.linalg.lstsq(xx, y[42])[0]
    [ 0.0191932   0.30157482  0.66017354]

但遗憾的是,目前numpy.linalg.lstsq( )还不是gufunc函数,因此无法直接使用它计算所有数据组的拟合系数。然而numpy.linalg中对线性方程组求解的函数solve( )是一个gufunc函数。并且根据最小二乘法的公式:

只需要求出XT XXT y,就可以使用numpy.linalg.solve( )计算出。为了实现这个运算,还需要计算矩阵乘积的gufunc函数。然而dot( )并不是一个gufunc函数,因为它不遵循广播规则。NumPy中目前还没有正式提供计算矩阵乘积的gufunc函数,不过在umath_tests模块中提供了一个测试用的函数:matrix_multiply( )。下面的程序使用它和solve( )实现高速多项式拟合运算,它所需的时间约为polyfit( )版本的五十分之一。

    %%time
    X = np.dstack([x**
2, x, np.ones_like(x)])
    Xt = X.swapaxes(-1, -2)
    
    import numpy.core.umath_tests as umath
    A = umath.matrix_multiply(Xt, X)
    b = umath.matrix_multiply(Xt, y[..., None]).squeeze( )
    
    beta3 = np.linalg.solve(A, b)
    
    print np.allclose(beta3, beta2)
    True
    Wall time: 30 ms

在上面的运算中,X的形状为(10000, 10, 3),Xt的形状为(10000, 3, 10)。matrix_multiply( )的各个参数和返回值的形状如下:

    c = matrix_multiply(a, b)
    a : (..., M, N)
    b : (..., N, K)
    c : (..., M, K)

调用matrix_multiply( )对Xt和X中的每对矩阵进行乘积运算,得到的结果A的形状为(10000, 3, 3)。而为了计算XT y,需要通过y[..., None]将y变成形状为(10000,10,1)的数组。matrix_multiply(Xt, y[..., None])所得到的形状为(10000, 3, 1)。调用其squeeze( ),删除长度为1的轴。这样b的形状为(10000, 3)。

solve( )的参数b支持两种形状,其中第一种情况的形状如下:

    x = solve(a, b)
    a : (..., M, M)
    b : (..., M)
    x : (..., M)

因此solve( )的返回值beta3的形状为(10000, 3)。

前面的例子中,使用的都是最简单的广播规则。实际上gufunc函数支持所有的ufunc函数的广播规则。因此形状分别为(a, m, n)和(b, 1, n, k)的两个数组通过matrix_multiply( )乘积之后得到的数组的形状为(b, a, m, k)。下面看一个使用gufunc函数广播运算的例子:

在二维平面上的旋转矩阵为:

它能将平面上的某点的坐标围绕原点旋转θ。对于形状为(N, 2)的矩阵P,可以表示平面上N个点的坐标。而矩阵乘积得到的则是将这N个点绕坐标原点旋转θ之后的坐标。下面的程序使用matrix_multiply( )将3条曲线上的坐标点分别旋转4个角度,得到12条曲线。

调用matrix_multiply( )时两个参数数组的形状分别为(3, 100, 2)和(4, 1, 2, 2),其中广播轴的形状分别为(3,)和(4, 1),运算轴的形状分别为(100, 2)和(2, 2)。广播轴进行广播之后的形状为(4, 3),而运算轴进行矩阵乘积之后的形状为(100, 2),因此结果rpoints的形状为(4, 3, 100, 2)。

    M = np.array([[[np.cos(t), -np.sin(t)], 
                   [np.sin(t), np.cos(t)]]
                 for t in np.linspace(0, np.pi, 4, endpoint=False)])
    
    x = np.linspace(-1, 1, 100)
    points = np.array((np.c_[x, x], np.c_[x, x**
3], np.c_[x**
3, x]))
    rpoints = umath.matrix_multiply(points, M[:, None, ...])
    
    print points.shape, M.shape, rpoints.shape
    (3, 100, 2) (4, 2, 2) (4, 3, 100, 2)

将这12条曲线绘制成图表之后的效果如图2-15所示。

图2-15 使用矩阵乘积的广播运算将3条曲线分别旋转4个角度