音视频传输协议与流媒体技术深度解析

音视频传输协议是现代流媒体系统的核心技术,它们决定了媒体数据如何在网络中高效、可靠地传输。本文将深入分析主流的音视频传输协议,包括实时传输协议(RTP/RTCP)、流媒体协议(RTMP)、自适应流协议(HLS、DASH)等,并通过代码实现展示各协议的工作原理。

1. 实时传输协议(RTP/RTCP)

1.1 RTP协议基础

RTP(Real-time Transport Protocol)是为实时音视频传输设计的协议,运行在UDP之上,提供时间戳、序列号等信息支持实时播放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import struct
import socket
import time
import threading
from typing import Optional, Tuple, List

class RTPPacket:
"""RTP数据包实现"""

def __init__(self, version=2, padding=False, extension=False, cc=0,
marker=False, payload_type=96, sequence_number=0,
timestamp=0, ssrc=0, payload=b''):
self.version = version
self.padding = padding
self.extension = extension
self.cc = cc # CSRC count
self.marker = marker
self.payload_type = payload_type
self.sequence_number = sequence_number
self.timestamp = timestamp
self.ssrc = ssrc # Synchronization source
self.csrc_list = [] # Contributing source list
self.payload = payload

def pack(self) -> bytes:
"""打包RTP数据包"""
# RTP固定头部(12字节)
# 0 1 2 3
# 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# |V=2|P|X| CC |M| PT | sequence number |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | timestamp |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | synchronization source (SSRC) identifier |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

# 第一个字节:V(2) + P(1) + X(1) + CC(4)
byte1 = (self.version << 6) | (int(self.padding) << 5) | \
(int(self.extension) << 4) | (self.cc & 0x0F)

# 第二个字节:M(1) + PT(7)
byte2 = (int(self.marker) << 7) | (self.payload_type & 0x7F)

# 打包固定头部
header = struct.pack('!BBHII',
byte1, byte2,
self.sequence_number & 0xFFFF,
self.timestamp & 0xFFFFFFFF,
self.ssrc & 0xFFFFFFFF)

# 添加CSRC列表(如果有)
csrc_data = b''
for csrc in self.csrc_list[:self.cc]:
csrc_data += struct.pack('!I', csrc & 0xFFFFFFFF)

return header + csrc_data + self.payload

@classmethod
def unpack(cls, data: bytes) -> 'RTPPacket':
"""解包RTP数据包"""
if len(data) < 12:
raise ValueError("RTP packet too short")

# 解析固定头部
byte1, byte2, seq_num, timestamp, ssrc = struct.unpack('!BBHII', data[:12])

# 解析第一个字节
version = (byte1 >> 6) & 0x03
padding = bool((byte1 >> 5) & 0x01)
extension = bool((byte1 >> 4) & 0x01)
cc = byte1 & 0x0F

# 解析第二个字节
marker = bool((byte2 >> 7) & 0x01)
payload_type = byte2 & 0x7F

# 创建RTP包对象
packet = cls(version=version, padding=padding, extension=extension,
cc=cc, marker=marker, payload_type=payload_type,
sequence_number=seq_num, timestamp=timestamp, ssrc=ssrc)

# 解析CSRC列表
offset = 12
for i in range(cc):
if offset + 4 <= len(data):
csrc = struct.unpack('!I', data[offset:offset+4])[0]
packet.csrc_list.append(csrc)
offset += 4

# 提取载荷
packet.payload = data[offset:]

return packet

def __str__(self):
return (f"RTP(seq={self.sequence_number}, ts={self.timestamp}, "
f"pt={self.payload_type}, ssrc={self.ssrc:08x}, "
f"payload_len={len(self.payload)})")

class RTPSender:
"""RTP发送器"""

def __init__(self, dest_ip: str, dest_port: int, payload_type: int = 96,
ssrc: Optional[int] = None):
self.dest_ip = dest_ip
self.dest_port = dest_port
self.payload_type = payload_type
self.ssrc = ssrc or int(time.time() * 1000) & 0xFFFFFFFF

# 状态变量
self.sequence_number = 0
self.timestamp = 0

# 创建UDP套接字
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

print(f"RTP Sender initialized: {dest_ip}:{dest_port}, SSRC: {self.ssrc:08x}")

def send_packet(self, payload: bytes, timestamp_increment: int = 3600,
marker: bool = False) -> bool:
"""发送RTP数据包"""
try:
# 创建RTP包
packet = RTPPacket(
payload_type=self.payload_type,
sequence_number=self.sequence_number,
timestamp=self.timestamp,
ssrc=self.ssrc,
marker=marker,
payload=payload
)

# 打包并发送
data = packet.pack()
self.socket.sendto(data, (self.dest_ip, self.dest_port))

# 更新状态
self.sequence_number = (self.sequence_number + 1) & 0xFFFF
self.timestamp = (self.timestamp + timestamp_increment) & 0xFFFFFFFF

print(f"Sent: {packet}")
return True

except Exception as e:
print(f"Send error: {e}")
return False

def close(self):
"""关闭发送器"""
self.socket.close()

class RTPReceiver:
"""RTP接收器"""

def __init__(self, listen_port: int):
self.listen_port = listen_port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('', listen_port))

# 接收统计
self.packets_received = 0
self.bytes_received = 0
self.last_sequence = None
self.packets_lost = 0

self.running = False
self.receive_thread = None

print(f"RTP Receiver listening on port {listen_port}")

def start_receiving(self, callback=None):
"""开始接收数据包"""
self.running = True
self.receive_thread = threading.Thread(target=self._receive_loop, args=(callback,))
self.receive_thread.start()

def _receive_loop(self, callback):
"""接收循环"""
while self.running:
try:
data, addr = self.socket.recvfrom(1500)
packet = RTPPacket.unpack(data)

# 更新统计信息
self.packets_received += 1
self.bytes_received += len(data)

# 检测丢包
if self.last_sequence is not None:
expected_seq = (self.last_sequence + 1) & 0xFFFF
if packet.sequence_number != expected_seq:
if packet.sequence_number > expected_seq:
lost = packet.sequence_number - expected_seq
else:
lost = (0x10000 - expected_seq) + packet.sequence_number
self.packets_lost += lost
print(f"Packet loss detected: expected {expected_seq}, got {packet.sequence_number}")

self.last_sequence = packet.sequence_number

print(f"Received: {packet} from {addr}")

# 调用回调函数
if callback:
callback(packet, addr)

except Exception as e:
if self.running:
print(f"Receive error: {e}")

def stop_receiving(self):
"""停止接收"""
self.running = False
if self.receive_thread:
self.receive_thread.join()

def get_statistics(self) -> dict:
"""获取接收统计信息"""
return {
'packets_received': self.packets_received,
'bytes_received': self.bytes_received,
'packets_lost': self.packets_lost,
'loss_rate': self.packets_lost / max(1, self.packets_received + self.packets_lost)
}

def close(self):
"""关闭接收器"""
self.stop_receiving()
self.socket.close()

# RTP传输演示
def demo_rtp_transmission():
import time

# 创建接收器
receiver = RTPReceiver(5004)

def packet_handler(packet, addr):
# 简单的包处理器
payload_preview = packet.payload[:20] if len(packet.payload) > 20 else packet.payload
print(f" Payload preview: {payload_preview}")

receiver.start_receiving(packet_handler)

# 等待接收器启动
time.sleep(0.1)

# 创建发送器
sender = RTPSender('127.0.0.1', 5004, payload_type=96)

# 发送测试数据
test_payloads = [
b'Hello RTP World!',
b'This is packet 2',
b'Audio data simulation',
b'Video frame data',
b'End of stream'
]

print("\nSending RTP packets...")
for i, payload in enumerate(test_payloads):
marker = (i == len(test_payloads) - 1) # 最后一个包设置marker
sender.send_packet(payload, timestamp_increment=3600, marker=marker)
time.sleep(0.1)

# 等待接收完成
time.sleep(0.5)

# 显示统计信息
stats = receiver.get_statistics()
print(f"\nReceiver Statistics:")
print(f" Packets received: {stats['packets_received']}")
print(f" Bytes received: {stats['bytes_received']}")
print(f" Packets lost: {stats['packets_lost']}")
print(f" Loss rate: {stats['loss_rate']:.2%}")

# 清理资源
sender.close()
receiver.close()

# 运行演示
demo_rtp_transmission()

1.2 RTCP控制协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class RTCPPacket:
"""RTCP数据包基类"""

def __init__(self, version=2, padding=False, reception_report_count=0,
packet_type=0, length=0):
self.version = version
self.padding = padding
self.reception_report_count = reception_report_count
self.packet_type = packet_type
self.length = length

def pack_header(self) -> bytes:
"""打包RTCP通用头部"""
byte1 = (self.version << 6) | (int(self.padding) << 5) | \
(self.reception_report_count & 0x1F)

return struct.pack('!BBH', byte1, self.packet_type, self.length)

class RTCPSenderReport(RTCPPacket):
"""RTCP发送者报告"""

def __init__(self, ssrc=0, ntp_timestamp_msw=0, ntp_timestamp_lsw=0,
rtp_timestamp=0, sender_packet_count=0, sender_octet_count=0):
super().__init__(packet_type=200) # SR = 200
self.ssrc = ssrc
self.ntp_timestamp_msw = ntp_timestamp_msw # NTP时间戳高32位
self.ntp_timestamp_lsw = ntp_timestamp_lsw # NTP时间戳低32位
self.rtp_timestamp = rtp_timestamp
self.sender_packet_count = sender_packet_count
self.sender_octet_count = sender_octet_count
self.reception_reports = []

def pack(self) -> bytes:
"""打包发送者报告"""
# 计算长度(以32位字为单位,减1)
self.length = 6 + len(self.reception_reports) * 6 - 1
self.reception_report_count = len(self.reception_reports)

# 打包头部
data = self.pack_header()

# 打包发送者信息
data += struct.pack('!IIIII',
self.ssrc,
self.ntp_timestamp_msw,
self.ntp_timestamp_lsw,
self.rtp_timestamp,
self.sender_packet_count)
data += struct.pack('!I', self.sender_octet_count)

# 打包接收报告(如果有)
for rr in self.reception_reports:
data += rr.pack_reception_report()

return data

@classmethod
def unpack(cls, data: bytes) -> 'RTCPSenderReport':
"""解包发送者报告"""
if len(data) < 28: # 最小SR包大小
raise ValueError("RTCP SR packet too short")

# 解析头部
byte1, packet_type, length = struct.unpack('!BBH', data[:4])

if packet_type != 200:
raise ValueError("Not a Sender Report packet")

rc = byte1 & 0x1F

# 解析发送者信息
ssrc, ntp_msw, ntp_lsw, rtp_ts, pkt_count, octet_count = \
struct.unpack('!IIIIII', data[4:28])

sr = cls(ssrc=ssrc, ntp_timestamp_msw=ntp_msw,
ntp_timestamp_lsw=ntp_lsw, rtp_timestamp=rtp_ts,
sender_packet_count=pkt_count, sender_octet_count=octet_count)

return sr

class RTCPReceiverReport(RTCPPacket):
"""RTCP接收者报告"""

def __init__(self, ssrc=0):
super().__init__(packet_type=201) # RR = 201
self.ssrc = ssrc
self.reception_reports = []

def add_reception_report(self, source_ssrc, fraction_lost, cumulative_lost,
highest_seq_received, interarrival_jitter,
last_sr_timestamp, delay_since_last_sr):
"""添加接收报告"""
report = {
'source_ssrc': source_ssrc,
'fraction_lost': fraction_lost,
'cumulative_lost': cumulative_lost,
'highest_seq_received': highest_seq_received,
'interarrival_jitter': interarrival_jitter,
'last_sr_timestamp': last_sr_timestamp,
'delay_since_last_sr': delay_since_last_sr
}
self.reception_reports.append(report)

def pack(self) -> bytes:
"""打包接收者报告"""
# 计算长度
self.length = 1 + len(self.reception_reports) * 6 - 1
self.reception_report_count = len(self.reception_reports)

# 打包头部
data = self.pack_header()

# 打包SSRC
data += struct.pack('!I', self.ssrc)

# 打包接收报告
for rr in self.reception_reports:
# 打包接收报告块(24字节)
data += struct.pack('!I', rr['source_ssrc'])

# 分数丢失率和累计丢失包数(4字节)
fraction_and_lost = (rr['fraction_lost'] << 24) | \
(rr['cumulative_lost'] & 0xFFFFFF)
data += struct.pack('!I', fraction_and_lost)

data += struct.pack('!III',
rr['highest_seq_received'],
rr['interarrival_jitter'],
rr['last_sr_timestamp'])
data += struct.pack('!I', rr['delay_since_last_sr'])

return data

class RTCPManager:
"""RTCP管理器"""

def __init__(self, local_ssrc: int, rtcp_port: int):
self.local_ssrc = local_ssrc
self.rtcp_port = rtcp_port

# 统计信息
self.packets_sent = 0
self.octets_sent = 0
self.start_time = time.time()

# 接收统计(按SSRC)
self.remote_sources = {}

# RTCP套接字
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('', rtcp_port))

self.running = False
self.rtcp_thread = None

print(f"RTCP Manager initialized on port {rtcp_port}, SSRC: {local_ssrc:08x}")

def update_send_statistics(self, packet_size: int):
"""更新发送统计"""
self.packets_sent += 1
self.octets_sent += packet_size

def update_receive_statistics(self, packet: RTPPacket):
"""更新接收统计"""
ssrc = packet.ssrc

if ssrc not in self.remote_sources:
self.remote_sources[ssrc] = {
'packets_received': 0,
'octets_received': 0,
'last_seq': None,
'highest_seq': 0,
'seq_cycles': 0,
'packets_lost': 0,
'jitter': 0.0,
'last_arrival_time': 0,
'last_rtp_timestamp': 0
}

source = self.remote_sources[ssrc]
source['packets_received'] += 1
source['octets_received'] += len(packet.payload)

# 序列号处理
if source['last_seq'] is not None:
if packet.sequence_number < source['last_seq']:
if source['last_seq'] - packet.sequence_number > 32768:
source['seq_cycles'] += 1

extended_seq = source['seq_cycles'] * 65536 + packet.sequence_number
if extended_seq > source['highest_seq']:
source['highest_seq'] = extended_seq
else:
source['highest_seq'] = packet.sequence_number

source['last_seq'] = packet.sequence_number

# 抖动计算(简化版)
current_time = time.time()
if source['last_arrival_time'] > 0:
arrival_diff = current_time - source['last_arrival_time']
timestamp_diff = packet.timestamp - source['last_rtp_timestamp']

# 简化的抖动计算
if timestamp_diff > 0:
jitter_sample = abs(arrival_diff - timestamp_diff / 90000.0) # 假设90kHz时钟
source['jitter'] = source['jitter'] * 15/16 + jitter_sample / 16

source['last_arrival_time'] = current_time
source['last_rtp_timestamp'] = packet.timestamp

def generate_sender_report(self) -> RTCPSenderReport:
"""生成发送者报告"""
# 获取当前NTP时间戳
ntp_time = time.time() + 2208988800 # NTP epoch offset
ntp_msw = int(ntp_time)
ntp_lsw = int((ntp_time - ntp_msw) * (2**32))

# 当前RTP时间戳(简化)
rtp_timestamp = int((time.time() - self.start_time) * 90000) & 0xFFFFFFFF

sr = RTCPSenderReport(
ssrc=self.local_ssrc,
ntp_timestamp_msw=ntp_msw,
ntp_timestamp_lsw=ntp_lsw,
rtp_timestamp=rtp_timestamp,
sender_packet_count=self.packets_sent,
sender_octet_count=self.octets_sent
)

return sr

def generate_receiver_report(self) -> RTCPReceiverReport:
"""生成接收者报告"""
rr = RTCPReceiverReport(ssrc=self.local_ssrc)

for ssrc, source in self.remote_sources.items():
# 计算丢失率
expected_packets = source['highest_seq'] - source['packets_received'] + 1
packets_lost = max(0, expected_packets - source['packets_received'])

if expected_packets > 0:
fraction_lost = min(255, int(256 * packets_lost / expected_packets))
else:
fraction_lost = 0

rr.add_reception_report(
source_ssrc=ssrc,
fraction_lost=fraction_lost,
cumulative_lost=packets_lost,
highest_seq_received=source['highest_seq'],
interarrival_jitter=int(source['jitter'] * 90000), # 转换为RTP时钟单位
last_sr_timestamp=0, # 简化
delay_since_last_sr=0 # 简化
)

return rr

def send_rtcp_report(self, dest_ip: str, dest_port: int):
"""发送RTCP报告"""
try:
if self.packets_sent > 0:
# 发送者报告
sr = self.generate_sender_report()
data = sr.pack()
else:
# 接收者报告
rr = self.generate_receiver_report()
data = rr.pack()

self.socket.sendto(data, (dest_ip, dest_port))
print(f"Sent RTCP report to {dest_ip}:{dest_port} ({len(data)} bytes)")

except Exception as e:
print(f"RTCP send error: {e}")

def start_rtcp_loop(self, dest_ip: str, dest_port: int, interval: float = 5.0):
"""启动RTCP循环"""
self.running = True

def rtcp_loop():
while self.running:
time.sleep(interval)
if self.running:
self.send_rtcp_report(dest_ip, dest_port)

self.rtcp_thread = threading.Thread(target=rtcp_loop)
self.rtcp_thread.start()

def stop_rtcp_loop(self):
"""停止RTCP循环"""
self.running = False
if self.rtcp_thread:
self.rtcp_thread.join()

def close(self):
"""关闭RTCP管理器"""
self.stop_rtcp_loop()
self.socket.close()

# RTCP演示
def demo_rtcp():
# 创建RTCP管理器
rtcp_manager = RTCPManager(local_ssrc=0x12345678, rtcp_port=5005)

# 模拟一些RTP包接收
for i in range(10):
fake_packet = RTPPacket(
sequence_number=i,
timestamp=i * 3600,
ssrc=0x87654321,
payload=f"Packet {i}".encode()
)
rtcp_manager.update_receive_statistics(fake_packet)

# 模拟一些发送统计
for i in range(5):
rtcp_manager.update_send_statistics(100)

# 生成并显示报告
print("\nGenerating RTCP reports...")

sr = rtcp_manager.generate_sender_report()
print(f"Sender Report: SSRC={sr.ssrc:08x}, packets={sr.sender_packet_count}, octets={sr.sender_octet_count}")

rr = rtcp_manager.generate_receiver_report()
print(f"Receiver Report: SSRC={rr.ssrc:08x}, reports={len(rr.reception_reports)}")

for i, report in enumerate(rr.reception_reports):
print(f" Report {i}: SSRC={report['source_ssrc']:08x}, lost={report['cumulative_lost']}, jitter={report['interarrival_jitter']}")

rtcp_manager.close()

# 运行演示
demo_rtcp()

2. RTMP流媒体协议

2.1 RTMP协议实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
import struct
import hashlib
import random
from enum import IntEnum
from typing import Dict, Any, Optional

class RTMPMessageType(IntEnum):
"""RTMP消息类型"""
SET_CHUNK_SIZE = 1
ABORT_MESSAGE = 2
ACKNOWLEDGEMENT = 3
USER_CONTROL_MESSAGE = 4
WINDOW_ACKNOWLEDGEMENT_SIZE = 5
SET_PEER_BANDWIDTH = 6
AUDIO = 8
VIDEO = 9
DATA_AMF3 = 15
SHARED_OBJECT_AMF3 = 16
COMMAND_AMF3 = 17
DATA_AMF0 = 18
SHARED_OBJECT_AMF0 = 19
COMMAND_AMF0 = 20
AGGREGATE = 22

class RTMPChunk:
"""RTMP块"""

def __init__(self, chunk_stream_id=0, timestamp=0, message_length=0,
message_type=0, message_stream_id=0, data=b''):
self.chunk_stream_id = chunk_stream_id
self.timestamp = timestamp
self.message_length = message_length
self.message_type = message_type
self.message_stream_id = message_stream_id
self.data = data

def pack_header(self, fmt: int) -> bytes:
"""打包块头部"""
header = b''

# 基本头部(1-3字节)
if self.chunk_stream_id < 64:
# 1字节格式
basic_header = (fmt << 6) | self.chunk_stream_id
header += struct.pack('!B', basic_header)
elif self.chunk_stream_id < 320:
# 2字节格式
basic_header = fmt << 6
header += struct.pack('!BB', basic_header, self.chunk_stream_id - 64)
else:
# 3字节格式
basic_header = (fmt << 6) | 1
cs_id = self.chunk_stream_id - 64
header += struct.pack('!BBB', basic_header, cs_id & 0xFF, (cs_id >> 8) & 0xFF)

# 消息头部(根据fmt决定长度)
if fmt == 0:
# Type 0: 11字节
if self.timestamp >= 0xFFFFFF:
ts = 0xFFFFFF
else:
ts = self.timestamp

header += struct.pack('!IBIB',
ts & 0xFFFFFF,
self.message_length,
self.message_type,
self.message_stream_id)

# 扩展时间戳(如果需要)
if self.timestamp >= 0xFFFFFF:
header += struct.pack('!I', self.timestamp)

elif fmt == 1:
# Type 1: 7字节
if self.timestamp >= 0xFFFFFF:
ts = 0xFFFFFF
else:
ts = self.timestamp

header += struct.pack('!IBI',
ts & 0xFFFFFF,
self.message_length,
self.message_type)

if self.timestamp >= 0xFFFFFF:
header += struct.pack('!I', self.timestamp)

elif fmt == 2:
# Type 2: 3字节
if self.timestamp >= 0xFFFFFF:
ts = 0xFFFFFF
header += struct.pack('!I', ts)[1:] # 只取3字节
header += struct.pack('!I', self.timestamp)
else:
header += struct.pack('!I', self.timestamp)[1:] # 只取3字节

# Type 3: 0字节(无消息头部)

return header

class AMF0Encoder:
"""AMF0编码器"""

@staticmethod
def encode_number(value: float) -> bytes:
"""编码数字"""
return struct.pack('!Bd', 0x00, value)

@staticmethod
def encode_boolean(value: bool) -> bytes:
"""编码布尔值"""
return struct.pack('!BB', 0x01, 1 if value else 0)

@staticmethod
def encode_string(value: str) -> bytes:
"""编码字符串"""
utf8_bytes = value.encode('utf-8')
return struct.pack('!BH', 0x02, len(utf8_bytes)) + utf8_bytes

@staticmethod
def encode_object(obj: Dict[str, Any]) -> bytes:
"""编码对象"""
data = struct.pack('!B', 0x03) # Object marker

for key, value in obj.items():
# 属性名(不带类型标记)
key_bytes = key.encode('utf-8')
data += struct.pack('!H', len(key_bytes)) + key_bytes

# 属性值
if isinstance(value, (int, float)):
data += AMF0Encoder.encode_number(float(value))
elif isinstance(value, bool):
data += AMF0Encoder.encode_boolean(value)
elif isinstance(value, str):
data += AMF0Encoder.encode_string(value)
elif value is None:
data += struct.pack('!B', 0x05) # Null
else:
# 默认转为字符串
data += AMF0Encoder.encode_string(str(value))

# 对象结束标记
data += struct.pack('!HB', 0x0000, 0x09)

return data

@staticmethod
def encode_null() -> bytes:
"""编码null"""
return struct.pack('!B', 0x05)

class AMF0Decoder:
"""AMF0解码器"""

@staticmethod
def decode(data: bytes, offset: int = 0) -> tuple:
"""解码AMF0数据"""
if offset >= len(data):
return None, offset

type_marker = data[offset]
offset += 1

if type_marker == 0x00: # Number
if offset + 8 > len(data):
raise ValueError("Insufficient data for number")
value = struct.unpack('!d', data[offset:offset+8])[0]
return value, offset + 8

elif type_marker == 0x01: # Boolean
if offset >= len(data):
raise ValueError("Insufficient data for boolean")
value = bool(data[offset])
return value, offset + 1

elif type_marker == 0x02: # String
if offset + 2 > len(data):
raise ValueError("Insufficient data for string length")
length = struct.unpack('!H', data[offset:offset+2])[0]
offset += 2

if offset + length > len(data):
raise ValueError("Insufficient data for string")
value = data[offset:offset+length].decode('utf-8')
return value, offset + length

elif type_marker == 0x03: # Object
obj = {}
while offset < len(data):
# 读取属性名长度
if offset + 2 > len(data):
break
name_length = struct.unpack('!H', data[offset:offset+2])[0]
offset += 2

# 检查对象结束标记
if name_length == 0:
if offset < len(data) and data[offset] == 0x09:
offset += 1
break

# 读取属性名
if offset + name_length > len(data):
break
name = data[offset:offset+name_length].decode('utf-8')
offset += name_length

# 读取属性值
value, offset = AMF0Decoder.decode(data, offset)
obj[name] = value

return obj, offset

elif type_marker == 0x05: # Null
return None, offset

else:
raise ValueError(f"Unsupported AMF0 type: {type_marker}")

class RTMPConnection:
"""RTMP连接"""

def __init__(self):
self.chunk_size = 128
self.window_ack_size = 2500000
self.peer_bandwidth = 2500000

# 块流状态
self.chunk_streams = {}

# 连接状态
self.connected = False
self.transaction_id = 1

def create_connect_command(self, app: str, tcUrl: str) -> bytes:
"""创建连接命令"""
# 构建connect命令
command_name = "connect"
transaction_id = self.transaction_id
self.transaction_id += 1

# 连接对象
connect_obj = {
"app": app,
"type": "nonprivate",
"tcUrl": tcUrl,
"fpad": False,
"capabilities": 15.0,
"audioCodecs": 3575.0,
"videoCodecs": 252.0,
"videoFunction": 1.0
}

# 编码AMF0数据
amf_data = b''
amf_data += AMF0Encoder.encode_string(command_name)
amf_data += AMF0Encoder.encode_number(transaction_id)
amf_data += AMF0Encoder.encode_object(connect_obj)

# 创建RTMP块
chunk = RTMPChunk(
chunk_stream_id=3,
timestamp=0,
message_length=len(amf_data),
message_type=RTMPMessageType.COMMAND_AMF0,
message_stream_id=0,
data=amf_data
)

return self._pack_chunk(chunk, fmt=0)

def create_publish_command(self, stream_name: str, publish_type: str = "live") -> bytes:
"""创建发布命令"""
command_name = "publish"
transaction_id = self.transaction_id
self.transaction_id += 1

# 编码AMF0数据
amf_data = b''
amf_data += AMF0Encoder.encode_string(command_name)
amf_data += AMF0Encoder.encode_number(transaction_id)
amf_data += AMF0Encoder.encode_null() # Command object (null)
amf_data += AMF0Encoder.encode_string(stream_name)
amf_data += AMF0Encoder.encode_string(publish_type)

# 创建RTMP块
chunk = RTMPChunk(
chunk_stream_id=8,
timestamp=0,
message_length=len(amf_data),
message_type=RTMPMessageType.COMMAND_AMF0,
message_stream_id=1,
data=amf_data
)

return self._pack_chunk(chunk, fmt=0)

def create_audio_packet(self, audio_data: bytes, timestamp: int) -> bytes:
"""创建音频数据包"""
chunk = RTMPChunk(
chunk_stream_id=4,
timestamp=timestamp,
message_length=len(audio_data),
message_type=RTMPMessageType.AUDIO,
message_stream_id=1,
data=audio_data
)

return self._pack_chunk(chunk, fmt=0)

def create_video_packet(self, video_data: bytes, timestamp: int) -> bytes:
"""创建视频数据包"""
chunk = RTMPChunk(
chunk_stream_id=6,
timestamp=timestamp,
message_length=len(video_data),
message_type=RTMPMessageType.VIDEO,
message_stream_id=1,
data=video_data
)

return self._pack_chunk(chunk, fmt=0)

def _pack_chunk(self, chunk: RTMPChunk, fmt: int) -> bytes:
"""打包块数据"""
data = b''

# 分割数据为多个块(如果需要)
remaining_data = chunk.data
first_chunk = True

while remaining_data:
# 当前块的数据长度
current_chunk_size = min(len(remaining_data), self.chunk_size)
current_data = remaining_data[:current_chunk_size]
remaining_data = remaining_data[current_chunk_size:]

if first_chunk:
# 第一个块使用指定格式
data += chunk.pack_header(fmt)
first_chunk = False
else:
# 后续块使用Type 3格式(无头部)
data += chunk.pack_header(3)

data += current_data

return data

def parse_chunk(self, data: bytes, offset: int = 0) -> tuple:
"""解析块数据"""
if offset >= len(data):
return None, offset

original_offset = offset

# 解析基本头部
basic_header = data[offset]
offset += 1

fmt = (basic_header >> 6) & 0x03
chunk_stream_id = basic_header & 0x3F

# 处理扩展的块流ID
if chunk_stream_id == 0:
if offset >= len(data):
return None, original_offset
chunk_stream_id = data[offset] + 64
offset += 1
elif chunk_stream_id == 1:
if offset + 1 >= len(data):
return None, original_offset
chunk_stream_id = struct.unpack('!H', data[offset:offset+2])[0] + 64
offset += 2

# 获取或创建块流状态
if chunk_stream_id not in self.chunk_streams:
self.chunk_streams[chunk_stream_id] = {
'timestamp': 0,
'message_length': 0,
'message_type': 0,
'message_stream_id': 0,
'timestamp_delta': 0
}

stream_state = self.chunk_streams[chunk_stream_id]

# 解析消息头部
timestamp = stream_state['timestamp']
message_length = stream_state['message_length']
message_type = stream_state['message_type']
message_stream_id = stream_state['message_stream_id']

if fmt == 0:
# Type 0: 完整头部
if offset + 11 > len(data):
return None, original_offset

timestamp_raw = struct.unpack('!I', b'\x00' + data[offset:offset+3])[0]
message_length = struct.unpack('!I', b'\x00' + data[offset+3:offset+6])[0]
message_type = data[offset+6]
message_stream_id = struct.unpack('!I', data[offset+7:offset+11])[0]
offset += 11

# 检查扩展时间戳
if timestamp_raw == 0xFFFFFF:
if offset + 4 > len(data):
return None, original_offset
timestamp = struct.unpack('!I', data[offset:offset+4])[0]
offset += 4
else:
timestamp = timestamp_raw

# 更新流状态
stream_state['timestamp'] = timestamp
stream_state['message_length'] = message_length
stream_state['message_type'] = message_type
stream_state['message_stream_id'] = message_stream_id

elif fmt == 1:
# Type 1: 无消息流ID
if offset + 7 > len(data):
return None, original_offset

timestamp_delta_raw = struct.unpack('!I', b'\x00' + data[offset:offset+3])[0]
message_length = struct.unpack('!I', b'\x00' + data[offset+3:offset+6])[0]
message_type = data[offset+6]
offset += 7

# 检查扩展时间戳
if timestamp_delta_raw == 0xFFFFFF:
if offset + 4 > len(data):
return None, original_offset
timestamp_delta = struct.unpack('!I', data[offset:offset+4])[0]
offset += 4
else:
timestamp_delta = timestamp_delta_raw

timestamp = stream_state['timestamp'] + timestamp_delta
message_stream_id = stream_state['message_stream_id']

# 更新流状态
stream_state['timestamp'] = timestamp
stream_state['message_length'] = message_length
stream_state['message_type'] = message_type
stream_state['timestamp_delta'] = timestamp_delta

elif fmt == 2:
# Type 2: 仅时间戳差值
if offset + 3 > len(data):
return None, original_offset

timestamp_delta_raw = struct.unpack('!I', b'\x00' + data[offset:offset+3])[0]
offset += 3

# 检查扩展时间戳
if timestamp_delta_raw == 0xFFFFFF:
if offset + 4 > len(data):
return None, original_offset
timestamp_delta = struct.unpack('!I', data[offset:offset+4])[0]
offset += 4
else:
timestamp_delta = timestamp_delta_raw

timestamp = stream_state['timestamp'] + timestamp_delta
message_length = stream_state['message_length']
message_type = stream_state['message_type']
message_stream_id = stream_state['message_stream_id']

# 更新流状态
stream_state['timestamp'] = timestamp
stream_state['timestamp_delta'] = timestamp_delta

# Type 3: 无头部,使用之前的值

# 读取数据
chunk_data_size = min(message_length, self.chunk_size)
if offset + chunk_data_size > len(data):
return None, original_offset

chunk_data = data[offset:offset+chunk_data_size]
offset += chunk_data_size

# 创建块对象
chunk = RTMPChunk(
chunk_stream_id=chunk_stream_id,
timestamp=timestamp,
message_length=message_length,
message_type=message_type,
message_stream_id=message_stream_id,
data=chunk_data
)

return chunk, offset

# RTMP演示
def demo_rtmp():
# 创建RTMP连接
rtmp = RTMPConnection()

print("RTMP Protocol Demo")
print("==================")

# 创建连接命令
connect_data = rtmp.create_connect_command("live", "rtmp://localhost/live")
print(f"\nConnect command: {len(connect_data)} bytes")
print(f"Data preview: {connect_data[:50].hex()}...")

# 创建发布命令
publish_data = rtmp.create_publish_command("stream1", "live")
print(f"\nPublish command: {len(publish_data)} bytes")
print(f"Data preview: {publish_data[:50].hex()}...")

# 创建音频数据包
audio_data = b'\x17\x00' + b'\x00' * 100 # 模拟AAC音频数据
audio_packet = rtmp.create_audio_packet(audio_data, 1000)
print(f"\nAudio packet: {len(audio_packet)} bytes")
print(f"Data preview: {audio_packet[:50].hex()}...")

# 创建视频数据包
video_data = b'\x17\x01\x00\x00\x00' + b'\x00' * 200 # 模拟H.264视频数据
video_packet = rtmp.create_video_packet(video_data, 1033)
print(f"\nVideo packet: {len(video_packet)} bytes")
print(f"Data preview: {video_packet[:50].hex()}...")

# 演示AMF0编解码
print("\nAMF0 Encoding/Decoding Demo:")

# 编码测试对象
test_obj = {
"name": "test_stream",
"duration": 120.5,
"live": True,
"viewers": 1000
}

encoded = AMF0Encoder.encode_object(test_obj)
print(f"Encoded object: {len(encoded)} bytes")
print(f"Hex: {encoded.hex()}")

# 解码测试
decoded, _ = AMF0Decoder.decode(encoded)
print(f"Decoded object: {decoded}")

# 验证编解码一致性
print(f"Encoding/Decoding consistent: {test_obj == decoded}")

# 运行演示
demo_rtmp()

3. HTTP自适应流协议

3.1 HLS(HTTP Live Streaming)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
import os
import time
import urllib.parse
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class HLSSegment:
"""HLS片段"""
duration: float
uri: str
sequence: int
discontinuity: bool = False
key_uri: Optional[str] = None
key_iv: Optional[str] = None
byterange: Optional[str] = None
program_date_time: Optional[str] = None

@dataclass
class HLSVariant:
"""HLS变体流"""
bandwidth: int
uri: str
resolution: Optional[str] = None
codecs: Optional[str] = None
frame_rate: Optional[float] = None
audio_group: Optional[str] = None
video_group: Optional[str] = None
subtitles_group: Optional[str] = None

class HLSPlaylist:
"""HLS播放列表"""

def __init__(self, is_master: bool = False):
self.is_master = is_master
self.version = 3
self.target_duration = 10
self.media_sequence = 0
self.segments: List[HLSSegment] = []
self.variants: List[HLSVariant] = []
self.ended = False
self.playlist_type = None # VOD, EVENT, None

# 主播放列表属性
self.independent_segments = False

# 媒体播放列表属性
self.allow_cache = True

def add_segment(self, duration: float, uri: str, sequence: Optional[int] = None,
discontinuity: bool = False, **kwargs) -> HLSSegment:
"""添加片段"""
if sequence is None:
sequence = self.media_sequence + len(self.segments)

segment = HLSSegment(
duration=duration,
uri=uri,
sequence=sequence,
discontinuity=discontinuity,
**kwargs
)

self.segments.append(segment)
return segment

def add_variant(self, bandwidth: int, uri: str, **kwargs) -> HLSVariant:
"""添加变体流"""
variant = HLSVariant(bandwidth=bandwidth, uri=uri, **kwargs)
self.variants.append(variant)
return variant

def remove_old_segments(self, keep_count: int = 3):
"""移除旧片段(用于直播)"""
if len(self.segments) > keep_count:
removed_count = len(self.segments) - keep_count
self.segments = self.segments[removed_count:]
self.media_sequence += removed_count

def generate_m3u8(self) -> str:
"""生成M3U8播放列表"""
lines = ['#EXTM3U']

# 版本
lines.append(f'#EXT-X-VERSION:{self.version}')

if self.is_master:
# 主播放列表
if self.independent_segments:
lines.append('#EXT-X-INDEPENDENT-SEGMENTS')

for variant in self.variants:
# 构建属性字符串
attrs = [f'BANDWIDTH={variant.bandwidth}']

if variant.resolution:
attrs.append(f'RESOLUTION={variant.resolution}')
if variant.codecs:
attrs.append(f'CODECS="{variant.codecs}"')
if variant.frame_rate:
attrs.append(f'FRAME-RATE={variant.frame_rate}')
if variant.audio_group:
attrs.append(f'AUDIO="{variant.audio_group}"')
if variant.video_group:
attrs.append(f'VIDEO="{variant.video_group}"')
if variant.subtitles_group:
attrs.append(f'SUBTITLES="{variant.subtitles_group}"')

lines.append(f'#EXT-X-STREAM-INF:{','.join(attrs)}')
lines.append(variant.uri)

else:
# 媒体播放列表
lines.append(f'#EXT-X-TARGETDURATION:{int(self.target_duration)}')
lines.append(f'#EXT-X-MEDIA-SEQUENCE:{self.media_sequence}')

if self.playlist_type:
lines.append(f'#EXT-X-PLAYLIST-TYPE:{self.playlist_type}')

if not self.allow_cache:
lines.append('#EXT-X-ALLOW-CACHE:NO')

# 添加片段
for segment in self.segments:
if segment.discontinuity:
lines.append('#EXT-X-DISCONTINUITY')

if segment.key_uri:
key_attrs = [f'METHOD=AES-128', f'URI="{segment.key_uri}"']
if segment.key_iv:
key_attrs.append(f'IV={segment.key_iv}')
lines.append(f'#EXT-X-KEY:{','.join(key_attrs)}')

if segment.program_date_time:
lines.append(f'#EXT-X-PROGRAM-DATE-TIME:{segment.program_date_time}')

if segment.byterange:
lines.append(f'#EXT-X-BYTERANGE:{segment.byterange}')

lines.append(f'#EXTINF:{segment.duration:.3f},')
lines.append(segment.uri)

if self.ended:
lines.append('#EXT-X-ENDLIST')

return '\n'.join(lines) + '\n'

@classmethod
def parse_m3u8(cls, content: str, base_uri: str = '') -> 'HLSPlaylist':
"""解析M3U8播放列表"""
lines = [line.strip() for line in content.split('\n') if line.strip()]

if not lines or lines[0] != '#EXTM3U':
raise ValueError("Invalid M3U8 format")

# 检测是否为主播放列表
is_master = any('#EXT-X-STREAM-INF' in line for line in lines)

playlist = cls(is_master=is_master)

i = 1 # 跳过 #EXTM3U
current_segment_info = {}

while i < len(lines):
line = lines[i]

if line.startswith('#EXT-X-VERSION:'):
playlist.version = int(line.split(':')[1])

elif line.startswith('#EXT-X-TARGETDURATION:'):
playlist.target_duration = int(line.split(':')[1])

elif line.startswith('#EXT-X-MEDIA-SEQUENCE:'):
playlist.media_sequence = int(line.split(':')[1])

elif line.startswith('#EXT-X-PLAYLIST-TYPE:'):
playlist.playlist_type = line.split(':')[1]

elif line.startswith('#EXT-X-ENDLIST'):
playlist.ended = True

elif line.startswith('#EXT-X-STREAM-INF:'):
# 解析变体流信息
attrs_str = line.split(':', 1)[1]
attrs = cls._parse_attributes(attrs_str)

# 下一行应该是URI
if i + 1 < len(lines) and not lines[i + 1].startswith('#'):
uri = lines[i + 1]
if base_uri and not uri.startswith(('http://', 'https://')):
uri = urllib.parse.urljoin(base_uri, uri)

variant = HLSVariant(
bandwidth=int(attrs.get('BANDWIDTH', 0)),
uri=uri,
resolution=attrs.get('RESOLUTION'),
codecs=attrs.get('CODECS', '').strip('"'),
frame_rate=float(attrs.get('FRAME-RATE', 0)) if attrs.get('FRAME-RATE') else None,
audio_group=attrs.get('AUDIO', '').strip('"'),
video_group=attrs.get('VIDEO', '').strip('"'),
subtitles_group=attrs.get('SUBTITLES', '').strip('"')
)
playlist.variants.append(variant)
i += 1 # 跳过URI行

elif line.startswith('#EXTINF:'):
# 解析片段信息
info = line.split(':', 1)[1]
duration_str = info.split(',')[0]
duration = float(duration_str)
current_segment_info['duration'] = duration

elif line.startswith('#EXT-X-DISCONTINUITY'):
current_segment_info['discontinuity'] = True

elif line.startswith('#EXT-X-KEY:'):
attrs_str = line.split(':', 1)[1]
attrs = cls._parse_attributes(attrs_str)
current_segment_info['key_uri'] = attrs.get('URI', '').strip('"')
current_segment_info['key_iv'] = attrs.get('IV')

elif line.startswith('#EXT-X-PROGRAM-DATE-TIME:'):
current_segment_info['program_date_time'] = line.split(':', 1)[1]

elif line.startswith('#EXT-X-BYTERANGE:'):
current_segment_info['byterange'] = line.split(':', 1)[1]

elif not line.startswith('#'):
# URI行,创建片段
uri = line
if base_uri and not uri.startswith(('http://', 'https://')):
uri = urllib.parse.urljoin(base_uri, uri)

segment = HLSSegment(
duration=current_segment_info.get('duration', 0),
uri=uri,
sequence=playlist.media_sequence + len(playlist.segments),
discontinuity=current_segment_info.get('discontinuity', False),
key_uri=current_segment_info.get('key_uri'),
key_iv=current_segment_info.get('key_iv'),
byterange=current_segment_info.get('byterange'),
program_date_time=current_segment_info.get('program_date_time')
)
playlist.segments.append(segment)
current_segment_info = {} # 重置

i += 1

return playlist

@staticmethod
def _parse_attributes(attrs_str: str) -> Dict[str, str]:
"""解析属性字符串"""
attrs = {}

# 简单的属性解析(不处理嵌套引号)
parts = []
current = ''
in_quotes = False

for char in attrs_str:
if char == '"':
in_quotes = not in_quotes
current += char
elif char == ',' and not in_quotes:
parts.append(current.strip())
current = ''
else:
current += char

if current.strip():
parts.append(current.strip())

for part in parts:
if '=' in part:
key, value = part.split('=', 1)
attrs[key.strip()] = value.strip()

return attrs

class HLSGenerator:
"""HLS生成器"""

def __init__(self, output_dir: str, segment_duration: float = 6.0):
self.output_dir = output_dir
self.segment_duration = segment_duration
self.segment_counter = 0

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

def generate_vod_playlist(self, video_duration: float, bitrates: List[int]) -> str:
"""生成VOD主播放列表"""
master_playlist = HLSPlaylist(is_master=True)
master_playlist.independent_segments = True

# 为每个码率创建变体流
for bitrate in bitrates:
resolution = self._get_resolution_for_bitrate(bitrate)
codecs = "avc1.42e00a,mp4a.40.2" # H.264 Baseline + AAC

variant_uri = f"playlist_{bitrate}k.m3u8"
master_playlist.add_variant(
bandwidth=bitrate * 1000,
uri=variant_uri,
resolution=resolution,
codecs=codecs
)

# 生成媒体播放列表
self._generate_media_playlist(variant_uri, video_duration, bitrate)

# 保存主播放列表
master_path = os.path.join(self.output_dir, "playlist.m3u8")
with open(master_path, 'w') as f:
f.write(master_playlist.generate_m3u8())

return master_path

def _generate_media_playlist(self, filename: str, video_duration: float, bitrate: int):
"""生成媒体播放列表"""
media_playlist = HLSPlaylist(is_master=False)
media_playlist.playlist_type = "VOD"
media_playlist.target_duration = self.segment_duration

# 计算片段数量
segment_count = int(video_duration / self.segment_duration) + 1

for i in range(segment_count):
segment_duration = min(self.segment_duration, video_duration - i * self.segment_duration)
if segment_duration <= 0:
break

segment_uri = f"segment_{bitrate}k_{i:04d}.ts"
media_playlist.add_segment(segment_duration, segment_uri)

media_playlist.ended = True

# 保存媒体播放列表
media_path = os.path.join(self.output_dir, filename)
with open(media_path, 'w') as f:
f.write(media_playlist.generate_m3u8())

def _get_resolution_for_bitrate(self, bitrate: int) -> str:
"""根据码率获取分辨率"""
if bitrate >= 4000:
return "1920x1080"
elif bitrate >= 2500:
return "1280x720"
elif bitrate >= 1500:
return "854x480"
elif bitrate >= 800:
return "640x360"
else:
return "426x240"

def create_live_playlist(self, stream_name: str) -> 'LiveHLSManager':
"""创建直播播放列表管理器"""
return LiveHLSManager(self.output_dir, stream_name, self.segment_duration)

class LiveHLSManager:
"""直播HLS管理器"""

def __init__(self, output_dir: str, stream_name: str, segment_duration: float = 6.0):
self.output_dir = output_dir
self.stream_name = stream_name
self.segment_duration = segment_duration

self.playlist = HLSPlaylist(is_master=False)
self.playlist.target_duration = segment_duration
self.playlist.allow_cache = False

self.segment_counter = 0
self.playlist_path = os.path.join(output_dir, f"{stream_name}.m3u8")

def add_segment(self, segment_data: bytes, timestamp: float) -> str:
"""添加新片段"""
segment_filename = f"{self.stream_name}_{self.segment_counter:06d}.ts"
segment_path = os.path.join(self.output_dir, segment_filename)

# 保存片段文件
with open(segment_path, 'wb') as f:
f.write(segment_data)

# 添加到播放列表
self.playlist.add_segment(self.segment_duration, segment_filename)

# 保持最近的片段(滑动窗口)
self.playlist.remove_old_segments(keep_count=6)

# 更新播放列表文件
self._update_playlist_file()

self.segment_counter += 1
return segment_path

def _update_playlist_file(self):
"""更新播放列表文件"""
with open(self.playlist_path, 'w') as f:
f.write(self.playlist.generate_m3u8())

def end_stream(self):
"""结束直播流"""
self.playlist.ended = True
self._update_playlist_file()

# HLS演示
def demo_hls():
print("HLS (HTTP Live Streaming) Demo")
print("==============================")

# 创建临时目录
import tempfile
temp_dir = tempfile.mkdtemp()
print(f"Output directory: {temp_dir}")

# 创建HLS生成器
hls_gen = HLSGenerator(temp_dir, segment_duration=6.0)

# 生成VOD播放列表
print("\nGenerating VOD playlist...")
bitrates = [500, 1000, 2000, 4000] # kbps
video_duration = 120.0 # 2分钟

master_path = hls_gen.generate_vod_playlist(video_duration, bitrates)
print(f"Master playlist created: {master_path}")

# 读取并显示主播放列表
with open(master_path, 'r') as f:
master_content = f.read()

print("\nMaster Playlist Content:")
print("-" * 40)
print(master_content)

# 读取并显示一个媒体播放列表
media_path = os.path.join(temp_dir, "playlist_1000k.m3u8")
with open(media_path, 'r') as f:
media_content = f.read()

print("\nMedia Playlist Content (1000k):")
print("-" * 40)
print(media_content)

# 演示播放列表解析
print("\nParsing playlist...")
parsed_master = HLSPlaylist.parse_m3u8(master_content)
print(f"Parsed master playlist: {len(parsed_master.variants)} variants")

for i, variant in enumerate(parsed_master.variants):
print(f" Variant {i+1}: {variant.bandwidth}bps, {variant.resolution}, {variant.uri}")

parsed_media = HLSPlaylist.parse_m3u8(media_content)
print(f"\nParsed media playlist: {len(parsed_media.segments)} segments")
print(f" Target duration: {parsed_media.target_duration}s")
print(f" Media sequence: {parsed_media.media_sequence}")
print(f" Playlist type: {parsed_media.playlist_type}")
print(f" Ended: {parsed_media.ended}")

# 演示直播播放列表
print("\nLive streaming demo...")
live_manager = hls_gen.create_live_playlist("live_stream")

# 模拟添加几个片段
for i in range(5):
fake_segment_data = f"Segment {i} data\n".encode() * 1000
timestamp = time.time() + i * 6

segment_path = live_manager.add_segment(fake_segment_data, timestamp)
print(f" Added segment: {os.path.basename(segment_path)}")

time.sleep(0.1) # 短暂延迟

# 读取直播播放列表
with open(live_manager.playlist_path, 'r') as f:
live_content = f.read()

print("\nLive Playlist Content:")
print("-" * 40)
print(live_content)

# 结束直播
live_manager.end_stream()
print("\nLive stream ended.")

# 清理临时文件
import shutil
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")

# 运行演示
demo_hls()

4. DASH (Dynamic Adaptive Streaming over HTTP)

DASH是一种基于HTTP的自适应流媒体技术,类似于HLS但使用XML格式的MPD(Media Presentation Description)文件。

4.1 DASH基础概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
import base64
import hashlib

class DASHSegment:
"""DASH片段"""

def __init__(self, url: str, duration: float, start_time: float = 0,
byte_range: Optional[str] = None):
self.url = url
self.duration = duration
self.start_time = start_time
self.byte_range = byte_range

class DASHRepresentation:
"""DASH表示(不同质量的流)"""

def __init__(self, representation_id: str, bandwidth: int,
width: Optional[int] = None, height: Optional[int] = None,
frame_rate: Optional[str] = None, codecs: Optional[str] = None):
self.id = representation_id
self.bandwidth = bandwidth
self.width = width
self.height = height
self.frame_rate = frame_rate
self.codecs = codecs
self.segments: List[DASHSegment] = []
self.initialization_url: Optional[str] = None
self.media_template: Optional[str] = None

def add_segment(self, url: str, duration: float, start_time: float = 0):
"""添加片段"""
segment = DASHSegment(url, duration, start_time)
self.segments.append(segment)
return segment

class DASHAdaptationSet:
"""DASH自适应集合"""

def __init__(self, adaptation_set_id: str, content_type: str = "video",
mime_type: str = "video/mp4", lang: Optional[str] = None):
self.id = adaptation_set_id
self.content_type = content_type
self.mime_type = mime_type
self.lang = lang
self.representations: List[DASHRepresentation] = []

def add_representation(self, representation: DASHRepresentation):
"""添加表示"""
self.representations.append(representation)
return representation

class DASHPeriod:
"""DASH周期"""

def __init__(self, period_id: str, start: str = "PT0S", duration: Optional[str] = None):
self.id = period_id
self.start = start
self.duration = duration
self.adaptation_sets: List[DASHAdaptationSet] = []

def add_adaptation_set(self, adaptation_set: DASHAdaptationSet):
"""添加自适应集合"""
self.adaptation_sets.append(adaptation_set)
return adaptation_set

class DASHManifest:
"""DASH清单(MPD)"""

def __init__(self, manifest_type: str = "static",
media_presentation_duration: Optional[str] = None,
min_buffer_time: str = "PT4S"):
self.type = manifest_type # static 或 dynamic
self.media_presentation_duration = media_presentation_duration
self.min_buffer_time = min_buffer_time
self.availability_start_time: Optional[str] = None
self.publish_time: Optional[str] = None
self.minimum_update_period: Optional[str] = None
self.periods: List[DASHPeriod] = []

# 命名空间
self.namespaces = {
'': 'urn:mpeg:dash:schema:mpd:2011',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
}

def add_period(self, period: DASHPeriod):
"""添加周期"""
self.periods.append(period)
return period

def generate_mpd(self) -> str:
"""生成MPD XML"""
# 创建根元素
root = ET.Element('MPD')

# 设置命名空间
for prefix, uri in self.namespaces.items():
if prefix:
root.set(f'xmlns:{prefix}', uri)
else:
root.set('xmlns', uri)

# 设置基本属性
root.set('type', self.type)
root.set('minBufferTime', self.min_buffer_time)

if self.media_presentation_duration:
root.set('mediaPresentationDuration', self.media_presentation_duration)

if self.availability_start_time:
root.set('availabilityStartTime', self.availability_start_time)

if self.publish_time:
root.set('publishTime', self.publish_time)

if self.minimum_update_period:
root.set('minimumUpdatePeriod', self.minimum_update_period)

# 添加周期
for period in self.periods:
period_elem = ET.SubElement(root, 'Period')
period_elem.set('id', period.id)
period_elem.set('start', period.start)

if period.duration:
period_elem.set('duration', period.duration)

# 添加自适应集合
for adaptation_set in period.adaptation_sets:
as_elem = ET.SubElement(period_elem, 'AdaptationSet')
as_elem.set('id', adaptation_set.id)
as_elem.set('contentType', adaptation_set.content_type)
as_elem.set('mimeType', adaptation_set.mime_type)

if adaptation_set.lang:
as_elem.set('lang', adaptation_set.lang)

# 添加表示
for representation in adaptation_set.representations:
repr_elem = ET.SubElement(as_elem, 'Representation')
repr_elem.set('id', representation.id)
repr_elem.set('bandwidth', str(representation.bandwidth))

if representation.width and representation.height:
repr_elem.set('width', str(representation.width))
repr_elem.set('height', str(representation.height))

if representation.frame_rate:
repr_elem.set('frameRate', representation.frame_rate)

if representation.codecs:
repr_elem.set('codecs', representation.codecs)

# 初始化片段
if representation.initialization_url:
init_elem = ET.SubElement(repr_elem, 'Initialization')
init_elem.set('sourceURL', representation.initialization_url)

# 媒体模板
if representation.media_template:
template_elem = ET.SubElement(repr_elem, 'SegmentTemplate')
template_elem.set('media', representation.media_template)
template_elem.set('startNumber', '1')

# 时间线
if representation.segments:
timeline_elem = ET.SubElement(template_elem, 'SegmentTimeline')

for segment in representation.segments:
s_elem = ET.SubElement(timeline_elem, 'S')
s_elem.set('d', str(int(segment.duration * 1000))) # 毫秒

# 片段列表
elif representation.segments:
for i, segment in enumerate(representation.segments):
seg_elem = ET.SubElement(repr_elem, 'SegmentURL')
seg_elem.set('media', segment.url)

if segment.byte_range:
seg_elem.set('mediaRange', segment.byte_range)

# 格式化XML
self._indent(root)
return ET.tostring(root, encoding='unicode', xml_declaration=True)

def _indent(self, elem, level=0):
"""格式化XML缩进"""
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
self._indent(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i

@classmethod
def parse_mpd(cls, mpd_content: str) -> 'DASHManifest':
"""解析MPD文件"""
root = ET.fromstring(mpd_content)

# 提取基本信息
manifest_type = root.get('type', 'static')
duration = root.get('mediaPresentationDuration')
min_buffer = root.get('minBufferTime', 'PT4S')

manifest = cls(manifest_type, duration, min_buffer)
manifest.availability_start_time = root.get('availabilityStartTime')
manifest.publish_time = root.get('publishTime')
manifest.minimum_update_period = root.get('minimumUpdatePeriod')

# 解析周期
for period_elem in root.findall('.//{urn:mpeg:dash:schema:mpd:2011}Period'):
period_id = period_elem.get('id', 'period_0')
period_start = period_elem.get('start', 'PT0S')
period_duration = period_elem.get('duration')

period = DASHPeriod(period_id, period_start, period_duration)

# 解析自适应集合
for as_elem in period_elem.findall('.//{urn:mpeg:dash:schema:mpd:2011}AdaptationSet'):
as_id = as_elem.get('id', 'as_0')
content_type = as_elem.get('contentType', 'video')
mime_type = as_elem.get('mimeType', 'video/mp4')
lang = as_elem.get('lang')

adaptation_set = DASHAdaptationSet(as_id, content_type, mime_type, lang)

# 解析表示
for repr_elem in as_elem.findall('.//{urn:mpeg:dash:schema:mpd:2011}Representation'):
repr_id = repr_elem.get('id')
bandwidth = int(repr_elem.get('bandwidth', 0))
width = int(repr_elem.get('width', 0)) if repr_elem.get('width') else None
height = int(repr_elem.get('height', 0)) if repr_elem.get('height') else None
frame_rate = repr_elem.get('frameRate')
codecs = repr_elem.get('codecs')

representation = DASHRepresentation(
repr_id, bandwidth, width, height, frame_rate, codecs
)

# 解析片段URL
for seg_elem in repr_elem.findall('.//{urn:mpeg:dash:schema:mpd:2011}SegmentURL'):
media_url = seg_elem.get('media')
byte_range = seg_elem.get('mediaRange')

segment = DASHSegment(media_url, 0, 0, byte_range)
representation.segments.append(segment)

adaptation_set.add_representation(representation)

period.add_adaptation_set(adaptation_set)

manifest.add_period(period)

return manifest

class DASHGenerator:
"""DASH生成器"""

def __init__(self, output_dir: str, segment_duration: float = 4.0):
self.output_dir = output_dir
self.segment_duration = segment_duration

os.makedirs(output_dir, exist_ok=True)

def generate_vod_manifest(self, video_duration: float,
video_representations: List[Dict[str, Any]],
audio_representations: List[Dict[str, Any]] = None) -> str:
"""生成VOD清单"""

# 计算总时长(ISO 8601格式)
duration_iso = self._seconds_to_iso8601(video_duration)

manifest = DASHManifest(
manifest_type="static",
media_presentation_duration=duration_iso,
min_buffer_time="PT4S"
)

# 创建周期
period = DASHPeriod("period_0")

# 视频自适应集合
if video_representations:
video_as = DASHAdaptationSet(
"video_as", "video", "video/mp4"
)

for i, video_repr in enumerate(video_representations):
representation = DASHRepresentation(
f"video_{i}",
video_repr['bandwidth'],
video_repr.get('width'),
video_repr.get('height'),
video_repr.get('frame_rate'),
video_repr.get('codecs', 'avc1.42e00a')
)

# 设置初始化片段
representation.initialization_url = f"video_{i}_init.mp4"

# 生成片段
segment_count = int(video_duration / self.segment_duration) + 1
for j in range(segment_count):
segment_duration = min(self.segment_duration,
video_duration - j * self.segment_duration)
if segment_duration <= 0:
break

segment_url = f"video_{i}_seg_{j:04d}.m4s"
representation.add_segment(segment_url, segment_duration, j * self.segment_duration)

video_as.add_representation(representation)

period.add_adaptation_set(video_as)

# 音频自适应集合
if audio_representations:
audio_as = DASHAdaptationSet(
"audio_as", "audio", "audio/mp4"
)

for i, audio_repr in enumerate(audio_representations):
representation = DASHRepresentation(
f"audio_{i}",
audio_repr['bandwidth'],
codecs=audio_repr.get('codecs', 'mp4a.40.2')
)

representation.initialization_url = f"audio_{i}_init.mp4"

# 生成音频片段
segment_count = int(video_duration / self.segment_duration) + 1
for j in range(segment_count):
segment_duration = min(self.segment_duration,
video_duration - j * self.segment_duration)
if segment_duration <= 0:
break

segment_url = f"audio_{i}_seg_{j:04d}.m4s"
representation.add_segment(segment_url, segment_duration, j * self.segment_duration)

audio_as.add_representation(representation)

period.add_adaptation_set(audio_as)

manifest.add_period(period)

# 保存MPD文件
mpd_path = os.path.join(self.output_dir, "manifest.mpd")
with open(mpd_path, 'w', encoding='utf-8') as f:
f.write(manifest.generate_mpd())

return mpd_path

def generate_live_manifest(self, availability_start_time: datetime) -> str:
"""生成直播清单"""
manifest = DASHManifest(
manifest_type="dynamic",
min_buffer_time="PT4S"
)

manifest.availability_start_time = availability_start_time.isoformat() + 'Z'
manifest.publish_time = datetime.utcnow().isoformat() + 'Z'
manifest.minimum_update_period = "PT2S"

# 创建周期
period = DASHPeriod("live_period")

# 视频自适应集合
video_as = DASHAdaptationSet("live_video", "video", "video/mp4")

# 添加一个视频表示
video_repr = DASHRepresentation(
"live_video_1", 2000000, 1280, 720, "25", "avc1.42e00a"
)
video_repr.media_template = "live_video_$Number$.m4s"
video_repr.initialization_url = "live_video_init.mp4"

video_as.add_representation(video_repr)
period.add_adaptation_set(video_as)

manifest.add_period(period)

# 保存MPD文件
mpd_path = os.path.join(self.output_dir, "live_manifest.mpd")
with open(mpd_path, 'w', encoding='utf-8') as f:
f.write(manifest.generate_mpd())

return mpd_path

def _seconds_to_iso8601(self, seconds: float) -> str:
"""将秒数转换为ISO 8601持续时间格式"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60

if hours > 0:
return f"PT{hours}H{minutes}M{secs:.3f}S"
elif minutes > 0:
return f"PT{minutes}M{secs:.3f}S"
else:
return f"PT{secs:.3f}S"

# DASH演示
def demo_dash():
print("DASH (Dynamic Adaptive Streaming over HTTP) Demo")
print("================================================")

# 创建临时目录
import tempfile
temp_dir = tempfile.mkdtemp()
print(f"Output directory: {temp_dir}")

# 创建DASH生成器
dash_gen = DASHGenerator(temp_dir, segment_duration=4.0)

# 定义视频表示
video_representations = [
{
'bandwidth': 500000,
'width': 640,
'height': 360,
'frame_rate': '25',
'codecs': 'avc1.42e00a'
},
{
'bandwidth': 1000000,
'width': 854,
'height': 480,
'frame_rate': '25',
'codecs': 'avc1.42e00a'
},
{
'bandwidth': 2000000,
'width': 1280,
'height': 720,
'frame_rate': '25',
'codecs': 'avc1.42e00a'
}
]

# 定义音频表示
audio_representations = [
{
'bandwidth': 128000,
'codecs': 'mp4a.40.2'
}
]

# 生成VOD清单
print("\nGenerating VOD manifest...")
video_duration = 120.0 # 2分钟

mpd_path = dash_gen.generate_vod_manifest(
video_duration, video_representations, audio_representations
)
print(f"MPD manifest created: {mpd_path}")

# 读取并显示MPD内容
with open(mpd_path, 'r', encoding='utf-8') as f:
mpd_content = f.read()

print("\nMPD Manifest Content:")
print("-" * 50)
print(mpd_content)

# 演示清单解析
print("\nParsing MPD manifest...")
parsed_manifest = DASHManifest.parse_mpd(mpd_content)

print(f"Manifest type: {parsed_manifest.type}")
print(f"Duration: {parsed_manifest.media_presentation_duration}")
print(f"Min buffer time: {parsed_manifest.min_buffer_time}")
print(f"Periods: {len(parsed_manifest.periods)}")

for period in parsed_manifest.periods:
print(f"\nPeriod '{period.id}':")
print(f" Adaptation sets: {len(period.adaptation_sets)}")

for adaptation_set in period.adaptation_sets:
print(f" - {adaptation_set.content_type} ({adaptation_set.mime_type})")
print(f" Representations: {len(adaptation_set.representations)}")

for representation in adaptation_set.representations:
print(f" - {representation.id}: {representation.bandwidth}bps")
if representation.width and representation.height:
print(f" Resolution: {representation.width}x{representation.height}")
print(f" Segments: {len(representation.segments)}")

# 生成直播清单
print("\nGenerating live manifest...")
start_time = datetime.utcnow() - timedelta(seconds=30)
live_mpd_path = dash_gen.generate_live_manifest(start_time)

with open(live_mpd_path, 'r', encoding='utf-8') as f:
live_mpd_content = f.read()

print("\nLive MPD Manifest Content:")
print("-" * 50)
print(live_mpd_content)

# 清理临时文件
import shutil
shutil.rmtree(temp_dir)
print(f"\nCleaned up temporary directory: {temp_dir}")

# 运行演示
demo_dash()

5. 协议比较与选择

5.1 协议特性对比

特性 RTP/RTCP RTMP HLS DASH
传输方式 UDP/TCP TCP HTTP HTTP
延迟 极低(100ms) 低(1-3s) 高(6-30s) 高(6-30s)
防火墙友好 一般
CDN支持 一般 优秀 优秀
自适应码率 需额外实现 支持 支持 支持
移动端支持 需额外实现 一般 原生支持 原生支持
标准化程度 高(RFC) 私有 高(RFC) 高(ISO)

5.2 应用场景选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class ProtocolSelector:
"""协议选择器"""

@staticmethod
def recommend_protocol(requirements: Dict[str, Any]) -> str:
"""根据需求推荐协议"""
latency = requirements.get('latency', 'normal') # low, normal, high
content_type = requirements.get('content_type', 'vod') # live, vod
platform = requirements.get('platform', 'web') # web, mobile, desktop
network = requirements.get('network', 'good') # poor, good, excellent
scale = requirements.get('scale', 'small') # small, medium, large

score = {
'RTP': 0,
'RTMP': 0,
'HLS': 0,
'DASH': 0
}

# 延迟要求
if latency == 'low':
score['RTP'] += 3
score['RTMP'] += 2
score['HLS'] -= 2
score['DASH'] -= 2
elif latency == 'normal':
score['RTMP'] += 1
score['HLS'] += 1
score['DASH'] += 1

# 内容类型
if content_type == 'live':
score['RTP'] += 2
score['RTMP'] += 2
score['HLS'] += 1
score['DASH'] += 1
else: # VOD
score['HLS'] += 2
score['DASH'] += 2

# 平台支持
if platform == 'web':
score['HLS'] += 2
score['DASH'] += 2
score['RTMP'] += 1
elif platform == 'mobile':
score['HLS'] += 3
score['DASH'] += 2
elif platform == 'desktop':
score['RTP'] += 1
score['RTMP'] += 2
score['HLS'] += 1
score['DASH'] += 1

# 网络条件
if network == 'poor':
score['HLS'] += 2
score['DASH'] += 2
score['RTP'] -= 1
elif network == 'excellent':
score['RTP'] += 1

# 规模要求
if scale == 'large':
score['HLS'] += 2
score['DASH'] += 2
score['RTP'] -= 1
score['RTMP'] -= 1

# 返回得分最高的协议
best_protocol = max(score.items(), key=lambda x: x[1])
return best_protocol[0]

@staticmethod
def get_implementation_guide(protocol: str) -> Dict[str, str]:
"""获取实现指南"""
guides = {
'RTP': {
'server': 'GStreamer, FFmpeg, Live555',
'client': 'VLC, GStreamer, FFmpeg',
'considerations': '需要处理NAT穿透,考虑使用TURN服务器'
},
'RTMP': {
'server': 'Nginx-RTMP, SRS, Wowza',
'client': 'FFmpeg, OBS, Flash Player',
'considerations': '需要Flash支持,考虑向HLS/DASH迁移'
},
'HLS': {
'server': 'Nginx, Apache, CDN',
'client': 'Safari原生, hls.js, ExoPlayer',
'considerations': '注意片段大小和缓存策略'
},
'DASH': {
'server': 'Nginx, Apache, CDN',
'client': 'dash.js, ExoPlayer, Shaka Player',
'considerations': '更好的标准化,但需要JavaScript播放器'
}
}
return guides.get(protocol, {})

# 协议选择演示
def demo_protocol_selection():
print("Protocol Selection Demo")
print("=======================")

selector = ProtocolSelector()

# 测试不同场景
scenarios = [
{
'name': '低延迟直播',
'requirements': {
'latency': 'low',
'content_type': 'live',
'platform': 'web',
'network': 'good',
'scale': 'medium'
}
},
{
'name': '移动端VOD',
'requirements': {
'latency': 'normal',
'content_type': 'vod',
'platform': 'mobile',
'network': 'poor',
'scale': 'large'
}
},
{
'name': '桌面端实时通信',
'requirements': {
'latency': 'low',
'content_type': 'live',
'platform': 'desktop',
'network': 'excellent',
'scale': 'small'
}
},
{
'name': '大规模直播',
'requirements': {
'latency': 'normal',
'content_type': 'live',
'platform': 'web',
'network': 'good',
'scale': 'large'
}
}
]

for scenario in scenarios:
print(f"\n场景: {scenario['name']}")
print(f"需求: {scenario['requirements']}")

recommended = selector.recommend_protocol(scenario['requirements'])
print(f"推荐协议: {recommended}")

guide = selector.get_implementation_guide(recommended)
if guide:
print(f"服务器: {guide['server']}")
print(f"客户端: {guide['client']}")
print(f"注意事项: {guide['considerations']}")

# 运行演示
demo_protocol_selection()

总结

本文深入探讨了音视频传输协议与流媒体技术的核心概念和实现方法:

关键技术要点

  1. RTP/RTCP协议:适用于实时传输,提供时序信息和质量反馈
  2. RTMP协议:Adobe开发的流媒体协议,适合直播推流
  3. HLS技术:基于HTTP的自适应流媒体,移动端支持好
  4. DASH标准:国际标准化的自适应流媒体技术

技术发展趋势

  • 低延迟优化:WebRTC、CMAF等技术降低流媒体延迟
  • AI增强:智能码率自适应、内容感知编码
  • 边缘计算:CDN边缘节点实时转码和优化
  • 5G应用:超低延迟、高带宽场景的新机遇

实际应用建议

  • 直播场景:根据延迟要求选择RTP、RTMP或HLS
  • 点播场景:优先考虑HLS或DASH
  • 移动端:HLS具有最好的原生支持
  • 大规模分发:利用CDN和HTTP协议的优势

通过理解这些协议的特点和适用场景,开发者可以为不同的音视频应用选择最合适的传输方案,实现高质量的流媒体服务。

版权所有,如有侵权请联系我