音视频编解码技术原理:从理论到实践的深度解析

音视频编解码技术是数字多媒体领域的核心技术,它通过各种算法和技术手段实现音视频数据的高效压缩和还原。本文将深入探讨编解码技术的基本原理、关键算法和实际应用。

1. 编解码基础理论

1.1 信息论基础

编解码技术的理论基础来自于信息论,特别是香农的信息熵理论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
import math

def calculate_entropy(data):
"""计算信息熵"""
if len(data) == 0:
return 0

# 统计符号频率
counter = Counter(data)
total = len(data)

# 计算熵
entropy = 0
for count in counter.values():
probability = count / total
if probability > 0:
entropy -= probability * math.log2(probability)

return entropy

def huffman_encoding(data):
"""霍夫曼编码实现"""
if len(data) == 0:
return {}, ""

# 统计频率
frequency = Counter(data)

# 构建霍夫曼树
import heapq

class Node:
def __init__(self, char, freq):
self.char = char
self.freq = freq
self.left = None
self.right = None

def __lt__(self, other):
return self.freq < other.freq

# 创建叶子节点
heap = [Node(char, freq) for char, freq in frequency.items()]
heapq.heapify(heap)

# 构建霍夫曼树
while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)

merged = Node(None, left.freq + right.freq)
merged.left = left
merged.right = right

heapq.heappush(heap, merged)

# 生成编码表
def generate_codes(node, code="", codes={}):
if node:
if node.char is not None: # 叶子节点
codes[node.char] = code if code else "0"
else:
generate_codes(node.left, code + "0", codes)
generate_codes(node.right, code + "1", codes)
return codes

root = heap[0] if heap else None
codes = generate_codes(root)

# 编码数据
encoded = "".join(codes[char] for char in data)

return codes, encoded

# 示例:计算不同数据的熵
data_uniform = [0, 1, 2, 3] * 25 # 均匀分布
data_skewed = [0] * 80 + [1] * 15 + [2] * 4 + [3] * 1 # 偏斜分布

print(f"均匀分布数据熵: {calculate_entropy(data_uniform):.3f} bits")
print(f"偏斜分布数据熵: {calculate_entropy(data_skewed):.3f} bits")

# 霍夫曼编码示例
text = "ABRACADABRA"
codes, encoded = huffman_encoding(text)
print(f"\n原始文本: {text}")
print(f"编码表: {codes}")
print(f"编码结果: {encoded}")
print(f"压缩率: {len(encoded) / (len(text) * 8):.3f}")

1.2 冗余类型分析

音视频数据中存在多种类型的冗余:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def analyze_redundancy():
"""分析音视频数据中的冗余类型"""

# 1. 空间冗余(相邻像素相似性)
def spatial_redundancy_demo():
# 生成具有空间相关性的图像
import numpy as np

# 创建渐变图像
image = np.zeros((100, 100))
for i in range(100):
for j in range(100):
image[i, j] = (i + j) / 2

# 计算相邻像素差值
diff_horizontal = np.diff(image, axis=1)
diff_vertical = np.diff(image, axis=0)

print(f"水平方向平均差值: {np.mean(np.abs(diff_horizontal)):.3f}")
print(f"垂直方向平均差值: {np.mean(np.abs(diff_vertical)):.3f}")

return image, diff_horizontal, diff_vertical

# 2. 时间冗余(帧间相似性)
def temporal_redundancy_demo():
# 模拟视频帧序列
frames = []
base_frame = np.random.rand(50, 50) * 100

for i in range(10):
# 添加小幅变化
noise = np.random.rand(50, 50) * 5
frame = base_frame + noise
frames.append(frame)

# 计算帧间差异
frame_diffs = []
for i in range(1, len(frames)):
diff = np.mean(np.abs(frames[i] - frames[i-1]))
frame_diffs.append(diff)

print(f"平均帧间差异: {np.mean(frame_diffs):.3f}")
return frames, frame_diffs

# 3. 感知冗余(人眼/耳感知限制)
def perceptual_redundancy_demo():
# 人眼对不同频率的敏感度
frequencies = np.linspace(0, 50, 100) # 空间频率

# 简化的对比敏感度函数
def csf(f):
return 2.6 * (0.0192 + 0.114 * f) * np.exp(-(0.114 * f)**1.1)

sensitivity = csf(frequencies)

plt.figure(figsize=(10, 6))
plt.plot(frequencies, sensitivity)
plt.xlabel('空间频率 (cycles/degree)')
plt.ylabel('对比敏感度')
plt.title('人眼对比敏感度函数')
plt.grid(True)
plt.show()

return frequencies, sensitivity

print("=== 冗余类型分析 ===")
print("\n1. 空间冗余:")
spatial_redundancy_demo()

print("\n2. 时间冗余:")
temporal_redundancy_demo()

print("\n3. 感知冗余:")
perceptual_redundancy_demo()

# 运行冗余分析
analyze_redundancy()

2. 视频编码核心技术

2.1 帧内预测(Intra Prediction)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import numpy as np
import cv2

class IntraPrediction:
"""帧内预测实现"""

def __init__(self, block_size=4):
self.block_size = block_size

# H.264帧内预测模式
self.prediction_modes = {
0: 'Vertical',
1: 'Horizontal',
2: 'DC',
3: 'Diagonal_Down_Left',
4: 'Diagonal_Down_Right',
5: 'Vertical_Right',
6: 'Horizontal_Down',
7: 'Vertical_Left',
8: 'Horizontal_Up'
}

def get_reference_pixels(self, image, x, y):
"""获取参考像素"""
h, w = image.shape

# 上方参考像素
top_ref = np.zeros(self.block_size * 2 + 1)
if y > 0:
start_x = max(0, x - 1)
end_x = min(w, x + self.block_size)
top_ref[1:end_x-start_x+1] = image[y-1, start_x:end_x]

# 左侧参考像素
left_ref = np.zeros(self.block_size * 2 + 1)
if x > 0:
start_y = max(0, y - 1)
end_y = min(h, y + self.block_size)
left_ref[1:end_y-start_y+1] = image[start_y:end_y, x-1]

return top_ref, left_ref

def vertical_prediction(self, top_ref):
"""垂直预测"""
pred_block = np.zeros((self.block_size, self.block_size))
for i in range(self.block_size):
pred_block[:, i] = top_ref[i + 1]
return pred_block

def horizontal_prediction(self, left_ref):
"""水平预测"""
pred_block = np.zeros((self.block_size, self.block_size))
for i in range(self.block_size):
pred_block[i, :] = left_ref[i + 1]
return pred_block

def dc_prediction(self, top_ref, left_ref):
"""DC预测(平均值)"""
# 计算可用参考像素的平均值
available_pixels = []

# 添加上方像素
for i in range(1, self.block_size + 1):
if top_ref[i] != 0: # 假设0表示不可用
available_pixels.append(top_ref[i])

# 添加左侧像素
for i in range(1, self.block_size + 1):
if left_ref[i] != 0:
available_pixels.append(left_ref[i])

if available_pixels:
dc_value = np.mean(available_pixels)
else:
dc_value = 128 # 默认值

pred_block = np.full((self.block_size, self.block_size), dc_value)
return pred_block

def diagonal_down_left_prediction(self, top_ref):
"""对角线左下预测"""
pred_block = np.zeros((self.block_size, self.block_size))

for i in range(self.block_size):
for j in range(self.block_size):
if i + j < self.block_size - 1:
pred_block[i, j] = (top_ref[i + j + 1] +
2 * top_ref[i + j + 2] +
top_ref[i + j + 3] + 2) // 4
else:
pred_block[i, j] = top_ref[self.block_size * 2]

return pred_block

def predict_block(self, image, x, y, mode):
"""预测块"""
top_ref, left_ref = self.get_reference_pixels(image, x, y)

if mode == 0: # Vertical
return self.vertical_prediction(top_ref)
elif mode == 1: # Horizontal
return self.horizontal_prediction(left_ref)
elif mode == 2: # DC
return self.dc_prediction(top_ref, left_ref)
elif mode == 3: # Diagonal Down Left
return self.diagonal_down_left_prediction(top_ref)
else:
# 其他模式的简化实现
return self.dc_prediction(top_ref, left_ref)

def find_best_mode(self, image, x, y):
"""寻找最佳预测模式"""
h, w = image.shape

# 确保块在图像范围内
if x + self.block_size > w or y + self.block_size > h:
return 2, None, float('inf') # 返回DC模式

original_block = image[y:y+self.block_size, x:x+self.block_size]
best_mode = 2
best_prediction = None
best_cost = float('inf')

# 测试不同预测模式
for mode in [0, 1, 2, 3]: # 简化,只测试4种模式
try:
prediction = self.predict_block(image, x, y, mode)

# 计算预测误差(SAD - Sum of Absolute Differences)
cost = np.sum(np.abs(original_block - prediction))

if cost < best_cost:
best_cost = cost
best_mode = mode
best_prediction = prediction
except:
continue

return best_mode, best_prediction, best_cost

def encode_frame_intra(self, image):
"""帧内编码整帧"""
h, w = image.shape
encoded_frame = np.zeros_like(image, dtype=np.float32)
mode_map = np.zeros((h // self.block_size, w // self.block_size), dtype=np.int32)

total_cost = 0

for y in range(0, h, self.block_size):
for x in range(0, w, self.block_size):
mode, prediction, cost = self.find_best_mode(image, x, y)

if prediction is not None:
# 存储预测模式
mode_map[y // self.block_size, x // self.block_size] = mode

# 计算残差
original_block = image[y:y+self.block_size, x:x+self.block_size]
residual = original_block.astype(np.float32) - prediction
encoded_frame[y:y+self.block_size, x:x+self.block_size] = residual

total_cost += cost

return encoded_frame, mode_map, total_cost

# 使用示例
def demo_intra_prediction():
# 创建测试图像
test_image = np.zeros((64, 64), dtype=np.uint8)

# 添加一些结构
test_image[10:30, 10:50] = 100 # 矩形区域
test_image[35:55, 20:40] = 200 # 另一个矩形

# 添加渐变
for i in range(64):
test_image[i, :] += i * 2

# 创建预测器
predictor = IntraPrediction(block_size=8)

# 执行帧内预测
residual, mode_map, total_cost = predictor.encode_frame_intra(test_image)

print(f"总预测成本: {total_cost}")
print(f"预测模式分布:")
for mode in range(4):
count = np.sum(mode_map == mode)
mode_name = predictor.prediction_modes.get(mode, f'Mode_{mode}')
print(f" {mode_name}: {count} 块")

# 可视化结果
import matplotlib.pyplot as plt

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

axes[0, 0].imshow(test_image, cmap='gray')
axes[0, 0].set_title('原始图像')

axes[0, 1].imshow(mode_map, cmap='viridis')
axes[0, 1].set_title('预测模式分布')

axes[1, 0].imshow(residual, cmap='gray')
axes[1, 0].set_title('预测残差')

# 重构图像
reconstructed = np.zeros_like(test_image, dtype=np.float32)
h, w = test_image.shape

for y in range(0, h, predictor.block_size):
for x in range(0, w, predictor.block_size):
mode = mode_map[y // predictor.block_size, x // predictor.block_size]
prediction = predictor.predict_block(test_image, x, y, mode)

if prediction is not None:
residual_block = residual[y:y+predictor.block_size, x:x+predictor.block_size]
reconstructed[y:y+predictor.block_size, x:x+predictor.block_size] = prediction + residual_block

axes[1, 1].imshow(reconstructed, cmap='gray')
axes[1, 1].set_title('重构图像')

plt.tight_layout()
plt.show()

# 计算PSNR
mse = np.mean((test_image.astype(np.float32) - reconstructed) ** 2)
psnr = 20 * np.log10(255.0 / np.sqrt(mse)) if mse > 0 else float('inf')
print(f"重构PSNR: {psnr:.2f} dB")

# 运行演示
demo_intra_prediction()

2.2 帧间预测与运动估计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class MotionEstimation:
"""运动估计实现"""

def __init__(self, block_size=16, search_range=16):
self.block_size = block_size
self.search_range = search_range

def full_search(self, current_frame, reference_frame, x, y):
"""全搜索运动估计"""
h, w = current_frame.shape

# 确保当前块在帧内
if x + self.block_size > w or y + self.block_size > h:
return 0, 0, float('inf')

current_block = current_frame[y:y+self.block_size, x:x+self.block_size]

best_mv_x, best_mv_y = 0, 0
best_cost = float('inf')

# 在搜索范围内寻找最佳匹配
for mv_y in range(-self.search_range, self.search_range + 1):
for mv_x in range(-self.search_range, self.search_range + 1):
# 计算参考块位置
ref_x = x + mv_x
ref_y = y + mv_y

# 检查边界
if (ref_x < 0 or ref_y < 0 or
ref_x + self.block_size > w or
ref_y + self.block_size > h):
continue

# 获取参考块
ref_block = reference_frame[ref_y:ref_y+self.block_size,
ref_x:ref_x+self.block_size]

# 计算SAD(绝对差值和)
cost = np.sum(np.abs(current_block.astype(np.int32) -
ref_block.astype(np.int32)))

if cost < best_cost:
best_cost = cost
best_mv_x, best_mv_y = mv_x, mv_y

return best_mv_x, best_mv_y, best_cost

def three_step_search(self, current_frame, reference_frame, x, y):
"""三步搜索算法"""
h, w = current_frame.shape

if x + self.block_size > w or y + self.block_size > h:
return 0, 0, float('inf')

current_block = current_frame[y:y+self.block_size, x:x+self.block_size]

# 初始搜索中心
center_x, center_y = 0, 0
step_size = self.search_range // 2

while step_size >= 1:
best_cost = float('inf')
best_x, best_y = center_x, center_y

# 搜索9个点
for dy in [-step_size, 0, step_size]:
for dx in [-step_size, 0, step_size]:
mv_x = center_x + dx
mv_y = center_y + dy

# 检查搜索范围
if (abs(mv_x) > self.search_range or
abs(mv_y) > self.search_range):
continue

# 计算参考块位置
ref_x = x + mv_x
ref_y = y + mv_y

# 检查边界
if (ref_x < 0 or ref_y < 0 or
ref_x + self.block_size > w or
ref_y + self.block_size > h):
continue

# 获取参考块并计算代价
ref_block = reference_frame[ref_y:ref_y+self.block_size,
ref_x:ref_x+self.block_size]
cost = np.sum(np.abs(current_block.astype(np.int32) -
ref_block.astype(np.int32)))

if cost < best_cost:
best_cost = cost
best_x, best_y = mv_x, mv_y

# 更新搜索中心
center_x, center_y = best_x, best_y
step_size //= 2

return center_x, center_y, best_cost

def diamond_search(self, current_frame, reference_frame, x, y):
"""菱形搜索算法"""
h, w = current_frame.shape

if x + self.block_size > w or y + self.block_size > h:
return 0, 0, float('inf')

current_block = current_frame[y:y+self.block_size, x:x+self.block_size]

# 大菱形搜索模式
large_diamond = [(-2, 0), (0, -2), (2, 0), (0, 2)]
# 小菱形搜索模式
small_diamond = [(-1, 0), (0, -1), (1, 0), (0, 1)]

center_x, center_y = 0, 0

def calculate_cost(mv_x, mv_y):
ref_x = x + mv_x
ref_y = y + mv_y

if (ref_x < 0 or ref_y < 0 or
ref_x + self.block_size > w or
ref_y + self.block_size > h):
return float('inf')

ref_block = reference_frame[ref_y:ref_y+self.block_size,
ref_x:ref_x+self.block_size]
return np.sum(np.abs(current_block.astype(np.int32) -
ref_block.astype(np.int32)))

# 大菱形搜索阶段
while True:
best_cost = calculate_cost(center_x, center_y)
best_x, best_y = center_x, center_y

for dx, dy in large_diamond:
mv_x = center_x + dx
mv_y = center_y + dy

if (abs(mv_x) <= self.search_range and
abs(mv_y) <= self.search_range):
cost = calculate_cost(mv_x, mv_y)

if cost < best_cost:
best_cost = cost
best_x, best_y = mv_x, mv_y

# 如果中心点是最佳点,进入小菱形搜索
if best_x == center_x and best_y == center_y:
break

center_x, center_y = best_x, best_y

# 小菱形搜索阶段
for dx, dy in small_diamond:
mv_x = center_x + dx
mv_y = center_y + dy

if (abs(mv_x) <= self.search_range and
abs(mv_y) <= self.search_range):
cost = calculate_cost(mv_x, mv_y)

if cost < best_cost:
best_cost = cost
best_x, best_y = mv_x, mv_y

return best_x, best_y, best_cost

def estimate_motion(self, current_frame, reference_frame, algorithm='diamond'):
"""对整帧进行运动估计"""
h, w = current_frame.shape

# 运动向量场
mv_field_x = np.zeros((h // self.block_size, w // self.block_size))
mv_field_y = np.zeros((h // self.block_size, w // self.block_size))
cost_map = np.zeros((h // self.block_size, w // self.block_size))

for y in range(0, h, self.block_size):
for x in range(0, w, self.block_size):
if algorithm == 'full':
mv_x, mv_y, cost = self.full_search(current_frame, reference_frame, x, y)
elif algorithm == 'three_step':
mv_x, mv_y, cost = self.three_step_search(current_frame, reference_frame, x, y)
else: # diamond
mv_x, mv_y, cost = self.diamond_search(current_frame, reference_frame, x, y)

block_y = y // self.block_size
block_x = x // self.block_size

mv_field_x[block_y, block_x] = mv_x
mv_field_y[block_y, block_x] = mv_y
cost_map[block_y, block_x] = cost

return mv_field_x, mv_field_y, cost_map

def motion_compensation(self, reference_frame, mv_field_x, mv_field_y):
"""运动补偿"""
h, w = reference_frame.shape
compensated_frame = np.zeros_like(reference_frame)

for y in range(0, h, self.block_size):
for x in range(0, w, self.block_size):
block_y = y // self.block_size
block_x = x // self.block_size

if (block_y < mv_field_x.shape[0] and
block_x < mv_field_x.shape[1]):

mv_x = int(mv_field_x[block_y, block_x])
mv_y = int(mv_field_y[block_y, block_x])

# 计算参考块位置
ref_x = x + mv_x
ref_y = y + mv_y

# 检查边界
if (ref_x >= 0 and ref_y >= 0 and
ref_x + self.block_size <= w and
ref_y + self.block_size <= h):

# 复制参考块
ref_block = reference_frame[ref_y:ref_y+self.block_size,
ref_x:ref_x+self.block_size]
compensated_frame[y:y+self.block_size,
x:x+self.block_size] = ref_block

return compensated_frame

# 运动估计演示
def demo_motion_estimation():
# 创建测试序列
frame1 = np.zeros((128, 128), dtype=np.uint8)
frame1[40:80, 40:80] = 255 # 白色方块

# 第二帧:方块向右移动
frame2 = np.zeros((128, 128), dtype=np.uint8)
frame2[40:80, 50:90] = 255 # 方块移动了10个像素

# 添加噪声
noise1 = np.random.normal(0, 5, frame1.shape)
noise2 = np.random.normal(0, 5, frame2.shape)
frame1 = np.clip(frame1.astype(np.float32) + noise1, 0, 255).astype(np.uint8)
frame2 = np.clip(frame2.astype(np.float32) + noise2, 0, 255).astype(np.uint8)

# 创建运动估计器
me = MotionEstimation(block_size=16, search_range=16)

# 测试不同算法
algorithms = ['full', 'three_step', 'diamond']

fig, axes = plt.subplots(2, 4, figsize=(16, 8))

# 显示原始帧
axes[0, 0].imshow(frame1, cmap='gray')
axes[0, 0].set_title('参考帧')
axes[1, 0].imshow(frame2, cmap='gray')
axes[1, 0].set_title('当前帧')

for i, algorithm in enumerate(algorithms):
print(f"\n运行 {algorithm} 搜索...")

import time
start_time = time.time()

# 运动估计
mv_x, mv_y, cost_map = me.estimate_motion(frame2, frame1, algorithm)

# 运动补偿
compensated = me.motion_compensation(frame1, mv_x, mv_y)

end_time = time.time()

print(f"算法: {algorithm}")
print(f"执行时间: {end_time - start_time:.3f} 秒")
print(f"平均运动向量: ({np.mean(mv_x):.2f}, {np.mean(mv_y):.2f})")
print(f"平均匹配代价: {np.mean(cost_map):.2f}")

# 可视化运动向量场
axes[0, i+1].quiver(mv_x, mv_y, scale=50)
axes[0, i+1].set_title(f'{algorithm} - 运动向量')
axes[0, i+1].set_aspect('equal')

# 显示补偿后的帧
axes[1, i+1].imshow(compensated, cmap='gray')
axes[1, i+1].set_title(f'{algorithm} - 补偿帧')

plt.tight_layout()
plt.show()

# 计算预测误差
for algorithm in algorithms:
mv_x, mv_y, _ = me.estimate_motion(frame2, frame1, algorithm)
compensated = me.motion_compensation(frame1, mv_x, mv_y)

# 计算PSNR
mse = np.mean((frame2.astype(np.float32) - compensated.astype(np.float32)) ** 2)
psnr = 20 * np.log10(255.0 / np.sqrt(mse)) if mse > 0 else float('inf')

print(f"{algorithm} 算法 PSNR: {psnr:.2f} dB")

# 运行演示
demo_motion_estimation()

2.3 变换编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import numpy as np
from scipy.fftpack import dct, idct

class TransformCoding:
"""变换编码实现"""

def __init__(self, block_size=8):
self.block_size = block_size

# JPEG量化表
self.luminance_quant_table = np.array([
[16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99]
])

def dct2d(self, block):
"""二维DCT变换"""
return dct(dct(block.T, norm='ortho').T, norm='ortho')

def idct2d(self, block):
"""二维IDCT变换"""
return idct(idct(block.T, norm='ortho').T, norm='ortho')

def quantize(self, dct_block, quality=50):
"""量化"""
# 根据质量因子调整量化表
if quality < 50:
scale = 5000 / quality
else:
scale = 200 - 2 * quality

quant_table = np.clip((self.luminance_quant_table * scale + 50) / 100, 1, 255)

# 量化
quantized = np.round(dct_block / quant_table)

return quantized.astype(np.int32), quant_table

def dequantize(self, quantized_block, quant_table):
"""反量化"""
return quantized_block * quant_table

def zigzag_scan(self, block):
"""之字形扫描"""
# 之字形扫描顺序
zigzag_order = [
(0,0), (0,1), (1,0), (2,0), (1,1), (0,2), (0,3), (1,2),
(2,1), (3,0), (4,0), (3,1), (2,2), (1,3), (0,4), (0,5),
(1,4), (2,3), (3,2), (4,1), (5,0), (6,0), (5,1), (4,2),
(3,3), (2,4), (1,5), (0,6), (0,7), (1,6), (2,5), (3,4),
(4,3), (5,2), (6,1), (7,0), (7,1), (6,2), (5,3), (4,4),
(3,5), (2,6), (1,7), (2,7), (3,6), (4,5), (5,4), (6,3),
(7,2), (7,3), (6,4), (5,5), (4,6), (3,7), (4,7), (5,6),
(6,5), (7,4), (7,5), (6,6), (5,7), (6,7), (7,6), (7,7)
]

return [block[i, j] for i, j in zigzag_order]

def inverse_zigzag_scan(self, zigzag_data):
"""逆之字形扫描"""
block = np.zeros((8, 8))
zigzag_order = [
(0,0), (0,1), (1,0), (2,0), (1,1), (0,2), (0,3), (1,2),
(2,1), (3,0), (4,0), (3,1), (2,2), (1,3), (0,4), (0,5),
(1,4), (2,3), (3,2), (4,1), (5,0), (6,0), (5,1), (4,2),
(3,3), (2,4), (1,5), (0,6), (0,7), (1,6), (2,5), (3,4),
(4,3), (5,2), (6,1), (7,0), (7,1), (6,2), (5,3), (4,4),
(3,5), (2,6), (1,7), (2,7), (3,6), (4,5), (5,4), (6,3),
(7,2), (7,3), (6,4), (5,5), (4,6), (3,7), (4,7), (5,6),
(6,5), (7,4), (7,5), (6,6), (5,7), (6,7), (7,6), (7,7)
]

for idx, (i, j) in enumerate(zigzag_order):
if idx < len(zigzag_data):
block[i, j] = zigzag_data[idx]

return block

def run_length_encode(self, zigzag_data):
"""游程编码"""
encoded = []
i = 0

while i < len(zigzag_data):
if zigzag_data[i] == 0:
# 计算连续零的个数
zero_count = 0
while i < len(zigzag_data) and zigzag_data[i] == 0:
zero_count += 1
i += 1

if i < len(zigzag_data):
# (零的个数, 非零值)
encoded.append((zero_count, zigzag_data[i]))
i += 1
else:
# 结束符号
encoded.append((0, 0))
else:
# 非零值
encoded.append((0, zigzag_data[i]))
i += 1

return encoded

def run_length_decode(self, encoded_data):
"""游程解码"""
decoded = []

for zero_count, value in encoded_data:
if zero_count == 0 and value == 0:
# 结束符号
break

# 添加零
decoded.extend([0] * zero_count)

# 添加非零值
if not (zero_count == 0 and value == 0):
decoded.append(value)

# 填充到64个元素
while len(decoded) < 64:
decoded.append(0)

return decoded[:64]

def encode_block(self, block, quality=50):
"""编码单个块"""
# 1. DCT变换
dct_block = self.dct2d(block - 128) # 减去128进行中心化

# 2. 量化
quantized, quant_table = self.quantize(dct_block, quality)

# 3. 之字形扫描
zigzag = self.zigzag_scan(quantized)

# 4. 游程编码
encoded = self.run_length_encode(zigzag)

return encoded, quant_table

def decode_block(self, encoded_data, quant_table):
"""解码单个块"""
# 1. 游程解码
zigzag = self.run_length_decode(encoded_data)

# 2. 逆之字形扫描
quantized = self.inverse_zigzag_scan(zigzag)

# 3. 反量化
dct_block = self.dequantize(quantized, quant_table)

# 4. IDCT变换
reconstructed = self.idct2d(dct_block) + 128 # 加回128

return np.clip(reconstructed, 0, 255)

def encode_image(self, image, quality=50):
"""编码整幅图像"""
h, w = image.shape
encoded_blocks = []
quant_tables = []

for y in range(0, h, self.block_size):
for x in range(0, w, self.block_size):
# 提取块
block = image[y:y+self.block_size, x:x+self.block_size]

# 如果块不足8x8,进行填充
if block.shape != (self.block_size, self.block_size):
padded_block = np.zeros((self.block_size, self.block_size))
padded_block[:block.shape[0], :block.shape[1]] = block
block = padded_block

# 编码块
encoded, quant_table = self.encode_block(block, quality)
encoded_blocks.append(encoded)
quant_tables.append(quant_table)

return encoded_blocks, quant_tables, (h, w)

def decode_image(self, encoded_blocks, quant_tables, image_shape):
"""解码整幅图像"""
h, w = image_shape
reconstructed = np.zeros((h, w))

block_idx = 0
for y in range(0, h, self.block_size):
for x in range(0, w, self.block_size):
if block_idx < len(encoded_blocks):
# 解码块
decoded_block = self.decode_block(encoded_blocks[block_idx],
quant_tables[block_idx])

# 放置到图像中
end_y = min(y + self.block_size, h)
end_x = min(x + self.block_size, w)

reconstructed[y:end_y, x:end_x] = decoded_block[:end_y-y, :end_x-x]

block_idx += 1

return reconstructed.astype(np.uint8)

def calculate_compression_ratio(self, original_size, encoded_blocks):
"""计算压缩比"""
# 估算编码后的大小(简化计算)
total_symbols = sum(len(block) for block in encoded_blocks)

# 假设每个符号平均需要4位(实际会根据熵编码进一步压缩)
compressed_size = total_symbols * 4 / 8 # 转换为字节

compression_ratio = original_size / compressed_size
return compression_ratio, compressed_size

# 变换编码演示
def demo_transform_coding():
# 创建测试图像
test_image = np.zeros((64, 64), dtype=np.uint8)

# 添加不同频率的内容
for i in range(64):
for j in range(64):
# 低频成分
test_image[i, j] += 128 + 50 * np.sin(2 * np.pi * i / 32)
# 高频成分
test_image[i, j] += 20 * np.sin(2 * np.pi * i / 4) * np.sin(2 * np.pi * j / 4)

test_image = np.clip(test_image, 0, 255).astype(np.uint8)

# 创建变换编码器
tc = TransformCoding()

# 测试不同质量级别
qualities = [10, 30, 50, 70, 90]

fig, axes = plt.subplots(2, len(qualities) + 1, figsize=(15, 6))

# 显示原始图像
axes[0, 0].imshow(test_image, cmap='gray')
axes[0, 0].set_title('原始图像')
axes[1, 0].axis('off')

original_size = test_image.size

for i, quality in enumerate(qualities):
print(f"\n质量级别: {quality}")

# 编码
encoded_blocks, quant_tables, shape = tc.encode_image(test_image, quality)

# 解码
reconstructed = tc.decode_image(encoded_blocks, quant_tables, shape)

# 计算压缩比
compression_ratio, compressed_size = tc.calculate_compression_ratio(original_size, encoded_blocks)

# 计算PSNR
mse = np.mean((test_image.astype(np.float32) - reconstructed.astype(np.float32)) ** 2)
psnr = 20 * np.log10(255.0 / np.sqrt(mse)) if mse > 0 else float('inf')

print(f"压缩比: {compression_ratio:.2f}:1")
print(f"PSNR: {psnr:.2f} dB")
print(f"压缩后大小: {compressed_size:.0f} 字节")

# 显示重构图像
axes[0, i+1].imshow(reconstructed, cmap='gray')
axes[0, i+1].set_title(f'Q={quality}\nPSNR={psnr:.1f}dB')

# 显示误差图像
error_image = np.abs(test_image.astype(np.float32) - reconstructed.astype(np.float32))
axes[1, i+1].imshow(error_image, cmap='hot')
axes[1, i+1].set_title(f'误差图像\n压缩比={compression_ratio:.1f}:1')

plt.tight_layout()
plt.show()

# DCT系数分析
sample_block = test_image[:8, :8]
dct_coeffs = tc.dct2d(sample_block - 128)

fig, axes = plt.subplots(1, 3, figsize=(12, 4))

axes[0].imshow(sample_block, cmap='gray')
axes[0].set_title('原始块')

im1 = axes[1].imshow(dct_coeffs, cmap='RdBu_r')
axes[1].set_title('DCT系数')
plt.colorbar(im1, ax=axes[1])

# 显示量化后的系数
quantized, _ = tc.quantize(dct_coeffs, quality=50)
im2 = axes[2].imshow(quantized, cmap='RdBu_r')
axes[2].set_title('量化后系数')
plt.colorbar(im2, ax=axes[2])

plt.tight_layout()
plt.show()

# 运行演示
demo_transform_coding()

3. 音频编码核心技术

3.1 感知音频编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import stft, istft
from scipy.fftpack import fft, ifft

class PsychoacousticModel:
"""心理声学模型"""

def __init__(self, sample_rate=44100, frame_size=1024):
self.sample_rate = sample_rate
self.frame_size = frame_size
self.freq_bins = frame_size // 2 + 1

# Bark频率刻度
self.bark_boundaries = self._calculate_bark_boundaries()

# 绝对听觉阈值
self.absolute_threshold = self._calculate_absolute_threshold()

def _calculate_bark_boundaries(self):
"""计算Bark频率边界"""
# Bark刻度的24个临界频带
bark_freqs = [0, 100, 200, 300, 400, 510, 630, 770, 920, 1080,
1270, 1480, 1720, 2000, 2320, 2700, 3150, 3700,
4400, 5300, 6400, 7700, 9500, 12000, 15500]

# 转换为FFT bin索引
boundaries = []
for freq in bark_freqs:
bin_idx = int(freq * self.frame_size / self.sample_rate)
boundaries.append(min(bin_idx, self.freq_bins - 1))

return boundaries

def _calculate_absolute_threshold(self):
"""计算绝对听觉阈值"""
freqs = np.linspace(0, self.sample_rate/2, self.freq_bins)

# 绝对听觉阈值公式(简化版)
threshold_db = 3.64 * (freqs/1000)**(-0.8) - 6.5 * np.exp(-0.6 * (freqs/1000 - 3.3)**2) + 1e-3 * (freqs/1000)**4

# 转换为线性刻度
threshold_linear = 10**(threshold_db / 20)

return threshold_linear

def calculate_masking_threshold(self, spectrum):
"""计算掩蔽阈值"""
# 将频谱分组到Bark频带
bark_spectrum = self._group_to_bark_bands(spectrum)

# 计算每个频带的掩蔽阈值
masking_threshold = np.zeros(self.freq_bins)

for i in range(len(self.bark_boundaries) - 1):
start_bin = self.bark_boundaries[i]
end_bin = self.bark_boundaries[i + 1]

# 频带能量
band_energy = bark_spectrum[i]

# 计算掩蔽函数
masking_curve = self._calculate_masking_curve(i, band_energy)

# 应用到对应的频率bin
for j in range(start_bin, end_bin):
if j < len(masking_threshold):
masking_threshold[j] = max(masking_threshold[j], masking_curve[j])

# 与绝对听觉阈值取最大值
final_threshold = np.maximum(masking_threshold, self.absolute_threshold)

return final_threshold

def _group_to_bark_bands(self, spectrum):
"""将频谱分组到Bark频带"""
bark_spectrum = []

for i in range(len(self.bark_boundaries) - 1):
start_bin = self.bark_boundaries[i]
end_bin = self.bark_boundaries[i + 1]

# 计算频带内的总能量
band_energy = np.sum(spectrum[start_bin:end_bin]**2)
bark_spectrum.append(band_energy)

return bark_spectrum

def _calculate_masking_curve(self, band_idx, band_energy):
"""计算掩蔽曲线"""
masking_curve = np.zeros(self.freq_bins)

if band_energy <= 0:
return masking_curve

# 掩蔽器的中心频率
center_bin = (self.bark_boundaries[band_idx] + self.bark_boundaries[band_idx + 1]) // 2

# 掩蔽扩散函数(简化)
for i in range(self.freq_bins):
# 计算频率距离(以Bark为单位)
freq_distance = abs(i - center_bin) / (self.freq_bins / 24) # 近似转换

# 掩蔽函数
if freq_distance <= 1:
# 同一临界频带内
masking_db = -6.025 - 0.275 * freq_distance
elif freq_distance <= 8:
# 向上掩蔽
masking_db = -17 - 0.15 * freq_distance
else:
# 远距离掩蔽
masking_db = -40

# 转换为线性刻度并应用掩蔽器能量
masking_curve[i] = band_energy * 10**(masking_db / 10)

return masking_curve

def calculate_smr(self, signal_spectrum, noise_spectrum):
"""计算信噪掩蔽比(SMR)"""
masking_threshold = self.calculate_masking_threshold(signal_spectrum)

# 计算每个频率bin的SMR
smr = np.zeros(self.freq_bins)

for i in range(self.freq_bins):
signal_power = signal_spectrum[i]**2
noise_power = noise_spectrum[i]**2
threshold_power = masking_threshold[i]**2

# SMR = 信号功率 / max(噪声功率, 掩蔽阈值功率)
effective_noise = max(noise_power, threshold_power)

if effective_noise > 0:
smr[i] = 10 * np.log10(signal_power / effective_noise)
else:
smr[i] = 100 # 很高的SMR

return smr

class AudioEncoder:
"""音频编码器"""

def __init__(self, sample_rate=44100, frame_size=1024, bit_rate=128000):
self.sample_rate = sample_rate
self.frame_size = frame_size
self.bit_rate = bit_rate
self.psychoacoustic_model = PsychoacousticModel(sample_rate, frame_size)

# 子带滤波器组
self.num_subbands = 32
self.subband_filters = self._create_subband_filters()

def _create_subband_filters(self):
"""创建子带滤波器组"""
filters = []

for k in range(self.num_subbands):
# 创建第k个子带的滤波器系数
filter_coeffs = []

for n in range(64): # 滤波器长度
if n == 0:
h_n = 1.0 / self.num_subbands
else:
h_n = (2.0 / self.num_subbands) * np.cos(
np.pi * (2*k + 1) * n / (2 * self.num_subbands)
)

# 应用窗函数
window = 0.5 - 0.5 * np.cos(2 * np.pi * n / 63)
filter_coeffs.append(h_n * window)

filters.append(filter_coeffs)

return filters

def subband_analysis(self, audio_frame):
"""子带分析"""
subband_samples = np.zeros(self.num_subbands)

# 应用子带滤波器
for k in range(self.num_subbands):
# 卷积运算
filtered = np.convolve(audio_frame, self.subband_filters[k], mode='valid')

# 下采样
if len(filtered) > 0:
subband_samples[k] = filtered[len(filtered)//2]

return subband_samples

def bit_allocation(self, subband_samples, masking_threshold):
"""比特分配"""
# 计算每个子带的信噪比
snr = np.zeros(self.num_subbands)

for k in range(self.num_subbands):
signal_power = subband_samples[k]**2

# 将掩蔽阈值映射到子带
freq_start = k * self.sample_rate // (2 * self.num_subbands)
freq_end = (k + 1) * self.sample_rate // (2 * self.num_subbands)

bin_start = int(freq_start * self.frame_size / self.sample_rate)
bin_end = int(freq_end * self.frame_size / self.sample_rate)

if bin_end > bin_start and bin_end < len(masking_threshold):
threshold_power = np.mean(masking_threshold[bin_start:bin_end])**2
else:
threshold_power = 1e-10

if threshold_power > 0:
snr[k] = 10 * np.log10(signal_power / threshold_power)
else:
snr[k] = 100

# 根据SNR分配比特
total_bits = self.bit_rate * self.frame_size // self.sample_rate
bit_allocation = np.zeros(self.num_subbands, dtype=int)

# 简化的比特分配算法
snr_normalized = np.maximum(snr, 0)
snr_sum = np.sum(snr_normalized)

if snr_sum > 0:
for k in range(self.num_subbands):
bit_allocation[k] = int(total_bits * snr_normalized[k] / snr_sum)

return bit_allocation

def quantize_subband(self, sample, num_bits):
"""子带量化"""
if num_bits <= 0:
return 0, 0

# 计算量化级数
num_levels = 2**num_bits

# 自适应量化步长
max_val = max(abs(sample), 1e-10)
step_size = 2 * max_val / num_levels

# 量化
quantized_index = int(np.clip((sample + max_val) / step_size, 0, num_levels - 1))

# 重构值
reconstructed = quantized_index * step_size - max_val

return quantized_index, reconstructed

def encode_frame(self, audio_frame):
"""编码音频帧"""
# 1. 子带分析
subband_samples = self.subband_analysis(audio_frame)

# 2. 计算频谱用于心理声学模型
spectrum = np.abs(fft(audio_frame)[:self.frame_size//2 + 1])
masking_threshold = self.psychoacoustic_model.calculate_masking_threshold(spectrum)

# 3. 比特分配
bit_allocation = self.bit_allocation(subband_samples, masking_threshold)

# 4. 量化
quantized_indices = []
reconstructed_samples = []

for k in range(self.num_subbands):
index, reconstructed = self.quantize_subband(subband_samples[k], bit_allocation[k])
quantized_indices.append(index)
reconstructed_samples.append(reconstructed)

return {
'quantized_indices': quantized_indices,
'bit_allocation': bit_allocation,
'reconstructed_samples': reconstructed_samples
}

# 音频编码演示
def demo_audio_encoding():
# 生成测试音频信号
duration = 0.1 # 100ms
sample_rate = 44100
t = np.linspace(0, duration, int(sample_rate * duration))

# 复合信号:基频 + 谐波 + 噪声
fundamental = 440 # A4
signal = (0.5 * np.sin(2 * np.pi * fundamental * t) +
0.3 * np.sin(2 * np.pi * 2 * fundamental * t) +
0.2 * np.sin(2 * np.pi * 3 * fundamental * t) +
0.1 * np.random.normal(0, 1, len(t)))

# 归一化
signal = signal / np.max(np.abs(signal)) * 0.8

# 创建编码器
encoder = AudioEncoder(sample_rate=sample_rate, frame_size=1024)

# 分帧编码
frame_size = encoder.frame_size
encoded_frames = []

for i in range(0, len(signal) - frame_size, frame_size // 2): # 50% 重叠
frame = signal[i:i + frame_size]

if len(frame) == frame_size:
encoded_frame = encoder.encode_frame(frame)
encoded_frames.append(encoded_frame)

print(f"编码了 {len(encoded_frames)} 帧")

# 分析第一帧的编码结果
if encoded_frames:
first_frame = encoded_frames[0]

print("\n第一帧编码分析:")
print(f"比特分配: {first_frame['bit_allocation']}")
print(f"总分配比特数: {np.sum(first_frame['bit_allocation'])}")

# 可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

# 原始信号
axes[0, 0].plot(signal[:frame_size])
axes[0, 0].set_title('原始音频帧')
axes[0, 0].set_xlabel('样本')
axes[0, 0].set_ylabel('幅度')

# 频谱
spectrum = np.abs(fft(signal[:frame_size]))
freqs = np.linspace(0, sample_rate/2, len(spectrum)//2)
axes[0, 1].plot(freqs, 20*np.log10(spectrum[:len(spectrum)//2] + 1e-10))
axes[0, 1].set_title('频谱')
axes[0, 1].set_xlabel('频率 (Hz)')
axes[0, 1].set_ylabel('幅度 (dB)')

# 掩蔽阈值
masking_threshold = encoder.psychoacoustic_model.calculate_masking_threshold(
spectrum[:len(spectrum)//2])
axes[1, 0].plot(freqs, 20*np.log10(spectrum[:len(spectrum)//2] + 1e-10), label='信号')
axes[1, 0].plot(freqs, 20*np.log10(masking_threshold + 1e-10), label='掩蔽阈值')
axes[1, 0].set_title('信号与掩蔽阈值')
axes[1, 0].set_xlabel('频率 (Hz)')
axes[1, 0].set_ylabel('幅度 (dB)')
axes[1, 0].legend()

# 比特分配
axes[1, 1].bar(range(encoder.num_subbands), first_frame['bit_allocation'])
axes[1, 1].set_title('子带比特分配')
axes[1, 1].set_xlabel('子带索引')
axes[1, 1].set_ylabel('分配比特数')

plt.tight_layout()
plt.show()

# 运行演示
demo_audio_encoding()

4. 熵编码技术

4.1 算术编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class ArithmeticCoder:
"""算术编码实现"""

def __init__(self, precision=32):
self.precision = precision
self.max_value = (1 << precision) - 1
self.quarter = 1 << (precision - 2)
self.half = 2 * self.quarter
self.three_quarters = 3 * self.quarter

def build_frequency_table(self, data):
"""构建频率表"""
from collections import Counter

# 统计符号频率
counter = Counter(data)

# 构建累积频率表
symbols = sorted(counter.keys())
frequencies = [counter[symbol] for symbol in symbols]

# 添加EOF符号
symbols.append('EOF')
frequencies.append(1)

# 计算累积频率
total_freq = sum(frequencies)
cumulative_freq = [0]

for freq in frequencies:
cumulative_freq.append(cumulative_freq[-1] + freq)

return symbols, frequencies, cumulative_freq, total_freq

def encode(self, data):
"""算术编码"""
if not data:
return [], []

symbols, frequencies, cumulative_freq, total_freq = self.build_frequency_table(data)

# 创建符号到索引的映射
symbol_to_index = {symbol: i for i, symbol in enumerate(symbols)}

# 初始化编码范围
low = 0
high = self.max_value
pending_bits = 0
encoded_bits = []

def output_bit(bit):
encoded_bits.append(bit)

# 输出待定位
for _ in range(pending_bits):
encoded_bits.append(1 - bit)

nonlocal pending_bits
pending_bits = 0

# 编码每个符号
for symbol in data + ['EOF']:
if symbol not in symbol_to_index:
continue

index = symbol_to_index[symbol]

# 计算新的范围
range_size = high - low + 1

high = low + (range_size * cumulative_freq[index + 1]) // total_freq - 1
low = low + (range_size * cumulative_freq[index]) // total_freq

# 输出位并重新缩放
while True:
if high < self.half:
# 输出0
output_bit(0)
elif low >= self.half:
# 输出1
output_bit(1)
low -= self.half
high -= self.half
elif low >= self.quarter and high < self.three_quarters:
# E3缩放
pending_bits += 1
low -= self.quarter
high -= self.quarter
else:
break

# 左移一位
low = 2 * low
high = 2 * high + 1

# 输出最终位
pending_bits += 1
if low < self.quarter:
output_bit(0)
else:
output_bit(1)

return encoded_bits, (symbols, frequencies, cumulative_freq, total_freq)

def decode(self, encoded_bits, frequency_table, length):
"""算术解码"""
symbols, frequencies, cumulative_freq, total_freq = frequency_table

if not encoded_bits:
return []

# 初始化解码范围
low = 0
high = self.max_value

# 读取初始值
value = 0
for i in range(min(self.precision, len(encoded_bits))):
value = (value << 1) + encoded_bits[i]

decoded_data = []
bit_index = self.precision

for _ in range(length):
# 计算当前符号
range_size = high - low + 1
scaled_value = ((value - low + 1) * total_freq - 1) // range_size

# 查找符号
symbol_index = 0
for i in range(len(cumulative_freq) - 1):
if cumulative_freq[i] <= scaled_value < cumulative_freq[i + 1]:
symbol_index = i
break

symbol = symbols[symbol_index]

if symbol == 'EOF':
break

decoded_data.append(symbol)

# 更新范围
high = low + (range_size * cumulative_freq[symbol_index + 1]) // total_freq - 1
low = low + (range_size * cumulative_freq[symbol_index]) // total_freq

# 重新缩放
while True:
if high < self.half:
# 不需要操作
pass
elif low >= self.half:
value -= self.half
low -= self.half
high -= self.half
elif low >= self.quarter and high < self.three_quarters:
value -= self.quarter
low -= self.quarter
high -= self.quarter
else:
break

# 左移并读取新位
low = 2 * low
high = 2 * high + 1
value = 2 * value

if bit_index < len(encoded_bits):
value += encoded_bits[bit_index]
bit_index += 1

return decoded_data

# 算术编码演示
def demo_arithmetic_coding():
# 测试数据
test_data = "ABRACADABRA"
data_list = list(test_data)

print(f"原始数据: {test_data}")
print(f"原始长度: {len(test_data)} 字符 = {len(test_data) * 8} 位")

# 创建算术编码器
coder = ArithmeticCoder(precision=16) # 使用较小精度便于演示

# 编码
encoded_bits, frequency_table = coder.encode(data_list)

print(f"\n编码结果:")
print(f"编码位数: {len(encoded_bits)}")
print(f"压缩率: {len(encoded_bits) / (len(test_data) * 8):.3f}")
print(f"编码位序列: {''.join(map(str, encoded_bits))}")

# 显示频率表
symbols, frequencies, cumulative_freq, total_freq = frequency_table
print(f"\n频率表:")
for i, symbol in enumerate(symbols):
if symbol != 'EOF':
print(f" '{symbol}': 频率={frequencies[i]}, 累积={cumulative_freq[i]}-{cumulative_freq[i+1]}")

# 解码
decoded_data = coder.decode(encoded_bits, frequency_table, len(data_list))
decoded_string = ''.join(decoded_data)

print(f"\n解码结果: {decoded_string}")
print(f"解码正确: {decoded_string == test_data}")

# 比较不同数据的压缩效果
test_cases = [
"AAAAAAAAAA", # 高度重复
"ABCDEFGHIJ", # 均匀分布
"AAAAABBBCC", # 中等重复
"The quick brown fox jumps over the lazy dog" # 自然文本
]

print("\n不同数据的压缩效果:")
for test_case in test_cases:
data_list = list(test_case)
encoded_bits, _ = coder.encode(data_list)

original_bits = len(test_case) * 8
compressed_bits = len(encoded_bits)
compression_ratio = compressed_bits / original_bits

print(f" '{test_case[:20]}{'...' if len(test_case) > 20 else ''}':")
print(f" 原始: {original_bits} 位, 压缩: {compressed_bits} 位, 比率: {compression_ratio:.3f}")

# 运行演示
demo_arithmetic_coding()

4.2 CABAC编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
class CABACEncoder:
"""CABAC(上下文自适应二进制算术编码)实现"""

def __init__(self):
# 上下文模型
self.contexts = {}

# 概率状态表(简化版)
self.prob_states = self._init_probability_states()

# 算术编码器
self.arithmetic_coder = ArithmeticCoder(precision=16)

def _init_probability_states(self):
"""初始化概率状态表"""
# 简化的概率状态,实际H.264使用64个状态
states = []
for i in range(32):
# 从高概率到低概率
prob_0 = 0.5 + 0.4 * (31 - i) / 31
prob_1 = 1 - prob_0
states.append((prob_0, prob_1))

return states

def get_context(self, context_type, neighbors=None):
"""获取上下文模型"""
# 根据上下文类型和邻居信息确定上下文索引
if context_type == 'mb_type':
# 宏块类型的上下文
if neighbors is None:
return 0

left_mb_type = neighbors.get('left', 0)
top_mb_type = neighbors.get('top', 0)

# 简化的上下文计算
context_idx = (left_mb_type + top_mb_type) % 4

elif context_type == 'mvd':
# 运动向量差值的上下文
if neighbors is None:
return 0

left_mvd = neighbors.get('left_mvd', 0)
top_mvd = neighbors.get('top_mvd', 0)

# 根据邻居MVD的大小确定上下文
if abs(left_mvd) + abs(top_mvd) < 3:
context_idx = 0
elif abs(left_mvd) + abs(top_mvd) < 33:
context_idx = 1
else:
context_idx = 2

elif context_type == 'coeff':
# 系数的上下文
if neighbors is None:
return 0

num_nonzero = neighbors.get('num_nonzero', 0)

# 根据非零系数数量确定上下文
if num_nonzero == 0:
context_idx = 0
elif num_nonzero <= 2:
context_idx = 1
else:
context_idx = 2

else:
context_idx = 0

# 确保上下文在有效范围内
context_idx = min(context_idx, len(self.prob_states) - 1)

return context_idx

def binarize(self, value, binarization_type='unary'):
"""二值化"""
if binarization_type == 'unary':
# 一元码
if value == 0:
return [0]
else:
return [1] * value + [0]

elif binarization_type == 'truncated_unary':
# 截断一元码
max_value = 8 # 截断值
if value < max_value:
if value == 0:
return [0]
else:
return [1] * value + [0]
else:
return [1] * max_value

elif binarization_type == 'fixed_length':
# 定长二进制码
num_bits = 4 # 假设4位
binary = []
for i in range(num_bits):
binary.append((value >> (num_bits - 1 - i)) & 1)
return binary

elif binarization_type == 'exp_golomb':
# 指数哥伦布码
if value == 0:
return [1]

# 计算前缀长度
prefix_len = 0
temp = value + 1
while temp > 1:
temp //= 2
prefix_len += 1

# 构造码字
code = [0] * prefix_len + [1] # 前缀

# 添加后缀
suffix = value + 1 - (1 << prefix_len)
for i in range(prefix_len):
code.append((suffix >> (prefix_len - 1 - i)) & 1)

return code

else:
# 默认使用定长编码
return self.binarize(value, 'fixed_length')

def encode_bin(self, bin_value, context_idx):
"""编码单个二进制位"""
# 获取当前上下文的概率状态
if context_idx not in self.contexts:
self.contexts[context_idx] = 0 # 初始状态

state = self.contexts[context_idx]
prob_0, prob_1 = self.prob_states[state]

# 使用算术编码
if bin_value == 0:
# 编码0
encoded_bits, _ = self.arithmetic_coder.encode([0])
else:
# 编码1
encoded_bits, _ = self.arithmetic_coder.encode([1])

# 更新上下文状态
self._update_context_state(context_idx, bin_value)

return encoded_bits

def _update_context_state(self, context_idx, bin_value):
"""更新上下文状态"""
current_state = self.contexts[context_idx]

# 简化的状态转移
if bin_value == 0:
# 观察到0,增加0的概率
if current_state < len(self.prob_states) - 1:
self.contexts[context_idx] = min(current_state + 1, len(self.prob_states) - 1)
else:
# 观察到1,减少0的概率
if current_state > 0:
self.contexts[context_idx] = max(current_state - 1, 0)

def encode_syntax_element(self, value, element_type, neighbors=None):
"""编码语法元素"""
# 1. 二值化
if element_type == 'mb_type':
binary_string = self.binarize(value, 'truncated_unary')
elif element_type == 'mvd':
binary_string = self.binarize(abs(value), 'exp_golomb')
if value != 0:
# 添加符号位
binary_string.append(1 if value > 0 else 0)
elif element_type == 'coeff':
binary_string = self.binarize(abs(value), 'exp_golomb')
if value != 0:
binary_string.append(1 if value > 0 else 0)
else:
binary_string = self.binarize(value, 'fixed_length')

# 2. 上下文建模和算术编码
encoded_bits = []

for i, bin_val in enumerate(binary_string):
# 获取上下文
context_idx = self.get_context(element_type, neighbors)

# 编码二进制位
bits = self.encode_bin(bin_val, context_idx)
encoded_bits.extend(bits)

return encoded_bits

# CABAC编码演示
def demo_cabac_encoding():
# 创建CABAC编码器
cabac = CABACEncoder()

# 测试数据:模拟H.264语法元素
test_data = [
{'type': 'mb_type', 'value': 1, 'neighbors': {'left': 0, 'top': 1}},
{'type': 'mvd', 'value': 3, 'neighbors': {'left_mvd': 1, 'top_mvd': 2}},
{'type': 'mvd', 'value': -2, 'neighbors': {'left_mvd': 3, 'top_mvd': 0}},
{'type': 'coeff', 'value': 5, 'neighbors': {'num_nonzero': 2}},
{'type': 'coeff', 'value': 0, 'neighbors': {'num_nonzero': 1}},
{'type': 'coeff', 'value': -1, 'neighbors': {'num_nonzero': 0}},
]

print("CABAC编码演示:")

total_bits = 0

for i, element in enumerate(test_data):
encoded_bits = cabac.encode_syntax_element(
element['value'],
element['type'],
element['neighbors']
)

total_bits += len(encoded_bits)

print(f"\n元素 {i+1}:")
print(f" 类型: {element['type']}")
print(f" 值: {element['value']}")
print(f" 邻居: {element['neighbors']}")
print(f" 编码位数: {len(encoded_bits)}")
print(f" 编码结果: {''.join(map(str, encoded_bits[:20]))}{'...' if len(encoded_bits) > 20 else ''}")

print(f"\n总编码位数: {total_bits}")

# 显示上下文状态
print("\n上下文状态:")
for context_idx, state in cabac.contexts.items():
prob_0, prob_1 = cabac.prob_states[state]
print(f" 上下文 {context_idx}: 状态={state}, P(0)={prob_0:.3f}, P(1)={prob_1:.3f}")

# 运行演示
demo_cabac_encoding()

5. 编码性能评估

5.1 率失真优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class RateDistortionOptimizer:
"""率失真优化器"""

def __init__(self, lambda_factor=1.0):
self.lambda_factor = lambda_factor

def calculate_distortion(self, original, reconstructed, metric='mse'):
"""计算失真"""
if metric == 'mse':
return np.mean((original - reconstructed) ** 2)
elif metric == 'mae':
return np.mean(np.abs(original - reconstructed))
elif metric == 'ssim':
return self._calculate_ssim(original, reconstructed)
else:
return np.mean((original - reconstructed) ** 2)

def _calculate_ssim(self, img1, img2):
"""计算SSIM(简化版)"""
# 常数
C1 = (0.01 * 255) ** 2
C2 = (0.03 * 255) ** 2

# 均值
mu1 = np.mean(img1)
mu2 = np.mean(img2)

# 方差和协方差
sigma1_sq = np.var(img1)
sigma2_sq = np.var(img2)
sigma12 = np.mean((img1 - mu1) * (img2 - mu2))

# SSIM计算
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1_sq + sigma2_sq + C2)

ssim = numerator / denominator
return 1 - ssim # 转换为失真度量

def estimate_rate(self, quantized_coeffs, method='entropy'):
"""估算码率"""
if method == 'entropy':
# 使用熵估算
from collections import Counter

# 展平系数
flat_coeffs = quantized_coeffs.flatten()

# 计算熵
counter = Counter(flat_coeffs)
total = len(flat_coeffs)

entropy = 0
for count in counter.values():
prob = count / total
if prob > 0:
entropy -= prob * np.log2(prob)

return entropy * total

elif method == 'huffman':
# 使用霍夫曼编码估算
flat_coeffs = quantized_coeffs.flatten()
_, encoded = huffman_encoding(flat_coeffs)
return len(encoded)

else:
# 简单估算:非零系数数量
return np.count_nonzero(quantized_coeffs) * 8

def rd_cost(self, original, reconstructed, quantized_coeffs, rate_method='entropy'):
"""计算率失真代价"""
distortion = self.calculate_distortion(original, reconstructed)
rate = self.estimate_rate(quantized_coeffs, rate_method)

# 率失真代价:D + λR
cost = distortion + self.lambda_factor * rate

return cost, distortion, rate

def optimize_quantization(self, dct_block, quant_table_base, qp_range=(1, 51)):
"""优化量化参数"""
best_qp = qp_range[0]
best_cost = float('inf')
best_reconstructed = None
best_quantized = None

results = []

for qp in range(qp_range[0], qp_range[1] + 1):
# 计算量化表
scale = max(1, qp // 6)
quant_table = quant_table_base * scale

# 量化
quantized = np.round(dct_block / quant_table)

# 反量化
reconstructed_dct = quantized * quant_table

# 计算率失真代价
cost, distortion, rate = self.rd_cost(dct_block, reconstructed_dct, quantized)

results.append({
'qp': qp,
'cost': cost,
'distortion': distortion,
'rate': rate,
'quantized': quantized,
'reconstructed': reconstructed_dct
})

if cost < best_cost:
best_cost = cost
best_qp = qp
best_reconstructed = reconstructed_dct
best_quantized = quantized

return best_qp, best_cost, best_reconstructed, best_quantized, results

# 率失真优化演示
def demo_rate_distortion_optimization():
# 创建测试DCT块
test_block = np.array([
[100, 80, 60, 40, 20, 10, 5, 2],
[80, 60, 40, 20, 10, 5, 2, 1],
[60, 40, 20, 10, 5, 2, 1, 0],
[40, 20, 10, 5, 2, 1, 0, 0],
[20, 10, 5, 2, 1, 0, 0, 0],
[10, 5, 2, 1, 0, 0, 0, 0],
[5, 2, 1, 0, 0, 0, 0, 0],
[2, 1, 0, 0, 0, 0, 0, 0]
], dtype=np.float32)

# JPEG量化表
quant_table_base = np.array([
[16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99]
], dtype=np.float32)

# 测试不同的λ值
lambda_values = [0.1, 1.0, 10.0, 100.0]

fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()

for i, lambda_val in enumerate(lambda_values):
print(f"\nλ = {lambda_val}:")

# 创建优化器
optimizer = RateDistortionOptimizer(lambda_factor=lambda_val)

# 优化量化参数
best_qp, best_cost, best_reconstructed, best_quantized, results = optimizer.optimize_quantization(
test_block, quant_table_base, qp_range=(1, 20)
)

print(f" 最优QP: {best_qp}")
print(f" 最优代价: {best_cost:.2f}")

# 提取结果数据
qps = [r['qp'] for r in results]
costs = [r['cost'] for r in results]
distortions = [r['distortion'] for r in results]
rates = [r['rate'] for r in results]

# 绘制率失真曲线
axes[i].plot(rates, distortions, 'b-o', markersize=4)

# 标记最优点
best_result = next(r for r in results if r['qp'] == best_qp)
axes[i].plot(best_result['rate'], best_result['distortion'], 'ro', markersize=8)

axes[i].set_xlabel('码率 (bits)')
axes[i].set_ylabel('失真 (MSE)')
axes[i].set_title(f'λ = {lambda_val}, 最优QP = {best_qp}')
axes[i].grid(True)

# 添加QP标签
for j, (rate, distortion, qp) in enumerate(zip(rates[::2], distortions[::2], qps[::2])):
axes[i].annotate(f'{qp}', (rate, distortion), xytext=(5, 5),
textcoords='offset points', fontsize=8)

plt.tight_layout()
plt.show()

# 显示不同QP下的量化结果
print("\n不同QP下的量化效果:")
optimizer = RateDistortionOptimizer(lambda_factor=1.0)

for qp in [1, 5, 10, 20]:
scale = max(1, qp // 6)
quant_table = quant_table_base * scale

quantized = np.round(test_block / quant_table)
reconstructed = quantized * quant_table

cost, distortion, rate = optimizer.rd_cost(test_block, reconstructed, quantized)

print(f" QP={qp}: 失真={distortion:.2f}, 码率={rate:.1f}, 代价={cost:.2f}")
print(f" 非零系数: {np.count_nonzero(quantized)}/64")

# 运行演示
demo_rate_distortion_optimization()

6. 总结

音视频编解码技术是一个复杂而精密的技术领域,涉及信号处理、信息论、心理声学/视觉学等多个学科。本文深入探讨了编解码技术的核心原理:

6.1 关键技术要点

  1. 信息论基础:熵、冗余分析和霍夫曼编码为压缩提供了理论基础
  2. 预测技术:帧内预测和帧间预测有效减少了空间和时间冗余
  3. 变换编码:DCT等变换将信号转换到频域,便于压缩
  4. 感知编码:利用人类感知特性,在保持主观质量的同时实现高压缩比
  5. 熵编码:算术编码和CABAC等技术进一步提高压缩效率
  6. 率失真优化:在码率和质量之间找到最佳平衡点

6.2 技术发展趋势

  • AI驱动的编解码:深度学习技术正在革命性地改变编解码领域
  • 端到端优化:从信号采集到显示的全链路优化
  • 自适应编码:根据内容特性动态调整编码策略
  • 低延迟编码:满足实时通信和交互应用的需求
  • 多模态编码:音视频联合编码,提高整体效率

6.3 实际应用考虑

在实际应用中,编解码技术的选择需要考虑:

  • 应用场景:直播、点播、视频会议等不同场景的需求
  • 硬件约束:编解码复杂度与硬件能力的匹配
  • 网络条件:带宽限制和传输特性
  • 质量要求:主观质量与客观指标的平衡
  • 兼容性:标准化和互操作性要求

编解码技术的发展永远在追求更高的压缩效率、更好的质量和更低的复杂度之间的最佳平衡。随着新技术的不断涌现,这个领域将继续快速发展,为数字媒体应用提供更强大的技术支撑。


本文通过理论分析和代码实现,深入探讨了音视频编解码技术的核心原理。这些技术不仅是现代多媒体系统的基础,也是未来智能媒体处理的重要组成部分。

版权所有,如有侵权请联系我