目录

  1. 1. 前言
  2. 2. 古典密码
    1. 2.1. 移位密码
    2. 2.2. 单表置换密码
    3. 2.3. 穷举法攻击移位密码
    4. 2.4. 词频分析攻击置换密码
  3. 3. 密码学数学基础
    1. 3.1. 欧几里得算法
    2. 3.2. 扩展欧几里得算法
    3. 3.3. 有限域
    4. 3.4. 中国剩余定理
    5. 3.5. 素性检验
    6. 3.6. 爱拉托斯散筛法
  4. 4. 分组密码
    1. 4.1. DES
      1. 4.1.1. 雪崩效应检验
      2. 4.1.2. 文件加解密
      3. 4.1.3. 性能分析
    2. 4.2. AES
      1. 4.2.1. 文件加解密
      2. 4.2.2. 性能分析

LOADING

第一次加载文章图片可能会花费较长时间

要不挂个梯子试试?(x

加载过慢请开启缓存 浏览器默认开启

密码算法实训

2025/3/8 Crypto
  |     |   总文章阅读量:

前言

学学密码算法


古典密码

移位密码

将英文字母向前或向后移动一个固定位置。例如向后移动3个位置,即对字母表作置换(不分大小写)。

此时的数学函数:a = (a+3) mod 26

初步思路:

遍历明文的每个字符,逐个字符取 ascii 码与起始字符的 ascii 码相减即为起始序号,加密后的字符 ascii 即 (起始序号+偏移量) mod 26 + 起始字符的 ascii

cipher = ""
for c in plaintext:
    ser_num = ord(c) - ord('A')
    ser_num = (ser_num + shift) % 26
    cipher_char = chr(ser_num + ord('A'))
    cipher += cipher_char

接下来考虑明文存在大小写字母的情况,采用三目运算符进行判断

那么加密部分:

def enc(plaintext, shift):
    cipher = ""
    for c in plaintext:
        ser_num = ord(c) - ord('A') if ord(c) < ord('a') else ord(c) - ord('a')
        ser_num = (ser_num + shift) % 26
        cipher_char = chr(ser_num + ord('A')) if ord(c) < ord('a') else chr(ser_num + ord('a'))
        cipher += cipher_char
    return cipher

解密部分只需要用同样的方式减去偏移量:

def dec(ciphertext,shift):
    plain = ""
    for c in ciphertext:
        ser_num = ord(c) - ord('A') if ord(c) < ord('a') else ord(c) - ord('a')
        ser_num = (ser_num - shift) % 26
        plain_char = chr(ser_num + ord('A')) if ord(c) < ord('a') else chr(ser_num + ord('a'))
        plain += plain_char
    return plain

最终的代码:

Text = "FlagisHere"


def enc(plaintext, shift):
    cipher = ""
    for c in plaintext:
        ser_num = ord(c) - ord('A') if ord(c) < ord('a') else ord(c) - ord('a')
        ser_num = (ser_num + shift) % 26
        cipher_char = chr(ser_num + ord('A')) if ord(c) < ord('a') else chr(ser_num + ord('a'))
        cipher += cipher_char
    return cipher

def dec(ciphertext,shift):
    plain = ""
    for c in ciphertext:
        ser_num = ord(c) - ord('A') if ord(c) < ord('a') else ord(c) - ord('a')
        ser_num = (ser_num - shift) % 26
        plain_char = chr(ser_num + ord('A')) if ord(c) < ord('a') else chr(ser_num + ord('a'))
        plain += plain_char
    return plain

print(enc(Text, 7))
print(dec(enc(Text, 3),3))

image-20250309183614959


单表置换密码

单表置换密码就是根据字母表的置换对明文进行变换的方法

给定置换

A  B  C  D  E  F  G  H  I  J  K  L  M  N  O  P  Q  R  S  T  U  V  W  X  Y  Z
H  K  W  T  X  Y  S  G  B  P  Q  E  J  A  Z  M  L  N  O  F  C  I  D  V  U  R

使用字典存储置换表,这里先用 zip 转成元组再遍历到字典中:

original = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z"
substitute = "H K W T X Y S G B P Q E J A Z M L N O F C I D V U R"

substitution_table = {o: s for o, s in zip(original.split(), substitute.split())}

然后设计加密过程:

def enc(plaintext):
    cipher=""
    for c in plaintext:
        if c in substitute:
            cipher += substitution_table[c]
        else:
            cipher += c
    return cipher

解密需要创建一个反向置换表:

rev_substitution_table = {o: s for o, s in zip(original.split(), substitute.split())}

同样的解密过程:

def dec(ciphertext):
    plain=""
    for c in ciphertext:
        if c in rev_substitution_table:
            plain += rev_substitution_table[c]
        else:
            plain += c
    return plain

最终的代码:

Text = "IOJEDWNFGVEDIOUJWFN"

original = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z"
substitute = "H K W T X Y S G B P Q E J A Z M L N O F C I D V U R"

substitution_table = {o: s for o, s in zip(original.split(), substitute.split())}

def enc(plaintext):
    cipher=""
    for c in plaintext:
        if c in substitute:
            cipher += substitution_table[c]
        else:
            cipher += c
    return cipher

rev_substitution_table = {v : k for k,v in substitution_table.items()}

def dec(ciphertext):
    plain=""
    for c in ciphertext:
        if c in rev_substitution_table:
            plain += rev_substitution_table[c]
        else:
            plain += c
    return plain

print(enc(Text))
print(dec(enc(Text)))

image-20250309183552872


穷举法攻击移位密码

有效密钥空间只有25,穷举密钥即可实现

实际上就是爆破偏移量,解密方法不变

from Shift_Cipher import *

cipher = "MshnpzOlyl"


def brute_force(ciphertext):
    text = ""
    for shift in range(1, 25):
        text = dec(ciphertext, shift)
        print(text, shift)


brute_force(cipher)

image-20250309174730193

如图,明文为 FlagisHere,密钥(偏移量)为 7


词频分析攻击置换密码

统计密文的词频特征对照自然语言进行试译

使用 collections 库进行词频分析:

from collections import *
import re

text = "SIC GCBSPNA XPMHACQ JB GPYXSMEPNXIY JR SINS MF SPNBRQJSSJBE JBFMPQNSJMB FPMQ N XMJBS N SM N XMJBS H HY QCNBR MF N XMRRJHAY JBRCGZPC GINBBCA JB RZGI N VNY SINS SIC MPJEJBNA QCRRNEC GNB MBAY HC PCGMTCPCD HY SIC PJEISFZA PCGJXJCBSR SIC XNPSJGJXNBSR JB SIC SPNBRNGSJMB NPC NAJGC SIC MPJEJBNSMP MF SIC QCRRNEC HMH SIC PCGCJTCP NBD MRGNP N XMRRJHAC MXXMBCBS VIM VJRICR SM ENJB ZBNZSIMPJOCD GMBSPMA MF SIC QCRRNEC"


def word_frequency(text):
    words = re.findall(r'\b\w+\b', text)
    frequency = Counter(words)
    return frequency


frequency = word_frequency(text)
for word, count in frequency.items():
    print(f"{word}: {count}")

image-20250309180244062

SIC: 9
GCBSPNA: 1
XPMHACQ: 1
JB: 3
GPYXSMEPNXIY: 1
JR: 1
SINS: 2
MF: 4
SPNBRQJSSJBE: 1
JBFMPQNSJMB: 1
FPMQ: 1
N: 6
XMJBS: 2
SM: 2
H: 1
HY: 2
QCNBR: 1
XMRRJHAY: 1
JBRCGZPC: 1
GINBBCA: 1
RZGI: 1
VNY: 1
MPJEJBNA: 1
QCRRNEC: 3
GNB: 1
MBAY: 1
HC: 1
PCGMTCPCD: 1
PJEISFZA: 1
PCGJXJCBSR: 1
XNPSJGJXNBSR: 1
SPNBRNGSJMB: 1
NPC: 1
NAJGC: 1
MPJEJBNSMP: 1
HMH: 1
PCGCJTCP: 1
NBD: 1
MRGNP: 1
XMRRJHAC: 1
MXXMBCBS: 1
VIM: 1
VJRICR: 1
ENJB: 1
ZBNZSIMPJOCD: 1
GMBSPMA: 1

猜测 SIC 对应 THE,N 对应 A

然后进行替换,这里把替换的字符换成小写

from EN_Words_Frequency import *

text = "SIC GCBSPNA XPMHACQ JB GPYXSMEPNXIY JR SINS MF SPNBRQJSSJBE JBFMPQNSJMB FPMQ N XMJBS N SM N XMJBS H HY QCNBR MF N XMRRJHAY JBRCGZPC GINBBCA JB RZGI N VNY SINS SIC MPJEJBNA QCRRNEC GNB MBAY HC PCGMTCPCD HY SIC PJEISFZA PCGJXJCBSR SIC XNPSJGJXNBSR JB SIC SPNBRNGSJMB NPC NAJGC SIC MPJEJBNSMP MF SIC QCRRNEC HMH SIC PCGCJTCP NBD MRGNP N XMRRJHAC MXXMBCBS VIM VJRICR SM ENJB ZBNZSIMPJOCD GMBSPMA MF SIC QCRRNEC"

text = text.replace("S","t").replace("I","h").replace("C","e").replace("N","a")

print(word_frequency(text))
Counter({'the': 9, 'a': 6, 'MF': 4, 'JB': 3, 'QeRRaEe': 3, 'that': 2, 'XMJBt': 2, 'tM': 2, 'HY': 2, 'GeBtPaA': 1, 'XPMHAeQ': 1, 'GPYXtMEPaXhY': 1, 'JR': 1, 'tPaBRQJttJBE': 1, 'JBFMPQatJMB': 1, 'FPMQ': 1, 'H': 1, 'QeaBR': 1, 'XMRRJHAY': 1, 'JBReGZPe': 1, 'GhaBBeA': 1, 'RZGh': 1, 'VaY': 1, 'MPJEJBaA': 1, 'GaB': 1, 'MBAY': 1, 'He': 1, 'PeGMTePeD': 1, 'PJEhtFZA': 1, 'PeGJXJeBtR': 1, 'XaPtJGJXaBtR': 1, 'tPaBRaGtJMB': 1, 'aPe': 1, 'aAJGe': 1, 'MPJEJBatMP': 1, 'HMH': 1, 'PeGeJTeP': 1, 'aBD': 1, 'MRGaP': 1, 'XMRRJHAe': 1, 'MXXMBeBt': 1, 'VhM': 1, 'VJRheR': 1, 'EaJB': 1, 'ZBaZthMPJOeD': 1, 'GMBtPMA': 1})

观察发现 tM 对应 to,aPe 对应 are

按这种方式反复进行替换,直到语义正确为止

最终的替换表

from EN_Words_Frequency import *

text = "SIC GCBSPNA XPMHACQ JB GPYXSMEPNXIY JR SINS MF SPNBRQJSSJBE JBFMPQNSJMB FPMQ N XMJBS N SM N XMJBS H HY QCNBR MF N XMRRJHAY JBRCGZPC GINBBCA JB RZGI N VNY SINS SIC MPJEJBNA QCRRNEC GNB MBAY HC PCGMTCPCD HY SIC PJEISFZA PCGJXJCBSR SIC XNPSJGJXNBSR JB SIC SPNBRNGSJMB NPC NAJGC SIC MPJEJBNSMP MF SIC QCRRNEC HMH SIC PCGCJTCP NBD MRGNP N XMRRJHAC MXXMBCBS VIM VJRICR SM ENJB ZBNZSIMPJOCD GMBSPMA MF SIC QCRRNEC"

text = text.replace("S","t").replace("I","h").replace("C","e").replace("N","a")
text = text.replace("M","o").replace("P","r")
text = text.replace("F","f").replace("V","w")
text = text.replace("J","i").replace("R","s")
text = text.replace("Q","m").replace("E","g")
text = text.replace("X","p").replace("H","b").replace("A","l")
text = text.replace("Y","y").replace("B","n").replace("D","d")
text = text.replace("G","c")
text = text.replace("T","v")
text = text.replace("Z","u").replace("O","z")
print(word_frequency(text))
print(text)

明文:

the central problem in cryptography is that of transmitting information from a point a to a point b by means of a possibly insecure channel in such a way that the original message can only be recovered by the rightful recipients the participants in the transaction are alice the originator of the message bob the receiver and oscar a possible opponent who wishes to gain unauthorized control of the message

置换表

{
    'A': 'l',   'B': 'n',   'C': 'e',   'D': 'd',
    'E': 'g',   'F': 'f',   'G': 'c',   'H': 'b',
    'I': 'h',   'J': 'i',   
    'M': 'o',   'N': 'a',   'O': 'z',   'P': 'r',
    'Q': 'm',   'R': 's',   'S': 't',   'T': 'v',
    'V': 'w',   'X': 'p',
    'Y': 'y',   'Z': 'u'
}

密码学数学基础

欧几里得算法

辗转相除法,用于计算两个非负整数a,b的最大公约数

公式为 gcd(a,b) = gcd(b,a mod b),a >= b

该算法不断地将 b,a mod b 作为新的a,b进行迭代计算,直到当b为0时,此时的a就是最大公约数。

那么就有迭代算法实现:

def gcd(a, b):
    while b != 0:
        a, b = b, a % b
    return a

也可以用递归实现:

def gcd(a, b):
    if b == 0:
        return a
    return gcd(b, a % b)

于是完整代码:

def gcd(a, b):
    while b != 0:
        a, b = b, a % b
    return a

print(gcd(55,15))
print(gcd(349,269))

image-20250313143250470


扩展欧几里得算法

对于不完全为 0 的非负整数 a,b,gcd(a,b) 表示 a,b 的最大公约数,必然存在整数对 x,y ,使得 gcd(a,b) = ax+by

换句话说,如果 ax+by=m 有解,那么 m 一定是 gcd(a,b) 的若干倍

该算法核心思想是使用递归地进行辗转相除来求解 gcd (a, b) ,同时在递归的过程中,维护两个变量 x , y,满足 ax + by = gcd(a, b)

在上面 gcd 递归算法的基础上,尝试求出方程的一组 x 和 y:

设 a*x1 + b*y1 = gcd(a, b),b*x2 + (a mod b)*y2 = gcd(b, a mod b)

由欧几里得定理知:gcd(a, b) = gcd(b, a mod b)

a*x1 + b*y1 = b*x2 + (a mod b)*y2

又因为 a mod b = a - (a//b)*b

所以 a*x1 + b*y1 = b*x2 + (a - (a//b)*b)*y2

合并一下得 a*x1 + b*y1 = a*y2 + b*(x2-(a//b)*y2)

那么 x1 = y2y1 = x2-(a//b)*y2

到这里相邻两层递归的状态关系就显示出来了,由上一层的解 x2,y2,可以推出下一层 x1,y1 的状态

通过把这样的 x2, y2 不断代入,递归求解直至到达边界 gcd 为 0 即余数为 0 时,除数 a 就是最大公约数,此时得到方程的一组解:x=1,y=0,则 a*1+b*0=gcd(a,b)


由上述模板可以求出方程 ax+by=gcd(a,b) 的任意一组解 x0,y0

代码:

def ex_gcd(a, b):
    if b == 0:
        return 1, 0, a
    x, y, d = ex_gcd(b, a % b)
    return y, x - (a // b) * y, d


print(ex_gcd(1769, 550))
print(ex_gcd(269, 349))

image-20250314083748560


有限域

含有 2n 个元素的有限域,被称为 GF(2n)

image-20250314084115289

加减法:各项异或

image-20250315104427548

def gf_add(int a,int b):
    return (a ^ b)

乘法:左移?位,超过有限域需要取模回到有限域

image-20250315104522179

image-20250315111656599

image-20250315110744669

对于例4,加法运算拆成乘法运算,乘法式子内部左移运算

算法中利用位运算,这里视为 11 0111 与 00 1010

a:11 0111
b:00 1010 ^ 1 = 0
result: 0
a:110 1110
b:00 0101 ^ 1 = 1
result:110 1110
a:1101 1100
b:00 0010
result:110 1110
a:1 1011 1000 ^ 1 0001 1011 = 1010 0011
b:00 0001
result:1100 1101

对于 a * b,在GF(28)上遍历8次

a 每次左移,b 每次右移,如果 b 的当前位为 1 即存在某一项,就让结果加上 a 当前的值(此时 a 左移的位数就是两式相乘的结果),最高位溢出时模指定的不可约多项式,这里指定为0x11B(1 0001 1011)

def gf_multiply(a: int, b: int, modulus: int = 0x11B) -> int:
    result = 0
    for _ in range(8):  # GF(2^8) 需要处理 8 位
        if b & 1:       # 如果当前位是 1
            result ^= a  # 异或操作相当于加法
        a <<= 1         # 左移相当于乘以 x
        if a & 0x100:   # 即 1 0000 0000,检测第9位是否为1,即溢出
            a ^= modulus  # 模不可约多项式
        b >>= 1         # 处理下一位
    return result

if __name__ == "__main__":
    print(hex(gf_multiply(0x5e, 0x54)))
    print(hex(gf_multiply(0x21, 0x1b)))

image-20250315115347111


中国剩余定理

image-20250317000304899

过程:

  1. 计算所有模数的积 n

    M = 1
    for m in moduli:
        M *= m
  2. 对于第 i 个方程:

    • 计算 mi = n / ni

      Mi = M // m
    • 计算 mi 在模 ni 意义下的逆元 mi-1

      def mod_inverse(a, m):
          """计算模逆元 (a^{-1} mod m)"""
          x, y, d = ex_gcd(a, m)
          if d != 1:
              raise ValueError("模数不互质,无唯一解")
          return x % m
      
      Mi_inv = mod_inverse(Mi, m)
    • 计算 ci = mimi-1

      solution = 0
      for a, m in zip(remainders, moduli):
          Mi = M // m
          Mi_inv = mod_inverse(Mi, m)
          solution += a * Mi * Mi_inv
  3. 方程组在模 n 意义下的唯一解为

    image-20250317005602134

    return solution % M

最终代码:

from EX_Euclid import *


def mod_inverse(a, m):
    """计算模逆元 (a^{-1} mod m)"""
    x, y, d = ex_gcd(a, m)
    if d != 1:
        raise ValueError("模数不互质,无唯一解")
    return x % m


def crt(remainders, moduli):
    """中国剩余定理求解"""
    M = 1  # 所有模数的乘积
    for m in moduli:
        M *= m

    solution = 0
    for a, m in zip(remainders, moduli):
        Mi = M // m
        Mi_inv = mod_inverse(Mi, m)
        solution += a * Mi * Mi_inv

    return solution % M


if __name__ == "__main__":
    rem = (2, 3, 2)
    mod = (3, 5, 7)
    print(crt(rem, mod))
    rem = (1, 5, 4, 10)
    mod = (5, 6, 7, 11)
    print(crt(rem, mod))

image-20250317112946510


素性检验

import math

def is_prime(n: int) -> bool:
    """判断一个整数是否为素数"""
    if n <= 1:
        return "No"
    if n == 2:
        return "Yes"
    if n % 2 == 0:
        return "No"
    
    # 计算平方根并取整,减少循环次数
    sqrt_n = int(math.sqrt(n)) + 1
    
    # 检查3到平方根范围内的所有奇数
    for i in range(3, sqrt_n, 2):
        if n % i == 0:
            return "No"
    return "Yes"

if __name__ == "__main__":
    print(is_prime(58))
    print(is_prime(13))
    print(is_prime(8000001239))

image-20250317113650710


爱拉托斯散筛法

import math

def eratosthenes_sieve(n: int) -> list:
    """使用埃拉托斯特尼筛法求N以内的素数"""
    if n < 2:
        return []
    
    # 初始化筛子数组(0和1非素数)
    sieve = [True] * (n + 1)
    sieve[0] = sieve[1] = False
    
    # 仅需筛选到平方根即可
    sqrt_n = int(math.sqrt(n)) + 1
    
    for i in range(2, sqrt_n):
        if sieve[i]:
            # 标记i的倍数(从i²开始优化速度)
            sieve[i*i : n+1 : i] = [False] * len(sieve[i*i : n+1 : i])
    
    # 收集所有素数
    primes = [i for i, is_prime in enumerate(sieve) if is_prime]
    return primes


if __name__ == "__main__":
    test1 = eratosthenes_sieve(120)
    print(test1)
    test2 = eratosthenes_sieve(512)
    print(test2)

image-20250317114117728


分组密码

DES

import binascii
 
class DES():
    def __init__(self):
        # 初始化DES加密的参数
        self.ip = [
            58, 50, 42, 34, 26, 18, 10, 2, 60, 52, 44, 36, 28, 20, 12, 4,
            62, 54, 46, 38, 30, 22, 14, 6, 64, 56, 48, 40, 32, 24, 16, 8,
            57, 49, 41, 33, 25, 17, 9, 1, 59, 51, 43, 35, 27, 19, 11, 3,
            61, 53, 45, 37, 29, 21, 13, 5, 63, 55, 47, 39, 31, 23, 15, 7,
        ]  # ip置换
 
        self.ip1 = [
            40, 8, 48, 16, 56, 24, 64, 32, 39, 7, 47, 15, 55, 23, 63, 31,
            38, 6, 46, 14, 54, 22, 62, 30, 37, 5, 45, 13, 53, 21, 61, 29,
            36, 4, 44, 12, 52, 20, 60, 28, 35, 3, 43, 11, 51, 19, 59, 27,
            34, 2, 42, 10, 50, 18, 58, 26, 33, 1, 41, 9, 49, 17, 57, 25,
        ]  # 逆ip置换
        self.E = [
            32, 1, 2, 3, 4, 5,
            4, 5, 6, 7, 8, 9,
            8, 9, 10, 11, 12, 13,
            12, 13, 14, 15, 16, 17,
            16, 17, 18, 19, 20, 21,
            20, 21, 22, 23, 24, 25,
            24, 25, 26, 27, 28, 29,
            28, 29, 30, 31, 32, 1,
        ]  # E置换,将32位明文置换位48位
        self.P = [
            16, 7, 20, 21, 29, 12, 28, 17,
            1, 15, 23, 26, 5, 18, 31, 10,
            2, 8, 24, 14, 32, 27, 3, 9,
            19, 13, 30, 6, 22, 11, 4, 25,
        ]  # P置换,对经过S盒之后的数据再次进行置换
        self.K = ''
        self.k1 = [
            57, 49, 41, 33, 25, 17, 9,
            1, 58, 50, 42, 34, 26, 18,
            10, 2, 59, 51, 43, 35, 27,
            19, 11, 3, 60, 52, 44, 36,
            63, 55, 47, 39, 31, 23, 15,
            7, 62, 54, 46, 38, 30, 22,
            14, 6, 61, 53, 45, 37, 29,
            21, 13, 5, 28, 20, 12, 4,
        ]  # 密钥的K1初始置换
        self.k2 = [
            14, 17, 11, 24, 1, 5, 3, 28,
            15, 6, 21, 10, 23, 19, 12, 4,
            26, 8, 16, 7, 27, 20, 13, 2,
            41, 52, 31, 37, 47, 55, 30, 40,
            51, 45, 33, 48, 44, 49, 39, 56,
            34, 53, 46, 42, 50, 36, 29, 32,
        ]
 
        self.k0 = [1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, ]  # 秘钥循环移位的位数
 
        self.S = [
            [
                14, 4, 13, 1, 2, 15, 11, 8, 3, 10, 6, 12, 5, 9, 0, 7,
                0, 15, 7, 4, 14, 2, 13, 1, 10, 6, 12, 11, 9, 5, 3, 8,
                4, 1, 14, 8, 13, 6, 2, 11, 15, 12, 9, 7, 3, 10, 5, 0,
                15, 12, 8, 2, 4, 9, 1, 7, 5, 11, 3, 14, 10, 0, 6, 13
            ],
            [
                15, 1, 8, 14, 6, 11, 3, 4, 9, 7, 2, 13, 12, 0, 5, 10,
                3, 13, 4, 7, 15, 2, 8, 14, 12, 0, 1, 10, 6, 9, 11, 5,
                0, 14, 7, 11, 10, 4, 13, 1, 5, 8, 12, 6, 9, 3, 2, 15,
                13, 8, 10, 1, 3, 15, 4, 2, 11, 6, 7, 12, 0, 5, 14, 9
            ],
            [
                10, 0, 9, 14, 6, 3, 15, 5, 1, 13, 12, 7, 11, 4, 2, 8,
                13, 7, 0, 9, 3, 4, 6, 10, 2, 8, 5, 14, 12, 11, 15, 1,
                13, 6, 4, 9, 8, 15, 3, 0, 11, 1, 2, 12, 5, 10, 14, 7,
                1, 10, 13, 0, 6, 9, 8, 7, 4, 15, 14, 3, 11, 5, 2, 12
            ],
            [
                7, 13, 14, 3, 0, 6, 9, 10, 1, 2, 8, 5, 11, 12, 4, 15,
                13, 8, 11, 5, 6, 15, 0, 3, 4, 7, 2, 12, 1, 10, 14, 9,
                10, 6, 9, 0, 12, 11, 7, 13, 15, 1, 3, 14, 5, 2, 8, 4,
                3, 15, 0, 6, 10, 1, 13, 8, 9, 4, 5, 11, 12, 7, 2, 14
            ],
            [
                2, 12, 4, 1, 7, 10, 11, 6, 8, 5, 3, 15, 13, 0, 14, 9,
                14, 11, 2, 12, 4, 7, 13, 1, 5, 0, 15, 10, 3, 9, 8, 6,
                4, 2, 1, 11, 10, 13, 7, 8, 15, 9, 12, 5, 6, 3, 0, 14,
                11, 8, 12, 7, 1, 14, 2, 13, 6, 15, 0, 9, 10, 4, 5, 3
            ],
            [
                12, 1, 10, 15, 9, 2, 6, 8, 0, 13, 3, 4, 14, 7, 5, 11,
                10, 15, 4, 2, 7, 12, 9, 5, 6, 1, 13, 14, 0, 11, 3, 8,
                9, 14, 15, 5, 2, 8, 12, 3, 7, 0, 4, 10, 1, 13, 11, 6,
                4, 3, 2, 12, 9, 5, 15, 10, 11, 14, 1, 7, 6, 0, 8, 13
            ],
            [
                4, 11, 2, 14, 15, 0, 8, 13, 3, 12, 9, 7, 5, 10, 6, 1,
                13, 0, 11, 7, 4, 9, 1, 10, 14, 3, 5, 12, 2, 15, 8, 6,
                1, 4, 11, 13, 12, 3, 7, 14, 10, 15, 6, 8, 0, 5, 9, 2,
                6, 11, 13, 8, 1, 4, 10, 7, 9, 5, 0, 15, 14, 2, 3, 12
            ],
            [
                13, 2, 8, 4, 6, 15, 11, 1, 10, 9, 3, 14, 5, 0, 12, 7,
                1, 15, 13, 8, 10, 3, 7, 4, 12, 5, 6, 11, 0, 14, 9, 2,
                7, 11, 4, 1, 9, 12, 14, 2, 0, 6, 10, 13, 15, 3, 5, 8,
                2, 1, 14, 7, 4, 10, 8, 13, 15, 12, 9, 0, 3, 5, 6, 11
            ],
        ]  # 16进制表示S盒的数据,S盒是为了将48位转换为32位,有8个盒子
 
    def __substitution(self, table: str, self_table: list) -> str:
        """
        :param table: 需要进行置换的列表,是一个01字符串
        :param self_table: 置换表,在__init__中初始化了
        :return: 返回置换后的01字符串
        """
        sub_result = ""
        for i in self_table:
            sub_result += table[i - 1]
        return sub_result
 
    def str2bin(self, string: str) -> str:
        """
        将明文转为二进制字符串:
        :param string: 任意字符串
        :return:二进制字符串
        """
        plaintext_list = list(bytes(string, 'utf8'))  # 将字符串转成bytes类型,再转成list
        result = []  # 定义返回结果
        for num in plaintext_list:
            result.append(bin(num)[2:].zfill(8))  # 将列表的每个元素转成二进制字符串,8位宽度
        return "".join(result)
 
    def bin2str(self, binary: str) -> str:
        """
        二进制字符串转成字符串
        :param binary:
        :return:
        """
        list_bin = [binary[i:i + 8] for i in range(0, len(binary), 8)]  # 对二进制字符串进行切分,每8位为一组
        list_int = []
        for b in list_bin:
            list_int.append(int(b, 2))  # 对二进制转成int
        result = bytes(list_int).decode()  # 将列表转成bytes,在进行解码,得到字符串
        return result
 
    def __bin2int(self, binary: str) -> list:
        """
        由于加密之后的二进制无法直接转成字符,有不可见字符在,utf8可能无法解码,所以需要将二进制字符串每8位转成int型号列表,用于转成bytes再转hex
        :param binary: 二进制字符串
        :return: int型列表
        """
        list_bin = [binary[i:i + 8] for i in range(0, len(binary), 8)]  # 对二进制字符串进行切分,每8位为一组
        list_int = []
        for b in list_bin:
            list_int.append(int(b, 2))
        return list_int
 
    def __int2bin(self, list_int: list) -> str:
        result = []
        for num in list_int:
            result.append(bin(num)[2:].zfill(8))
        return ''.join(result)
 
    def __get_block_list(self, binary: str) -> list:
        """
        对明文二进制串进行切分,每64位为一块,DES加密以64位为一组进行加密的
        :type binary: 二进制串
        """
        len_binary = len(binary)
        if len_binary % 64 != 0:
            binary_block = binary + ("0" * (64 - (len_binary % 64)))
            return [binary_block[i:i + 64] for i in range(0, len(binary_block), 64)]
        else:
            return [binary[j:j + 64] for j in range(0, len(binary), 64)]
 
    def modify_secretkey(self):
        """
        修改默认密钥函数
        :return: None
        """
        newkey = input("输入密钥(长度为8):")
        if len(newkey) != 8:
            print("密钥长度不符合,请重新输入:")
            self.modify_secretkey()
        else:
            bin_key = self.str2bin(newkey)
            self.K = bin_key
            print("当前二进制形式密钥为:{}".format(self.K))
 
    def __f_funtion(self, right: str, key: str):
        """
        :param right: 明文二进制的字符串加密过程的右半段
        :param key: 当前轮数的密钥
        :return: 进行E扩展,与key异或操作,S盒操作后返回32位01字符串
        """
        # 对right进行E扩展
        e_result = self.__substitution(right, self.E)
        # 与key 进行异或操作
        xor_result = self.__xor_function(e_result, key)
        # 进入S盒子
        s_result = self.__s_box(xor_result)
        # 进行P置换
        p_result = self.__substitution(s_result, self.P)
        return p_result
 
    def __get_key_list(self):
        """
        :return: 返回加密过程中16轮的子密钥
        """
        key = self.__substitution(self.K, self.k1)
        left_key = key[0:28]
        right_key = key[28:56]
        keys = []
        for i in range(1, 17):
            move = self.k0[i - 1]
            move_left = left_key[move:28] + left_key[0:move]
            move_right = right_key[move:28] + right_key[0:move]
            left_key = move_left
            right_key = move_right
            move_key = left_key + right_key
            ki = self.__substitution(move_key, self.k2)
            keys.append(ki)
        return keys
 
    def __xor_function(self, str1: str, str2: str):
        """
        :param str1: 01字符串
        :param str2: 01字符串
        :return: 异或操作返回的结果
        """
        result = ""
        for i in range(0, len(str1)):
            if str1[i] == str2[i]:
                result += "0"
            else:
                result += "1"
        return result
 
    def __s_box(self, xor_result: str):
        """
        :param xor_result: 48位01字符串
        :return: 返回32位01字符串
        """
        result = ""
        for i in range(0, 8):
            # 将48位数据分为6组,循环进行
            block = xor_result[i * 6:(i + 1) * 6]
            line = int(block[0] + block[5], 2)
            colmn = int(block[1:5], 2)
            res = bin(self.S[i][line*16 + colmn])[2:]
            if len(res) < 4:
                res = '0' * (4 - len(res)) + res
            result += res
        return result
 
    def __iteration(self, bin_plaintext: str, key_list: list):
        """
        :param bin_plaintext: 01字符串,64位
        :param key_list: 密钥列表,共16个
        :return: 进行F函数以及和left异或操作之后的字符串
        """
        left = bin_plaintext[0:32]
        right = bin_plaintext[32:64]
        for i in range(0, 16):
            next_lift = right
            f_result = self.__f_funtion(right, key_list[i])
            next_right = self.__xor_function(left, f_result)
            left = next_lift
            right = next_right
        bin_plaintext_result = left + right
        return bin_plaintext_result[32:] + bin_plaintext_result[:32]
 
    def encode(self, plaintext):
        """
        :param plaintext: 明文字符串
        :return: 密文字符串
        """
        bin_plaintext = self.str2bin(plaintext)
        bin_plaintext_block = self.__get_block_list(bin_plaintext)
        ciphertext_bin_list = []
        key_list = self.__get_key_list()
        for block in bin_plaintext_block:
            # 初代ip置换
            sub_ip = self.__substitution(block, self.ip)
            ite_result = self.__iteration(sub_ip, key_list)
            # 逆ip置换
            sub_ip1 = self.__substitution(ite_result, self.ip1)
            ciphertext_bin_list.append(sub_ip1)
        ciphertext_bin = ''.join(ciphertext_bin_list)
        result = self.__bin2int(ciphertext_bin)
        return bytes(result).hex().upper()
 
    def decode(self, ciphertext):
        '''
        :param ciphertext: 密文字符串
        :return: 明文字符串
        '''
        b_ciphertext = binascii.a2b_hex(ciphertext)
        bin_ciphertext = self.__int2bin(list(b_ciphertext))
        bin_plaintext_list = []
        key_list = self.__get_key_list()
        key_list = key_list[::-1]
        bin_ciphertext_block = [bin_ciphertext[i:i + 64] for i in range(0, len(bin_ciphertext), 64)]
        for block in bin_ciphertext_block:
            sub_ip = self.__substitution(block, self.ip)
            ite = self.__iteration(sub_ip, key_list)
            sub_ip1 = self.__substitution(ite, self.ip1)
            bin_plaintext_list.append(sub_ip1)
        bin_plaintext = ''.join(bin_plaintext_list).replace('00000000', '')
        return self.bin2str(bin_plaintext)
 
    def main(self):
        select = input("Please selecting:\n1、Encryption\t 2、Decrpytion\t 3、Exit\nYour selecting:")
        if select == '1':
            plaintext = input("Input plaintext:")
            # print("Your plaintext is:{}".format(plaintext))
            ciphertext = self.encode(plaintext)
            print("The ciphertext is:{}".format(ciphertext))
        elif select == '2':
            plaintext = input("Input ciphertext:")
            # print("Your ciphertext is:{}".format(plaintext))
            plaintext = self.decode(plaintext)
            print("The plaintext is:{}".format(plaintext))
            # print(len(plaintext))
        elif select == '3':
            exit()
        else:
            input("Please selecting again!")
            self.main()
 
 
if __name__ == '__main__':
    mydes = DES()
    mydes.modify_secretkey()
    while True:
        mydes.main()
        print("")

image-20250323225835213


雪崩效应检验

from Crypto.Cipher import DES
import binascii

def count_bit_differences(str1, str2):
    """计算两个等长字符串之间的比特差异数"""
    xor_result = int(str1, 16) ^ int(str2, 16)
    return bin(xor_result).count('1')

def pad_text(text):
    """填充文本至8字节的倍数"""
    while len(text) % 8 != 0:
        text += b' '
    return text

def test_avalanche_plaintext(key, plaintext):
    """测试明文改变一位对密文的影响"""
    print("\n测试明文雪崩效应:")
    
    # 初始加密
    cipher = DES.new(key, DES.MODE_ECB)
    padded_text = pad_text(plaintext)
    original_cipher = cipher.encrypt(padded_text)
    original_hex = binascii.hexlify(original_cipher).decode()
    
    total_changes = 0
    # 逐位改变明文
    for i in range(len(plaintext) * 8):
        byte_pos = i // 8
        bit_pos = i % 8
        
        # 改变一位
        modified_text = bytearray(plaintext)
        modified_text[byte_pos] ^= (1 << bit_pos)
        modified_text = pad_text(bytes(modified_text))
        
        # 加密修改后的明文
        new_cipher = cipher.encrypt(modified_text)
        new_hex = binascii.hexlify(new_cipher).decode()
        
        # 计算差异位数
        diff_bits = count_bit_differences(original_hex, new_hex)
        total_changes += diff_bits
        print(f"改变第 {i+1} 位后,密文改变了 {diff_bits} 位")
    
    avg_changes = total_changes / (len(plaintext) * 8)
    print(f"平均每位改变影响了 {avg_changes:.2f} 位密文")

def test_avalanche_key(key, plaintext):
    """测试密钥改变一位对密文的影响"""
    print("\n测试密钥雪崩效应:")
    
    # 初始加密
    cipher = DES.new(key, DES.MODE_ECB)
    padded_text = pad_text(plaintext)
    original_cipher = cipher.encrypt(padded_text)
    original_hex = binascii.hexlify(original_cipher).decode()
    
    total_changes = 0
    # 逐位改变密钥
    for i in range(len(key) * 8):
        byte_pos = i // 8
        bit_pos = i % 8
        
        # 改变一位
        modified_key = bytearray(key)
        modified_key[byte_pos] ^= (1 << bit_pos)
        new_cipher = DES.new(bytes(modified_key), DES.MODE_ECB)
        
        # 加密
        new_ciphertext = new_cipher.encrypt(padded_text)
        new_hex = binascii.hexlify(new_ciphertext).decode()
        
        # 计算差异位数
        diff_bits = count_bit_differences(original_hex, new_hex)
        total_changes += diff_bits
        print(f"改变密钥第 {i+1} 位后,密文改变了 {diff_bits} 位")
    
    avg_changes = total_changes / (len(key) * 8)
    print(f"平均每位改变影响了 {avg_changes:.2f} 位密文")

def main():
    # 测试数据
    key = b'12345678'  # 8字节密钥
    plaintext = b'Hello123'  # 测试明文
    
    # 进行测试
    test_avalanche_plaintext(key, plaintext)
    test_avalanche_key(key, plaintext)

if __name__ == '__main__':
    main()

image-20250323230131639


文件加解密

from Crypto.Cipher import DES
from Crypto.Random import get_random_bytes
import os
import argparse

class DESFileCipher:
    def __init__(self, key=None):
        """初始化DES加密器,可以指定密钥或使用随机密钥"""
        if key is None:
            self.key = get_random_bytes(8)  # DES需要8字节密钥
        else:
            # 确保密钥长度为8字节
            self.key = key.ljust(8, b'\0')[:8]
    
    def pad_data(self, data):
        """对数据进行PKCS7填充"""
        pad_len = 8 - (len(data) % 8)
        padding = bytes([pad_len] * pad_len)
        return data + padding
    
    def unpad_data(self, data):
        """去除PKCS7填充"""
        pad_len = data[-1]
        return data[:-pad_len]
    
    def encrypt_file(self, input_file, output_file):
        """加密文件"""
        try:
            # 创建DES加密器
            cipher = DES.new(self.key, DES.MODE_ECB)
            
            # 读取并加密文件
            with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
                # 首先写入密钥(在实际应用中应该通过安全渠道传输)
                outfile.write(self.key)
                
                while True:
                    chunk = infile.read(1024)  # 每次读取1KB
                    if not chunk:
                        break
                    
                    # 对最后一块数据进行填充
                    if len(chunk) % 8 != 0:
                        chunk = self.pad_data(chunk)
                    
                    # 加密并写入
                    encrypted_chunk = cipher.encrypt(chunk)
                    outfile.write(encrypted_chunk)
            
            print(f"文件加密成功!加密后的文件保存为: {output_file}")
            
        except Exception as e:
            print(f"加密过程中出错: {str(e)}")
    
    def decrypt_file(self, input_file, output_file):
        """解密文件"""
        try:
            with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
                # 读取密钥(前8字节)
                key = infile.read(8)
                
                # 创建解密器
                cipher = DES.new(key, DES.MODE_ECB)
                
                # 读取并解密数据
                is_last_chunk = False
                while True:
                    chunk = infile.read(1024)  # 每次读取1KB
                    if not chunk:
                        break
                    
                    # 检查是否为最后一块数据
                    if len(chunk) < 1024:
                        is_last_chunk = True
                    
                    # 解密数据
                    decrypted_chunk = cipher.decrypt(chunk)
                    
                    # 如果是最后一块,去除填充
                    if is_last_chunk:
                        decrypted_chunk = self.unpad_data(decrypted_chunk)
                    
                    outfile.write(decrypted_chunk)
            
            print(f"文件解密成功!解密后的文件保存为: {output_file}")
            
        except Exception as e:
            print(f"解密过程中出错: {str(e)}")

def main():
    parser = argparse.ArgumentParser(description='DES文件加密解密工具')
    parser.add_argument('mode', choices=['encrypt', 'decrypt'], help='操作模式:encrypt(加密)或decrypt(解密)')
    parser.add_argument('input_file', help='输入文件路径')
    parser.add_argument('output_file', help='输出文件路径')
    parser.add_argument('--key', help='可选:指定密钥(8字节)', default=None)
    
    args = parser.parse_args()
    
    # 创建DES加密器实例
    if args.key:
        des_cipher = DESFileCipher(args.key.encode())
    else:
        des_cipher = DESFileCipher()
    
    # 执行加密或解密操作
    if args.mode == 'encrypt':
        des_cipher.encrypt_file(args.input_file, args.output_file)
    else:
        des_cipher.decrypt_file(args.input_file, args.output_file)

if __name__ == '__main__':
    main()

image-20250323230546877

image-20250323230657667

性能分析

使用伪随机数生成程序,生成不同大小的文件,然后采用DES加密解密,记录时间

from Crypto.Cipher import DES
from Crypto.Random import get_random_bytes
import os
import time
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

class FileGenerator:
    @staticmethod
    def generate_random_file(filename, size_mb):
        """生成指定大小的随机文件"""
        size_bytes = size_mb * 1024 * 1024  # 转换为字节
        with open(filename, 'wb') as f:
            remaining_bytes = size_bytes
            chunk_size = 1024 * 1024  # 每次写入1MB
            
            with tqdm(total=size_bytes, unit='B', unit_scale=True, 
                     desc=f'生成 {size_mb}MB 文件') as pbar:
                while remaining_bytes > 0:
                    current_chunk = min(chunk_size, remaining_bytes)
                    f.write(get_random_bytes(current_chunk))
                    remaining_bytes -= current_chunk
                    pbar.update(current_chunk)

class DESPerformanceTester:
    def __init__(self):
        self.key = get_random_bytes(8)  # 生成8字节密钥
        
    def pad_data(self, data):
        """PKCS7填充"""
        pad_len = 8 - (len(data) % 8)
        padding = bytes([pad_len] * pad_len)
        return data + padding
    
    def unpad_data(self, data):
        """去除PKCS7填充"""
        pad_len = data[-1]
        return data[:-pad_len]
    
    def encrypt_file(self, input_file, output_file):
        """加密文件并返回耗时"""
        start_time = time.time()
        
        cipher = DES.new(self.key, DES.MODE_ECB)
        file_size = os.path.getsize(input_file)
        
        with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
            with tqdm(total=file_size, unit='B', unit_scale=True, 
                     desc='加密进度') as pbar:
                while True:
                    chunk = infile.read(1024 * 1024)  # 读取1MB
                    if not chunk:
                        break
                    
                    if len(chunk) % 8 != 0:
                        chunk = self.pad_data(chunk)
                    
                    encrypted_chunk = cipher.encrypt(chunk)
                    outfile.write(encrypted_chunk)
                    pbar.update(len(chunk))
        
        return time.time() - start_time
    
    def decrypt_file(self, input_file, output_file):
        """解密文件并返回耗时"""
        start_time = time.time()
        
        cipher = DES.new(self.key, DES.MODE_ECB)
        file_size = os.path.getsize(input_file)
        
        with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
            is_last_chunk = False
            with tqdm(total=file_size, unit='B', unit_scale=True, 
                     desc='解密进度') as pbar:
                while True:
                    chunk = infile.read(1024 * 1024)  # 读取1MB
                    if not chunk:
                        break
                    
                    if len(chunk) < 1024 * 1024:
                        is_last_chunk = True
                    
                    decrypted_chunk = cipher.decrypt(chunk)
                    
                    if is_last_chunk:
                        decrypted_chunk = self.unpad_data(decrypted_chunk)
                    
                    outfile.write(decrypted_chunk)
                    pbar.update(len(chunk))
        
        return time.time() - start_time

def run_performance_test():
    # 测试文件大小(MB)
    file_sizes = [1, 5, 10, 50, 100]
    results = []
    
    generator = FileGenerator()
    tester = DESPerformanceTester()
    
    for size in file_sizes:
        print(f"\n测试文件大小: {size}MB")
        
        # 生成随机文件
        input_file = f"test_{size}mb.dat"
        encrypted_file = f"encrypted_{size}mb.dat"
        decrypted_file = f"decrypted_{size}mb.dat"
        
        generator.generate_random_file(input_file, size)
        
        # 测试加密
        encrypt_time = tester.encrypt_file(input_file, encrypted_file)
        print(f"加密耗时: {encrypt_time:.2f}秒")
        
        # 测试解密
        decrypt_time = tester.decrypt_file(encrypted_file, decrypted_file)
        print(f"解密耗时: {decrypt_time:.2f}秒")
        
        # 记录结果
        results.append({
            'size_mb': size,
            'encrypt_time': encrypt_time,
            'decrypt_time': decrypt_time
        })
        
        # 清理临时文件
        for f in [input_file, encrypted_file, decrypted_file]:
            if os.path.exists(f):
                os.remove(f)
    
    return results

def plot_results(results):
    """绘制性能测试结果图表"""
    df = pd.DataFrame(results)
    
    plt.figure(figsize=(10, 6))
    plt.plot(df['size_mb'], df['encrypt_time'], 'b-o', label='加密时间')
    plt.plot(df['size_mb'], df['decrypt_time'], 'r-o', label='解密时间')
    plt.xlabel('文件大小 (MB)')
    plt.ylabel('处理时间 (秒)')
    plt.title('DES加密解密性能测试')
    plt.legend()
    plt.grid(True)
    plt.savefig('des_performance.png')
    plt.close()
    
    # 打印详细结果表格
    print("\n性能测试结果:")
    print(df.to_string(index=False))

def main():
    print("开始DES加密性能测试...")
    results = run_performance_test()
    plot_results(results)
    print("\n测试完成!结果图表已保存为 'des_performance.png'")

if __name__ == '__main__':
    main()

image-20250323231110209


AES

文件加解密

from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
import base64
import os

class AESCipher:
    def __init__(self, key=None, mode=AES.MODE_CBC):
        """
        初始化AES加密器
        :param key: 密钥(16、24或32字节,对应AES-128、AES-192、AES-256)
        :param mode: 加密模式(CBC、ECB、CFB等)
        """
        self.block_size = AES.block_size
        if key is None:
            # 如果没有提供密钥,默认生成256位(32字节)的密钥
            self.key = get_random_bytes(32)
        else:
            # 确保密钥长度正确(16、24或32字节)
            if len(key) not in [16, 24, 32]:
                raise ValueError("密钥长度必须为16、24或32字节")
            self.key = key
        self.mode = mode

    def _create_cipher(self, iv=None):
        """
        创建加密器实例
        :param iv: 初始化向量(CBC、CFB模式需要)
        :return: Cipher对象
        """
        if self.mode in [AES.MODE_CBC, AES.MODE_CFB]:
            if iv is None:
                iv = get_random_bytes(self.block_size)
            return AES.new(self.key, self.mode, iv), iv
        else:
            return AES.new(self.key, self.mode), None

    def encrypt_string(self, plaintext):
        """
        加密字符串
        :param plaintext: 明文字符串
        :return: base64编码的密文
        """
        try:
            # 创建加密器
            cipher, iv = self._create_cipher()
            
            # 将字符串转换为字节并填充
            plaintext_bytes = plaintext.encode('utf-8')
            padded_data = pad(plaintext_bytes, self.block_size)
            
            # 加密
            ciphertext = cipher.encrypt(padded_data)
            
            # 组合IV和密文(如果使用了IV)
            if iv:
                final_data = iv + ciphertext
            else:
                final_data = ciphertext
            
            # 转换为base64编码
            return base64.b64encode(final_data).decode('utf-8')
            
        except Exception as e:
            raise Exception(f"加密失败: {str(e)}")

    def decrypt_string(self, ciphertext_b64):
        """
        解密字符串
        :param ciphertext_b64: base64编码的密文
        :return: 解密后的明文字符串
        """
        try:
            # 解码base64
            ciphertext = base64.b64decode(ciphertext_b64)
            
            # 提取IV(如果有)
            if self.mode in [AES.MODE_CBC, AES.MODE_CFB]:
                iv = ciphertext[:self.block_size]
                ciphertext = ciphertext[self.block_size:]
                cipher = AES.new(self.key, self.mode, iv)
            else:
                cipher = AES.new(self.key, self.mode)
            
            # 解密并去除填充
            decrypted_data = unpad(cipher.decrypt(ciphertext), self.block_size)
            
            # 转换为字符串
            return decrypted_data.decode('utf-8')
            
        except Exception as e:
            raise Exception(f"解密失败: {str(e)}")

    def encrypt_file(self, input_file, output_file):
        """
        加密文件
        :param input_file: 输入文件路径
        :param output_file: 输出文件路径
        """
        try:
            # 创建加密器
            cipher, iv = self._create_cipher()
            
            with open(input_file, 'rb') as infile, open(output_file, 'wb') as outfile:
                # 写入IV(如果有)
                if iv:
                    outfile.write(iv)
                
                # 分块读取并加密
                while True:
                    chunk = infile.read(1024 * 1024)  # 每次读取1MB
                    if not chunk:
                        break
                    
                    # 对最后一块进行填充
                    if len(chunk) % self.block_size != 0:
                        chunk = pad(chunk, self.block_size)
                    
                    # 加密并写入
                    encrypted_chunk = cipher.encrypt(chunk)
                    outfile.write(encrypted_chunk)
                    
            print(f"文件加密成功: {output_file}")
            
        except Exception as e:
            raise Exception(f"文件加密失败: {str(e)}")

    def decrypt_file(self, input_file, output_file):
        """
        解密文件
        :param input_file: 输入文件路径
        :param output_file: 输出文件路径
        """
        try:
            with open(input_file, 'rb') as infile:
                # 读取IV(如果需要)
                if self.mode in [AES.MODE_CBC, AES.MODE_CFB]:
                    iv = infile.read(self.block_size)
                    cipher = AES.new(self.key, self.mode, iv)
                else:
                    cipher = AES.new(self.key, self.mode)
                
                with open(output_file, 'wb') as outfile:
                    # 分块读取并解密
                    while True:
                        chunk = infile.read(1024 * 1024)  # 每次读取1MB
                        if not chunk:
                            break
                        
                        decrypted_chunk = cipher.decrypt(chunk)
                        
                        # 对最后一块去除填充
                        if not infile.peek(1):  # 如果是最后一块
                            decrypted_chunk = unpad(decrypted_chunk, self.block_size)
                        
                        outfile.write(decrypted_chunk)
                    
            print(f"文件解密成功: {output_file}")
            
        except Exception as e:
            raise Exception(f"文件解密失败: {str(e)}")

def main():
    # 示例用法
    try:
        # 创建AES加密器(使用CBC模式)
        key = get_random_bytes(32)  # 使用256位密钥
        aes = AESCipher(key, AES.MODE_CBC)
        
        # 字符串加解密示例
        print("\n=== 字符串加解密测试 ===")
        original_text = "这是一个测试文本!Hello, AES!"
        print(f"原始文本: {original_text}")
        
        # 加密
        encrypted_text = aes.encrypt_string(original_text)
        print(f"加密后: {encrypted_text}")
        
        # 解密
        decrypted_text = aes.decrypt_string(encrypted_text)
        print(f"解密后: {decrypted_text}")
        
        # 文件加解密示例
        print("\n=== 文件加解密测试 ===")
        # 创建测试文件
        with open("test.txt", "w", encoding="utf-8") as f:
            f.write("这是测试文件的内容\nTest file content!")
        
        # 加密文件
        aes.encrypt_file("test.txt", "encrypted.bin")
        
        # 解密文件
        aes.decrypt_file("encrypted.bin", "decrypted.txt")
        
        # 验证解密结果
        with open("test.txt", "r", encoding="utf-8") as f1, \
             open("decrypted.txt", "r", encoding="utf-8") as f2:
            original = f1.read()
            decrypted = f2.read()
            print("文件解密验证:", "成功" if original == decrypted else "失败")
        
        # 清理测试文件
        for file in ["test.txt", "encrypted.bin", "decrypted.txt"]:
            if os.path.exists(file):
                os.remove(file)
                
    except Exception as e:
        print(f"错误: {str(e)}")

if __name__ == "__main__":
    main()

image-20250331014744630


性能分析

from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
import os
import time
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns

class FileGenerator:
    @staticmethod
    def generate_random_file(filename, size_kb):
        """生成指定大小的随机文件"""
        size_bytes = size_kb * 1024
        with open(filename, 'wb') as f:
            f.write(get_random_bytes(size_bytes))
        return size_bytes

class AESCipher:
    def __init__(self, key=None):
        """初始化AES加密器,默认使用256位密钥"""
        self.key = key if key else get_random_bytes(32)  # 256位密钥
        self.block_size = AES.block_size

    def encrypt_file(self, input_file, output_file):
        """加密文件并返回处理时间"""
        try:
            # 记录开始时间
            start_time = time.time()
            
            # 创建加密器(CBC模式)
            iv = get_random_bytes(self.block_size)
            cipher = AES.new(self.key, AES.MODE_CBC, iv)
            
            # 读取文件并加密
            with open(input_file, 'rb') as in_file, open(output_file, 'wb') as out_file:
                # 写入IV
                out_file.write(iv)
                
                # 读取并加密数据
                data = in_file.read()
                padded_data = pad(data, self.block_size)
                encrypted_data = cipher.encrypt(padded_data)
                out_file.write(encrypted_data)
            
            # 计算处理时间
            end_time = time.time()
            return end_time - start_time
            
        except Exception as e:
            print(f"加密错误: {str(e)}")
            return None

    def decrypt_file(self, input_file, output_file):
        """解密文件并返回处理时间"""
        try:
            # 记录开始时间
            start_time = time.time()
            
            # 读取并解密数据
            with open(input_file, 'rb') as in_file, open(output_file, 'wb') as out_file:
                # 读取IV
                iv = in_file.read(self.block_size)
                cipher = AES.new(self.key, AES.MODE_CBC, iv)
                
                # 读取并解密数据
                encrypted_data = in_file.read()
                decrypted_data = cipher.decrypt(encrypted_data)
                unpadded_data = unpad(decrypted_data, self.block_size)
                out_file.write(unpadded_data)
            
            # 计算处理时间
            end_time = time.time()
            return end_time - start_time
            
        except Exception as e:
            print(f"解密错误: {str(e)}")
            return None

def test_aes_performance():
    # 测试文件大小(KB)
    file_sizes = [1, 50, 100, 1000, 2048, 5120]  # 1KB到5MB
    results = []
    
    # 创建加密器实例
    aes = AESCipher()
    file_gen = FileGenerator()
    
    print("\nAES加密性能测试")
    print("=" * 50)
    print(f"{'文件大小':>10} {'加密时间(s)':>12} {'解密时间(s)':>12} {'加密速度(MB/s)':>15} {'解密速度(MB/s)':>15}")
    print("-" * 50)
    
    for size in file_sizes:
        # 生成测试文件
        input_file = f"test_{size}kb.dat"
        encrypted_file = f"encrypted_{size}kb.bin"
        decrypted_file = f"decrypted_{size}kb.dat"
        
        try:
            # 生成随机文件
            file_size = file_gen.generate_random_file(input_file, size)
            
            # 测试加密
            encrypt_time = aes.encrypt_file(input_file, encrypted_file)
            
            # 测试解密
            decrypt_time = aes.decrypt_file(encrypted_file, decrypted_file)
            
            # 计算速度(MB/s)
            size_mb = size / 1024  # 转换为MB
            encrypt_speed = size_mb / encrypt_time if encrypt_time else 0
            decrypt_speed = size_mb / decrypt_time if decrypt_time else 0
            
            # 记录结果
            results.append({
                'size_kb': size,
                'encrypt_time': encrypt_time,
                'decrypt_time': decrypt_time,
                'encrypt_speed': encrypt_speed,
                'decrypt_speed': decrypt_speed
            })
            
            # 打印结果
            print(f"{size:>8}KB {encrypt_time:>12.4f} {decrypt_time:>12.4f} "
                  f"{encrypt_speed:>14.2f} {decrypt_speed:>14.2f}")
            
        finally:
            # 清理文件
            for f in [input_file, encrypted_file, decrypted_file]:
                if os.path.exists(f):
                    os.remove(f)
    
    return results

def plot_results(results):
    """绘制性能测试结果图表"""
    df = pd.DataFrame(results)
    
    # 设置图表风格
    sns.set_style("whitegrid")
    
    # 创建图表
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
    
    # 处理时间图表
    sns.lineplot(data=df, x='size_kb', y='encrypt_time', marker='o', label='加密时间', ax=ax1)
    sns.lineplot(data=df, x='size_kb', y='decrypt_time', marker='o', label='解密时间', ax=ax1)
    ax1.set_xlabel('文件大小 (KB)')
    ax1.set_ylabel('处理时间 (秒)')
    ax1.set_title('AES加密解密时间')
    
    # 处理速度图表
    sns.lineplot(data=df, x='size_kb', y='encrypt_speed', marker='o', label='加密速度', ax=ax2)
    sns.lineplot(data=df, x='size_kb', y='decrypt_speed', marker='o', label='解密速度', ax=ax2)
    ax2.set_xlabel('文件大小 (KB)')
    ax2.set_ylabel('处理速度 (MB/s)')
    ax2.set_title('AES加密解密速度')
    
    plt.tight_layout()
    plt.savefig('aes_performance.png')
    plt.close()
    
    # 打印详细结果表格
    print("\n性能测试结果:")
    print(df.to_string(index=False))
    
    # 保存结果到CSV文件
    df.to_csv('aes_performance_results.csv', index=False)

def verify_encryption(original_file, decrypted_file):
    """验证加密解密结果"""
    with open(original_file, 'rb') as f1, open(decrypted_file, 'rb') as f2:
        return f1.read() == f2.read()

def main():
    print("开始AES加密性能测试...")
    
    # 运行测试
    results = test_aes_performance()
    
    # 绘制结果
    plot_results(results)
    
    print("\n测试完成!")
    print("结果已保存到 'aes_performance.png' 和 'aes_performance_results.csv'")

if __name__ == '__main__':
    main()

image-20250331093348027