From Python to Numpy 读书笔记

原文:From Python to Numpy

4 Code vectorization

4.1 Introduction

Code vectorization 是一些本身就适合向量化的问题,只是需要一些 NumPy 的技巧或经验,在形式上做一些优化,而有些 Problem vectorization 的问题则不是这么简单了。

一个简单的例子,两个矩阵 Z1 和 Z2,相加,Python 的实现

def add_python(Z1,Z2):
    return [z1+z2 for (z1,z2) in zip(Z1,Z2)]

而 NumPy 的实现则要优雅得多

def add_numpy(Z1,Z2):
    return np.add(Z1,Z2)

代码测试显示,使用 NumPy 的方法要快的多

>>> Z1 = random.sample(range(1000), 100)
>>> Z2 = random.sample(range(1000), 100)
>>> timeit("add_python(Z1, Z2)", globals())
1000 loops, best of 3: 68 usec per loop
>>> timeit("add_numpy(Z1, Z2)", globals())
10000 loops, best of 3: 1.14 usec per loop

之所以 NumPy 解决方案中没有直接使用 + 运算符,是因为如果 Z1 和 Z2 是 list 的话,Python 会自动采用内置方法进行运算。

4.2 Uniform vectorization

Uniform vectorization 是当所有元素共用计算规则时最简单的向量化方法。一个典型的例子时 Game of Life,或许这是最早的细胞机器人例子。

The Game of Life

The Game of Life is a cellular automaton devised by the British mathematician John Horton Conway in 1970. It is the best-known example of a cellular automaton. The "game" is actually a zero-player game, meaning that its evolution is determined by its initial state, needing no input from human players. One interacts with the Game of Life by creating an initial configuration and observing how it evolves.

The universe of the Game of Life is an infinite two-dimensional orthogonal grid of square cells, each of which is in one of two possible states, live or dead. Every cell interacts with its eight neighbours, which are the cells that are directly horizontally, vertically, or diagonally adjacent. At each step in time, the following transitions occur:

1.Any live cell with fewer than two live neighbours dies, as if by needs caused by underpopulation.
2.Any live cell with more than three live neighbours dies, as if by overcrowding.
3.Any live cell with two or three live neighbours lives, unchanged, to the next generation.
4.Any dead cell with exactly three live neighbours becomes a live cell.

四条规则:

1.任何邻居个数少于两个的存活细胞死亡
2.任何邻居个数多于三个的存活细胞死亡
3.任何具有两个或三个邻居的存活细胞继续存活
4.任何邻居个数正好为 3 的死亡细胞复活。

初始状态由“种子”产生,以后每一步都重复以上的规则产生下一代。

Python implementation

使用 list 实现,为了避免不必要的条件检测,在 4*4 的范围外圈加一层 0 边界。

Z = [[0,0,0,0,0,0],
     [0,0,0,1,0,0],
     [0,1,0,1,0,0],
     [0,0,1,1,0,0],
     [0,0,0,0,0,0],
     [0,0,0,0,0,0]]

计算邻居个数

def compute_neighbours(Z):
    shape = len(Z), len(Z[0])
    N  = [[0,]*(shape[0]) for i in range(shape[1])]
    for x in range(1,shape[0]-1):
        for y in range(1,shape[1]-1):
            N[x][y] = Z[x-1][y-1]+Z[x][y-1]+Z[x+1][y-1] \
                    + Z[x-1][y]            +Z[x+1][y]   \
                    + Z[x-1][y+1]+Z[x][y+1]+Z[x+1][y+1]
    return N

接下来根据邻居个数,重复应用四条规则:

def iterate(Z):
    N = compute_neighbours(Z)
    for x in range(1,shape[0]-1):
        for y in range(1,shape[1]-1):
             if Z[x][y] == 1 and (N[x][y] < 2 or N[x][y] > 3):
                 Z[x][y] = 0
             elif Z[x][y] == 0 and N[x][y] == 3:
                 Z[x][y] = 1
    return Z

有一种初始状态叫做 glider,以此为初始状态后续的变化很有意思。

NumPy implementation

Game of Life 主要分为两部分,其一是计算邻居数量,其二是基于四条规则的状态更新。利用 NumPy 的特性,其实可以分开分别计算上下左右等方位的邻居数量。

N = np.zeros(Z.shape, dtype=int)
N[1:-1,1:-1] += (Z[ :-2, :-2] + Z[ :-2,1:-1] + Z[ :-2,2:] +
                 Z[1:-1, :-2]                + Z[1:-1,2:] +
                 Z[2:  , :-2] + Z[2:  ,1:-1] + Z[2:  ,2:])

然后使用 NumPy 的 argwhere 方法来应用规则

# Flatten arrays
N_ = N.ravel()
Z_ = Z.ravel()

# Apply rules
R1 = np.argwhere( (Z_==1) & (N_ < 2) )
R2 = np.argwhere( (Z_==1) & (N_ > 3) )
R3 = np.argwhere( (Z_==1) & ((N_==2) | (N_==3)) )
R4 = np.argwhere( (Z_==0) & (N_==3) )

# Set new values
Z_[R1] = 0
Z_[R2] = 0
Z_[R3] = Z_[R3]
Z_[R4] = 1

# Make sure borders stay null
Z[0,:] = Z[-1,:] = Z[:,0] = Z[:,-1] = 0

尽管这个版本已经不使用循环了,但是 argwhere 依然可能很慢,可以利用 NumPy 的 boolean 特性再改写一下。这一特性相当有用

birth = (N==3)[1:-1,1:-1] & (Z[1:-1,1:-1]==0)
survive = ((N==2) | (N==3))[1:-1,1:-1] & (Z[1:-1,1:-1]==1)
Z[...] = 0
Z[1:-1,1:-1][birth | survive] = 1

源代码

练习

化学反应有一些很有趣的特性,Gray-Scott 模型就是其中一个比较有趣的。 Let's consider two chemical species U and V with respective concentrations u and v and diffusion rates Du and Dv. V is converted into P with a rate of conversion k. f represents the rate of the process that feeds U and drains U, V and P. This can be written as:

Chemical reaction   Equations
U + 2V → 3V         u̇ = Du∇2u − uv2 + f(1 − u)
V → P               v̇ = Dv∇2v + uv2 − (f + k)v

以下是一些有趣的参数

Name        Du      Dv      f       k
Bacteria 1  0.16    0.08    0.035   0.065
Bacteria 2  0.14    0.06    0.035   0.065
Coral   0.16    0.08    0.060   0.062
Fingerprint 0.19    0.05    0.060   0.062
Spirals 0.10    0.10    0.018   0.050
Spirals Dense   0.12    0.08    0.020   0.050
Spirals Fast    0.10    0.16    0.020   0.050
Unstable    0.16    0.08    0.020   0.055
Worms 1 0.16    0.08    0.050   0.065
Worms 2 0.16    0.08    0.054   0.063
Zebrafish   0.16    0.08    0.035   0.060

实现源码

4.3 Temporal vectorization

The Mandelbrot set is the set of complex numbers c for which the function fc(z) = z2 + c does not diverge when iterated from z = 0, i.e., for which the sequence fc(0), fc(fc(0)), etc., remains bounded in absolute value. It is very easy to compute, but it can take a very long time because you need to ensure a given number does not diverge. This is generally done by iterating the computation up to a maximum number of iterations, after which, if the number is still within some bounds, it is considered non-divergent. Of course, the more iterations you do, the more precision you get.

曼德布洛特复数集合(Mandelbrot set,或译为曼德博集合)是一种在复平面上组成分形的点的集合,以数学家本华·曼德博的名字命名。曼德博集合与朱利亚集合有些相似的地方,例如使用相同的复二次多项式来进行迭代。曼德布洛特集合可以用复二次多项式:f_c(z) = z^2 +c来定义 其中c是一个复参数。对于每一个c,从开始对fc(z)进行迭代。序列的值或者延伸到无限大,或者只停留在有限半径的圆盘内。曼德布洛特集合就是使以上序列不延伸至无限大的所有c点的集合。从数学上来讲,曼德布洛特集合是一个复数的集合。一个给定的复数c或者属于曼德布洛特集合M,或者不是。

Python implementation

def mandelbrot_python(xmin, xmax, ymin, ymax, xn, yn, maxiter, horizon=2.0):
    def mandelbrot(z, maxiter):
        c = z
        for n in range(maxiter):
            if abs(z) > horizon:
                return n
            z = z*z + c
        return maxiter
    r1 = [xmin+i*(xmax-xmin)/xn for i in range(xn)]
    r2 = [ymin+i*(ymax-ymin)/yn for i in range(yn)]
    return [mandelbrot(complex(r, i),maxiter) for r in r1 for i in r2]

有趣的部分是 mandelbrot 函数计算序列 fc(fc(fc...))) 的部分,如何用 NumPy 实现呢。

NumPy implementation

关键是在每一步迭代时寻找尚未发散的值,并且只更新与这些值相关的信息。为了到达目的,使用 NumPy 的 less(x1, x2) 函数,其返回 x1 < x2 的真假。

def mandelbrot_numpy(xmin, xmax, ymin, ymax, xn, yn, maxiter, horizon=2.0):
    X = np.linspace(xmin, xmax, xn, dtype=np.float32)
    Y = np.linspace(ymin, ymax, yn, dtype=np.float32)
    C = X + Y[:,None]*1j
    N = np.zeros(C.shape, dtype=int)
    Z = np.zeros(C.shape, np.complex64)
    for n in range(maxiter):
        I = np.less(abs(Z), horizon)
        N[I] = n
        Z[I] = Z[I]**2 + C[I]
    N[N == maxiter-1] = 0
    return Z, N

计时

>>> xmin, xmax, xn = -2.25, +0.75, int(3000/3)
>>> ymin, ymax, yn = -1.25, +1.25, int(2500/3)
>>> maxiter = 200
>>> timeit("mandelbrot_python(xmin, xmax, ymin, ymax, xn, yn, maxiter)", globals())
1 loops, best of 3: 6.1 sec per loop
>>> timeit("mandelbrot_numpy(xmin, xmax, ymin, ymax, xn, yn, maxiter)", globals())
1 loops, best of 3: 1.15 sec per loop

Faster NumPy implementation

上一个版本获得了 5x 的速度,但并不如期待,主要是 less 函数每一步都需要计算 xn*yn 次,而有些值是已知发散了的,这一版本维护一个动态数组,只存储尚未发散的值,相对 Python 版本获得了 10x 的速度。

def mandelbrot_numpy_2(xmin, xmax, ymin, ymax, xn, yn, itermax, horizon=2.0):
    Xi, Yi = np.mgrid[0:xn, 0:yn]
    Xi, Yi = Xi.astype(np.uint32), Yi.astype(np.uint32)
    X = np.linspace(xmin, xmax, xn, dtype=np.float32)[Xi]
    Y = np.linspace(ymin, ymax, yn, dtype=np.float32)[Yi]
    C = X + Y*1j
    N_ = np.zeros(C.shape, dtype=np.uint32)
    Z_ = np.zeros(C.shape, dtype=np.complex64)
    Xi.shape = Yi.shape = C.shape = xn*yn

    Z = np.zeros(C.shape, np.complex64)
    for i in range(itermax):
        if not len(Z): break

        # Compute for relevant points only
        np.multiply(Z, Z, Z)
        np.add(Z, C, Z)

        # Failed convergence
        I = abs(Z) > horizon
        N_[Xi[I], Yi[I]] = i+1
        Z_[Xi[I], Yi[I]] = Z[I]

        # Keep going with those who have not diverged yet
        np.negative(I,I)
        Z = Z[I]
        Xi, Yi = Xi[I], Yi[I]
        C = C[I]
    return Z_.T, N_.T

Visualization

In order to visualize our results, we could directly display the N array using the matplotlib imshow command, but this would result in a "banded" image that is a known consequence of the escape count algorithm that we've been using. Such banding can be eliminated by using a fractional escape count. This can be done by measuring how far the iterated point landed outside of the escape cutoff. See the reference below about the renormalization of the escape count. Here is a picture of the result where we use recount normalization, and added a power normalized color map (gamma=0.3) as well as light shading.

练习

4.4 Spatial vectorization

Spatial vectorization 指的是这样的情况,个体共用计算规则但只和总体的一部分交互,而且这一部分还是动态变化的一部分,Boids 模型就是一个很好的例子。

Boids

As with most artificial life simulations, Boids is an example of emergent behavior; that is, the complexity of Boids arises from the interaction of individual agents (the boids, in this case) adhering to a set of simple rules. The rules applied in the simplest Boids world are as follows:

  • separation: steer to avoid crowding local flock-mates
  • alignment: steer towards the average heading of local flock-mates
  • cohesion: steer to move toward the average position (center of mass) of local flock-mates

Python implementation

很自然想到通过类来进行处理

import math
import random
from vec2 import vec2

class Boid:
    def __init__(self, x=0, y=0):
        self.position = vec2(x, y)
        angle = random.uniform(0, 2*math.pi)
        self.velocity = vec2(math.cos(angle), math.sin(angle))
        self.acceleration = vec2(0, 0)

vec2 是一个很简单的向量对象,可以节省一些时间。Boids 对于 Python 来说是个棘手的问题,因为模型有和邻居的交互,因此需要计算和每一个邻居的距离,一个典型的写法如下:

def separation(self, boids):
    count = 0
    for other in boids:
        d = (self.position - other.position).length()
        if 0 < d < desired_separation:
            count += 1
            ...
    if count > 0:
        ...

 def alignment(self, boids): ...
 def cohesion(self, boids): ...

完整代码见附录,太长,下面再新建一个 Flock 类

class Flock:
    def __init__(self, count=150):
        self.boids = []
        for i in range(count):
            boid = Boid()
            self.boids.append(boid)

    def run(self):
        for boid in self.boids:
            boid.run(self.boids)

基于这种方式,可以跑大概 50 个个体,之后计算就开始变得缓慢。没错,使用 NumPy 可以让计算变得快得多,但是首先需要指出 Python 实现的问题所在,如果仔细看代码的话,可以轻易发现有很多冗余的部分,欧氏距离是具有反身性的,也就是说两个个体之间的距离只需要计算一次,另外每次规则计算还要计算一次距离而没有缓存结果,实际上我们计算了 3n^2 次距离而不是 n^2/2次。

NumPy implementation

NumPy 实现和 Python 采取完全不同的策略,将所有个体的位置和速度都放在一个 ndarray 里。

n = 500
velocity = np.zeros((n, 2), dtype=np.float32)
position = np.zeros((n, 2), dtype=np.float32)

第一步是计算距离,我们要计算所有成对的距离

dx = np.subtract.outer(position[:, 0], position[:, 0])
dy = np.subtract.outer(position[:, 1], position[:, 1])
distance = np.hypot(dx, dy)

实际上可以使用 cdist 函数,但是 dx 和 dy 在后面的计算中需要用到,知道 dx dy 的情况下,用 hypot 就快一些了。distance 的 shape 是 n*n。