python 中的图像压缩
Image compression in python
对于我的图像压缩,我正在使用 pillow 库获取 rgb 中的每个像素(例如:(100, 0, 200))。使用 Huffman 编码我已经转换为二进制以减少位数。现在,我必须将位序列保存到文本或二进制文件中。压缩文件始终小于原始文件,但现在,我的 txt 文件比原始文件大。我该怎么办?
之后如何读取文件并解压缩。这是一个说明:
您的代码应读取图像文件,计算固定长度编码需要多少位
然后应用压缩算法来创建更小的编码——你需要实现
压缩,你不能使用压缩库。您应该输出以压缩格式存储图像所需的位数以及达到的压缩率。当谈到
要保存压缩图像,您将无法将其保存为标准图像格式,因为您将
已创建您自己的编码,但您可以将位序列保存到文本或二进制文件中。
您的代码还应该能够提示用户输入包含压缩文件的文本文件的文件名
位序列,然后将该文件解压缩为原始图像——您可以假设该文件
使用与您压缩的最后一个文件相同的压缩格式。因此,例如,如果您将 pacificat.bmp 压缩为存储在 pacificat.txt 中的一系列位,然后用户要求您解压缩 alt_encode.txt,您可以假设 alt_pacificat.txt 使用与 encode.txt 相同的压缩数据结构(例如,它可能是原始图像数据的子集)。
有许多库可以帮助您将格式化数据存储到来自 Python 的文件中。如果您研究这些选项并找到一种方法将您的压缩数据结构存储到一个文件中,这样用户就可以 select 位文件和数据结构文件并使用数据结构解压缩位文件
只需使用我当前的图像:flag2.bmp
这是我的代码
from PIL import Image
import sys, string
import copy
import time
codes = {}
def sortFreq (freqs) :
letters = freqs.keys()
tuples = []
for let in letters :
tuples.append((freqs[let],let))
tuples.sort()
return tuples
def buildTree(tuples) :
while len(tuples) > 1 :
leastTwo = tuple(tuples[0:2]) # get the 2 to combine
theRest = tuples[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
tuples = theRest + [(combFreq,leastTwo)] # add branch point to the end
tuples.sort() # sort it into place
return tuples[0] # Return the single tree inside the list
def trimTree (tree) :
# Trim the freq counters off, leaving just the letters
p = tree[1] # ignore freq count in [0]
if type(p) == type("") : return p # if just a leaf, return it
else : return (trimTree(p[0]), trimTree(p[1])) # trim left then right and recombine
def assignCodes(node, pat=''):
global codes
if type(node) == type("") :
codes[node] = pat # A leaf. set its code
else : #
assignCodes(node[0], pat+"0") # Branch point. Do the left branch
assignCodes(node[1], pat+"1") # then do the right branch.
start = time.time()
dictionary = {}
table = {}
image = Image.open('flag2.bmp')
#image.show()
width, height = image.size
px= image.load()
totalpixel = width*height
print("Total pixel: "+ str(totalpixel))
for x in range(width):
for y in range(height):
# print(px[x, y])
for i in range(3):
if dictionary.get(str(px[x, y][i])) is None:
dictionary[str(px[x, y][i])] = 1
else:
dictionary[str(px[x, y][i])] = dictionary[str(px[x, y][i])] +1
table = copy.deepcopy(dictionary)
def encode2 (str) :
global codes
output = ""
for ch in str : output += codes[ch]
return output
def decode (tree, str) :
output = ""
p = tree
for bit in str :
if bit == '0' : p = p[0] # Head up the left branch
else : p = p[1] # or up the right branch
if type(p) == type("") :
output += p # found a character. Add to output
p = tree # and restart for next character
return output
combination = len(dictionary)
for value in table:
table[value] = table[value] / (totalpixel * combination) * 100
print(table)
print(dictionary)
sortdic = sortFreq(dictionary)
tree = buildTree(sortdic)
print("tree")
print(tree)
trim = trimTree(tree)
print("trim")
print(trim)
print("assign 01")
assignCodes(trim)
print(codes)
empty_tuple = ()
f = open("answer.txt","w")
for x in range(width):
for y in range(height):
list = []
list.append(codes[str(px[x, y][0])])
list.append(codes[str(px[x, y][1])])
list.append(codes[str(px[x, y][2])])
print(str(px[x, y]) + ": " +str(list))
f.write(str(list))
print("decode test:", str(decode (trim, "1100")))
stop = time.time()
times = (stop - start) * 1000
print("Run time takes %d miliseconds" % times)
[flag2.bmp][1]
代码清理
让我们尝试稍微重构一下您的代码,利用 Python 标准库提供的算法,同时保持哈夫曼树计算和图像编码方法的精神。
计算符号计数
首先,我们可以将符号计数重构为函数,改写得更简洁:
- 使用
Image.getdata()
遍历图像中的所有像素
- 由于每个像素都由一个元组表示,因此使用
itertools.chain.from_iterable
获得强度的平面视图。
- 利用
collections.Counter
获得符号(强度计数)
另外,我们可以把它改成return一个(symbol, count)
的列表,按(count, symbol)
升序排列。为此,我们可以将它与您的 sortFreq(...)
函数的重写版本结合起来,利用:
- Python
sorted(...)
函数(允许我们定义排序依据的键),以及
- 元组 slicing 反转
(symbol, count)
元组进行排序
执行:
from collections import Counter
from itertools import chain
def count_symbols(image):
pixels = image.getdata()
values = chain.from_iterable(pixels)
counts = Counter(values).items()
return sorted(counts, key=lambda x:x[::-1])
构建树
这里只需要一个小改动 -- 因为我们已经对符号计数进行了排序,所以我们只需要反转元组即可让您现有的树构建算法发挥作用。我们可以使用 list comprehension 和元组切片一起使用来简洁地表达这一点。
执行:
def build_tree(counts) :
nodes = [entry[::-1] for entry in counts] # Reverse each (symbol,count) tuple
while len(nodes) > 1 :
leastTwo = tuple(nodes[0:2]) # get the 2 to combine
theRest = nodes[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
nodes = theRest + [(combFreq, leastTwo)] # add branch point to the end
nodes.sort() # sort it into place
return nodes[0] # Return the single tree inside the list
修剪树
同样,与您的原始实施相比只有两个小的变化:
- 更改测试以检查
tuple
(节点),使其独立于符号的表示方式。
- 摆脱不必要的
else
执行:
def trim_tree(tree) :
p = tree[1] # Ignore freq count in [0]
if type(p) is tuple: # Node, trim left then right and recombine
return (trim_tree(p[0]), trim_tree(p[1]))
return p # Leaf, just return it
分配代码
这里最重要的变化是消除了对全局 codes
变量的依赖。为了解决它,我们可以将实现分成两个函数,一个处理递归代码分配,一个包装器创建一个新的本地 codes
字典,在其上调度递归函数,和 returns输出。
让我们也将代码的表示形式从字符串转换为位列表(运行ge [0,1]
中的整数)——稍后将显示其用处。
再一次,我们将更改测试以检查 tuple
s(出于与修剪时相同的原因)。
执行:
def assign_codes_impl(codes, node, pat):
if type(node) == tuple:
assign_codes_impl(codes, node[0], pat + [0]) # Branch point. Do the left branch
assign_codes_impl(codes, node[1], pat + [1]) # then do the right branch.
else:
codes[node] = pat # A leaf. set its code
def assign_codes(tree):
codes = {}
assign_codes_impl(codes, tree, [])
return codes
编码
绕个小弯,说说数据的编码。
首先,让我们观察原始 RGB 像素由 3 个字节表示(每个颜色通道一个字节。即每个像素 24 位,并构成我们的基线。
现在,您当前的算法将第一个像素编码为以下 ASCII 字符串:
['000', '0010', '0011']
总共 23 个字节(或 184 位)。这比生的差很多。让我们来看看原因:
- 有两个空格,只是为了让人们更容易阅读。那些不携带任何信息。 (2字节)
- 三个代码中的每一个都由两个撇号分隔。由于代码仅由
0
和 1
组成,因此解析不需要撇号,因此也不携带任何信息。 (6字节)
- 每个代码都是一个prefix code,因此可以无歧义地解析,因此用于代码分隔的两个逗号也不需要。 (2字节)
- 我们知道每个像素有三个代码,所以我们也不需要大括号 (
[
,]
) 来分隔像素(原因同上)。 (2字节)
总的来说,每个像素有 12 个字节,根本不包含任何信息。剩余的 11 个字节(在这种特殊情况下)确实携带了一些信息……但是有多少?
请注意,输出字母表中仅有的两个可能符号是 0
和 1
。这意味着每个符号携带 1 位信息。由于您将每个符号存储为 ASCII 字符(一个字节),因此每 1 位信息使用 8 位。
放在一起,在这种特殊情况下,您使用 184 位来表示 11 位信息——比必要的多 16.7 倍,比仅以原始格式存储像素差 7.67 倍。
显然,使用编码数据的简单文本表示不会产生任何压缩。我们需要更好的方法。
比特流
从我们之前的分析中可以明显看出,为了有效地执行压缩(和解压缩),我们需要能够将输出(或输入)视为单个位流。标准 Python 库不提供执行此操作的直接解决方案——在最低 g运行 性下,我们一次只能读取或写入一个字节的文件。
由于我们要对可能包含多个位的值进行编码,因此有必要根据重要性对它们进行排序的方式进行解码。让我们将它们从最重要到最不重要排序。
位 I/O 实用程序
如前所述,我们将在 运行ge [0,1]
中将位序列表示为整数列表。让我们从编写一些简单的实用函数开始:
- 将整数转换为唯一表示它的最短位序列的函数(即至少 1 位,但除此之外没有前导零)。
- 将位序列转换为整数的函数。
- 零扩展(向最高有效位置添加零)位序列(以允许固定长度编码)的函数。
执行:
def to_binary_list(n):
"""Convert integer into a list of bits"""
return [n] if (n <= 1) else to_binary_list(n >> 1) + [n & 1]
def from_binary_list(bits):
"""Convert list of bits into an integer"""
result = 0
for bit in bits:
result = (result << 1) | bit
return result
def pad_bits(bits, n):
"""Prefix list of bits with enough zeros to reach n digits"""
assert(n >= len(bits))
return ([0] * (n - len(bits)) + bits)
用法示例:
>>> to_binary_list(14)
[1, 1, 1, 0]
>>> from_binary_list([1,1,1,0])
14
>>> pad_bits(to_binary_list(14),8)
[0, 0, 0, 0, 1, 1, 1, 0]
输出比特流
由于文件 I/O API 只允许我们保存整个字节,我们需要创建一个包装器 class 来缓冲写入内存中的流的位。
让我们提供写入单个位以及一系列位的方法。
每个写入命令(1 位或更多位)都会首先将这些位添加到缓冲区中。一旦缓冲区包含超过 8 位,则从前面移除 8 位组,将其转换为 运行ge [0-255] 中的整数并保存到输出文件中。这样做直到缓冲区包含少于 8 位。
最后,让我们提供一种方法来 "flush" 流 -- 当缓冲区非空,但不包含足够的位来构成一个完整的字节时,将零添加到最低有效位置,直到有8位,然后写字节。我们在关闭比特流时需要它(我们稍后会看到其他一些好处)。
执行:
class OutputBitStream(object):
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'wb')
self.bytes_written = 0
self.buffer = []
def write_bit(self, value):
self.write_bits([value])
def write_bits(self, values):
self.buffer += values
while len(self.buffer) >= 8:
self._save_byte()
def flush(self):
if len(self.buffer) > 0: # Add trailing zeros to complete a byte and write it
self.buffer += [0] * (8 - len(self.buffer))
self._save_byte()
assert(len(self.buffer) == 0)
def _save_byte(self):
bits = self.buffer[:8]
self.buffer[:] = self.buffer[8:]
byte_value = from_binary_list(bits)
self.file.write(bytes([byte_value]))
self.bytes_written += 1
def close(self):
self.flush()
self.file.close()
输入比特流
输入比特流遵循类似的主题。我们想一次读取一位或多位。为此,我们从文件中加载字节,将每个字节转换为位列表并将其添加到缓冲区,直到有足够的空间满足读取请求。
本例中的刷新命令清除缓冲区(确保它只包含零)。
执行:
class InputBitStream(object):
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'rb')
self.bytes_read = 0
self.buffer = []
def read_bit(self):
return self.read_bits(1)[0]
def read_bits(self, count):
while len(self.buffer) < count:
self._load_byte()
result = self.buffer[:count]
self.buffer[:] = self.buffer[count:]
return result
def flush(self):
assert(not any(self.buffer))
self.buffer[:] = []
def _load_byte(self):
value = ord(self.file.read(1))
self.buffer += pad_bits(to_binary_list(value), 8)
self.bytes_read += 1
def close(self):
self.file.close()
压缩格式
接下来我们需要定义压缩比特流的格式。解码图像需要三个基本信息块:
- 图像的形状(高度和宽度),假设它是 3 通道 RGB 图像。
- 在解码端重建霍夫曼码所需的信息
- 霍夫曼编码像素数据
让我们的压缩格式如下:
- 页眉
- 图像高度(16 位,无符号)
- 图像宽度(16 位,无符号)
- Huffman table(开始对齐到整个字节)
- 算法参见 this。
- 像素代码(开始对齐到整个字节)
width * height * 3
霍夫曼编码序列
压缩
执行:
from PIL import Image
def compressed_size(counts, codes):
header_size = 2 * 16 # height and width as 16 bit values
tree_size = len(counts) * (1 + 8) # Leafs: 1 bit flag, 8 bit symbol each
tree_size += len(counts) - 1 # Nodes: 1 bit flag each
if tree_size % 8 > 0: # Padding to next full byte
tree_size += 8 - (tree_size % 8)
# Sum for each symbol of count * code length
pixels_size = sum([count * len(codes[symbol]) for symbol, count in counts])
if pixels_size % 8 > 0: # Padding to next full byte
pixels_size += 8 - (pixels_size % 8)
return (header_size + tree_size + pixels_size) / 8
def encode_header(image, bitstream):
height_bits = pad_bits(to_binary_list(image.height), 16)
bitstream.write_bits(height_bits)
width_bits = pad_bits(to_binary_list(image.width), 16)
bitstream.write_bits(width_bits)
def encode_tree(tree, bitstream):
if type(tree) == tuple: # Note - write 0 and encode children
bitstream.write_bit(0)
encode_tree(tree[0], bitstream)
encode_tree(tree[1], bitstream)
else: # Leaf - write 1, followed by 8 bit symbol
bitstream.write_bit(1)
symbol_bits = pad_bits(to_binary_list(tree), 8)
bitstream.write_bits(symbol_bits)
def encode_pixels(image, codes, bitstream):
for pixel in image.getdata():
for value in pixel:
bitstream.write_bits(codes[value])
def compress_image(in_file_name, out_file_name):
print('Compressing "%s" -> "%s"' % (in_file_name, out_file_name))
image = Image.open(in_file_name)
print('Image shape: (height=%d, width=%d)' % (image.height, image.width))
size_raw = raw_size(image.height, image.width)
print('RAW image size: %d bytes' % size_raw)
counts = count_symbols(image)
print('Counts: %s' % counts)
tree = build_tree(counts)
print('Tree: %s' % str(tree))
trimmed_tree = trim_tree(tree)
print('Trimmed tree: %s' % str(trimmed_tree))
codes = assign_codes(trimmed_tree)
print('Codes: %s' % codes)
size_estimate = compressed_size(counts, codes)
print('Estimated size: %d bytes' % size_estimate)
print('Writing...')
stream = OutputBitStream(out_file_name)
print('* Header offset: %d' % stream.bytes_written)
encode_header(image, stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_written)
encode_tree(trimmed_tree, stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_written)
encode_pixels(image, codes, stream)
stream.close()
size_real = stream.bytes_written
print('Wrote %d bytes.' % size_real)
print('Estimate is %scorrect.' % ('' if size_estimate == size_real else 'in'))
print('Compression ratio: %0.2f' % (float(size_raw) / size_real))
减压
执行:
from PIL import Image
def decode_header(bitstream):
height = from_binary_list(bitstream.read_bits(16))
width = from_binary_list(bitstream.read_bits(16))
return (height, width)
#
def decode_tree(bitstream):
flag = bitstream.read_bits(1)[0]
if flag == 1: # Leaf, read and return symbol
return from_binary_list(bitstream.read_bits(8))
left = decode_tree(bitstream)
right = decode_tree(bitstream)
return (left, right)
def decode_value(tree, bitstream):
bit = bitstream.read_bits(1)[0]
node = tree[bit]
if type(node) == tuple:
return decode_value(node, bitstream)
return node
def decode_pixels(height, width, tree, bitstream):
pixels = bytearray()
for i in range(height * width * 3):
pixels.append(decode_value(tree, bitstream))
return Image.frombytes('RGB', (width, height), bytes(pixels))
def decompress_image(in_file_name, out_file_name):
print('Decompressing "%s" -> "%s"' % (in_file_name, out_file_name))
print('Reading...')
stream = InputBitStream(in_file_name)
print('* Header offset: %d' % stream.bytes_read)
height, width = decode_header(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_read)
trimmed_tree = decode_tree(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_read)
image = decode_pixels(height, width, trimmed_tree, stream)
stream.close()
print('Read %d bytes.' % stream.bytes_read)
print('Image size: (height=%d, width=%d)' % (height, width))
print('Trimmed tree: %s' % str(trimmed_tree))
image.save(out_file_name)
测试运行
from PIL import ImageChops
def raw_size(width, height):
header_size = 2 * 16 # height and width as 16 bit values
pixels_size = 3 * 8 * width * height # 3 channels, 8 bits per channel
return (header_size + pixels_size) / 8
def images_equal(file_name_a, file_name_b):
image_a = Image.open(file_name_a)
image_b = Image.open(file_name_b)
diff = ImageChops.difference(image_a, image_b)
return diff.getbbox() is None
if __name__ == '__main__':
start = time.time()
compress_image('flag.png', 'answer.txt')
print('-' * 40)
decompress_image('answer.txt', 'flag_out.png')
stop = time.time()
times = (stop - start) * 1000
print('-' * 40)
print('Run time takes %d miliseconds' % times)
print('Images equal = %s' % images_equal('flag.png', 'flag_out.png'))
我 运行 使用您提供的示例图像的脚本。
控制台输出:
Compressing "flag.png" -> "answer.txt"
Image shape: (height=18, width=23)
RAW image size: 1246 bytes
Counts: [(24, 90), (131, 90), (215, 90), (59, 324), (60, 324), (110, 324)]
Tree: (1242, ((594, ((270, ((90, 215), (180, ((90, 24), (90, 131))))), (324, 59))), (648, ((324, 60), (324, 110)))))
Trimmed tree: (((215, (24, 131)), 59), (60, 110))
Codes: {215: [0, 0, 0], 24: [0, 0, 1, 0], 131: [0, 0, 1, 1], 59: [0, 1], 60: [1, 0], 110: [1, 1]}
Estimated size: 379 bytes
Writing...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Wrote 379 bytes.
Estimate is correct.
Compression ratio: 3.29
----------------------------------------
Decompressing "answer.txt" -> "flag_out.png"
Reading...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Read 379 bytes.
Image size: (height=18, width=23)
Trimmed tree: (((215, (24, 131)), 59), (60, 110))
----------------------------------------
Run time takes 32 miliseconds
Images equal = True
潜在的改进
- Huffman table 每个颜色通道
- 调色板图像支持
- T运行信息过滤器(每通道增量编码,或更复杂的预测器)
- 处理重复的模型(RLE、LZ...)
- 规范霍夫曼 tables
对于我的图像压缩,我正在使用 pillow 库获取 rgb 中的每个像素(例如:(100, 0, 200))。使用 Huffman 编码我已经转换为二进制以减少位数。现在,我必须将位序列保存到文本或二进制文件中。压缩文件始终小于原始文件,但现在,我的 txt 文件比原始文件大。我该怎么办? 之后如何读取文件并解压缩。这是一个说明:
您的代码应读取图像文件,计算固定长度编码需要多少位 然后应用压缩算法来创建更小的编码——你需要实现 压缩,你不能使用压缩库。您应该输出以压缩格式存储图像所需的位数以及达到的压缩率。当谈到 要保存压缩图像,您将无法将其保存为标准图像格式,因为您将 已创建您自己的编码,但您可以将位序列保存到文本或二进制文件中。
您的代码还应该能够提示用户输入包含压缩文件的文本文件的文件名 位序列,然后将该文件解压缩为原始图像——您可以假设该文件 使用与您压缩的最后一个文件相同的压缩格式。因此,例如,如果您将 pacificat.bmp 压缩为存储在 pacificat.txt 中的一系列位,然后用户要求您解压缩 alt_encode.txt,您可以假设 alt_pacificat.txt 使用与 encode.txt 相同的压缩数据结构(例如,它可能是原始图像数据的子集)。
有许多库可以帮助您将格式化数据存储到来自 Python 的文件中。如果您研究这些选项并找到一种方法将您的压缩数据结构存储到一个文件中,这样用户就可以 select 位文件和数据结构文件并使用数据结构解压缩位文件
只需使用我当前的图像:flag2.bmp
这是我的代码
from PIL import Image
import sys, string
import copy
import time
codes = {}
def sortFreq (freqs) :
letters = freqs.keys()
tuples = []
for let in letters :
tuples.append((freqs[let],let))
tuples.sort()
return tuples
def buildTree(tuples) :
while len(tuples) > 1 :
leastTwo = tuple(tuples[0:2]) # get the 2 to combine
theRest = tuples[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
tuples = theRest + [(combFreq,leastTwo)] # add branch point to the end
tuples.sort() # sort it into place
return tuples[0] # Return the single tree inside the list
def trimTree (tree) :
# Trim the freq counters off, leaving just the letters
p = tree[1] # ignore freq count in [0]
if type(p) == type("") : return p # if just a leaf, return it
else : return (trimTree(p[0]), trimTree(p[1])) # trim left then right and recombine
def assignCodes(node, pat=''):
global codes
if type(node) == type("") :
codes[node] = pat # A leaf. set its code
else : #
assignCodes(node[0], pat+"0") # Branch point. Do the left branch
assignCodes(node[1], pat+"1") # then do the right branch.
start = time.time()
dictionary = {}
table = {}
image = Image.open('flag2.bmp')
#image.show()
width, height = image.size
px= image.load()
totalpixel = width*height
print("Total pixel: "+ str(totalpixel))
for x in range(width):
for y in range(height):
# print(px[x, y])
for i in range(3):
if dictionary.get(str(px[x, y][i])) is None:
dictionary[str(px[x, y][i])] = 1
else:
dictionary[str(px[x, y][i])] = dictionary[str(px[x, y][i])] +1
table = copy.deepcopy(dictionary)
def encode2 (str) :
global codes
output = ""
for ch in str : output += codes[ch]
return output
def decode (tree, str) :
output = ""
p = tree
for bit in str :
if bit == '0' : p = p[0] # Head up the left branch
else : p = p[1] # or up the right branch
if type(p) == type("") :
output += p # found a character. Add to output
p = tree # and restart for next character
return output
combination = len(dictionary)
for value in table:
table[value] = table[value] / (totalpixel * combination) * 100
print(table)
print(dictionary)
sortdic = sortFreq(dictionary)
tree = buildTree(sortdic)
print("tree")
print(tree)
trim = trimTree(tree)
print("trim")
print(trim)
print("assign 01")
assignCodes(trim)
print(codes)
empty_tuple = ()
f = open("answer.txt","w")
for x in range(width):
for y in range(height):
list = []
list.append(codes[str(px[x, y][0])])
list.append(codes[str(px[x, y][1])])
list.append(codes[str(px[x, y][2])])
print(str(px[x, y]) + ": " +str(list))
f.write(str(list))
print("decode test:", str(decode (trim, "1100")))
stop = time.time()
times = (stop - start) * 1000
print("Run time takes %d miliseconds" % times)
[flag2.bmp][1]
代码清理
让我们尝试稍微重构一下您的代码,利用 Python 标准库提供的算法,同时保持哈夫曼树计算和图像编码方法的精神。
计算符号计数
首先,我们可以将符号计数重构为函数,改写得更简洁:
- 使用
Image.getdata()
遍历图像中的所有像素 - 由于每个像素都由一个元组表示,因此使用
itertools.chain.from_iterable
获得强度的平面视图。 - 利用
collections.Counter
获得符号(强度计数)
另外,我们可以把它改成return一个(symbol, count)
的列表,按(count, symbol)
升序排列。为此,我们可以将它与您的 sortFreq(...)
函数的重写版本结合起来,利用:
- Python
sorted(...)
函数(允许我们定义排序依据的键),以及 - 元组 slicing 反转
(symbol, count)
元组进行排序
执行:
from collections import Counter
from itertools import chain
def count_symbols(image):
pixels = image.getdata()
values = chain.from_iterable(pixels)
counts = Counter(values).items()
return sorted(counts, key=lambda x:x[::-1])
构建树
这里只需要一个小改动 -- 因为我们已经对符号计数进行了排序,所以我们只需要反转元组即可让您现有的树构建算法发挥作用。我们可以使用 list comprehension 和元组切片一起使用来简洁地表达这一点。
执行:
def build_tree(counts) :
nodes = [entry[::-1] for entry in counts] # Reverse each (symbol,count) tuple
while len(nodes) > 1 :
leastTwo = tuple(nodes[0:2]) # get the 2 to combine
theRest = nodes[2:] # all the others
combFreq = leastTwo[0][0] + leastTwo[1][0] # the branch points freq
nodes = theRest + [(combFreq, leastTwo)] # add branch point to the end
nodes.sort() # sort it into place
return nodes[0] # Return the single tree inside the list
修剪树
同样,与您的原始实施相比只有两个小的变化:
- 更改测试以检查
tuple
(节点),使其独立于符号的表示方式。 - 摆脱不必要的
else
执行:
def trim_tree(tree) :
p = tree[1] # Ignore freq count in [0]
if type(p) is tuple: # Node, trim left then right and recombine
return (trim_tree(p[0]), trim_tree(p[1]))
return p # Leaf, just return it
分配代码
这里最重要的变化是消除了对全局 codes
变量的依赖。为了解决它,我们可以将实现分成两个函数,一个处理递归代码分配,一个包装器创建一个新的本地 codes
字典,在其上调度递归函数,和 returns输出。
让我们也将代码的表示形式从字符串转换为位列表(运行ge [0,1]
中的整数)——稍后将显示其用处。
再一次,我们将更改测试以检查 tuple
s(出于与修剪时相同的原因)。
执行:
def assign_codes_impl(codes, node, pat):
if type(node) == tuple:
assign_codes_impl(codes, node[0], pat + [0]) # Branch point. Do the left branch
assign_codes_impl(codes, node[1], pat + [1]) # then do the right branch.
else:
codes[node] = pat # A leaf. set its code
def assign_codes(tree):
codes = {}
assign_codes_impl(codes, tree, [])
return codes
编码
绕个小弯,说说数据的编码。
首先,让我们观察原始 RGB 像素由 3 个字节表示(每个颜色通道一个字节。即每个像素 24 位,并构成我们的基线。
现在,您当前的算法将第一个像素编码为以下 ASCII 字符串:
['000', '0010', '0011']
总共 23 个字节(或 184 位)。这比生的差很多。让我们来看看原因:
- 有两个空格,只是为了让人们更容易阅读。那些不携带任何信息。 (2字节)
- 三个代码中的每一个都由两个撇号分隔。由于代码仅由
0
和1
组成,因此解析不需要撇号,因此也不携带任何信息。 (6字节) - 每个代码都是一个prefix code,因此可以无歧义地解析,因此用于代码分隔的两个逗号也不需要。 (2字节)
- 我们知道每个像素有三个代码,所以我们也不需要大括号 (
[
,]
) 来分隔像素(原因同上)。 (2字节)
总的来说,每个像素有 12 个字节,根本不包含任何信息。剩余的 11 个字节(在这种特殊情况下)确实携带了一些信息……但是有多少?
请注意,输出字母表中仅有的两个可能符号是 0
和 1
。这意味着每个符号携带 1 位信息。由于您将每个符号存储为 ASCII 字符(一个字节),因此每 1 位信息使用 8 位。
放在一起,在这种特殊情况下,您使用 184 位来表示 11 位信息——比必要的多 16.7 倍,比仅以原始格式存储像素差 7.67 倍。
显然,使用编码数据的简单文本表示不会产生任何压缩。我们需要更好的方法。
比特流
从我们之前的分析中可以明显看出,为了有效地执行压缩(和解压缩),我们需要能够将输出(或输入)视为单个位流。标准 Python 库不提供执行此操作的直接解决方案——在最低 g运行 性下,我们一次只能读取或写入一个字节的文件。
由于我们要对可能包含多个位的值进行编码,因此有必要根据重要性对它们进行排序的方式进行解码。让我们将它们从最重要到最不重要排序。
位 I/O 实用程序
如前所述,我们将在 运行ge [0,1]
中将位序列表示为整数列表。让我们从编写一些简单的实用函数开始:
- 将整数转换为唯一表示它的最短位序列的函数(即至少 1 位,但除此之外没有前导零)。
- 将位序列转换为整数的函数。
- 零扩展(向最高有效位置添加零)位序列(以允许固定长度编码)的函数。
执行:
def to_binary_list(n):
"""Convert integer into a list of bits"""
return [n] if (n <= 1) else to_binary_list(n >> 1) + [n & 1]
def from_binary_list(bits):
"""Convert list of bits into an integer"""
result = 0
for bit in bits:
result = (result << 1) | bit
return result
def pad_bits(bits, n):
"""Prefix list of bits with enough zeros to reach n digits"""
assert(n >= len(bits))
return ([0] * (n - len(bits)) + bits)
用法示例:
>>> to_binary_list(14)
[1, 1, 1, 0]
>>> from_binary_list([1,1,1,0])
14
>>> pad_bits(to_binary_list(14),8)
[0, 0, 0, 0, 1, 1, 1, 0]
输出比特流
由于文件 I/O API 只允许我们保存整个字节,我们需要创建一个包装器 class 来缓冲写入内存中的流的位。
让我们提供写入单个位以及一系列位的方法。
每个写入命令(1 位或更多位)都会首先将这些位添加到缓冲区中。一旦缓冲区包含超过 8 位,则从前面移除 8 位组,将其转换为 运行ge [0-255] 中的整数并保存到输出文件中。这样做直到缓冲区包含少于 8 位。
最后,让我们提供一种方法来 "flush" 流 -- 当缓冲区非空,但不包含足够的位来构成一个完整的字节时,将零添加到最低有效位置,直到有8位,然后写字节。我们在关闭比特流时需要它(我们稍后会看到其他一些好处)。
执行:
class OutputBitStream(object):
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'wb')
self.bytes_written = 0
self.buffer = []
def write_bit(self, value):
self.write_bits([value])
def write_bits(self, values):
self.buffer += values
while len(self.buffer) >= 8:
self._save_byte()
def flush(self):
if len(self.buffer) > 0: # Add trailing zeros to complete a byte and write it
self.buffer += [0] * (8 - len(self.buffer))
self._save_byte()
assert(len(self.buffer) == 0)
def _save_byte(self):
bits = self.buffer[:8]
self.buffer[:] = self.buffer[8:]
byte_value = from_binary_list(bits)
self.file.write(bytes([byte_value]))
self.bytes_written += 1
def close(self):
self.flush()
self.file.close()
输入比特流
输入比特流遵循类似的主题。我们想一次读取一位或多位。为此,我们从文件中加载字节,将每个字节转换为位列表并将其添加到缓冲区,直到有足够的空间满足读取请求。
本例中的刷新命令清除缓冲区(确保它只包含零)。
执行:
class InputBitStream(object):
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'rb')
self.bytes_read = 0
self.buffer = []
def read_bit(self):
return self.read_bits(1)[0]
def read_bits(self, count):
while len(self.buffer) < count:
self._load_byte()
result = self.buffer[:count]
self.buffer[:] = self.buffer[count:]
return result
def flush(self):
assert(not any(self.buffer))
self.buffer[:] = []
def _load_byte(self):
value = ord(self.file.read(1))
self.buffer += pad_bits(to_binary_list(value), 8)
self.bytes_read += 1
def close(self):
self.file.close()
压缩格式
接下来我们需要定义压缩比特流的格式。解码图像需要三个基本信息块:
- 图像的形状(高度和宽度),假设它是 3 通道 RGB 图像。
- 在解码端重建霍夫曼码所需的信息
- 霍夫曼编码像素数据
让我们的压缩格式如下:
- 页眉
- 图像高度(16 位,无符号)
- 图像宽度(16 位,无符号)
- Huffman table(开始对齐到整个字节)
- 算法参见 this。
- 像素代码(开始对齐到整个字节)
width * height * 3
霍夫曼编码序列
压缩
执行:
from PIL import Image
def compressed_size(counts, codes):
header_size = 2 * 16 # height and width as 16 bit values
tree_size = len(counts) * (1 + 8) # Leafs: 1 bit flag, 8 bit symbol each
tree_size += len(counts) - 1 # Nodes: 1 bit flag each
if tree_size % 8 > 0: # Padding to next full byte
tree_size += 8 - (tree_size % 8)
# Sum for each symbol of count * code length
pixels_size = sum([count * len(codes[symbol]) for symbol, count in counts])
if pixels_size % 8 > 0: # Padding to next full byte
pixels_size += 8 - (pixels_size % 8)
return (header_size + tree_size + pixels_size) / 8
def encode_header(image, bitstream):
height_bits = pad_bits(to_binary_list(image.height), 16)
bitstream.write_bits(height_bits)
width_bits = pad_bits(to_binary_list(image.width), 16)
bitstream.write_bits(width_bits)
def encode_tree(tree, bitstream):
if type(tree) == tuple: # Note - write 0 and encode children
bitstream.write_bit(0)
encode_tree(tree[0], bitstream)
encode_tree(tree[1], bitstream)
else: # Leaf - write 1, followed by 8 bit symbol
bitstream.write_bit(1)
symbol_bits = pad_bits(to_binary_list(tree), 8)
bitstream.write_bits(symbol_bits)
def encode_pixels(image, codes, bitstream):
for pixel in image.getdata():
for value in pixel:
bitstream.write_bits(codes[value])
def compress_image(in_file_name, out_file_name):
print('Compressing "%s" -> "%s"' % (in_file_name, out_file_name))
image = Image.open(in_file_name)
print('Image shape: (height=%d, width=%d)' % (image.height, image.width))
size_raw = raw_size(image.height, image.width)
print('RAW image size: %d bytes' % size_raw)
counts = count_symbols(image)
print('Counts: %s' % counts)
tree = build_tree(counts)
print('Tree: %s' % str(tree))
trimmed_tree = trim_tree(tree)
print('Trimmed tree: %s' % str(trimmed_tree))
codes = assign_codes(trimmed_tree)
print('Codes: %s' % codes)
size_estimate = compressed_size(counts, codes)
print('Estimated size: %d bytes' % size_estimate)
print('Writing...')
stream = OutputBitStream(out_file_name)
print('* Header offset: %d' % stream.bytes_written)
encode_header(image, stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_written)
encode_tree(trimmed_tree, stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_written)
encode_pixels(image, codes, stream)
stream.close()
size_real = stream.bytes_written
print('Wrote %d bytes.' % size_real)
print('Estimate is %scorrect.' % ('' if size_estimate == size_real else 'in'))
print('Compression ratio: %0.2f' % (float(size_raw) / size_real))
减压
执行:
from PIL import Image
def decode_header(bitstream):
height = from_binary_list(bitstream.read_bits(16))
width = from_binary_list(bitstream.read_bits(16))
return (height, width)
#
def decode_tree(bitstream):
flag = bitstream.read_bits(1)[0]
if flag == 1: # Leaf, read and return symbol
return from_binary_list(bitstream.read_bits(8))
left = decode_tree(bitstream)
right = decode_tree(bitstream)
return (left, right)
def decode_value(tree, bitstream):
bit = bitstream.read_bits(1)[0]
node = tree[bit]
if type(node) == tuple:
return decode_value(node, bitstream)
return node
def decode_pixels(height, width, tree, bitstream):
pixels = bytearray()
for i in range(height * width * 3):
pixels.append(decode_value(tree, bitstream))
return Image.frombytes('RGB', (width, height), bytes(pixels))
def decompress_image(in_file_name, out_file_name):
print('Decompressing "%s" -> "%s"' % (in_file_name, out_file_name))
print('Reading...')
stream = InputBitStream(in_file_name)
print('* Header offset: %d' % stream.bytes_read)
height, width = decode_header(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Tree offset: %d' % stream.bytes_read)
trimmed_tree = decode_tree(stream)
stream.flush() # Ensure next chunk is byte-aligned
print('* Pixel offset: %d' % stream.bytes_read)
image = decode_pixels(height, width, trimmed_tree, stream)
stream.close()
print('Read %d bytes.' % stream.bytes_read)
print('Image size: (height=%d, width=%d)' % (height, width))
print('Trimmed tree: %s' % str(trimmed_tree))
image.save(out_file_name)
测试运行
from PIL import ImageChops
def raw_size(width, height):
header_size = 2 * 16 # height and width as 16 bit values
pixels_size = 3 * 8 * width * height # 3 channels, 8 bits per channel
return (header_size + pixels_size) / 8
def images_equal(file_name_a, file_name_b):
image_a = Image.open(file_name_a)
image_b = Image.open(file_name_b)
diff = ImageChops.difference(image_a, image_b)
return diff.getbbox() is None
if __name__ == '__main__':
start = time.time()
compress_image('flag.png', 'answer.txt')
print('-' * 40)
decompress_image('answer.txt', 'flag_out.png')
stop = time.time()
times = (stop - start) * 1000
print('-' * 40)
print('Run time takes %d miliseconds' % times)
print('Images equal = %s' % images_equal('flag.png', 'flag_out.png'))
我 运行 使用您提供的示例图像的脚本。
控制台输出:
Compressing "flag.png" -> "answer.txt"
Image shape: (height=18, width=23)
RAW image size: 1246 bytes
Counts: [(24, 90), (131, 90), (215, 90), (59, 324), (60, 324), (110, 324)]
Tree: (1242, ((594, ((270, ((90, 215), (180, ((90, 24), (90, 131))))), (324, 59))), (648, ((324, 60), (324, 110)))))
Trimmed tree: (((215, (24, 131)), 59), (60, 110))
Codes: {215: [0, 0, 0], 24: [0, 0, 1, 0], 131: [0, 0, 1, 1], 59: [0, 1], 60: [1, 0], 110: [1, 1]}
Estimated size: 379 bytes
Writing...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Wrote 379 bytes.
Estimate is correct.
Compression ratio: 3.29
----------------------------------------
Decompressing "answer.txt" -> "flag_out.png"
Reading...
* Header offset: 0
* Tree offset: 4
* Pixel offset: 12
Read 379 bytes.
Image size: (height=18, width=23)
Trimmed tree: (((215, (24, 131)), 59), (60, 110))
----------------------------------------
Run time takes 32 miliseconds
Images equal = True
潜在的改进
- Huffman table 每个颜色通道
- 调色板图像支持
- T运行信息过滤器(每通道增量编码,或更复杂的预测器)
- 处理重复的模型(RLE、LZ...)
- 规范霍夫曼 tables