# 奇巧淫技-二维码跨网络传输数据

# 概述

将文件压缩成7zip格式再按2950的长度分割,并给每一块打上序号分别生成二维码,大致是2K数据对应一张二维码。

使用方式:

  1. 使用qr_gen中的代码生成二维码

    大约30000个中文汉字得到的压缩包为62K,最后生成了29张二维码。

    每张二维码携带的数据大致如下:

    {"`<count>`/`<total>`":"`<data>`"}
    
    python qr_gen.py test_file.txt
    
  2. 使用相机或各种手段得到这些二维码中存储的数据并json.loads,最后放入一个大dict中-将这一部分逻辑补充到qr_decode.py的get_big_dict函数中

    1. 安卓系统可以去谷歌商店下载【二维码扫描仪】,去设置中打开批量扫描模式,然后扫完之后将扫描的历史记录导出成csv处理。
  3. 使用qr_decode中的代码恢复文件

    python qr_decode.py temp.7z
    

# 代码

# qr_gen

import qrcode
import os
from py7zr import SevenZipFile
import base64
import json
import numpy as np
# import cv2
from PIL import Image
import sys
import shutil
import time

# def show(img, name='img'):
#     maxHeight = 540
#     maxWidth = 960
#     scaleX = maxWidth / img.shape[1]
#     scaleY = maxHeight / img.shape[0]
#     scale = min(scaleX, scaleY)
#     if scale < 1:
#         img = cv2.resize(img, (0, 0), fx=scale, fy=scale)
#     cv2.imshow(name, img)
#     cv2.waitKey(0)
#     cv2.destroyWindow(name)

def split_data(data, base=2954):
    tmp = data
    index = 0
    res = []
    x = {}
    while len(tmp) > 0:
        x[index] = tmp[0:min(len(tmp), base-len('{"' + str(index) + '": ""}') - 4)]
        tmp = tmp[min(len(tmp), base-len('{"' + str(index) + '": ""}') - 4):]
        index += 1

    total = index - 1

    for k, v in x.items():
        y = {f'{k}/{total}': v}
        res.append(json.dumps(y))

    return res

def color_gen(size, color):
    img = np.zeros((size[0], size[1], 3), np.uint8)
    for i, v in enumerate(color):
        img[:, :, i] = img[:, :, i] + np.ones((size[0], size[1]), np.uint8) * v
    # show(img)
    return img

def add_mark(img, f):
    size = [30, 30]
    size_2 = [30, 30]
    m = color_gen(size, [255, 0, 255])
    g = color_gen(size, [0, 255, 0])
    b = color_gen(size, [255, 0, 0])
    s = color_gen(size, [255, 255, 0])
    c = color_gen(size_2, [0, 0, 0])
    img[0:m.shape[0], 0:m.shape[1]] = m
    img[0:g.shape[0], 0-g.shape[1]:] = g
    img[0-b.shape[0]:, 0:b.shape[1]] = b
    img[0-s.shape[0]:, 0-s.shape[1]:] = s
    # if f:
    #     img[0 - c.shape[0]:, (img.shape[1] - c.shape[1])//2:(img.shape[1] - c.shape[1])//2+c.shape[1]] = c
    # show(img)
    return img

def add_mark_se(img_size):
    img = np.ones((img_size[0], img_size[1], 3), np.uint8) * 255
    size = [30, 30]
    size_2 = [30, 30]
    m = color_gen(size, [255, 0, 255])
    g = color_gen(size, [0, 255, 0])
    b = color_gen(size, [255, 0, 0])
    s = color_gen(size, [255, 255, 0])
    c = color_gen(size_2, [0, 255, 255])
    img[0:m.shape[0], 0:m.shape[1]] = m
    img[0:g.shape[0], 0-g.shape[1]:] = g
    img[0-b.shape[0]:, 0:b.shape[1]] = b
    img[0-s.shape[0]:, 0-s.shape[1]:] = s
    img[0 - c.shape[0]:, (img.shape[1] - c.shape[1])//2:(img.shape[1] - c.shape[1])//2+c.shape[1]] = c
    # show(img)
    return img

# def vedio_gen(data_path, x, size=(1850, 1850)):
#     # data_path = "2017-06-13-29/images/"
#     fps = 8  # 视频帧率
#     # size = (640, 480)  # 需要转为视频的图片的尺寸
#     res_pth = os.path.normpath(os.path.join(data_path, '../output.mp4'))
#     video = cv2.VideoWriter(res_pth, cv2.VideoWriter_fourcc('a', 'v', 'c', '1'), fps, size)
#
#     img_s = add_mark_se(size)
#     video.write(img_s)
#     for i in range(x):
#         image_path = data_path + rf'\a{i}.png'
#         # print(image_path)
#         img = cv2.imread(image_path)
#         video.write(img)
#
#     video.write(img_s)
#     video.write(img_s)
#
#     video.release()
#     cv2.destroyAllWindows()

def main(pth, base):
    # print(pth)

    if os.path.exists(os.path.join(pth, f'../a')):
        shutil.rmtree(os.path.join(pth, f'../a'))
        time.sleep(1)
    os.makedirs(os.path.normpath(os.path.join(pth, f'../a')))

    qr = qrcode.QRCode(
        version=40,
        error_correction=qrcode.constants.ERROR_CORRECT_L,
        box_size=10,
        border=4,
    )

    # filters = [{'id': FILTER_ZSTD, 'level': 3}]

    z_pth = os.path.normpath(os.path.join(pth, '../a.7z'))

    with SevenZipFile(z_pth, mode='w') as tar:
        tar.write(pth, os.path.split(pth)[1])

    with open(z_pth, 'rb') as f:
        con = f.read()

    # print(len(con))
    con = base64.encodebytes(con).decode('utf-8')
    con = con.replace('\n', '_')
    # base = 2900
    res = split_data(con, base)
    flag = True
    for i, v in enumerate(res):
        qr.add_data(v)
        qr.make(fit=False)
        img = qr.make_image(fill_color="black", back_color="white")
        # img.save(os.path.normpath(os.path.join(pth, f'../a/a{i}.png')))
        qr.clear()
        img = np.expand_dims(np.asarray(img, dtype='uint8'), axis=2)[:, :, [0, 0, 0]] * 255
        img = add_mark(img, flag)
        img = Image.fromarray(np.asarray(img)[:, :, [2,1,0]])
        img.save(os.path.normpath(os.path.join(pth, f'../a/a{i}.png')))
        flag = not flag

    # vedio_gen(os.path.normpath(os.path.join(pth, f'../a')), i+1, size=img.shape[0:2])

    print('Done!')

if __name__ == '__main__':
    if len(sys.argv) == 2:
        pth = sys.argv[1]
        base = 2950
    elif len(sys.argv) > 2:
        pth = sys.argv[1]
        base = sys.argv[2]
    else:
        pth = os.path.normpath(os.path.join(__file__, r'..\..\test_file\test.py'))
        base = 2950
    main(pth, base)

# qr_decode

import sys
import base64


def get_big_dict():
    raise NotImplementedError
    import pandas as pd
    import json

    fp = "/Users/lei/Documents/History_1691122776517.csv"
    df = pd.read_csv(fp)
    print(df.columns)
    texts = list(df["text"])
    res = {}
    for text in texts:
        res.update(json.loads(text))
    return res


def check_data(res):
    error = []
    tar = ""
    total = int(list(res.keys())[0].split("/")[1])
    index = [int(i.split("/")[0]) for i in res.keys()]
    index.sort()
    for i in range(total + 1):
        if i not in index:
            error.append(i)
    if error:
        print(f"缺失数据:{error}")
        return False
    else:
        print("数据完整")
        for i in index:
            tar += res[f"{i}/{total}"]
        return tar


def main(big_dict, zip_file_name="temp.7z"):
    big_dict = get_big_dict()
    res = check_data(big_dict)
    if res:
        res = base64.decodebytes(res.encode("utf-8"))
        with open("./temp.7z", "wb") as f:
            f.write(res)


if __name__ == "__main__":
    print(sys.argv)
    if len(sys.argv) > 2:
        print("Usage: python decode_qr.py [<zip_file_name>]")
    elif len(sys.argv) == 2:
        zip_file_name = sys.argv[1]
    else:
        zip_file_name = "temp.7z"
    main(zip_file_name)

# decode_qr - 备用

# !-*- coding:utf-8 -*-
import os
import requests
from io import BytesIO
from pyzbar import pyzbar
from PIL import Image, ImageEnhance
# from py7zr import SevenZipFile
import re
import base64
import json
from manual_detect import manual_detect

def get_ewm(img_adds):
    """ 读取二维码的内容: img_adds:二维码地址(可以是网址也可是本地地址 """
    if os.path.isfile(img_adds):
        # 从本地加载二维码图片
        img = Image.open(img_adds)
    else:
        # 从网络下载并加载二维码图片
        rq_img = requests.get(img_adds).content
        img = Image.open(BytesIO(rq_img))

    # img.show()  # 显示图片,测试用

    txt_list = pyzbar.decode(img)

    barcodeData = b''
    txt = txt_list[0]
    barcodeData += txt.data

    return barcodeData
        # print(barcodeData)

def check_data(res):
    error = []
    tar = ''
    total = int(list(res.keys())[0].split('/')[1])
    index = [int(i.split('/')[0]) for i in res.keys()]
    index.sort()
    for i in range(total + 1):
        if i not in index:
            error.append(i)
    if error:
        print(f'缺失数据:{error}')
        return False
    else:
        print('数据完整')
        for i in index:
            tar += res[f'{i}/{total}']
        return tar

def main(pth):
    tpl = re.compile(r'.*\.(png|jpg|JPG|PNG|jpeg)')
    tar_list = []
    for root, dirs, files in os.walk(pth):
        for file in files:
            if tpl.findall(file):
                tar_list.append(os.path.join(root, file))
    # print(tar_list)
    res = {}
    for i in tar_list:
        tmp = get_ewm(i)
        if tmp:
            res.update(json.loads(tmp.decode('utf-8')))
    res = check_data(res)
    if res:
        res = base64.decodebytes(res.encode('utf-8'))
        with open(os.path.normpath(os.path.join(pth, '../tmp.7z')), 'wb') as f:
            f.write(res)


if __name__ == '__main__':
    # 解析本地二维码
    # pth1 = os.path.normpath(os.path.join(__file__, r'..\..\test_file\res'))
    pth = os.path.normpath(os.path.join(__file__, r'..\..\test_file\right'))
    m = manual_detect()
    pth1 = m.main(pth, force_manal_flag=False)
    main(pth1)
    # get_ewm(pth)

# manual_detect - 备用

import cv2
import os
import numpy as np
import shutil
import time
import numpy as np

def sigmoid(x):
    s = 255/(1 + np.exp(127-x))
    return s

class manual_detect:
    def __init__(self):
        self.draw_img = None
        self.dots = []
        self.m = ((135, 150, 150), (180, 255, 255))
        self.g = ((38, 144, 147), (72, 255, 255))
        self.b = ((96, 150, 150), (135, 255, 255))
        self.s = ((74, 150, 150), (101, 255, 255))
        self.manual_flag = False

    @staticmethod
    def show(img, name='img'):
        # return
        # return
        maxHeight = 960
        maxWidth = 960
        scaleX = maxWidth / img.shape[1]
        scaleY = maxHeight / img.shape[0]
        scale = min(scaleX, scaleY)
        if scale < 1:
            img = cv2.resize(img, (0, 0), fx=scale, fy=scale)
        cv2.imshow(name, img)
        cv2.waitKey(0)
        cv2.destroyWindow(name)

    @staticmethod
    def shrink_size(img):
        maxHeight = 1080
        maxWidth = 1080
        scaleX = maxWidth / img.shape[1]
        scaleY = maxHeight / img.shape[0]
        scale = min(scaleX, scaleY)
        if scale < 1:
            img = cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA )
        return img

    @staticmethod
    def get_trans(img, pts1, pts2):
        M = cv2.getPerspectiveTransform(pts1, pts2)
        dst = cv2.warpPerspective(img, M, (700, 700))
        return dst

    @staticmethod
    def draw_dot(img, dots):
        # img_tmp = cv2.copyTo(img)
        img_tmp = img.copy()
        for dot in dots:
            cv2.circle(img_tmp, tuple(dot), 5, (0, 255, 0), -1)
        return img_tmp

    def show_confirm(self, img, name='img'):
        return
        maxHeight = 960
        maxWidth = 960
        scaleX = maxWidth / img.shape[1]
        scaleY = maxHeight / img.shape[0]
        scale = min(scaleX, scaleY)
        if scale < 1:
            img = cv2.resize(img, (0, 0), fx=scale, fy=scale)
        while True:
            cv2.imshow(name, img)
            k = cv2.waitKey(5)
            # print(k)
            if k == ord('q') or k == 27:
                break
            elif k == ord('a'):
                self.dots = []
                self.manual_flag = True
                # self.draw_img = img.copy()
                break
        cv2.destroyWindow(name)

    def draw_circle(self, event, x, y, flags, param):
        # global ix, iy, dots, draw_img
        if event == cv2.EVENT_LBUTTONDBLCLK:
            cv2.circle(self.draw_img, (x, y), 5, (255, 0, 0), -1)
            self.dots.append([x, y])

    def get_dots(self, img):
        # global dots, draw_img
        self.draw_img = img.copy()
        self.dots = []
        cv2.namedWindow('image')
        cv2.setMouseCallback('image', self.draw_circle)
        while True:
            cv2.imshow('image', self.draw_img)
            if len(self.dots) >= 4:
                break
            k = cv2.waitKey(5)
            # print(k)
            if k == 27:
                break
            elif k == ord('a'):
                self.dots = []
                self.draw_img = img.copy()
                # break
        cv2.destroyAllWindows()
        # print(self.dots)

    def main_sub(self, pth):
        # global dots, drow_img
        img = cv2.imread(pth)
        img = self.shrink_size(img)
        # self.show(img)

        self.get_dots(img)
        if len(self.dots) != 4:
            print('标定点错误')
            return

        # pts1 = np.float32([[56, 65], [368, 52], [28, 387], [389, 390]])
        pts2 = np.float32([[0, 0], [700, 0], [0, 700], [700, 700]])
        pts1 = np.float32(self.dots)

        img_dot = self.draw_dot(img, self.dots)
        self.show(img_dot)

        img_trs = self.get_trans(img, pts1, pts2)
        img_trs = self.strach_color(img_trs)
        img_trs = self.strach_color(img_trs)
        img_trs = self.strach_color(img_trs)
        # img_trs = cv2.cvtColor(img_trs, cv2.COLOR_BGR2GRAY)
        # _, img_trs = cv2.threshold(img_trs,100,255,cv2.THRESH_BINARY)
        # self.show(img_trs)

        cv2.imwrite(os.path.normpath(os.path.join(pth, r'..\..\res', os.path.split(pth)[-1])), img_trs)

    def get_p(self, thr):
        contours, hierarchy = cv2.findContours(thr, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        if contours:
            cnt = contours[0]
            x, y, w, h = cv2.boundingRect(cnt)
            img = cv2.cvtColor(thr, cv2.COLOR_GRAY2BGR)
            cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)
            # print(M)

            # cv2.circle(thr, )

            # self.show(img)
            self.dots.append([x + w // 2, y + h // 2])

    def find_mark(self, img, color):
        frame_HSV = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        frame_threshold = cv2.inRange(frame_HSV, color[0], color[1])
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
        opening = cv2.morphologyEx(frame_threshold, cv2.MORPH_OPEN, kernel)
        self.get_p(opening)
        # self.show(frame_threshold)

        # print(111)

    def get_dots_auto(self, img):
        self.dots = []
        self.find_mark(img, self.m)
        self.find_mark(img, self.g)
        self.find_mark(img, self.b)
        self.find_mark(img, self.s)
        # self.show(img)

    def strach_color(self, img):
        img_t = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        h,s,v = cv2.split(img_t)

        v_max = np.max(v)
        v_min = np.min(v)
        v1 = np.uint8(np.clip(cv2.add(255 / (v_max - v_min) * 1.2 * v, (0 - v_min) / (v_max - v_min)), 0, 255))

        img1 = np.uint8(cv2.merge((h,s,v1)))
        return  cv2.cvtColor(img1, cv2.COLOR_HSV2BGR)

    def strach_color2(self, img):
        img_t = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        h,s,v = cv2.split(img_t)

        v_max = np.max(v)
        v_min = np.min(v)
        v1 = np.clip(cv2.add(255 / (v_max - v_min) * 1.2 * v, (0 - v_min) / (v_max - v_min)), 0, 255)
        v1 = np.uint8(sigmoid(v1))

        img1 = np.uint8(cv2.merge((h,s,v1)))
        return  cv2.cvtColor(img1, cv2.COLOR_HSV2BGR)

    def main_sub_fur(self, pth):
        img = cv2.imread(pth)
        img = self.shrink_size(img)

        img2 = self.strach_color2(img)

        # self.show(img2)

        self.get_dots_auto(img2)

        # pts1 = np.float32([[56, 65], [368, 52], [28, 387], [389, 390]])
        pts2 = np.float32([[0, 0], [700, 0], [0, 700], [700, 700]])
        pts1 = np.float32(self.dots)

        img_dot = self.draw_dot(img, self.dots)
        self.show_confirm(img_dot)
        if len(self.dots) != 4:
            print('标定点错误,需要人工标定')
            self.manual_flag = True

        if self.manual_flag:
            return

        img_trs = self.get_trans(img, pts1, pts2)
        img_trs = self.strach_color(img_trs)
        img_trs = self.strach_color(img_trs)
        img_trs = self.strach_color(img_trs)
        # img_trs = cv2.cvtColor(img_trs, cv2.COLOR_BGR2GRAY)
        # _, img_trs = cv2.threshold(img_trs,100,255,cv2.THRESH_BINARY)
        # self.show(img_trs)

        cv2.imwrite(os.path.normpath(os.path.join(pth, r'..\..\res', os.path.split(pth)[-1])), img_trs)

    def main(self, pth, force_manal_flag=False):
        res_pth = os.path.normpath(os.path.join(pth, r'..\res'))
        if os.path.exists(res_pth):
            shutil.rmtree(res_pth)
            time.sleep(2)
        os.mkdir(res_pth)

        for root, dirs, files in os.walk(pth):
            for file in files:
                self.manual_flag = False
                if force_manal_flag:
                    self.manual_flag = True
                else:
                    self.main_sub_fur(os.path.join(root, file))
                if self.manual_flag:
                    self.main_sub(os.path.join(root, file))

        return os.path.normpath(os.path.join(pth, r'..\res'))


if __name__ == '__main__':
    pth = os.path.normpath(os.path.join(__file__, r'..\..\test_file\a1'))
    m = manual_detect()
    m.main(pth)