跳转至

KNN 实现 iris 数据集分类

手写 knn 分类实现 iris 数据分类,从软件工程的角度进行实现

1. 环境和数据准备

iris 数据集下载

curl -L -o iris.zip https://www.kaggle.com/api/v1/datasets/download/uciml/iris
  • python3: 3.7.0
  • numpy: 1.21.6
  • pandas: 1.3.5

2. 框架代码实现

主程序框架

def main():
    X_train, y_train, X_test, y_test = preprocess()

    model = KNNClassifier(k=3)
    model.train(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy=calc_accuracy(y_pred, y_test)

    print("accuracy: %f" % (accuracy))

knn 分类器框架

class KNNClassifier():
    def __init__(self, k=3):
        self.k = k

    def train(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    def predict(self, X_test):
        y_pred = []

        return y_pred

预处理框架

def preprocess():

    data = load_csv('iris.csv')
    X, y = split_xy(data)

    return split_test(X, y, test_size=0.2)

3. 算法实现

1. knn 分类器实现

由 l2 距离无循环的方法,在 predict 环节实现最临近需要三个步骤 1. 计算 [test, train] 距离矩阵 dists 2. 计算每一行前 n 个最小的值对应的 y[index] 3. 投票选择前 n 个值中出现最多的值 maxcout(y[index])

def l2_distance(x, y):
    # sum((m, p), axis=1) ->  (m, )
    x_square_sum = np.sum(np.square(x), axis=1)
    # sum((n, p), axis=1) ->  (n, )
    y_square_sum = np.sum(np.square(y), axis=1)
    # (m, 1) + (1, n) -> (m, n) + (m, n) = (m, n)
    x_y_square_sum = x_square_sum[:, np.newaxis] + y_square_sum[np.newaxis, :]

    # (m, p) dot (p, n) = (m, n)
    x_y_dot = np.dot(x, y.T)

    # (m, n) -2(m, n) = (m, n)
    d = np.square(x_y_square_sum - 2*x_y_dot)

    return d


class KNNClassifier():
    def __init__(self, k=3):
        self.k = k

    def train(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    def predict(self, X_test):
        N = X_test.shape[0]
        y_pred = np.zeros(N)

        dists = l2_distance(X_test, self.X_train)

        for i in range(X_test.shape[0]):
            index = np.argsort(dists[i])
            index = index[:self.k]
            closest_y = self.y_train[index]

            values, counts = np.unique(closest_y, return_counts=True)
            y_pred[i] = values[np.argmax(counts)]

        return y_pred

2. 预处理实现

def shuffle_data(X, y):
    idx = np.random.permutation(len(X))
    return X[idx], y[idx]


def split_data(X, y,test_size=0.9):
    """
    使用 numpy 分割数据为训练集和测试集
    """

    test_size = int(len(X) * test_size)

    X_train = X[test_size:]
    y_train = y[test_size:]
    X_test = X[:test_size]
    y_test = y[:test_size]

    return X_train, y_train, X_test, y_test

def calc_accuracy(y_pred, y):
    return np.mean(y == y_pred)


def preprocess():

    df = pd.read_csv('iris.csv')

    X_df = df.iloc[:, :-1] # 去掉最后一列
    y_df = df.iloc[:, -1] #  最后一列

    X = X_df.values
    y_label = y_df.values

    label_to_int = {label: idx for idx, label in enumerate(sorted(set(y_label)))}
    y = np.array([label_to_int[label] for label in y_label])

    # 打乱数据
    X, y = shuffle_data(X, y)

    # 分割训练测试集

    return split_data(X, y)