直播技术架构与实现:从推流到播放的完整解决方案

引言

直播技术作为现代互联网的重要组成部分,已经深入到教育、娱乐、电商、企业培训等各个领域。一个完整的直播系统涉及音视频采集、编码、推流、转码、分发、播放等多个技术环节,每个环节都有其独特的技术挑战和优化空间。

本文将深入探讨直播技术的完整架构,从技术原理到实际实现,为读者提供一个全面的直播系统开发指南。

1. 直播系统整体架构

1.1 系统架构概览

一个完整的直播系统通常包含以下几个核心组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
import asyncio
import json
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import threading
import queue
import hashlib
import uuid

class StreamState(Enum):
"""流状态枚举"""
IDLE = "idle"
CONNECTING = "connecting"
STREAMING = "streaming"
PAUSED = "paused"
ERROR = "error"
DISCONNECTED = "disconnected"

class StreamProtocol(Enum):
"""流协议枚举"""
RTMP = "rtmp"
RTSP = "rtsp"
HLS = "hls"
DASH = "dash"
WEBRTC = "webrtc"
SRT = "srt"

@dataclass
class StreamConfig:
"""流配置"""
stream_id: str
protocol: StreamProtocol
video_codec: str = "h264"
audio_codec: str = "aac"
video_bitrate: int = 2000 # kbps
audio_bitrate: int = 128 # kbps
resolution: tuple = (1920, 1080)
framerate: int = 30
keyframe_interval: int = 2 # seconds

@dataclass
class StreamMetrics:
"""流指标"""
bitrate: float = 0.0
fps: float = 0.0
dropped_frames: int = 0
latency: float = 0.0
packet_loss: float = 0.0
connection_time: float = 0.0
total_bytes: int = 0
error_count: int = 0
last_update: float = field(default_factory=time.time)

class LiveStreamingArchitecture:
"""直播系统架构管理器"""

def __init__(self):
self.streams: Dict[str, StreamConfig] = {}
self.metrics: Dict[str, StreamMetrics] = {}
self.state_callbacks: Dict[str, List[Callable]] = {}
self.running = False
self.monitor_thread = None

# 系统组件
self.ingest_servers = [] # 推流服务器
self.transcoding_nodes = [] # 转码节点
self.cdn_nodes = [] # CDN节点
self.origin_servers = [] # 源站服务器

def register_stream(self, config: StreamConfig) -> bool:
"""注册流"""
try:
self.streams[config.stream_id] = config
self.metrics[config.stream_id] = StreamMetrics()

print(f"Stream registered: {config.stream_id}")
print(f" Protocol: {config.protocol.value}")
print(f" Resolution: {config.resolution[0]}x{config.resolution[1]}")
print(f" Video bitrate: {config.video_bitrate}kbps")
print(f" Audio bitrate: {config.audio_bitrate}kbps")

return True
except Exception as e:
print(f"Failed to register stream {config.stream_id}: {e}")
return False

def start_stream(self, stream_id: str) -> bool:
"""启动流"""
if stream_id not in self.streams:
print(f"Stream {stream_id} not found")
return False

config = self.streams[stream_id]

try:
# 模拟启动流的过程
print(f"Starting stream {stream_id}...")

# 1. 验证推流地址
if not self._validate_stream_url(stream_id):
raise Exception("Invalid stream URL")

# 2. 分配转码资源
transcoding_node = self._allocate_transcoding_node(config)
if not transcoding_node:
raise Exception("No available transcoding nodes")

# 3. 配置CDN分发
cdn_config = self._configure_cdn_distribution(stream_id)
if not cdn_config:
raise Exception("Failed to configure CDN")

# 4. 启动监控
self._start_stream_monitoring(stream_id)

print(f"Stream {stream_id} started successfully")
print(f" Transcoding node: {transcoding_node}")
print(f" CDN configuration: {cdn_config}")

return True

except Exception as e:
print(f"Failed to start stream {stream_id}: {e}")
return False

def stop_stream(self, stream_id: str) -> bool:
"""停止流"""
if stream_id not in self.streams:
print(f"Stream {stream_id} not found")
return False

try:
print(f"Stopping stream {stream_id}...")

# 1. 停止推流接收
self._stop_ingest(stream_id)

# 2. 停止转码
self._stop_transcoding(stream_id)

# 3. 清理CDN缓存
self._cleanup_cdn_cache(stream_id)

# 4. 停止监控
self._stop_stream_monitoring(stream_id)

print(f"Stream {stream_id} stopped successfully")
return True

except Exception as e:
print(f"Failed to stop stream {stream_id}: {e}")
return False

def get_stream_metrics(self, stream_id: str) -> Optional[StreamMetrics]:
"""获取流指标"""
return self.metrics.get(stream_id)

def _validate_stream_url(self, stream_id: str) -> bool:
"""验证推流地址"""
# 模拟URL验证逻辑
config = self.streams[stream_id]

if config.protocol == StreamProtocol.RTMP:
# RTMP URL格式验证
return True
elif config.protocol == StreamProtocol.WEBRTC:
# WebRTC连接验证
return True
else:
return True

def _allocate_transcoding_node(self, config: StreamConfig) -> Optional[str]:
"""分配转码节点"""
# 模拟转码节点分配逻辑
available_nodes = ["transcode-node-1", "transcode-node-2", "transcode-node-3"]

# 根据负载选择节点
selected_node = available_nodes[0] # 简化选择逻辑

print(f"Allocated transcoding node: {selected_node}")
print(f" Input: {config.resolution[0]}x{config.resolution[1]}@{config.framerate}fps")
print(f" Output profiles: 1080p, 720p, 480p")

return selected_node

def _configure_cdn_distribution(self, stream_id: str) -> Optional[Dict]:
"""配置CDN分发"""
config = self.streams[stream_id]

cdn_config = {
'stream_id': stream_id,
'origin_server': 'origin.example.com',
'edge_servers': [
'edge1.example.com',
'edge2.example.com',
'edge3.example.com'
],
'protocols': {
'hls': f'https://cdn.example.com/live/{stream_id}/index.m3u8',
'dash': f'https://cdn.example.com/live/{stream_id}/manifest.mpd',
'rtmp': f'rtmp://cdn.example.com/live/{stream_id}'
},
'cache_ttl': 10, # seconds
'geo_distribution': True
}

return cdn_config

def _start_stream_monitoring(self, stream_id: str):
"""启动流监控"""
if not self.running:
self.running = True
self.monitor_thread = threading.Thread(
target=self._monitoring_loop,
daemon=True
)
self.monitor_thread.start()

def _stop_stream_monitoring(self, stream_id: str):
"""停止流监控"""
# 在实际实现中,这里会停止特定流的监控
pass

def _monitoring_loop(self):
"""监控循环"""
while self.running:
try:
for stream_id in self.streams.keys():
self._update_stream_metrics(stream_id)
time.sleep(5) # 每5秒更新一次指标
except Exception as e:
print(f"Monitoring error: {e}")

def _update_stream_metrics(self, stream_id: str):
"""更新流指标"""
if stream_id not in self.metrics:
return

metrics = self.metrics[stream_id]

# 模拟指标更新
import random
metrics.bitrate = random.uniform(1800, 2200) # kbps
metrics.fps = random.uniform(28, 30)
metrics.latency = random.uniform(2, 5) # seconds
metrics.packet_loss = random.uniform(0, 0.1) # percentage
metrics.total_bytes += int(metrics.bitrate * 1024 / 8 * 5) # 5秒的数据量
metrics.last_update = time.time()

# 检查异常情况
if metrics.fps < 25:
metrics.dropped_frames += 1

if metrics.packet_loss > 0.05:
metrics.error_count += 1

def _stop_ingest(self, stream_id: str):
"""停止推流接收"""
print(f"Stopping ingest for stream {stream_id}")

def _stop_transcoding(self, stream_id: str):
"""停止转码"""
print(f"Stopping transcoding for stream {stream_id}")

def _cleanup_cdn_cache(self, stream_id: str):
"""清理CDN缓存"""
print(f"Cleaning up CDN cache for stream {stream_id}")

def get_system_status(self) -> Dict:
"""获取系统状态"""
active_streams = len([s for s in self.streams.keys() if s in self.metrics])
total_bitrate = sum(m.bitrate for m in self.metrics.values())
avg_latency = sum(m.latency for m in self.metrics.values()) / len(self.metrics) if self.metrics else 0

return {
'active_streams': active_streams,
'total_streams': len(self.streams),
'total_bitrate': total_bitrate,
'average_latency': avg_latency,
'system_load': random.uniform(0.3, 0.8), # 模拟系统负载
'uptime': time.time() - (time.time() - 3600), # 模拟1小时运行时间
}

# 直播架构演示
def demo_live_streaming_architecture():
print("Live Streaming Architecture Demo")
print("===============================")

# 创建直播系统
live_system = LiveStreamingArchitecture()

# 配置不同类型的流
streams = [
StreamConfig(
stream_id="live_stream_1",
protocol=StreamProtocol.RTMP,
resolution=(1920, 1080),
video_bitrate=2000,
audio_bitrate=128
),
StreamConfig(
stream_id="live_stream_2",
protocol=StreamProtocol.WEBRTC,
resolution=(1280, 720),
video_bitrate=1500,
audio_bitrate=96
),
StreamConfig(
stream_id="live_stream_3",
protocol=StreamProtocol.SRT,
resolution=(1920, 1080),
video_bitrate=3000,
audio_bitrate=192
)
]

print("\nRegistering streams...")
for stream_config in streams:
success = live_system.register_stream(stream_config)
print(f"Stream {stream_config.stream_id}: {'✓' if success else '✗'}")

print("\nStarting streams...")
for stream_config in streams:
success = live_system.start_stream(stream_config.stream_id)
print(f"Stream {stream_config.stream_id}: {'✓' if success else '✗'}")

# 等待一段时间让监控数据积累
print("\nMonitoring streams for 10 seconds...")
time.sleep(10)

# 显示系统状态
print("\nSystem Status:")
status = live_system.get_system_status()
for key, value in status.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")

# 显示各流的指标
print("\nStream Metrics:")
for stream_id in streams:
metrics = live_system.get_stream_metrics(stream_id.stream_id)
if metrics:
print(f" {stream_id.stream_id}:")
print(f" Bitrate: {metrics.bitrate:.1f} kbps")
print(f" FPS: {metrics.fps:.1f}")
print(f" Latency: {metrics.latency:.2f}s")
print(f" Packet Loss: {metrics.packet_loss:.3f}%")
print(f" Dropped Frames: {metrics.dropped_frames}")
print(f" Total Bytes: {metrics.total_bytes / 1024 / 1024:.2f} MB")

# 停止流
print("\nStopping streams...")
for stream_config in streams:
success = live_system.stop_stream(stream_config.stream_id)
print(f"Stream {stream_config.stream_id}: {'✓' if success else '✗'}")

print("\nLive streaming architecture demo completed")

# 运行直播架构演示
if __name__ == "__main__":
demo_live_streaming_architecture()

1.2 核心组件详解

推流服务器(Ingest Server)

推流服务器负责接收来自主播端的音视频流,是整个直播链路的入口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import socket
import struct
import threading
from typing import Dict, Optional, Callable

class RTMPIngestServer:
"""RTMP推流服务器"""

def __init__(self, host: str = "0.0.0.0", port: int = 1935):
self.host = host
self.port = port
self.running = False
self.server_socket = None
self.clients: Dict[str, socket.socket] = {}
self.stream_handlers: Dict[str, Callable] = {}

def start(self):
"""启动服务器"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(10)

self.running = True
print(f"RTMP Ingest Server started on {self.host}:{self.port}")

# 启动接受连接的线程
accept_thread = threading.Thread(target=self._accept_connections, daemon=True)
accept_thread.start()

except Exception as e:
print(f"Failed to start RTMP server: {e}")

def stop(self):
"""停止服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()

# 关闭所有客户端连接
for client_socket in self.clients.values():
client_socket.close()
self.clients.clear()

print("RTMP Ingest Server stopped")

def _accept_connections(self):
"""接受客户端连接"""
while self.running:
try:
client_socket, address = self.server_socket.accept()
client_id = f"{address[0]}:{address[1]}"

print(f"New RTMP connection from {client_id}")

self.clients[client_id] = client_socket

# 为每个客户端启动处理线程
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, client_id),
daemon=True
)
client_thread.start()

except Exception as e:
if self.running:
print(f"Error accepting connection: {e}")

def _handle_client(self, client_socket: socket.socket, client_id: str):
"""处理客户端连接"""
try:
# RTMP握手过程
if not self._rtmp_handshake(client_socket):
print(f"RTMP handshake failed for {client_id}")
return

print(f"RTMP handshake completed for {client_id}")

# 处理RTMP消息
while self.running:
message = self._read_rtmp_message(client_socket)
if not message:
break

self._process_rtmp_message(message, client_id)

except Exception as e:
print(f"Error handling client {client_id}: {e}")
finally:
if client_id in self.clients:
del self.clients[client_id]
client_socket.close()
print(f"Client {client_id} disconnected")

def _rtmp_handshake(self, client_socket: socket.socket) -> bool:
"""RTMP握手"""
try:
# C0 + C1
c0_c1 = client_socket.recv(1537)
if len(c0_c1) != 1537:
return False

# 验证C0版本
version = c0_c1[0]
if version != 3:
return False

# 发送S0 + S1 + S2
s0 = b'\x03' # 版本3
s1 = b'\x00' * 1536 # 简化的S1
s2 = c0_c1[1:] # S2 = C1

client_socket.send(s0 + s1 + s2)

# 接收C2
c2 = client_socket.recv(1536)
if len(c2) != 1536:
return False

return True

except Exception as e:
print(f"RTMP handshake error: {e}")
return False

def _read_rtmp_message(self, client_socket: socket.socket) -> Optional[Dict]:
"""读取RTMP消息"""
try:
# 读取基本头部
basic_header = client_socket.recv(1)
if not basic_header:
return None

# 解析消息头部(简化实现)
message = {
'type': 'data',
'timestamp': int(time.time() * 1000),
'length': 0,
'stream_id': 0,
'data': b''
}

return message

except Exception as e:
print(f"Error reading RTMP message: {e}")
return None

def _process_rtmp_message(self, message: Dict, client_id: str):
"""处理RTMP消息"""
# 根据消息类型进行处理
if message['type'] == 'video':
self._handle_video_data(message['data'], client_id)
elif message['type'] == 'audio':
self._handle_audio_data(message['data'], client_id)
elif message['type'] == 'metadata':
self._handle_metadata(message['data'], client_id)

def _handle_video_data(self, data: bytes, client_id: str):
"""处理视频数据"""
# 转发到转码服务器或CDN
pass

def _handle_audio_data(self, data: bytes, client_id: str):
"""处理音频数据"""
# 转发到转码服务器或CDN
pass

def _handle_metadata(self, data: bytes, client_id: str):
"""处理元数据"""
# 解析流信息
pass

def register_stream_handler(self, stream_key: str, handler: Callable):
"""注册流处理器"""
self.stream_handlers[stream_key] = handler

def get_active_streams(self) -> List[str]:
"""获取活跃流列表"""
return list(self.clients.keys())

# RTMP推流服务器演示
def demo_rtmp_ingest_server():
print("RTMP Ingest Server Demo")
print("=======================")

# 创建RTMP服务器
rtmp_server = RTMPIngestServer(host="127.0.0.1", port=1935)

# 启动服务器
rtmp_server.start()

print("\nRTMP server is running...")
print("You can now push streams to rtmp://127.0.0.1:1935/live/stream_key")

# 模拟运行一段时间
try:
time.sleep(30) # 运行30秒
except KeyboardInterrupt:
print("\nShutting down server...")

# 停止服务器
rtmp_server.stop()

print("RTMP ingest server demo completed")

if __name__ == "__main__":
demo_rtmp_ingest_server()

2. 转码与自适应码率

2.1 多码率转码系统

转码是直播系统中的关键环节,负责将输入流转换为多种分辨率和码率的输出流,以适应不同网络条件和设备能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import subprocess
import json
import os
from typing import List, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, Future
import tempfile

@dataclass
class TranscodeProfile:
"""转码配置文件"""
name: str
width: int
height: int
video_bitrate: int # kbps
audio_bitrate: int # kbps
framerate: int = 30
keyframe_interval: int = 2
video_codec: str = "libx264"
audio_codec: str = "aac"
preset: str = "veryfast"
profile: str = "baseline"
level: str = "3.1"

class LiveTranscoder:
"""直播转码器"""

def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.active_jobs: Dict[str, Future] = {}
self.profiles = self._create_default_profiles()

def _create_default_profiles(self) -> List[TranscodeProfile]:
"""创建默认转码配置"""
return [
TranscodeProfile(
name="1080p",
width=1920, height=1080,
video_bitrate=4000, audio_bitrate=128,
preset="fast", profile="high", level="4.0"
),
TranscodeProfile(
name="720p",
width=1280, height=720,
video_bitrate=2500, audio_bitrate=128,
preset="fast", profile="main", level="3.1"
),
TranscodeProfile(
name="480p",
width=854, height=480,
video_bitrate=1200, audio_bitrate=96,
preset="veryfast", profile="baseline", level="3.0"
),
TranscodeProfile(
name="360p",
width=640, height=360,
video_bitrate=800, audio_bitrate=64,
preset="veryfast", profile="baseline", level="3.0"
)
]

def start_transcoding(self, input_url: str, stream_id: str,
output_profiles: Optional[List[str]] = None) -> bool:
"""开始转码"""
if stream_id in self.active_jobs:
print(f"Transcoding already active for stream {stream_id}")
return False

if output_profiles is None:
output_profiles = ["1080p", "720p", "480p", "360p"]

# 选择转码配置
selected_profiles = [p for p in self.profiles if p.name in output_profiles]

if not selected_profiles:
print(f"No valid profiles found for {output_profiles}")
return False

print(f"Starting transcoding for stream {stream_id}")
print(f"Input: {input_url}")
print(f"Profiles: {[p.name for p in selected_profiles]}")

# 提交转码任务
future = self.executor.submit(
self._transcode_stream,
input_url, stream_id, selected_profiles
)

self.active_jobs[stream_id] = future
return True

def stop_transcoding(self, stream_id: str) -> bool:
"""停止转码"""
if stream_id not in self.active_jobs:
print(f"No active transcoding for stream {stream_id}")
return False

future = self.active_jobs[stream_id]
future.cancel()
del self.active_jobs[stream_id]

print(f"Transcoding stopped for stream {stream_id}")
return True

def _transcode_stream(self, input_url: str, stream_id: str,
profiles: List[TranscodeProfile]):
"""执行转码"""
try:
# 创建输出目录
output_dir = f"/tmp/live_output/{stream_id}"
os.makedirs(output_dir, exist_ok=True)

# 为每个配置文件生成FFmpeg命令
processes = []

for profile in profiles:
output_path = f"{output_dir}/{profile.name}"
os.makedirs(output_path, exist_ok=True)

# 构建FFmpeg命令
cmd = self._build_ffmpeg_command(input_url, profile, output_path)

print(f"Starting transcoding for {profile.name}")
print(f"Command: {' '.join(cmd)}")

# 启动FFmpeg进程
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)

processes.append((profile.name, process))

# 监控转码进程
self._monitor_transcoding_processes(processes, stream_id)

except Exception as e:
print(f"Transcoding error for stream {stream_id}: {e}")

def _build_ffmpeg_command(self, input_url: str, profile: TranscodeProfile,
output_path: str) -> List[str]:
"""构建FFmpeg命令"""
cmd = [
"ffmpeg",
"-i", input_url,
"-c:v", profile.video_codec,
"-preset", profile.preset,
"-profile:v", profile.profile,
"-level", profile.level,
"-b:v", f"{profile.video_bitrate}k",
"-maxrate", f"{int(profile.video_bitrate * 1.2)}k",
"-bufsize", f"{profile.video_bitrate * 2}k",
"-vf", f"scale={profile.width}:{profile.height}",
"-r", str(profile.framerate),
"-g", str(profile.framerate * profile.keyframe_interval),
"-c:a", profile.audio_codec,
"-b:a", f"{profile.audio_bitrate}k",
"-ar", "44100",
"-ac", "2",
"-f", "hls",
"-hls_time", "4",
"-hls_list_size", "10",
"-hls_flags", "delete_segments",
"-hls_segment_filename", f"{output_path}/segment_%03d.ts",
f"{output_path}/playlist.m3u8"
]

return cmd

def _monitor_transcoding_processes(self, processes: List[tuple], stream_id: str):
"""监控转码进程"""
try:
while True:
active_processes = []

for profile_name, process in processes:
if process.poll() is None:
# 进程仍在运行
active_processes.append((profile_name, process))
else:
# 进程已结束
return_code = process.returncode
if return_code == 0:
print(f"Transcoding completed for {profile_name}")
else:
stderr = process.stderr.read()
print(f"Transcoding failed for {profile_name}: {stderr}")

if not active_processes:
break

processes = active_processes
time.sleep(5) # 每5秒检查一次

except Exception as e:
print(f"Error monitoring transcoding processes: {e}")
# 终止所有进程
for _, process in processes:
if process.poll() is None:
process.terminate()

def get_transcoding_status(self, stream_id: str) -> Optional[Dict]:
"""获取转码状态"""
if stream_id not in self.active_jobs:
return None

future = self.active_jobs[stream_id]

status = {
'stream_id': stream_id,
'running': not future.done(),
'profiles': [p.name for p in self.profiles]
}

if future.done():
try:
future.result()
status['status'] = 'completed'
except Exception as e:
status['status'] = 'failed'
status['error'] = str(e)
else:
status['status'] = 'running'

return status

def add_custom_profile(self, profile: TranscodeProfile):
"""添加自定义转码配置"""
self.profiles.append(profile)
print(f"Added custom profile: {profile.name}")

def shutdown(self):
"""关闭转码器"""
print("Shutting down transcoder...")

# 取消所有活跃任务
for stream_id, future in self.active_jobs.items():
future.cancel()
print(f"Cancelled transcoding for {stream_id}")

self.active_jobs.clear()
self.executor.shutdown(wait=True)

print("Transcoder shutdown completed")

# 转码系统演示
def demo_live_transcoder():
print("Live Transcoder Demo")
print("===================")

# 创建转码器
transcoder = LiveTranscoder(max_workers=2)

# 添加自定义配置
custom_profile = TranscodeProfile(
name="240p",
width=426, height=240,
video_bitrate=400, audio_bitrate=64,
preset="ultrafast", profile="baseline"
)
transcoder.add_custom_profile(custom_profile)

# 模拟输入流
input_streams = [
"rtmp://input.example.com/live/stream1",
"rtmp://input.example.com/live/stream2"
]

print("\nStarting transcoding jobs...")
for i, input_url in enumerate(input_streams):
stream_id = f"stream_{i+1}"
profiles = ["720p", "480p", "360p"] if i == 0 else ["1080p", "720p", "240p"]

success = transcoder.start_transcoding(input_url, stream_id, profiles)
print(f"Stream {stream_id}: {'✓' if success else '✗'}")

# 监控转码状态
print("\nMonitoring transcoding status...")
for _ in range(6): # 监控30秒
time.sleep(5)

print(f"\n--- Status Update ---")
for i in range(len(input_streams)):
stream_id = f"stream_{i+1}"
status = transcoder.get_transcoding_status(stream_id)

if status:
print(f"Stream {stream_id}: {status['status']}")
if status['running']:
print(f" Profiles: {', '.join(status['profiles'])}")
else:
print(f"Stream {stream_id}: Not found")

# 停止转码
print("\nStopping transcoding...")
for i in range(len(input_streams)):
stream_id = f"stream_{i+1}"
transcoder.stop_transcoding(stream_id)

# 关闭转码器
transcoder.shutdown()

print("\nLive transcoder demo completed")

if __name__ == "__main__":
demo_live_transcoder()

版权所有,如有侵权请联系我