好像这个 prompt 和 水上飞机的 prompt 最近出现的频率比较高,应该是放到生产环境中了。

那么自然要来搞他,数据集可以在这里找到

先来分析一下,一共就三种类型,一种是树叶组成的,一种是花瓣组成的,还有一种不知道是什么玄学组成的黑不拉几的东西. 再来看组成的内容,除了象就是马,就是一个二分类呗。

其实除了 "Please select all the elephants drawn with lеaves" 这个 prompt 之外,还有一个类似的 "Please select all the horses drawn with flowers",但是这个 prompt 几乎没有见到过,不知道提这个 issue 的人是怎么刷出来的,其实我觉得更大的原因是这个 flower 的图片困惑度太高了,让人很难区分,可能会极大降低用户体验。但是相较于人眼来提取特征,我是觉得机器提取特征更快 - -。

思路就有了,首先组成的分类简单的不能再简单,只需要提取图片的主题色即可,这里我用的 kmeans 来做颜色的聚类进行主题色的提取。这里我选了聚类中心 k=3,这是为了区分明部和暗部,所以各给他们一个中心,剩下的一个交给主题色,这样只需要设定一个阈值,来计算颜色和绿色之前的距离即可。这里选了 200,判别准确率 100%

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _style_classification(self, img):
    # opencv to numpy
    img = np.array(img)
    # from 3d -> 2d
    img = img.reshape((img.shape[0] * img.shape[1], img.shape[2])).astype(np.float64)
    # print(img.shape)
    centroid, label = kmeans2(img, k=3)
    # print(centroid)
    # print(label)

    green_centroid = np.array([0.0, 255.0, 0.0])

    flag = False
    min_dis = np.inf
    for i in range(len(centroid)):
        # distance between centroid and green < threshold
        # print(np.linalg.norm(centroid[i] - green_centroid))
        min_dis = min(min_dis, np.linalg.norm(centroid[i] - green_centroid))

    if min_dis < 200:
        flag = True

    return flag

区分象马这个还真是有难度,或者说一点难度有没有。你那图片处理的方法,比如再进行聚类,或者像之前两篇博文一样[1][2],计算重量或者超像素数量对于这个的区分还真是有点难。因为象马体积也差不多,在下方的也都是 5-6 个支点(因为大象有鼻子和尾巴,马有尾巴和嘴)很难区分。说简单也很简单,这不就是 "Dog vs Cat" 吗?深度学习入门图像分类任务罢了…

首先先来打标签,得用那个风格的分类器来过滤一下,这样可以少打很多标签

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import cv2
from src.services.hcaptcha_challenger.solutions.elephant_solution import ElephantSolution

img_path = os.path.join('elephants_drawn_with_leaves')
label_file = open('label.txt', 'w')
os.makedirs(os.path.join(img_path, 'elephant'), exist_ok=True)
os.makedirs(os.path.join(img_path, 'house'), exist_ok=True)

if __name__ == '__main__':
    # 0 for house, 1 for elephant
    imgs = os.listdir(img_path)
    edwls = ElephantSolution()
    for idx, img_ in enumerate(imgs):
        img_path_ = os.path.join(img_path, img_)
        if os.path.isdir(img_path_):
            continue
        img = cv2.imread(img_path_)
        cv2.imshow("img", img)

        print(f'{img_path_}: {idx}')

        if edwls._style_classification(img):
            key = cv2.waitKey(0)
            if key == ord('0'):
                label_file.write(f'{img_path_} 0\n')
                label_file.flush()
                cv2.imwrite(os.path.join(img_path, 'house', img_), img)
                print(f'{img_path_} 0: house')
            elif key == ord('1'):
                label_file.write(f'{img_path_} 1\n')
                label_file.flush()
                cv2.imwrite(os.path.join(img_path, 'elephant', img_), img)
                print(f'{img_path_} 1: elephant')
        else:
            print('Drop')

设计个简单的 ResNet 模型,没必要用 resnet18 这样庞大的模型,这个问题还不配,所以我 DIY 了一个非常小的模型,图片也被我放缩到了 (64 x 64),参数可以少很多。

训练测试一步到胃。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import shutil
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision

import cv2
from PIL import Image
from src.services.hcaptcha_challenger.solutions.elephant_solution import ElephantSolution


class ResidualBlock(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels,
                               out_channels,
                               kernel_size=3,
                               stride=stride,
                               padding=1,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels,
                               out_channels,
                               kernel_size=3,
                               stride=1,
                               padding=1,
                               bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels))

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.downsample(residual)
        out = self.relu(out)
        return out


class Net(nn.Module):

    def __init__(self, in_channels=3, num_classes=10):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 16, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.resblock1 = ResidualBlock(16, 32)
        self.resblock2 = ResidualBlock(32, 64, stride=2)
        self.avgpool = nn.AvgPool2d(kernel_size=7, stride=1)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.resblock1(x)
        x = self.resblock2(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        # print(x.size())
        x = self.fc(x)
        return x


img_path = os.path.join('..', 'database', 'elephants_drawn_with_leaves')

img_transform = torchvision.transforms.Compose([
    # torchvision.transforms.Grayscale(num_output_channels=1),
    # torchvision.transforms.GaussianBlur(kernel_size=3),
    torchvision.transforms.Resize((64, 64)),
    torchvision.transforms.ToTensor(),
])


def train():
    model = Net(3, 2)
    model.train()
    model.cuda()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
    # focal loss
    criterion = nn.CrossEntropyLoss()

    print('model:', model)

    data = torchvision.datasets.ImageFolder(img_path, transform=img_transform)
    data_loader = torch.utils.data.DataLoader(data, batch_size=1, shuffle=True)
    print(f'{len(data)} images')
    epochs = 20

    # train with focal loss
    for epoch in range(epochs):
        total_loss = 0
        total_acc = 0
        for i, (img, label) in enumerate(data_loader):
            img = img.cuda()
            label = label.cuda()
            optimizer.zero_grad()
            out = model(img)
            loss = criterion(out, label)
            loss.backward()
            optimizer.step()
            if (i + 1) % 10 == 0:
                print(f'epoch: {epoch + 1}, iter: {i + 1}, loss: {loss.item():.4f}')
            total_loss += loss.item()
            total_acc += torch.sum(torch.argmax(out, dim=1) == label).item()
        print(
            f'epoch: {epoch + 1}, avg loss: {total_loss / len(data):.4f}, avg acc: {total_acc / len(data):.4f}'
        )

    torch.save(model.state_dict(), 'model.pth')


def test_single(model, img):

    img = img_transform(img)
    img = img.unsqueeze(0)
    img = img.cuda()
    out = model(img)
    pred = torch.argmax(out, dim=1)
    # print(f'pred: {pred.item()}')
    if pred.item() == 0:
        return 0
    else:
        return 1


def test():
    model = Net(3, 2)
    model.load_state_dict(torch.load('model.pth'))
    model.eval()
    torch.onnx.export(model,
                      torch.randn(1, 3, 64, 64),
                      'model.onnx',
                      verbose=True,
                      export_params=True)
    model.cuda()
    test_data_path = os.path.join('val-dataset')
    imgs = os.listdir(test_data_path)

    dir1 = os.path.join('val-dataset', 'elephant_drawn_with_leaves')
    dir2 = os.path.join('val-dataset', 'house_drawn_with_leaves')
    dir3 = os.path.join('val-dataset', 'without_leaves')

    dirs = [dir1, dir2, dir3]

    for dir in dirs:
        if os.path.exists(dir):
            shutil.rmtree(dir)
        os.mkdir(dir)
    es = ElephantSolution()

    for img in imgs:
        if os.path.isdir(os.path.join(test_data_path, img)):
            continue
        img_ = cv2.imread(os.path.join(test_data_path, img))
        result = 2
        if es._style_classification(img_):
            result = test_single(model, Image.open(os.path.join(test_data_path, img)))

        print(f'{img} is {result} save to {os.path.join(dirs[result], img)}')
        cv2.imwrite(os.path.join(dirs[result], img), img_)


if __name__ == '__main__':
    train()
    test()

这里有个很有意思的地方,一开始 train 完之后拿去测试,发现错误率还挺高的,可能有 10% 这样,这几乎是不可接受的,因为验证码一共就 9 张,不太好搞,然后我以为是过拟合(这应该是常规思路吧,毕竟训练集都快 100% 了),结果把 lr 调大,epoch 调小,怎么做都会有 5% 左右的错误率。我就纳了闷了,这么简单的分类任务,怎么会这么差。后来啊…我把 epoch 破天荒地调大了,发现测试集也几乎 100%了,最后整个测试集的错误率大概在 1.8% 左右,也就没再继续优化,甚至懒得再把测试数据拿进去 train 了。

最后整个模型参数保存成 pt 也就 311KB,如果导出 onnx290KB

这里又学到一个技巧,导出成 onnx 以后可以直接用 opencv 来进行推理,这样可以极大的节省部署环境的资源。

Solution 完整的代码在下面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import cv2
import numpy as np
from scipy.cluster.vq import kmeans2


class ElephantSolution:

    def __init__(self):
        self.debug = True

    def solution(self, img_stream, **kwargs) -> bool:  # noqa
        """Implementation process of solution"""
        img_arr = np.frombuffer(img_stream, np.uint8)
        img = cv2.imdecode(img_arr, flags=1)

        cv2.imshow("img", img)
        cv2.waitKey(0)

        if not self._style_classification(img):
            return False

        # using model to predict
        # print(img.shape)
        # resize
        img = cv2.resize(img, (64, 64))
        # print(img.shape)
        model_path = os.path.join('..', '..', '..', 'model', 'elephant_model.onnx')
        model = cv2.dnn.readNetFromONNX(model_path)
        blob = cv2.dnn.blobFromImage(img, 1 / 255.0, (64, 64), (0, 0, 0), swapRB=True, crop=False)
        model.setInput(blob)
        out = model.forward()
        # print(out.shape)
        # print(out)
        label = np.argmax(out, axis=1)[0]
        # print(label)
        if label == 0:
            return True

        return False

    def _style_classification(self, img):
        # opencv to numpy
        img = np.array(img)
        # from 3d -> 2d
        img = img.reshape((img.shape[0] * img.shape[1], img.shape[2])).astype(np.float64)
        # print(img.shape)
        centroid, label = kmeans2(img, k=3)
        # print(centroid)
        # print(label)

        green_centroid = np.array([0.0, 255.0, 0.0])

        flag = False
        min_dis = np.inf
        for i in range(len(centroid)):
            # distance between centroid and green < threshold
            # print(np.linalg.norm(centroid[i] - green_centroid))
            min_dis = min(min_dis, np.linalg.norm(centroid[i] - green_centroid))

        if min_dis < 200:
            flag = True

        return flag