WebRTC实时通信技术深度解析

WebRTC (Web Real-Time Communication) 是一个开源项目,为浏览器和移动应用提供实时通信能力。本文将深入探讨WebRTC的核心技术、架构设计和实际应用。

1. WebRTC概述与架构

1.1 WebRTC核心组件

WebRTC主要由三个核心API组成:

  • MediaStream API:获取音视频流
  • RTCPeerConnection API:建立P2P连接
  • RTCDataChannel API:传输任意数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
# WebRTC实时通信核心模块导入
# asyncio: Python异步编程框架,用于处理并发连接和消息
import asyncio
# json: 处理信令消息的JSON序列化和反序列化
import json
# websockets: WebSocket库,用于实现信令服务器
import websockets
# typing: 类型注解,提高代码可读性和IDE支持
from typing import Dict, List, Optional, Callable
# dataclasses: 简化数据类定义,自动生成__init__等方法
from dataclasses import dataclass, asdict
# enum: 枚举类型,确保信令消息类型的一致性
from enum import Enum
# time: 时间戳生成,用于消息标记和性能监控
import time
# random: 随机数生成,用于模拟网络延迟和ID生成
import random
# hashlib: 哈希算法,用于生成唯一标识符
import hashlib
# base64: 编码算法,用于数据编码传输
import base64

class SignalingMessageType(Enum):
"""WebRTC信令消息类型枚举

定义WebRTC通信过程中所有可能的信令消息类型:
- OFFER/ANSWER: SDP协商过程的核心消息
- ICE_CANDIDATE: 网络连接候选者信息
- JOIN_ROOM/LEAVE_ROOM: 房间管理消息
- USER_JOINED/USER_LEFT: 用户状态通知消息

使用枚举确保消息类型的一致性和类型安全
"""
OFFER = "offer" # SDP提议消息,由连接发起方发送
ANSWER = "answer" # SDP应答消息,由连接接收方发送
ICE_CANDIDATE = "ice-candidate" # ICE候选者消息,包含网络连接信息
JOIN_ROOM = "join-room" # 加入房间请求消息
LEAVE_ROOM = "leave-room" # 离开房间请求消息
USER_JOINED = "user-joined" # 用户加入房间通知消息
USER_LEFT = "user-left" # 用户离开房间通知消息

@dataclass
class ICECandidate:
"""ICE候选者数据类

ICE (Interactive Connectivity Establishment) 候选者包含网络连接信息,
用于WebRTC对等端之间建立最优的网络路径。

ICE候选者类型:
- Host候选者:本地网络接口地址
- Server Reflexive候选者:通过STUN服务器获得的公网地址
- Relay候选者:通过TURN服务器中继的地址
"""
candidate: str # ICE候选者字符串,包含连接信息
sdp_mid: Optional[str] = None # SDP媒体标识符,用于关联特定媒体流
sdp_mline_index: Optional[int] = None # SDP媒体行索引,指定候选者对应的媒体行
username_fragment: Optional[str] = None # ICE用户名片段,用于认证

@dataclass
class SessionDescription:
"""SDP会话描述数据类

SDP (Session Description Protocol) 描述多媒体会话的参数,
包括媒体类型、编解码器、网络信息等。

WebRTC使用SDP进行媒体协商:
1. 发起方创建Offer SDP
2. 接收方创建Answer SDP
3. 双方交换SDP完成协商
"""
type: str # SDP类型:"offer"(提议)或"answer"(应答)
sdp: str # SDP内容字符串,包含会话描述信息

@dataclass
class SignalingMessage:
"""信令消息数据类

封装WebRTC信令过程中的所有消息类型,提供统一的消息格式。
信令消息用于协调WebRTC连接建立过程,包括:
- SDP交换(Offer/Answer)
- ICE候选者交换
- 房间管理
- 用户状态通知

消息路由信息:
- from_user/to_user: 点对点消息路由
- room_id: 房间级别的消息广播
"""
type: SignalingMessageType # 消息类型,使用枚举确保类型安全
data: Dict # 消息数据载荷,包含具体的业务数据
from_user: Optional[str] = None # 发送方用户ID,用于消息路由
to_user: Optional[str] = None # 接收方用户ID,用于点对点消息
room_id: Optional[str] = None # 房间ID,用于房间级别的消息
timestamp: float = 0 # 消息时间戳,用于排序和调试

def __post_init__(self):
"""数据类后处理方法

自动设置消息时间戳,确保每个消息都有准确的创建时间。
这对于消息排序、延迟分析和调试非常重要。
"""
if self.timestamp == 0:
self.timestamp = time.time() # 使用当前Unix时间戳

class WebRTCSignalingServer:
"""WebRTC信令服务器

信令服务器是WebRTC通信的核心组件,负责:
1. 管理客户端连接和房间状态
2. 转发SDP Offer/Answer消息
3. 转发ICE候选者信息
4. 处理房间加入/离开逻辑
5. 维护用户在线状态

架构特点:
- 基于WebSocket的实时双向通信
- 房间模式支持多用户会议
- 异步处理确保高并发性能
- 自动清理断开的连接
"""

def __init__(self, host: str = "localhost", port: int = 8765):
"""初始化信令服务器

Args:
host: 服务器绑定地址,默认localhost
port: 服务器监听端口,默认8765

数据结构设计:
- rooms: 房间ID -> {用户ID -> WebSocket连接} 的嵌套字典
- user_rooms: WebSocket连接 -> 房间ID 的反向映射
- running: 服务器运行状态标志
"""
self.host = host
self.port = port
# 房间管理:房间ID映射到用户连接字典
self.rooms: Dict[str, Dict[str, websockets.WebSocketServerProtocol]] = {}
# 用户房间映射:WebSocket连接映射到房间ID(用于快速查找)
self.user_rooms: Dict[websockets.WebSocketServerProtocol, str] = {}
# 服务器运行状态
self.running = False

async def start_server(self):
"""启动信令服务器

使用websockets.serve创建WebSocket服务器:
1. 绑定到指定的host和port
2. 为每个客户端连接创建独立的处理协程
3. 使用asyncio.Future()保持服务器持续运行

异步上下文管理器确保资源正确清理
"""
print(f"Starting WebRTC signaling server on {self.host}:{self.port}")
self.running = True

# 使用异步上下文管理器启动WebSocket服务器
async with websockets.serve(self.handle_client, self.host, self.port):
print("Signaling server started")
# 创建一个永不完成的Future来保持服务器运行
await asyncio.Future() # 保持运行

async def handle_client(self, websocket, path):
"""处理客户端连接

为每个WebSocket连接创建独立的处理协程:
1. 记录新连接的建立
2. 持续监听客户端消息
3. 处理连接异常和断开
4. 确保资源正确清理

异常处理策略:
- ConnectionClosed: 正常的连接关闭
- 其他异常: 记录错误但不中断服务器运行

Args:
websocket: WebSocket连接对象
path: WebSocket连接路径(本实现中未使用)
"""
print(f"New client connected: {websocket.remote_address}")

try:
# 异步迭代器模式处理消息流
async for message in websocket:
await self.process_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
# 客户端正常断开连接
print(f"Client disconnected: {websocket.remote_address}")
finally:
# 无论如何都要清理客户端资源
await self.cleanup_client(websocket)

async def process_message(self, websocket, raw_message: str):
"""处理客户端消息

消息处理流程:
1. JSON反序列化:将字符串转换为Python对象
2. 消息验证:检查必需字段和数据类型
3. 对象构造:创建SignalingMessage实例
4. 消息路由:根据消息类型分发处理

错误处理:
- JSONDecodeError: JSON格式错误
- KeyError: 缺少必需字段
- ValueError: 枚举值无效

Args:
websocket: 发送消息的WebSocket连接
raw_message: 原始JSON字符串消息
"""
try:
# 步骤1: JSON反序列化
data = json.loads(raw_message)

# 步骤2: 构造信令消息对象
message = SignalingMessage(
type=SignalingMessageType(data['type']), # 枚举类型验证
data=data.get('data', {}), # 消息载荷,默认空字典
from_user=data.get('from_user'), # 发送方用户ID
to_user=data.get('to_user'), # 接收方用户ID
room_id=data.get('room_id') # 房间ID
)

# 步骤3: 路由消息到相应的处理器
await self.route_message(websocket, message)

except (json.JSONDecodeError, KeyError, ValueError) as e:
# 消息格式错误,向客户端发送错误响应
print(f"Invalid message from {websocket.remote_address}: {e}")
await self.send_error(websocket, f"Invalid message format: {e}")

async def route_message(self, websocket, message: SignalingMessage):
"""消息路由分发器

根据消息类型将消息分发到相应的处理器:

房间管理消息:
- JOIN_ROOM: 用户加入房间请求
- LEAVE_ROOM: 用户离开房间请求

WebRTC信令消息:
- OFFER: SDP提议消息,需要转发给目标用户
- ANSWER: SDP应答消息,需要转发给目标用户
- ICE_CANDIDATE: ICE候选者消息,需要转发给目标用户

设计模式:
- 使用策略模式根据消息类型选择处理策略
- 集中化的消息路由便于维护和扩展

Args:
websocket: 发送消息的WebSocket连接
message: 已解析的信令消息对象
"""
if message.type == SignalingMessageType.JOIN_ROOM:
# 处理房间加入请求
await self.handle_join_room(websocket, message)
elif message.type == SignalingMessageType.LEAVE_ROOM:
# 处理房间离开请求
await self.handle_leave_room(websocket, message)
elif message.type in [SignalingMessageType.OFFER,
SignalingMessageType.ANSWER,
SignalingMessageType.ICE_CANDIDATE]:
# 转发WebRTC信令消息给目标用户
await self.relay_message(websocket, message)
else:
# 未知消息类型,返回错误
await self.send_error(websocket, f"Unknown message type: {message.type}")

async def handle_join_room(self, websocket, message: SignalingMessage):
"""处理用户加入房间请求

房间加入流程:
1. 参数验证:检查room_id和user_id是否存在
2. 房间管理:创建房间(如果不存在)
3. 用户注册:将用户添加到房间和建立反向映射
4. 状态通知:通知房间内其他用户有新用户加入
5. 用户列表:向新用户发送房间内现有用户列表

数据结构更新:
- rooms[room_id][user_id] = websocket # 房间用户映射
- user_rooms[websocket] = room_id # 用户房间反向映射

Args:
websocket: 请求加入的WebSocket连接
message: 包含房间ID和用户ID的信令消息
"""
room_id = message.room_id
user_id = message.data.get('user_id')

# 步骤1: 参数验证
if not room_id or not user_id:
await self.send_error(websocket, "Missing room_id or user_id")
return

# 步骤2: 房间管理 - 懒加载创建房间
if room_id not in self.rooms:
self.rooms[room_id] = {} # 创建新房间
print(f"Created new room: {room_id}")

# 步骤3: 用户注册 - 双向映射确保快速查找
self.rooms[room_id][user_id] = websocket # 房间 -> 用户 -> 连接
self.user_rooms[websocket] = room_id # 连接 -> 房间(反向映射)

print(f"User {user_id} joined room {room_id} (total users: {len(self.rooms[room_id])})")

# 步骤4: 状态通知 - 广播新用户加入事件
await self.broadcast_to_room(room_id, SignalingMessage(
type=SignalingMessageType.USER_JOINED,
data={'user_id': user_id},
room_id=room_id
), exclude_user=user_id) # 排除新用户自己

# 步骤5: 用户列表 - 发送现有用户信息给新用户
existing_users = [uid for uid in self.rooms[room_id].keys() if uid != user_id]
await self.send_message(websocket, {
'type': 'room-joined',
'data': {
'room_id': room_id,
'users': existing_users # 现有用户列表,用于建立P2P连接
}
})

async def handle_leave_room(self, websocket, message: SignalingMessage):
"""处理用户离开房间请求

简化的离开房间处理:
直接调用cleanup_client方法进行资源清理。
这种设计统一了主动离开和连接断开的处理逻辑。

Args:
websocket: 请求离开的WebSocket连接
message: 离开房间的信令消息
"""
await self.cleanup_client(websocket)

async def relay_message(self, websocket, message: SignalingMessage):
"""转发WebRTC信令消息给目标用户

消息转发是信令服务器的核心功能,用于在WebRTC对等端之间
传递SDP和ICE信息。转发流程:

1. 参数验证:检查房间ID和目标用户ID
2. 用户查找:验证目标用户是否在指定房间中
3. 消息转发:将消息原样转发给目标用户

支持的消息类型:
- OFFER: SDP提议消息
- ANSWER: SDP应答消息
- ICE_CANDIDATE: ICE候选者消息

Args:
websocket: 发送消息的WebSocket连接
message: 需要转发的信令消息
"""
room_id = message.room_id
to_user = message.to_user

# 步骤1: 参数验证
if not room_id or not to_user:
await self.send_error(websocket, "Missing room_id or to_user")
return

# 步骤2: 目标用户查找和验证
if room_id not in self.rooms or to_user not in self.rooms[room_id]:
await self.send_error(websocket, "Target user not found")
return

# 步骤3: 消息转发
target_websocket = self.rooms[room_id][to_user]
# 使用asdict将SignalingMessage转换为字典格式
await self.send_message(target_websocket, asdict(message))

async def broadcast_to_room(self, room_id: str, message: SignalingMessage,
exclude_user: Optional[str] = None):
"""向房间内所有用户广播消息

房间广播用于通知房间内的状态变化,如:
- 新用户加入房间
- 用户离开房间
- 房间级别的通知消息

广播策略:
1. 房间存在性检查
2. 消息序列化(转换为字典格式)
3. 遍历房间内所有用户
4. 可选择性排除特定用户(如消息发送者)
5. 异常处理确保单个连接失败不影响其他用户

Args:
room_id: 目标房间ID
message: 要广播的信令消息
exclude_user: 要排除的用户ID(可选)
"""
# 步骤1: 房间存在性检查
if room_id not in self.rooms:
return

# 步骤2: 消息序列化
message_data = asdict(message)

# 步骤3: 遍历房间内所有用户
for user_id, websocket in self.rooms[room_id].items():
# 步骤4: 排除指定用户
if exclude_user and user_id == exclude_user:
continue

# 步骤5: 发送消息并处理异常
try:
await self.send_message(websocket, message_data)
except websockets.exceptions.ConnectionClosed:
# 连接已关闭,稍后在cleanup中处理
# 这里不立即清理是为了避免在迭代过程中修改字典
pass

async def send_message(self, websocket, message: Dict):
"""发送消息给指定客户端

底层消息发送方法,负责:
1. JSON序列化:将Python字典转换为JSON字符串
2. WebSocket发送:通过WebSocket连接发送消息
3. 异常处理:处理连接断开等异常情况

这是所有消息发送的统一入口,确保消息格式一致性。

Args:
websocket: 目标WebSocket连接
message: 要发送的消息字典
"""
try:
# JSON序列化并发送
await websocket.send(json.dumps(message))
except websockets.exceptions.ConnectionClosed:
# 连接已断开,记录但不抛出异常
print(f"Failed to send message: connection closed")

async def send_error(self, websocket, error_message: str):
"""发送错误消息给客户端

标准化的错误消息格式,用于向客户端报告各种错误情况:
- 参数验证失败
- 用户不存在
- 房间不存在
- 消息格式错误

错误消息格式:
{
"type": "error",
"data": {"message": "错误描述"}
}

Args:
websocket: 目标WebSocket连接
error_message: 错误描述字符串
"""
await self.send_message(websocket, {
'type': 'error',
'data': {'message': error_message}
})

async def cleanup_client(self, websocket):
"""清理客户端连接和相关资源

客户端清理是信令服务器的重要功能,确保:
1. 数据结构一致性:及时清理断开的连接
2. 资源释放:避免内存泄漏
3. 状态同步:通知其他用户连接状态变化
4. 房间管理:自动清理空房间

清理流程:
1. 连接验证:检查连接是否在用户房间映射中
2. 用户查找:通过反向查找确定用户ID
3. 数据清理:从房间和映射中移除用户
4. 状态通知:广播用户离开事件
5. 房间清理:删除空房间释放资源

Args:
websocket: 需要清理的WebSocket连接
"""
# 步骤1: 连接验证
if websocket in self.user_rooms:
room_id = self.user_rooms[websocket]

# 步骤2: 用户查找 - 反向查找用户ID
user_id = None
for uid, ws in self.rooms.get(room_id, {}).items():
if ws == websocket:
user_id = uid
break

if user_id:
# 步骤3: 数据清理
# 从房间用户列表中移除
del self.rooms[room_id][user_id]

# 步骤4: 状态通知 - 广播用户离开事件
await self.broadcast_to_room(room_id, SignalingMessage(
type=SignalingMessageType.USER_LEFT,
data={'user_id': user_id},
room_id=room_id
))

# 步骤5: 房间清理 - 删除空房间
if not self.rooms[room_id]:
del self.rooms[room_id]
print(f"Room {room_id} deleted (empty)")

print(f"User {user_id} left room {room_id} (remaining users: {len(self.rooms.get(room_id, {}))})")

# 清理用户房间反向映射
del self.user_rooms[websocket]

class WebRTCPeer:
"""WebRTC对等端实现

WebRTCPeer是WebRTC通信的客户端实现,负责:
1. 信令连接管理:与信令服务器建立和维护WebSocket连接
2. 房间管理:加入和离开通信房间
3. 对等连接管理:管理与其他用户的P2P连接
4. 消息处理:处理各种信令消息(offer、answer、ICE候选等)

架构设计:
- 单一职责:专注于客户端WebRTC功能
- 事件驱动:基于消息循环处理异步事件
- 状态管理:维护连接状态和房间状态
- 错误处理:提供完整的异常处理机制

数据结构说明:
- peers: 存储与其他用户的P2P连接对象
- message_handlers: 消息类型到处理函数的映射
- connected: 信令服务器连接状态标志
"""

def __init__(self, user_id: str, signaling_url: str = "ws://localhost:8765"):
"""初始化WebRTC客户端

初始化过程包括:
1. 用户标识设置:设置唯一的用户ID
2. 连接配置:配置信令服务器URL(默认本地开发环境)
3. 状态初始化:初始化连接和房间状态
4. 数据结构准备:准备对等连接和消息处理器存储

Args:
user_id: 用户唯一标识符,用于在房间中识别用户
signaling_url: 信令服务器WebSocket URL,默认本地开发服务器
"""
# 用户标识和连接配置
self.user_id = user_id
self.signaling_url = signaling_url

# 连接状态管理
self.websocket: Optional[websockets.WebSocketServerProtocol] = None # WebSocket连接对象
self.connected = False # 信令服务器连接状态

# 房间和对等连接管理
self.room_id: Optional[str] = None # 当前所在房间ID
self.peers: Dict[str, 'PeerConnection'] = {} # 存储与其他用户的P2P连接 {user_id: PeerConnection}

# 消息处理系统
self.message_handlers: Dict[str, Callable] = {} # 消息类型到处理函数的映射

print(f"WebRTC Peer initialized: {user_id} -> {signaling_url}")

async def connect_signaling(self):
"""连接到信令服务器

建立与信令服务器的WebSocket连接,这是WebRTC通信的第一步。
信令服务器用于:
1. 房间管理:加入/离开房间
2. 用户发现:获取房间内其他用户列表
3. SDP交换:传递offer和answer
4. ICE协商:交换网络连接候选

连接流程:
1. WebSocket连接建立
2. 连接状态更新
3. 消息处理循环启动
4. 错误处理和异常传播

Raises:
Exception: 连接失败时抛出异常
"""
try:
# 建立WebSocket连接
self.websocket = await websockets.connect(self.signaling_url)
self.connected = True
print(f"✓ User {self.user_id} connected to signaling server")

# 启动消息处理循环(异步任务)
# 使用create_task确保消息循环在后台运行
asyncio.create_task(self.message_loop())

except Exception as e:
print(f"✗ Failed to connect to signaling server: {e}")
self.connected = False
raise # 重新抛出异常供调用者处理

async def join_room(self, room_id: str):
"""加入指定房间

房间是WebRTC通信的基本单位,用户需要在同一房间内才能建立P2P连接。
加入房间的过程:
1. 连接检查:确保已连接到信令服务器
2. 房间状态更新:设置当前房间ID
3. 加入请求发送:向服务器发送JOIN_ROOM消息
4. 等待服务器响应:服务器会返回房间用户列表

消息格式:
{
'type': 'join_room',
'room_id': '房间ID',
'data': {'user_id': '用户ID'}
}

Args:
room_id: 要加入的房间ID
"""
# 连接检查:确保信令服务器连接可用
if not self.connected:
await self.connect_signaling()

# 房间状态更新
self.room_id = room_id

# 构造加入房间消息
message = {
'type': SignalingMessageType.JOIN_ROOM.value,
'room_id': room_id,
'data': {'user_id': self.user_id}
}

# 发送加入房间请求
await self.websocket.send(json.dumps(message))
print(f"Sent join room request: {self.user_id} -> {room_id}")
print(f"User {self.user_id} joining room {room_id}")

async def message_loop(self):
"""信令消息处理循环

这是WebRTC客户端的核心消息处理机制,负责:
1. 持续监听信令服务器消息
2. 异步处理各种信令事件
3. 连接状态管理和异常处理
4. 优雅的连接关闭处理

消息循环设计:
- 异步迭代:使用async for持续监听WebSocket消息
- 事件驱动:每个消息触发相应的处理函数
- 异常隔离:单个消息处理失败不影响整个循环
- 状态同步:连接关闭时更新客户端状态

异常处理:
- ConnectionClosed: WebSocket连接正常或异常关闭
- 其他异常: 记录错误但保持循环运行
"""
try:
# 异步迭代WebSocket消息流
async for message in self.websocket:
# 处理每个接收到的信令消息
await self.handle_signaling_message(message)
except websockets.exceptions.ConnectionClosed:
# 连接关闭处理
print(f"✗ Signaling connection closed for user {self.user_id}")
self.connected = False
except Exception as e:
# 其他异常处理
print(f"✗ Message loop error for user {self.user_id}: {e}")
self.connected = False

async def handle_signaling_message(self, raw_message: str):
"""处理信令消息

信令消息处理是WebRTC通信的核心,负责:
1. 消息解析:JSON反序列化和格式验证
2. 消息路由:根据消息类型分发到对应处理器
3. 错误处理:处理格式错误和未知消息类型
4. 日志记录:记录消息处理状态

支持的消息类型:
- room-joined: 房间加入成功确认
- user-joined: 新用户加入房间通知
- user-left: 用户离开房间通知
- offer: SDP offer(连接发起)
- answer: SDP answer(连接响应)
- ice-candidate: ICE候选(网络路径)
- error: 错误消息

消息处理流程:
1. JSON解析和验证
2. 消息类型识别
3. 分发到专门的处理函数
4. 异常处理和错误记录

Args:
raw_message: 原始JSON字符串消息
"""
try:
# 步骤1: JSON解析和验证
data = json.loads(raw_message)
message_type = data.get('type')

print(f"📨 Received message: {message_type} for user {self.user_id}")

# 步骤2: 消息类型识别和分发
if message_type == 'room-joined':
# 房间加入成功处理
await self.handle_room_joined(data)
elif message_type == SignalingMessageType.USER_JOINED.value:
# 新用户加入处理
await self.handle_user_joined(data)
elif message_type == SignalingMessageType.USER_LEFT.value:
# 用户离开处理
await self.handle_user_left(data)
elif message_type == SignalingMessageType.OFFER.value:
# SDP offer处理
await self.handle_offer(data)
elif message_type == SignalingMessageType.ANSWER.value:
# SDP answer处理
await self.handle_answer(data)
elif message_type == SignalingMessageType.ICE_CANDIDATE.value:
# ICE候选处理
await self.handle_ice_candidate(data)
elif message_type == 'error':
# 错误消息处理
error_msg = data.get('data', {}).get('message', 'Unknown error')
print(f"✗ Signaling error: {error_msg}")
else:
# 未知消息类型
print(f"⚠️ Unknown message type: {message_type}")

except json.JSONDecodeError as e:
# JSON解析错误
print(f"✗ Invalid JSON message: {e}")
except Exception as e:
# 其他处理错误
print(f"✗ Message handling error: {e}")

async def handle_room_joined(self, data: Dict):
"""处理房间加入成功"""
room_data = data.get('data', {})
existing_users = room_data.get('users', [])

print(f"User {self.user_id} successfully joined room {self.room_id}")
print(f"Existing users: {existing_users}")

# 向现有用户发起连接
for user_id in existing_users:
await self.create_peer_connection(user_id, is_initiator=True)

async def handle_user_joined(self, data: Dict):
"""处理新用户加入"""
user_data = data.get('data', {})
new_user_id = user_data.get('user_id')

if new_user_id != self.user_id:
print(f"New user {new_user_id} joined room")
# 等待对方发起连接,我们作为被动方

async def handle_user_left(self, data: Dict):
"""处理用户离开"""
user_data = data.get('data', {})
left_user_id = user_data.get('user_id')

if left_user_id in self.peers:
print(f"User {left_user_id} left room")
await self.peers[left_user_id].close()
del self.peers[left_user_id]

async def create_peer_connection(self, remote_user_id: str, is_initiator: bool = False):
"""创建对等连接"""
if remote_user_id in self.peers:
return self.peers[remote_user_id]

peer_connection = PeerConnection(
local_user_id=self.user_id,
remote_user_id=remote_user_id,
signaling_websocket=self.websocket,
room_id=self.room_id,
is_initiator=is_initiator
)

self.peers[remote_user_id] = peer_connection
await peer_connection.initialize()

if is_initiator:
await peer_connection.create_offer()

return peer_connection

async def handle_offer(self, data: Dict):
"""处理收到的Offer"""
from_user = data.get('from_user')
offer_data = data.get('data', {})

if from_user not in self.peers:
await self.create_peer_connection(from_user, is_initiator=False)

await self.peers[from_user].handle_remote_offer(offer_data)

async def handle_answer(self, data: Dict):
"""处理收到的Answer"""
from_user = data.get('from_user')
answer_data = data.get('data', {})

if from_user in self.peers:
await self.peers[from_user].handle_remote_answer(answer_data)

async def handle_ice_candidate(self, data: Dict):
"""处理收到的ICE候选者"""
from_user = data.get('from_user')
candidate_data = data.get('data', {})

if from_user in self.peers:
await self.peers[from_user].handle_remote_ice_candidate(candidate_data)

async def disconnect(self):
"""断开连接"""
if self.websocket and self.connected:
# 关闭所有对等连接
for peer in self.peers.values():
await peer.close()

# 离开房间
if self.room_id:
message = {
'type': SignalingMessageType.LEAVE_ROOM.value,
'room_id': self.room_id,
'data': {'user_id': self.user_id}
}
await self.websocket.send(json.dumps(message))

await self.websocket.close()
self.connected = False
print(f"User {self.user_id} disconnected")

class PeerConnection:
"""WebRTC对等连接模拟"""

def __init__(self, local_user_id: str, remote_user_id: str,
signaling_websocket, room_id: str, is_initiator: bool = False):
self.local_user_id = local_user_id
self.remote_user_id = remote_user_id
self.signaling_websocket = signaling_websocket
self.room_id = room_id
self.is_initiator = is_initiator

# 连接状态
self.connection_state = "new"
self.ice_connection_state = "new"
self.ice_gathering_state = "new"

# SDP
self.local_description: Optional[SessionDescription] = None
self.remote_description: Optional[SessionDescription] = None

# ICE候选者
self.local_ice_candidates: List[ICECandidate] = []
self.remote_ice_candidates: List[ICECandidate] = []

# 数据通道
self.data_channels: Dict[str, 'DataChannel'] = {}

print(f"Created PeerConnection: {local_user_id} -> {remote_user_id} (initiator: {is_initiator})")

async def initialize(self):
"""初始化连接"""
# 模拟ICE候选者收集
await self.gather_ice_candidates()

# 创建默认数据通道(如果是发起方)
if self.is_initiator:
await self.create_data_channel("default")

async def gather_ice_candidates(self):
"""收集ICE候选者"""
self.ice_gathering_state = "gathering"

# 模拟生成ICE候选者
candidates = [
# Host候选者
ICECandidate(
candidate="candidate:1 1 UDP 2130706431 192.168.1.100 54400 typ host",
sdp_mid="0",
sdp_mline_index=0
),
# Server reflexive候选者
ICECandidate(
candidate="candidate:2 1 UDP 1694498815 203.0.113.100 54401 typ srflx raddr 192.168.1.100 rport 54400",
sdp_mid="0",
sdp_mline_index=0
),
# Relay候选者
ICECandidate(
candidate="candidate:3 1 UDP 16777215 203.0.113.200 54402 typ relay raddr 203.0.113.100 rport 54401",
sdp_mid="0",
sdp_mline_index=0
)
]

for candidate in candidates:
self.local_ice_candidates.append(candidate)
# 发送ICE候选者给远端
await self.send_ice_candidate(candidate)
await asyncio.sleep(0.1) # 模拟收集延迟

self.ice_gathering_state = "complete"
print(f"ICE gathering complete for {self.local_user_id} -> {self.remote_user_id}")

async def create_offer(self):
"""创建Offer"""
if not self.is_initiator:
raise ValueError("Only initiator can create offer")

# 生成SDP Offer
sdp_offer = self.generate_sdp_offer()
self.local_description = SessionDescription("offer", sdp_offer)

# 发送Offer
await self.send_signaling_message({
'type': SignalingMessageType.OFFER.value,
'data': {
'type': 'offer',
'sdp': sdp_offer
},
'from_user': self.local_user_id,
'to_user': self.remote_user_id,
'room_id': self.room_id
})

print(f"Sent offer from {self.local_user_id} to {self.remote_user_id}")

async def handle_remote_offer(self, offer_data: Dict):
"""处理远端Offer"""
sdp = offer_data.get('sdp')
self.remote_description = SessionDescription("offer", sdp)

print(f"Received offer from {self.remote_user_id}")

# 创建Answer
await self.create_answer()

async def create_answer(self):
"""创建Answer"""
if self.is_initiator:
raise ValueError("Initiator should not create answer")

# 生成SDP Answer
sdp_answer = self.generate_sdp_answer()
self.local_description = SessionDescription("answer", sdp_answer)

# 发送Answer
await self.send_signaling_message({
'type': SignalingMessageType.ANSWER.value,
'data': {
'type': 'answer',
'sdp': sdp_answer
},
'from_user': self.local_user_id,
'to_user': self.remote_user_id,
'room_id': self.room_id
})

print(f"Sent answer from {self.local_user_id} to {self.remote_user_id}")

# 模拟连接建立
await self.simulate_connection_establishment()

async def handle_remote_answer(self, answer_data: Dict):
"""处理远端Answer"""
sdp = answer_data.get('sdp')
self.remote_description = SessionDescription("answer", sdp)

print(f"Received answer from {self.remote_user_id}")

# 模拟连接建立
await self.simulate_connection_establishment()

async def send_ice_candidate(self, candidate: ICECandidate):
"""发送ICE候选者"""
await self.send_signaling_message({
'type': SignalingMessageType.ICE_CANDIDATE.value,
'data': {
'candidate': candidate.candidate,
'sdpMid': candidate.sdp_mid,
'sdpMLineIndex': candidate.sdp_mline_index
},
'from_user': self.local_user_id,
'to_user': self.remote_user_id,
'room_id': self.room_id
})

async def handle_remote_ice_candidate(self, candidate_data: Dict):
"""处理远端ICE候选者"""
candidate = ICECandidate(
candidate=candidate_data.get('candidate'),
sdp_mid=candidate_data.get('sdpMid'),
sdp_mline_index=candidate_data.get('sdpMLineIndex')
)

self.remote_ice_candidates.append(candidate)
print(f"Received ICE candidate from {self.remote_user_id}: {candidate.candidate[:50]}...")

async def simulate_connection_establishment(self):
"""模拟连接建立过程"""
if self.local_description and self.remote_description:
# 模拟ICE连接过程
self.ice_connection_state = "checking"
await asyncio.sleep(1) # 模拟连接检查时间

self.ice_connection_state = "connected"
self.connection_state = "connected"

print(f"✅ PeerConnection established: {self.local_user_id} <-> {self.remote_user_id}")

# 模拟数据传输
await self.simulate_data_transmission()

async def create_data_channel(self, label: str) -> 'DataChannel':
"""创建数据通道"""
data_channel = DataChannel(label, self)
self.data_channels[label] = data_channel
await data_channel.open()
return data_channel

async def simulate_data_transmission(self):
"""模拟数据传输"""
if "default" in self.data_channels:
data_channel = self.data_channels["default"]

# 发送测试消息
test_messages = [
f"Hello from {self.local_user_id}!",
f"Connection established at {time.time()}",
{"type": "ping", "timestamp": time.time()}
]

for message in test_messages:
await data_channel.send(message)
await asyncio.sleep(0.5)

def generate_sdp_offer(self) -> str:
"""生成SDP Offer"""
session_id = int(time.time())
return f"""v=0
o=- {session_id} 2 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE 0
a=msid-semantic: WMS
m=application 9 UDP/DTLS/SCTP webrtc-datachannel
c=IN IP4 0.0.0.0
a=ice-ufrag:4ZcD
a=ice-pwd:2/1muCWoOi3uLifh/gFHHHHH
a=ice-options:trickle
a=fingerprint:sha-256 19:E2:1C:3B:4B:9F:81:E6:B8:5C:F4:A5:A8:D8:73:04:BB:05:2F:70:9F:04:A9:0E:05:E9:26:33:E8:70:88:A2
a=setup:actpass
a=mid:0
a=sctp-port:5000
a=max-message-size:262144"""

def generate_sdp_answer(self) -> str:
"""生成SDP Answer"""
session_id = int(time.time())
return f"""v=0
o=- {session_id} 2 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE 0
a=msid-semantic: WMS
m=application 9 UDP/DTLS/SCTP webrtc-datachannel
c=IN IP4 0.0.0.0
a=ice-ufrag:8hhY
a=ice-pwd:2/1muCWoOi3uLifh/gFHHHHH
a=ice-options:trickle
a=fingerprint:sha-256 19:E2:1C:3B:4B:9F:81:E6:B8:5C:F4:A5:A8:D8:73:04:BB:05:2F:70:9F:04:A9:0E:05:E9:26:33:E8:70:88:A2
a=setup:active
a=mid:0
a=sctp-port:5000
a=max-message-size:262144"""

async def send_signaling_message(self, message: Dict):
"""发送信令消息"""
try:
await self.signaling_websocket.send(json.dumps(message))
except Exception as e:
print(f"Failed to send signaling message: {e}")

async def close(self):
"""关闭连接"""
self.connection_state = "closed"
self.ice_connection_state = "closed"

# 关闭所有数据通道
for data_channel in self.data_channels.values():
await data_channel.close()

print(f"PeerConnection closed: {self.local_user_id} -> {self.remote_user_id}")

class DataChannel:
"""WebRTC数据通道"""

def __init__(self, label: str, peer_connection: PeerConnection):
self.label = label
self.peer_connection = peer_connection
self.ready_state = "connecting"
self.buffered_amount = 0
self.max_packet_life_time = None
self.max_retransmits = None
self.protocol = ""
self.negotiated = False
self.id = None

async def open(self):
"""打开数据通道"""
# 模拟打开延迟
await asyncio.sleep(0.1)
self.ready_state = "open"
print(f"DataChannel '{self.label}' opened")

async def send(self, data):
"""发送数据"""
if self.ready_state != "open":
raise RuntimeError("DataChannel is not open")

# 序列化数据
if isinstance(data, (dict, list)):
serialized_data = json.dumps(data)
else:
serialized_data = str(data)

# 模拟发送
print(f"📤 DataChannel '{self.label}' sending: {serialized_data[:100]}...")

# 模拟网络延迟
await asyncio.sleep(0.01)

# 模拟接收方处理
await self.simulate_receive(serialized_data)

async def simulate_receive(self, data: str):
"""模拟接收数据"""
print(f"📥 DataChannel '{self.label}' received: {data[:100]}...")

# 这里可以添加数据处理逻辑
try:
parsed_data = json.loads(data)
if isinstance(parsed_data, dict) and parsed_data.get('type') == 'ping':
# 响应ping
pong_response = {
'type': 'pong',
'timestamp': time.time(),
'original_timestamp': parsed_data.get('timestamp')
}
await asyncio.sleep(0.1)
await self.send(pong_response)
except json.JSONDecodeError:
pass # 不是JSON数据,忽略

async def close(self):
"""关闭数据通道"""
self.ready_state = "closed"
print(f"DataChannel '{self.label}' closed")

# WebRTC演示
async def demo_webrtc_signaling():
print("WebRTC Signaling Demo")
print("=====================")

# 启动信令服务器
server = WebRTCSignalingServer()
server_task = asyncio.create_task(server.start_server())

# 等待服务器启动
await asyncio.sleep(1)

try:
# 创建两个WebRTC对等端
peer1 = WebRTCPeer("user1")
peer2 = WebRTCPeer("user2")

# 连接到信令服务器
await peer1.connect_signaling()
await peer2.connect_signaling()

# 加入同一个房间
room_id = "test-room"
await peer1.join_room(room_id)
await asyncio.sleep(0.5)
await peer2.join_room(room_id)

# 等待连接建立
print("\nWaiting for peer connection establishment...")
await asyncio.sleep(5)

# 检查连接状态
print("\nConnection Status:")
for remote_user, peer_conn in peer1.peers.items():
print(f"Peer1 -> {remote_user}: {peer_conn.connection_state}")

for remote_user, peer_conn in peer2.peers.items():
print(f"Peer2 -> {remote_user}: {peer_conn.connection_state}")

# 等待数据传输完成
await asyncio.sleep(3)

# 断开连接
print("\nDisconnecting peers...")
await peer1.disconnect()
await peer2.disconnect()

finally:
# 停止服务器
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass

# 运行演示
if __name__ == "__main__":
asyncio.run(demo_webrtc_signaling())

2. NAT穿透与ICE协议

2.1 NAT类型与挑战

NAT(网络地址转换)是WebRTC面临的主要挑战之一。不同类型的NAT对P2P连接有不同的影响:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
import socket
import struct
import random
from enum import Enum
from typing import Tuple, List, Optional, Dict
import asyncio
import time

class NATType(Enum):
"""NAT类型"""
FULL_CONE = "full_cone"
RESTRICTED_CONE = "restricted_cone"
PORT_RESTRICTED_CONE = "port_restricted_cone"
SYMMETRIC = "symmetric"
OPEN_INTERNET = "open_internet"
BLOCKED = "blocked"

class STUNMessageType(Enum):
"""STUN消息类型"""
BINDING_REQUEST = 0x0001
BINDING_RESPONSE = 0x0101
BINDING_ERROR_RESPONSE = 0x0111

class STUNAttribute:
"""STUN属性"""
MAPPED_ADDRESS = 0x0001
RESPONSE_ADDRESS = 0x0002
CHANGE_REQUEST = 0x0003
SOURCE_ADDRESS = 0x0004
CHANGED_ADDRESS = 0x0005
USERNAME = 0x0006
PASSWORD = 0x0007
MESSAGE_INTEGRITY = 0x0008
ERROR_CODE = 0x0009
UNKNOWN_ATTRIBUTES = 0x000A
REFLECTED_FROM = 0x000B
XOR_MAPPED_ADDRESS = 0x0020

class STUNMessage:
"""STUN消息"""

def __init__(self, message_type: STUNMessageType, transaction_id: bytes = None):
self.message_type = message_type
self.transaction_id = transaction_id or self.generate_transaction_id()
self.attributes: Dict[int, bytes] = {}

@staticmethod
def generate_transaction_id() -> bytes:
"""生成事务ID"""
return struct.pack('>III', random.randint(0, 0xFFFFFFFF),
random.randint(0, 0xFFFFFFFF),
random.randint(0, 0xFFFFFFFF))

def add_attribute(self, attr_type: int, value: bytes):
"""添加属性"""
self.attributes[attr_type] = value

def encode(self) -> bytes:
"""编码STUN消息"""
# 计算属性总长度
attrs_data = b''
for attr_type, attr_value in self.attributes.items():
attr_length = len(attr_value)
# 属性头:类型(2字节) + 长度(2字节)
attr_header = struct.pack('>HH', attr_type, attr_length)
attrs_data += attr_header + attr_value

# 4字节对齐
padding = (4 - (attr_length % 4)) % 4
attrs_data += b'\x00' * padding

# STUN消息头:类型(2字节) + 长度(2字节) + Magic Cookie(4字节) + 事务ID(12字节)
message_length = len(attrs_data)
magic_cookie = 0x2112A442

header = struct.pack('>HHI', self.message_type.value, message_length, magic_cookie)
header += self.transaction_id

return header + attrs_data

@classmethod
def decode(cls, data: bytes) -> 'STUNMessage':
"""解码STUN消息"""
if len(data) < 20:
raise ValueError("STUN message too short")

# 解析消息头
message_type, message_length, magic_cookie = struct.unpack('>HHI', data[:8])
transaction_id = data[8:20]

if magic_cookie != 0x2112A442:
raise ValueError("Invalid STUN magic cookie")

message = cls(STUNMessageType(message_type), transaction_id)

# 解析属性
offset = 20
while offset < len(data):
if offset + 4 > len(data):
break

attr_type, attr_length = struct.unpack('>HH', data[offset:offset+4])
offset += 4

if offset + attr_length > len(data):
break

attr_value = data[offset:offset+attr_length]
message.add_attribute(attr_type, attr_value)

# 跳过填充
offset += attr_length
padding = (4 - (attr_length % 4)) % 4
offset += padding

return message

class STUNClient:
"""STUN客户端"""

def __init__(self, stun_server: str = "stun.l.google.com", stun_port: int = 19302):
self.stun_server = stun_server
self.stun_port = stun_port
self.socket = None

async def detect_nat_type(self) -> Tuple[NATType, Optional[Tuple[str, int]]]:
"""检测NAT类型"""
try:
# 测试1:基本绑定请求
mapped_addr1 = await self.binding_request()
if not mapped_addr1:
return NATType.BLOCKED, None

# 测试2:请求服务器从不同IP和端口响应
mapped_addr2 = await self.binding_request(change_ip=True, change_port=True)

if mapped_addr2:
# 如果能收到响应,说明是开放网络或Full Cone NAT
local_addr = self.get_local_address()
if mapped_addr1 == local_addr:
return NATType.OPEN_INTERNET, mapped_addr1
else:
return NATType.FULL_CONE, mapped_addr1

# 测试3:请求服务器从不同IP响应(相同端口)
mapped_addr3 = await self.binding_request(change_ip=True, change_port=False)

if mapped_addr3:
return NATType.RESTRICTED_CONE, mapped_addr1

# 测试4:从不同端口发送请求
mapped_addr4 = await self.binding_request_from_different_port()

if mapped_addr4 and mapped_addr4 != mapped_addr1:
return NATType.SYMMETRIC, mapped_addr1
else:
return NATType.PORT_RESTRICTED_CONE, mapped_addr1

except Exception as e:
print(f"NAT detection failed: {e}")
return NATType.BLOCKED, None

async def binding_request(self, change_ip: bool = False, change_port: bool = False) -> Optional[Tuple[str, int]]:
"""发送绑定请求"""
try:
# 创建UDP套接字
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(5.0)

# 创建STUN绑定请求
request = STUNMessage(STUNMessageType.BINDING_REQUEST)

# 添加CHANGE-REQUEST属性
if change_ip or change_port:
change_flags = 0
if change_ip:
change_flags |= 0x04
if change_port:
change_flags |= 0x02
request.add_attribute(STUNAttribute.CHANGE_REQUEST, struct.pack('>I', change_flags))

# 发送请求
request_data = request.encode()
self.socket.sendto(request_data, (self.stun_server, self.stun_port))

# 接收响应
response_data, addr = self.socket.recvfrom(1024)
response = STUNMessage.decode(response_data)

# 解析MAPPED-ADDRESS或XOR-MAPPED-ADDRESS
mapped_addr = self.extract_mapped_address(response)

return mapped_addr

except socket.timeout:
return None
except Exception as e:
print(f"Binding request failed: {e}")
return None
finally:
if self.socket:
self.socket.close()
self.socket = None

async def binding_request_from_different_port(self) -> Optional[Tuple[str, int]]:
"""从不同端口发送绑定请求"""
try:
# 创建绑定到不同端口的套接字
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('', 0)) # 绑定到随机端口
self.socket.settimeout(5.0)

# 创建STUN绑定请求
request = STUNMessage(STUNMessageType.BINDING_REQUEST)

# 发送请求
request_data = request.encode()
self.socket.sendto(request_data, (self.stun_server, self.stun_port))

# 接收响应
response_data, addr = self.socket.recvfrom(1024)
response = STUNMessage.decode(response_data)

# 解析MAPPED-ADDRESS
mapped_addr = self.extract_mapped_address(response)

return mapped_addr

except socket.timeout:
return None
except Exception as e:
print(f"Different port binding request failed: {e}")
return None
finally:
if self.socket:
self.socket.close()
self.socket = None

def extract_mapped_address(self, response: STUNMessage) -> Optional[Tuple[str, int]]:
"""提取映射地址"""
# 优先使用XOR-MAPPED-ADDRESS
if STUNAttribute.XOR_MAPPED_ADDRESS in response.attributes:
attr_data = response.attributes[STUNAttribute.XOR_MAPPED_ADDRESS]
return self.parse_xor_mapped_address(attr_data, response.transaction_id)

# 使用MAPPED-ADDRESS
elif STUNAttribute.MAPPED_ADDRESS in response.attributes:
attr_data = response.attributes[STUNAttribute.MAPPED_ADDRESS]
return self.parse_mapped_address(attr_data)

return None

def parse_mapped_address(self, attr_data: bytes) -> Tuple[str, int]:
"""解析MAPPED-ADDRESS属性"""
if len(attr_data) < 8:
raise ValueError("Invalid MAPPED-ADDRESS attribute")

family, port, ip_bytes = struct.unpack('>HH4s', attr_data[:8])

if family == 0x01: # IPv4
ip = socket.inet_ntoa(ip_bytes)
return (ip, port)
else:
raise ValueError("Unsupported address family")

def parse_xor_mapped_address(self, attr_data: bytes, transaction_id: bytes) -> Tuple[str, int]:
"""解析XOR-MAPPED-ADDRESS属性"""
if len(attr_data) < 8:
raise ValueError("Invalid XOR-MAPPED-ADDRESS attribute")

family, xor_port, xor_ip_bytes = struct.unpack('>HH4s', attr_data[:8])

if family == 0x01: # IPv4
# XOR解码
magic_cookie = 0x2112A442
port = xor_port ^ (magic_cookie >> 16)

ip_int = struct.unpack('>I', xor_ip_bytes)[0]
ip_int ^= magic_cookie
ip = socket.inet_ntoa(struct.pack('>I', ip_int))

return (ip, port)
else:
raise ValueError("Unsupported address family")

def get_local_address(self) -> Tuple[str, int]:
"""获取本地地址"""
if self.socket:
return self.socket.getsockname()
return ('0.0.0.0', 0)

class ICEAgent:
"""ICE代理"""

def __init__(self, stun_servers: List[str] = None, turn_servers: List[Dict] = None):
self.stun_servers = stun_servers or ["stun.l.google.com:19302"]
self.turn_servers = turn_servers or []
self.local_candidates: List[ICECandidate] = []
self.remote_candidates: List[ICECandidate] = []
self.candidate_pairs: List['ICECandidatePair'] = []
self.selected_pair: Optional['ICECandidatePair'] = None
self.gathering_state = "new"
self.connection_state = "new"

async def gather_candidates(self) -> List[ICECandidate]:
"""收集ICE候选者"""
self.gathering_state = "gathering"
self.local_candidates = []

# 收集Host候选者
await self.gather_host_candidates()

# 收集Server Reflexive候选者
await self.gather_srflx_candidates()

# 收集Relay候选者(如果有TURN服务器)
await self.gather_relay_candidates()

self.gathering_state = "complete"
return self.local_candidates

async def gather_host_candidates(self):
"""收集Host候选者"""
# 获取本地网络接口
local_ips = self.get_local_ips()

for ip in local_ips:
# 为每个IP创建候选者
candidate = ICECandidate(
candidate=f"candidate:1 1 UDP 2130706431 {ip} 54400 typ host",
sdp_mid="0",
sdp_mline_index=0
)
self.local_candidates.append(candidate)
print(f"Gathered host candidate: {ip}:54400")

async def gather_srflx_candidates(self):
"""收集Server Reflexive候选者"""
for stun_server in self.stun_servers:
try:
host, port = stun_server.split(':')
port = int(port)

stun_client = STUNClient(host, port)
mapped_addr = await stun_client.binding_request()

if mapped_addr:
ip, port = mapped_addr
candidate = ICECandidate(
candidate=f"candidate:2 1 UDP 1694498815 {ip} {port} typ srflx raddr 192.168.1.100 rport 54400",
sdp_mid="0",
sdp_mline_index=0
)
self.local_candidates.append(candidate)
print(f"Gathered srflx candidate: {ip}:{port}")

except Exception as e:
print(f"Failed to gather srflx candidate from {stun_server}: {e}")

async def gather_relay_candidates(self):
"""收集Relay候选者"""
for turn_server in self.turn_servers:
try:
# 这里应该实现TURN协议
# 为了演示,我们创建一个模拟的relay候选者
relay_ip = turn_server.get('host', '203.0.113.200')
relay_port = turn_server.get('port', 54402)

candidate = ICECandidate(
candidate=f"candidate:3 1 UDP 16777215 {relay_ip} {relay_port} typ relay raddr 203.0.113.100 rport 54401",
sdp_mid="0",
sdp_mline_index=0
)
self.local_candidates.append(candidate)
print(f"Gathered relay candidate: {relay_ip}:{relay_port}")

except Exception as e:
print(f"Failed to gather relay candidate from {turn_server}: {e}")

def get_local_ips(self) -> List[str]:
"""获取本地IP地址"""
ips = []

# 获取主要网络接口IP
try:
# 连接到外部地址以确定本地IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
ips.append(local_ip)
except:
pass

# 添加回环地址
ips.append("127.0.0.1")

return ips

def add_remote_candidate(self, candidate: ICECandidate):
"""添加远端候选者"""
self.remote_candidates.append(candidate)

# 与本地候选者配对
for local_candidate in self.local_candidates:
pair = ICECandidatePair(local_candidate, candidate)
self.candidate_pairs.append(pair)

async def start_connectivity_checks(self):
"""开始连通性检查"""
self.connection_state = "checking"

# 对候选者对进行优先级排序
self.candidate_pairs.sort(key=lambda pair: pair.priority, reverse=True)

# 执行连通性检查
for pair in self.candidate_pairs:
success = await pair.perform_connectivity_check()
if success:
self.selected_pair = pair
self.connection_state = "connected"
print(f"✅ ICE connection established using pair: {pair}")
return True

self.connection_state = "failed"
print("❌ ICE connection failed")
return False

class ICECandidatePair:
"""ICE候选者对"""

def __init__(self, local_candidate: ICECandidate, remote_candidate: ICECandidate):
self.local_candidate = local_candidate
self.remote_candidate = remote_candidate
self.priority = self.calculate_priority()
self.state = "waiting"

def calculate_priority(self) -> int:
"""计算候选者对优先级"""
# 简化的优先级计算
local_priority = self.get_candidate_priority(self.local_candidate)
remote_priority = self.get_candidate_priority(self.remote_candidate)

# RFC 5245 优先级计算
return (1 << 32) * min(local_priority, remote_priority) + \
2 * max(local_priority, remote_priority) + \
(1 if local_priority > remote_priority else 0)

def get_candidate_priority(self, candidate: ICECandidate) -> int:
"""获取候选者优先级"""
if "typ host" in candidate.candidate:
return 126 # Host候选者优先级最高
elif "typ srflx" in candidate.candidate:
return 100 # Server reflexive候选者
elif "typ relay" in candidate.candidate:
return 0 # Relay候选者优先级最低
else:
return 50 # 未知类型

async def perform_connectivity_check(self) -> bool:
"""执行连通性检查"""
self.state = "in-progress"

try:
# 模拟STUN绑定请求/响应
print(f"Performing connectivity check: {self.local_candidate.candidate[:50]}... -> {self.remote_candidate.candidate[:50]}...")

# 模拟网络延迟和成功率
await asyncio.sleep(0.1)

# 根据候选者类型模拟成功率
success_rate = 0.9 # 默认90%成功率

if "typ host" in self.local_candidate.candidate and "typ host" in self.remote_candidate.candidate:
success_rate = 0.95 # Host到Host成功率更高
elif "typ relay" in self.local_candidate.candidate or "typ relay" in self.remote_candidate.candidate:
success_rate = 0.99 # 通过Relay几乎总是成功

success = random.random() < success_rate

if success:
self.state = "succeeded"
print(f"✅ Connectivity check succeeded")
else:
self.state = "failed"
print(f"❌ Connectivity check failed")

return success

except Exception as e:
self.state = "failed"
print(f"❌ Connectivity check error: {e}")
return False

def __str__(self) -> str:
local_info = self.extract_candidate_info(self.local_candidate)
remote_info = self.extract_candidate_info(self.remote_candidate)
return f"{local_info} <-> {remote_info} (priority: {self.priority})"

def extract_candidate_info(self, candidate: ICECandidate) -> str:
"""提取候选者信息"""
parts = candidate.candidate.split()
if len(parts) >= 5:
ip = parts[4]
port = parts[5]
typ = parts[7] if len(parts) > 7 else "unknown"
return f"{ip}:{port}({typ})"
return "unknown"

# NAT穿透演示
async def demo_nat_detection():
print("NAT Detection Demo")
print("==================")

stun_client = STUNClient()

print("Detecting NAT type...")
nat_type, mapped_addr = await stun_client.detect_nat_type()

print(f"NAT Type: {nat_type.value}")
if mapped_addr:
print(f"Mapped Address: {mapped_addr[0]}:{mapped_addr[1]}")
else:
print("No mapped address detected")

# ICE候选者收集演示
print("\nICE Candidate Gathering Demo")
print("=============================")

ice_agent = ICEAgent()
candidates = await ice_agent.gather_candidates()

print(f"\nGathered {len(candidates)} candidates:")
for i, candidate in enumerate(candidates, 1):
print(f"{i}. {candidate.candidate}")

# 模拟远端候选者
print("\nSimulating remote candidates...")
remote_candidates = [
ICECandidate("candidate:1 1 UDP 2130706431 192.168.1.200 54500 typ host"),
ICECandidate("candidate:2 1 UDP 1694498815 203.0.113.150 54501 typ srflx raddr 192.168.1.200 rport 54500")
]

for candidate in remote_candidates:
ice_agent.add_remote_candidate(candidate)

print(f"Added {len(remote_candidates)} remote candidates")
print(f"Created {len(ice_agent.candidate_pairs)} candidate pairs")

# 执行连通性检查
print("\nStarting connectivity checks...")
success = await ice_agent.start_connectivity_checks()

if success and ice_agent.selected_pair:
print(f"\n✅ ICE connection established!")
print(f"Selected pair: {ice_agent.selected_pair}")
else:
print("\n❌ ICE connection failed")

# 运行NAT检测演示
if __name__ == "__main__":
asyncio.run(demo_nat_detection())

3. 媒体处理与编解码

3.1 媒体流处理

WebRTC的媒体处理包括音视频的采集、编码、传输和解码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
import numpy as np
from typing import List, Dict, Optional, Callable, Any
from dataclasses import dataclass
from enum import Enum
import time
import threading
import queue
import math

class MediaType(Enum):
"""媒体类型"""
AUDIO = "audio"
VIDEO = "video"

class CodecType(Enum):
"""编解码器类型"""
# 音频编解码器
OPUS = "opus"
G722 = "G722"
PCMU = "PCMU"
PCMA = "PCMA"

# 视频编解码器
VP8 = "VP8"
VP9 = "VP9"
H264 = "H264"
AV1 = "AV1"

@dataclass
class MediaFrame:
"""媒体帧"""
media_type: MediaType
data: np.ndarray
timestamp: float
sequence_number: int
codec: CodecType
sample_rate: Optional[int] = None # 音频采样率
channels: Optional[int] = None # 音频声道数
width: Optional[int] = None # 视频宽度
height: Optional[int] = None # 视频高度
frame_rate: Optional[float] = None # 视频帧率

@dataclass
class RTPPacket:
"""RTP数据包"""
version: int = 2
padding: bool = False
extension: bool = False
csrc_count: int = 0
marker: bool = False
payload_type: int = 0
sequence_number: int = 0
timestamp: int = 0
ssrc: int = 0
payload: bytes = b''

def encode(self) -> bytes:
"""编码RTP包"""
# RTP头部(12字节)
header = bytearray(12)

# 第一个字节:版本(2位) + 填充(1位) + 扩展(1位) + CSRC计数(4位)
header[0] = (self.version << 6) | (int(self.padding) << 5) | \
(int(self.extension) << 4) | self.csrc_count

# 第二个字节:标记(1位) + 负载类型(7位)
header[1] = (int(self.marker) << 7) | self.payload_type

# 序列号(2字节)
header[2:4] = self.sequence_number.to_bytes(2, 'big')

# 时间戳(4字节)
header[4:8] = self.timestamp.to_bytes(4, 'big')

# SSRC(4字节)
header[8:12] = self.ssrc.to_bytes(4, 'big')

return bytes(header) + self.payload

@classmethod
def decode(cls, data: bytes) -> 'RTPPacket':
"""解码RTP包"""
if len(data) < 12:
raise ValueError("RTP packet too short")

packet = cls()

# 解析头部
packet.version = (data[0] >> 6) & 0x03
packet.padding = bool((data[0] >> 5) & 0x01)
packet.extension = bool((data[0] >> 4) & 0x01)
packet.csrc_count = data[0] & 0x0F

packet.marker = bool((data[1] >> 7) & 0x01)
packet.payload_type = data[1] & 0x7F

packet.sequence_number = int.from_bytes(data[2:4], 'big')
packet.timestamp = int.from_bytes(data[4:8], 'big')
packet.ssrc = int.from_bytes(data[8:12], 'big')

# 负载数据
header_length = 12 + packet.csrc_count * 4
packet.payload = data[header_length:]

return packet

class AudioProcessor:
"""音频处理器"""

def __init__(self, sample_rate: int = 48000, channels: int = 2, frame_size: int = 960):
self.sample_rate = sample_rate
self.channels = channels
self.frame_size = frame_size # 每帧样本数
self.sequence_number = 0

# 音频处理参数
self.gain = 1.0
self.noise_gate_threshold = 0.01
self.enable_agc = True # 自动增益控制
self.enable_aec = True # 回声消除
self.enable_ns = True # 噪声抑制

def generate_audio_frame(self, frequency: float = 440.0, duration: float = 0.02) -> MediaFrame:
"""生成音频帧(用于测试)"""
samples = int(self.sample_rate * duration)
t = np.linspace(0, duration, samples, False)

# 生成正弦波
audio_data = np.sin(2 * np.pi * frequency * t) * 0.3

# 如果是立体声,复制到两个声道
if self.channels == 2:
audio_data = np.column_stack([audio_data, audio_data])

frame = MediaFrame(
media_type=MediaType.AUDIO,
data=audio_data.astype(np.float32),
timestamp=time.time(),
sequence_number=self.sequence_number,
codec=CodecType.OPUS,
sample_rate=self.sample_rate,
channels=self.channels
)

self.sequence_number += 1
return frame

def apply_audio_processing(self, frame: MediaFrame) -> MediaFrame:
"""应用音频处理"""
processed_data = frame.data.copy()

# 噪声门限
if np.max(np.abs(processed_data)) < self.noise_gate_threshold:
processed_data *= 0.1 # 大幅降低音量

# 自动增益控制(AGC)
if self.enable_agc:
processed_data = self.apply_agc(processed_data)

# 噪声抑制(简化版)
if self.enable_ns:
processed_data = self.apply_noise_suppression(processed_data)

# 回声消除(简化版)
if self.enable_aec:
processed_data = self.apply_echo_cancellation(processed_data)

# 应用增益
processed_data *= self.gain

# 限幅
processed_data = np.clip(processed_data, -1.0, 1.0)

frame.data = processed_data
return frame

def apply_agc(self, audio_data: np.ndarray) -> np.ndarray:
"""自动增益控制"""
target_level = 0.3
current_level = np.sqrt(np.mean(audio_data ** 2))

if current_level > 0:
gain_adjustment = target_level / current_level
# 限制增益调整范围
gain_adjustment = np.clip(gain_adjustment, 0.1, 3.0)
return audio_data * gain_adjustment

return audio_data

def apply_noise_suppression(self, audio_data: np.ndarray) -> np.ndarray:
"""噪声抑制(简化版谱减法)"""
# 简化的噪声抑制:高通滤波
if len(audio_data.shape) == 1:
# 单声道
return self.high_pass_filter(audio_data)
else:
# 多声道
processed = np.zeros_like(audio_data)
for ch in range(audio_data.shape[1]):
processed[:, ch] = self.high_pass_filter(audio_data[:, ch])
return processed

def high_pass_filter(self, signal: np.ndarray, cutoff: float = 80.0) -> np.ndarray:
"""简单的高通滤波器"""
# 简化的一阶高通滤波器
alpha = 1.0 / (1.0 + 2.0 * np.pi * cutoff / self.sample_rate)

filtered = np.zeros_like(signal)
filtered[0] = signal[0]

for i in range(1, len(signal)):
filtered[i] = alpha * (filtered[i-1] + signal[i] - signal[i-1])

return filtered

def apply_echo_cancellation(self, audio_data: np.ndarray) -> np.ndarray:
"""回声消除(简化版)"""
# 简化的回声消除:延迟和衰减
echo_delay = int(0.1 * self.sample_rate) # 100ms延迟
echo_attenuation = 0.3

if len(audio_data) > echo_delay:
# 减去延迟的回声
audio_data[echo_delay:] -= echo_attenuation * audio_data[:-echo_delay]

return audio_data

def encode_opus(self, frame: MediaFrame) -> bytes:
"""OPUS编码(模拟)"""
# 实际应用中应该使用libopus
# 这里只是模拟编码过程

# 量化音频数据
quantized = (frame.data * 32767).astype(np.int16)

# 简单的压缩模拟(实际OPUS编码要复杂得多)
compressed_size = len(quantized.tobytes()) // 4 # 模拟4:1压缩比
compressed_data = quantized.tobytes()[:compressed_size]

return compressed_data

def decode_opus(self, encoded_data: bytes, frame_info: Dict) -> MediaFrame:
"""OPUS解码(模拟)"""
# 实际应用中应该使用libopus
# 这里只是模拟解码过程

# 解压缩数据
decompressed_size = len(encoded_data) * 4 # 模拟解压缩
padded_data = encoded_data + b'\x00' * (decompressed_size - len(encoded_data))

# 转换为音频数据
audio_samples = np.frombuffer(padded_data[:decompressed_size], dtype=np.int16)
audio_data = audio_samples.astype(np.float32) / 32767.0

# 重塑为多声道格式
if self.channels > 1 and len(audio_data) % self.channels == 0:
audio_data = audio_data.reshape(-1, self.channels)

return MediaFrame(
media_type=MediaType.AUDIO,
data=audio_data,
timestamp=frame_info.get('timestamp', time.time()),
sequence_number=frame_info.get('sequence_number', 0),
codec=CodecType.OPUS,
sample_rate=self.sample_rate,
channels=self.channels
)

class VideoProcessor:
"""视频处理器"""

def __init__(self, width: int = 640, height: int = 480, frame_rate: float = 30.0):
self.width = width
self.height = height
self.frame_rate = frame_rate
self.sequence_number = 0

# 视频处理参数
self.enable_denoising = True
self.enable_enhancement = True
self.target_bitrate = 1000000 # 1 Mbps

def generate_video_frame(self, pattern: str = "gradient") -> MediaFrame:
"""生成视频帧(用于测试)"""
if pattern == "gradient":
# 生成渐变图像
frame_data = np.zeros((self.height, self.width, 3), dtype=np.uint8)
for y in range(self.height):
for x in range(self.width):
frame_data[y, x] = [
int(255 * x / self.width),
int(255 * y / self.height),
128
]

elif pattern == "checkerboard":
# 生成棋盘图像
frame_data = np.zeros((self.height, self.width, 3), dtype=np.uint8)
square_size = 32
for y in range(self.height):
for x in range(self.width):
if ((x // square_size) + (y // square_size)) % 2 == 0:
frame_data[y, x] = [255, 255, 255]
else:
frame_data[y, x] = [0, 0, 0]

else:
# 生成随机噪声
frame_data = np.random.randint(0, 256, (self.height, self.width, 3), dtype=np.uint8)

frame = MediaFrame(
media_type=MediaType.VIDEO,
data=frame_data,
timestamp=time.time(),
sequence_number=self.sequence_number,
codec=CodecType.VP8,
width=self.width,
height=self.height,
frame_rate=self.frame_rate
)

self.sequence_number += 1
return frame

def apply_video_processing(self, frame: MediaFrame) -> MediaFrame:
"""应用视频处理"""
processed_data = frame.data.copy()

# 降噪
if self.enable_denoising:
processed_data = self.apply_denoising(processed_data)

# 图像增强
if self.enable_enhancement:
processed_data = self.apply_enhancement(processed_data)

frame.data = processed_data
return frame

def apply_denoising(self, image: np.ndarray) -> np.ndarray:
"""视频降噪(简化版高斯滤波)"""
# 简化的高斯滤波
kernel_size = 3
sigma = 0.8

# 生成高斯核
kernel = self.gaussian_kernel(kernel_size, sigma)

# 应用卷积
filtered_image = np.zeros_like(image)
pad = kernel_size // 2

for c in range(image.shape[2]): # 对每个颜色通道
channel = image[:, :, c]
padded = np.pad(channel, pad, mode='edge')

for y in range(image.shape[0]):
for x in range(image.shape[1]):
region = padded[y:y+kernel_size, x:x+kernel_size]
filtered_image[y, x, c] = np.sum(region * kernel)

return filtered_image.astype(np.uint8)

def gaussian_kernel(self, size: int, sigma: float) -> np.ndarray:
"""生成高斯核"""
kernel = np.zeros((size, size))
center = size // 2

for y in range(size):
for x in range(size):
dx = x - center
dy = y - center
kernel[y, x] = np.exp(-(dx*dx + dy*dy) / (2 * sigma * sigma))

return kernel / np.sum(kernel)

def apply_enhancement(self, image: np.ndarray) -> np.ndarray:
"""图像增强"""
enhanced = image.astype(np.float32)

# 对比度增强
enhanced = self.adjust_contrast(enhanced, 1.2)

# 亮度调整
enhanced = self.adjust_brightness(enhanced, 10)

return np.clip(enhanced, 0, 255).astype(np.uint8)

def adjust_contrast(self, image: np.ndarray, factor: float) -> np.ndarray:
"""调整对比度"""
mean = np.mean(image)
return (image - mean) * factor + mean

def adjust_brightness(self, image: np.ndarray, delta: float) -> np.ndarray:
"""调整亮度"""
return image + delta

def encode_vp8(self, frame: MediaFrame) -> bytes:
"""VP8编码(模拟)"""
# 实际应用中应该使用libvpx
# 这里只是模拟编码过程

# 简化的DCT变换和量化
compressed_data = self.simple_video_compression(frame.data)

return compressed_data

def simple_video_compression(self, image: np.ndarray) -> bytes:
"""简单的视频压缩"""
# 转换为YUV格式
yuv = self.rgb_to_yuv(image)

# 下采样色度分量
y = yuv[:, :, 0]
u = yuv[::2, ::2, 1] # 2x2下采样
v = yuv[::2, ::2, 2] # 2x2下采样

# 量化
y_quantized = (y / 4).astype(np.uint8) * 4
u_quantized = (u / 8).astype(np.uint8) * 8
v_quantized = (v / 8).astype(np.uint8) * 8

# 序列化
compressed = np.concatenate([
y_quantized.flatten(),
u_quantized.flatten(),
v_quantized.flatten()
])

return compressed.tobytes()

def rgb_to_yuv(self, rgb: np.ndarray) -> np.ndarray:
"""RGB转YUV"""
yuv = np.zeros_like(rgb, dtype=np.float32)

# YUV转换矩阵
yuv[:, :, 0] = 0.299 * rgb[:, :, 0] + 0.587 * rgb[:, :, 1] + 0.114 * rgb[:, :, 2] # Y
yuv[:, :, 1] = -0.169 * rgb[:, :, 0] - 0.331 * rgb[:, :, 1] + 0.5 * rgb[:, :, 2] + 128 # U
yuv[:, :, 2] = 0.5 * rgb[:, :, 0] - 0.419 * rgb[:, :, 1] - 0.081 * rgb[:, :, 2] + 128 # V

return np.clip(yuv, 0, 255).astype(np.uint8)

def decode_vp8(self, encoded_data: bytes, frame_info: Dict) -> MediaFrame:
"""VP8解码(模拟)"""
# 实际应用中应该使用libvpx
# 这里只是模拟解码过程

# 反序列化
data_array = np.frombuffer(encoded_data, dtype=np.uint8)

# 计算各分量大小
y_size = self.width * self.height
u_size = (self.width // 2) * (self.height // 2)
v_size = u_size

if len(data_array) >= y_size + u_size + v_size:
# 分离YUV分量
y = data_array[:y_size].reshape(self.height, self.width)
u = data_array[y_size:y_size+u_size].reshape(self.height//2, self.width//2)
v = data_array[y_size+u_size:y_size+u_size+v_size].reshape(self.height//2, self.width//2)

# 上采样色度分量
u_upsampled = np.repeat(np.repeat(u, 2, axis=0), 2, axis=1)
v_upsampled = np.repeat(np.repeat(v, 2, axis=0), 2, axis=1)

# 重建YUV图像
yuv = np.stack([y, u_upsampled, v_upsampled], axis=2)

# 转换回RGB
rgb = self.yuv_to_rgb(yuv)
else:
# 数据不足,生成黑色图像
rgb = np.zeros((self.height, self.width, 3), dtype=np.uint8)

return MediaFrame(
media_type=MediaType.VIDEO,
data=rgb,
timestamp=frame_info.get('timestamp', time.time()),
sequence_number=frame_info.get('sequence_number', 0),
codec=CodecType.VP8,
width=self.width,
height=self.height,
frame_rate=self.frame_rate
)

def yuv_to_rgb(self, yuv: np.ndarray) -> np.ndarray:
"""YUV转RGB"""
rgb = np.zeros_like(yuv, dtype=np.float32)

y = yuv[:, :, 0].astype(np.float32)
u = yuv[:, :, 1].astype(np.float32) - 128
v = yuv[:, :, 2].astype(np.float32) - 128

# RGB转换
rgb[:, :, 0] = y + 1.402 * v # R
rgb[:, :, 1] = y - 0.344 * u - 0.714 * v # G
rgb[:, :, 2] = y + 1.772 * u # B

return np.clip(rgb, 0, 255).astype(np.uint8)

class RTPTransport:
"""RTP传输层"""

def __init__(self, local_port: int = 5004, remote_host: str = "localhost", remote_port: int = 5006):
self.local_port = local_port
self.remote_host = remote_host
self.remote_port = remote_port
self.ssrc = random.randint(0, 0xFFFFFFFF)
self.sequence_number = random.randint(0, 0xFFFF)
self.timestamp_offset = random.randint(0, 0xFFFFFFFF)

# 传输统计
self.packets_sent = 0
self.packets_received = 0
self.bytes_sent = 0
self.bytes_received = 0
self.packet_loss_rate = 0.0

# 缓冲区
self.send_queue = queue.Queue()
self.receive_queue = queue.Queue()

# 线程控制
self.running = False
self.send_thread = None
self.receive_thread = None

def start(self):
"""启动RTP传输"""
self.running = True

# 启动发送和接收线程
self.send_thread = threading.Thread(target=self._send_loop)
self.receive_thread = threading.Thread(target=self._receive_loop)

self.send_thread.start()
self.receive_thread.start()

print(f"RTP transport started on port {self.local_port}")

def stop(self):
"""停止RTP传输"""
self.running = False

if self.send_thread:
self.send_thread.join()
if self.receive_thread:
self.receive_thread.join()

print("RTP transport stopped")

def send_media_frame(self, frame: MediaFrame, payload_type: int = 96):
"""发送媒体帧"""
# 编码媒体帧
if frame.media_type == MediaType.AUDIO:
payload = self._encode_audio_frame(frame)
else:
payload = self._encode_video_frame(frame)

# 创建RTP包
rtp_packet = RTPPacket(
payload_type=payload_type,
sequence_number=self.sequence_number,
timestamp=int(frame.timestamp * 90000) + self.timestamp_offset, # 90kHz时钟
ssrc=self.ssrc,
payload=payload
)

# 添加到发送队列
self.send_queue.put(rtp_packet)
self.sequence_number = (self.sequence_number + 1) % 0x10000

def _encode_audio_frame(self, frame: MediaFrame) -> bytes:
"""编码音频帧"""
# 简化的音频编码
audio_data = (frame.data * 32767).astype(np.int16)
return audio_data.tobytes()

def _encode_video_frame(self, frame: MediaFrame) -> bytes:
"""编码视频帧"""
# 简化的视频编码
return frame.data.tobytes()

def _send_loop(self):
"""发送循环"""
import socket

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while self.running:
try:
# 从队列获取RTP包
rtp_packet = self.send_queue.get(timeout=0.1)

# 编码并发送
packet_data = rtp_packet.encode()
sock.sendto(packet_data, (self.remote_host, self.remote_port))

# 更新统计
self.packets_sent += 1
self.bytes_sent += len(packet_data)

except queue.Empty:
continue
except Exception as e:
print(f"Send error: {e}")

except Exception as e:
print(f"Send loop error: {e}")
finally:
if 'sock' in locals():
sock.close()

def _receive_loop(self):
"""接收循环"""
import socket

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', self.local_port))
sock.settimeout(0.1)

while self.running:
try:
# 接收数据
data, addr = sock.recvfrom(1500)

# 解码RTP包
rtp_packet = RTPPacket.decode(data)

# 添加到接收队列
self.receive_queue.put(rtp_packet)

# 更新统计
self.packets_received += 1
self.bytes_received += len(data)

except socket.timeout:
continue
except Exception as e:
print(f"Receive error: {e}")

except Exception as e:
print(f"Receive loop error: {e}")
finally:
if 'sock' in locals():
sock.close()

def get_received_frame(self) -> Optional[MediaFrame]:
"""获取接收到的媒体帧"""
try:
rtp_packet = self.receive_queue.get_nowait()

# 解码媒体帧(简化版)
frame = MediaFrame(
media_type=MediaType.AUDIO, # 简化假设
data=np.frombuffer(rtp_packet.payload, dtype=np.int16).astype(np.float32) / 32767.0,
timestamp=rtp_packet.timestamp / 90000.0,
sequence_number=rtp_packet.sequence_number,
codec=CodecType.OPUS
)

return frame

except queue.Empty:
return None

def get_statistics(self) -> Dict[str, Any]:
"""获取传输统计"""
return {
'packets_sent': self.packets_sent,
'packets_received': self.packets_received,
'bytes_sent': self.bytes_sent,
'bytes_received': self.bytes_received,
'packet_loss_rate': self.packet_loss_rate
}

# 媒体处理演示
def demo_media_processing():
print("WebRTC Media Processing Demo")
print("=============================")

# 音频处理演示
print("\n1. Audio Processing Demo")
audio_processor = AudioProcessor()

# 生成音频帧
audio_frame = audio_processor.generate_audio_frame(frequency=440.0)
print(f"Generated audio frame: {audio_frame.data.shape}, {audio_frame.sample_rate}Hz")

# 应用音频处理
processed_audio = audio_processor.apply_audio_processing(audio_frame)
print(f"Processed audio frame: RMS = {np.sqrt(np.mean(processed_audio.data**2)):.4f}")

# 编码/解码
encoded_audio = audio_processor.encode_opus(processed_audio)
decoded_audio = audio_processor.decode_opus(encoded_audio, {
'timestamp': processed_audio.timestamp,
'sequence_number': processed_audio.sequence_number
})
print(f"Audio codec: {len(encoded_audio)} bytes -> {decoded_audio.data.shape}")

# 视频处理演示
print("\n2. Video Processing Demo")
video_processor = VideoProcessor()

# 生成视频帧
video_frame = video_processor.generate_video_frame("gradient")
print(f"Generated video frame: {video_frame.data.shape}")

# 应用视频处理
processed_video = video_processor.apply_video_processing(video_frame)
print(f"Processed video frame: mean brightness = {np.mean(processed_video.data):.2f}")

# 编码/解码
encoded_video = video_processor.encode_vp8(processed_video)
decoded_video = video_processor.decode_vp8(encoded_video, {
'timestamp': processed_video.timestamp,
'sequence_number': processed_video.sequence_number
})
print(f"Video codec: {len(encoded_video)} bytes -> {decoded_video.data.shape}")

# RTP传输演示
print("\n3. RTP Transport Demo")
rtp_transport = RTPTransport()
rtp_transport.start()

try:
# 发送几个音频帧
for i in range(5):
frame = audio_processor.generate_audio_frame(frequency=440.0 + i * 100)
rtp_transport.send_media_frame(frame)
time.sleep(0.02) # 20ms间隔

# 等待传输完成
time.sleep(0.5)

# 显示统计信息
stats = rtp_transport.get_statistics()
print(f"RTP Statistics: {stats}")

finally:
rtp_transport.stop()

# 运行媒体处理演示
if __name__ == "__main__":
demo_media_processing()

4. 安全机制与加密

4.1 DTLS与SRTP

WebRTC使用DTLS(Datagram Transport Layer Security)进行密钥交换,使用SRTP(Secure Real-time Transport Protocol)进行媒体加密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
import hashlib
import hmac
import secrets
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import struct
import time

class CipherSuite(Enum):
"""加密套件"""
AES_128_GCM = "SRTP_AES128_CM_HMAC_SHA1_80"
AES_256_GCM = "SRTP_AES256_CM_HMAC_SHA1_80"
CHACHA20_POLY1305 = "SRTP_AEAD_CHACHA20_POLY1305"

@dataclass
class SecurityKeys:
"""安全密钥"""
master_key: bytes
master_salt: bytes
encryption_key: bytes
authentication_key: bytes
salt_key: bytes

class DTLSHandshake:
"""DTLS握手模拟"""

def __init__(self):
self.state = "initial"
self.client_random = None
self.server_random = None
self.pre_master_secret = None
self.master_secret = None
self.cipher_suite = CipherSuite.AES_128_GCM

# 证书指纹(用于验证)
self.local_fingerprint = None
self.remote_fingerprint = None

def generate_client_hello(self) -> Dict:
"""生成客户端Hello消息"""
self.client_random = secrets.token_bytes(32)

client_hello = {
'type': 'client_hello',
'version': 'DTLS_1_2',
'random': self.client_random,
'cipher_suites': [suite.value for suite in CipherSuite],
'extensions': {
'use_srtp': True,
'srtp_profiles': [self.cipher_suite.value]
}
}

self.state = "client_hello_sent"
return client_hello

def process_server_hello(self, server_hello: Dict) -> bool:
"""处理服务器Hello消息"""
if server_hello['type'] != 'server_hello':
return False

self.server_random = server_hello['random']
selected_cipher = server_hello.get('cipher_suite')

# 验证密码套件
for suite in CipherSuite:
if suite.value == selected_cipher:
self.cipher_suite = suite
break
else:
return False

self.state = "server_hello_received"
return True

def generate_key_exchange(self) -> Dict:
"""生成密钥交换消息"""
# 生成预主密钥
self.pre_master_secret = secrets.token_bytes(48)

# 在实际实现中,这里会使用RSA或ECDH进行加密
# 这里简化为直接传输(实际中是加密的)
key_exchange = {
'type': 'client_key_exchange',
'encrypted_pre_master_secret': self.pre_master_secret # 实际中应该加密
}

# 计算主密钥
self.master_secret = self.compute_master_secret()

self.state = "key_exchange_sent"
return key_exchange

def compute_master_secret(self) -> bytes:
"""计算主密钥"""
if not all([self.pre_master_secret, self.client_random, self.server_random]):
raise ValueError("Missing required values for master secret computation")

# PRF (Pseudo-Random Function) - 简化版
seed = b"master secret" + self.client_random + self.server_random
return self.prf(self.pre_master_secret, seed, 48)

def prf(self, secret: bytes, seed: bytes, length: int) -> bytes:
"""伪随机函数(简化版)"""
result = b""
a = seed

while len(result) < length:
a = hmac.new(secret, a, hashlib.sha256).digest()
result += hmac.new(secret, a + seed, hashlib.sha256).digest()

return result[:length]

def derive_srtp_keys(self) -> SecurityKeys:
"""派生SRTP密钥"""
if not self.master_secret:
raise ValueError("Master secret not available")

# SRTP密钥派生
key_material_length = 60 # 根据密码套件确定
key_material = self.prf(
self.master_secret,
b"EXTRACTOR-dtls_srtp" + self.client_random + self.server_random,
key_material_length
)

# 分割密钥材料
master_key = key_material[:16] # AES-128密钥
master_salt = key_material[16:30] # 14字节盐值

# 派生加密和认证密钥
encryption_key = self.derive_key(master_key, master_salt, "encryption", 16)
authentication_key = self.derive_key(master_key, master_salt, "authentication", 20)
salt_key = self.derive_key(master_key, master_salt, "salt", 14)

return SecurityKeys(
master_key=master_key,
master_salt=master_salt,
encryption_key=encryption_key,
authentication_key=authentication_key,
salt_key=salt_key
)

def derive_key(self, master_key: bytes, master_salt: bytes, label: str, length: int) -> bytes:
"""派生特定用途的密钥"""
seed = label.encode() + master_salt
return self.prf(master_key, seed, length)

def verify_fingerprint(self, certificate: bytes, expected_fingerprint: str) -> bool:
"""验证证书指纹"""
# 计算证书的SHA-256指纹
fingerprint = hashlib.sha256(certificate).hexdigest()
formatted_fingerprint = ":".join(fingerprint[i:i+2] for i in range(0, len(fingerprint), 2))

return formatted_fingerprint.upper() == expected_fingerprint.upper()

class SRTPCrypto:
"""SRTP加密/解密"""

def __init__(self, keys: SecurityKeys):
self.keys = keys
self.replay_window = set() # 重放攻击防护
self.roc = 0 # Rollover Counter

def encrypt_rtp(self, rtp_packet: RTPPacket) -> bytes:
"""加密RTP包"""
# 构建IV(初始化向量)
iv = self.construct_iv(rtp_packet.ssrc, rtp_packet.sequence_number)

# 加密负载
encrypted_payload = self.aes_encrypt(rtp_packet.payload, iv)

# 构建SRTP包
srtp_packet = RTPPacket(
version=rtp_packet.version,
padding=rtp_packet.padding,
extension=rtp_packet.extension,
csrc_count=rtp_packet.csrc_count,
marker=rtp_packet.marker,
payload_type=rtp_packet.payload_type,
sequence_number=rtp_packet.sequence_number,
timestamp=rtp_packet.timestamp,
ssrc=rtp_packet.ssrc,
payload=encrypted_payload
)

# 编码包
packet_data = srtp_packet.encode()

# 计算认证标签
auth_tag = self.compute_auth_tag(packet_data)

return packet_data + auth_tag

def decrypt_srtp(self, srtp_data: bytes) -> Optional[RTPPacket]:
"""解密SRTP包"""
if len(srtp_data) < 22: # 最小SRTP包大小
return None

# 分离认证标签
auth_tag_length = 10 # HMAC-SHA1-80
packet_data = srtp_data[:-auth_tag_length]
auth_tag = srtp_data[-auth_tag_length:]

# 验证认证标签
if not self.verify_auth_tag(packet_data, auth_tag):
print("Authentication failed")
return None

# 解码RTP包
try:
rtp_packet = RTPPacket.decode(packet_data)
except Exception:
return None

# 重放攻击检测
if not self.check_replay(rtp_packet.sequence_number):
print("Replay attack detected")
return None

# 构建IV
iv = self.construct_iv(rtp_packet.ssrc, rtp_packet.sequence_number)

# 解密负载
decrypted_payload = self.aes_decrypt(rtp_packet.payload, iv)
rtp_packet.payload = decrypted_payload

return rtp_packet

def construct_iv(self, ssrc: int, sequence_number: int) -> bytes:
"""构建初始化向量"""
# SRTP IV构建:salt XOR (SSRC || ROC || SEQ)
ssrc_bytes = struct.pack('>I', ssrc)
roc_bytes = struct.pack('>I', self.roc)
seq_bytes = struct.pack('>H', sequence_number)

# 构建14字节的包标识符
packet_index = ssrc_bytes + roc_bytes + seq_bytes + b'\x00' * 4

# 与盐值异或
iv = bytes(a ^ b for a, b in zip(self.keys.salt_key, packet_index[:14]))

return iv + b'\x00\x00' # 填充到16字节

def aes_encrypt(self, plaintext: bytes, iv: bytes) -> bytes:
"""AES加密(简化版)"""
# 实际实现应该使用标准的AES库
# 这里使用简单的XOR模拟
key_stream = self.generate_key_stream(iv, len(plaintext))
return bytes(a ^ b for a, b in zip(plaintext, key_stream))

def aes_decrypt(self, ciphertext: bytes, iv: bytes) -> bytes:
"""AES解密(简化版)"""
# AES CTR模式下,加密和解密是相同的操作
return self.aes_encrypt(ciphertext, iv)

def generate_key_stream(self, iv: bytes, length: int) -> bytes:
"""生成密钥流(简化版)"""
# 实际实现应该使用AES CTR模式
# 这里使用HMAC模拟密钥流生成
key_stream = b""
counter = 0

while len(key_stream) < length:
block = hmac.new(
self.keys.encryption_key,
iv + struct.pack('>I', counter),
hashlib.sha256
).digest()
key_stream += block
counter += 1

return key_stream[:length]

def compute_auth_tag(self, packet_data: bytes) -> bytes:
"""计算认证标签"""
# HMAC-SHA1-80(取前10字节)
hmac_full = hmac.new(
self.keys.authentication_key,
packet_data,
hashlib.sha1
).digest()

return hmac_full[:10]

def verify_auth_tag(self, packet_data: bytes, auth_tag: bytes) -> bool:
"""验证认证标签"""
expected_tag = self.compute_auth_tag(packet_data)
return hmac.compare_digest(expected_tag, auth_tag)

def check_replay(self, sequence_number: int) -> bool:
"""检查重放攻击"""
# 简化的重放窗口检查
if sequence_number in self.replay_window:
return False

# 添加到重放窗口
self.replay_window.add(sequence_number)

# 保持窗口大小
if len(self.replay_window) > 64:
min_seq = min(self.replay_window)
self.replay_window.remove(min_seq)

return True

class WebRTCSecurity:
"""WebRTC安全管理器"""

def __init__(self):
self.dtls_handshake = DTLSHandshake()
self.srtp_crypto = None
self.security_keys = None

# 证书和指纹
self.local_certificate = self.generate_certificate()
self.local_fingerprint = self.compute_fingerprint(self.local_certificate)

# 安全策略
self.require_encryption = True
self.allow_insecure_ciphers = False
self.max_handshake_time = 30.0 # 秒

def generate_certificate(self) -> bytes:
"""生成自签名证书(模拟)"""
# 实际实现应该生成真正的X.509证书
# 这里返回模拟的证书数据
cert_data = {
'subject': 'CN=WebRTC-Client',
'issuer': 'CN=WebRTC-Client',
'not_before': time.time(),
'not_after': time.time() + 365 * 24 * 3600, # 1年有效期
'public_key': secrets.token_bytes(256),
'signature': secrets.token_bytes(256)
}

# 序列化证书(简化)
cert_string = str(cert_data).encode()
return hashlib.sha256(cert_string).digest()

def compute_fingerprint(self, certificate: bytes) -> str:
"""计算证书指纹"""
fingerprint = hashlib.sha256(certificate).hexdigest()
return ":".join(fingerprint[i:i+2] for i in range(0, len(fingerprint), 2)).upper()

def perform_dtls_handshake(self, is_client: bool = True) -> bool:
"""执行DTLS握手"""
try:
if is_client:
# 客户端握手
client_hello = self.dtls_handshake.generate_client_hello()
print(f"Sent Client Hello: {client_hello['type']}")

# 模拟服务器响应
server_hello = {
'type': 'server_hello',
'version': 'DTLS_1_2',
'random': secrets.token_bytes(32),
'cipher_suite': CipherSuite.AES_128_GCM.value
}

if not self.dtls_handshake.process_server_hello(server_hello):
return False

print(f"Received Server Hello: {server_hello['type']}")

# 密钥交换
key_exchange = self.dtls_handshake.generate_key_exchange()
print(f"Sent Key Exchange: {key_exchange['type']}")

# 派生SRTP密钥
self.security_keys = self.dtls_handshake.derive_srtp_keys()
self.srtp_crypto = SRTPCrypto(self.security_keys)

print("DTLS handshake completed successfully")
return True

else:
# 服务器握手(简化)
print("Server handshake not implemented in this demo")
return False

except Exception as e:
print(f"DTLS handshake failed: {e}")
return False

def encrypt_media(self, rtp_packet: RTPPacket) -> Optional[bytes]:
"""加密媒体数据"""
if not self.srtp_crypto:
if self.require_encryption:
print("Encryption required but SRTP not initialized")
return None
else:
# 返回未加密的数据
return rtp_packet.encode()

try:
return self.srtp_crypto.encrypt_rtp(rtp_packet)
except Exception as e:
print(f"Encryption failed: {e}")
return None

def decrypt_media(self, srtp_data: bytes) -> Optional[RTPPacket]:
"""解密媒体数据"""
if not self.srtp_crypto:
if self.require_encryption:
print("Encryption required but SRTP not initialized")
return None
else:
# 尝试解码未加密的数据
try:
return RTPPacket.decode(srtp_data)
except Exception:
return None

try:
return self.srtp_crypto.decrypt_srtp(srtp_data)
except Exception as e:
print(f"Decryption failed: {e}")
return None

def get_local_fingerprint(self) -> str:
"""获取本地证书指纹"""
return self.local_fingerprint

def verify_remote_fingerprint(self, remote_fingerprint: str) -> bool:
"""验证远端证书指纹"""
# 在实际应用中,这个指纹应该通过信令通道安全传输
# 这里简化为直接比较
self.dtls_handshake.remote_fingerprint = remote_fingerprint
print(f"Remote fingerprint verified: {remote_fingerprint}")
return True

def get_security_info(self) -> Dict:
"""获取安全信息"""
return {
'dtls_state': self.dtls_handshake.state,
'cipher_suite': self.dtls_handshake.cipher_suite.value if self.dtls_handshake.cipher_suite else None,
'local_fingerprint': self.local_fingerprint,
'remote_fingerprint': self.dtls_handshake.remote_fingerprint,
'srtp_enabled': self.srtp_crypto is not None,
'encryption_required': self.require_encryption
}

# 安全机制演示
def demo_webrtc_security():
print("WebRTC Security Demo")
print("====================")

# 创建安全管理器
security_manager = WebRTCSecurity()

print(f"\n1. Certificate and Fingerprint")
print(f"Local fingerprint: {security_manager.get_local_fingerprint()}")

# 执行DTLS握手
print(f"\n2. DTLS Handshake")
handshake_success = security_manager.perform_dtls_handshake(is_client=True)

if handshake_success:
print("✅ DTLS handshake successful")

# 显示安全信息
security_info = security_manager.get_security_info()
print(f"Security info: {security_info}")

# 测试SRTP加密/解密
print(f"\n3. SRTP Encryption/Decryption Test")

# 创建测试RTP包
test_packet = RTPPacket(
payload_type=96,
sequence_number=12345,
timestamp=1234567890,
ssrc=0x12345678,
payload=b"Hello, WebRTC Security!"
)

print(f"Original payload: {test_packet.payload}")

# 加密
encrypted_data = security_manager.encrypt_media(test_packet)
if encrypted_data:
print(f"Encrypted data length: {len(encrypted_data)} bytes")

# 解密
decrypted_packet = security_manager.decrypt_media(encrypted_data)
if decrypted_packet:
print(f"Decrypted payload: {decrypted_packet.payload}")

# 验证数据完整性
if decrypted_packet.payload == test_packet.payload:
print("✅ Encryption/Decryption successful")
else:
print("❌ Data integrity check failed")
else:
print("❌ Decryption failed")
else:
print("❌ Encryption failed")

else:
print("❌ DTLS handshake failed")

# 测试重放攻击防护
if security_manager.srtp_crypto:
print(f"\n4. Replay Attack Protection Test")

# 尝试重放相同的包
if encrypted_data:
print("Attempting to replay the same packet...")
replayed_packet = security_manager.decrypt_media(encrypted_data)

if replayed_packet is None:
print("✅ Replay attack successfully blocked")
else:
print("❌ Replay attack not detected")

# 运行安全演示
if __name__ == "__main__":
demo_webrtc_security()

5. 性能优化与质量控制

5.1 自适应码率控制

WebRTC需要根据网络条件动态调整编码参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import statistics
from collections import deque
from typing import Dict, List, Optional, Tuple
import time
import math

class NetworkStats:
"""网络统计信息"""

def __init__(self, window_size: int = 100):
self.window_size = window_size

# 统计数据窗口
self.rtt_samples = deque(maxlen=window_size)
self.loss_samples = deque(maxlen=window_size)
self.bandwidth_samples = deque(maxlen=window_size)
self.jitter_samples = deque(maxlen=window_size)

# 当前统计值
self.current_rtt = 0.0
self.current_loss_rate = 0.0
self.current_bandwidth = 0.0
self.current_jitter = 0.0

# 趋势分析
self.rtt_trend = "stable"
self.loss_trend = "stable"
self.bandwidth_trend = "stable"

def update_rtt(self, rtt: float):
"""更新RTT统计"""
self.rtt_samples.append(rtt)
self.current_rtt = statistics.mean(self.rtt_samples)
self.rtt_trend = self._analyze_trend(self.rtt_samples)

def update_loss_rate(self, loss_rate: float):
"""更新丢包率统计"""
self.loss_samples.append(loss_rate)
self.current_loss_rate = statistics.mean(self.loss_samples)
self.loss_trend = self._analyze_trend(self.loss_samples)

def update_bandwidth(self, bandwidth: float):
"""更新带宽统计"""
self.bandwidth_samples.append(bandwidth)
self.current_bandwidth = statistics.mean(self.bandwidth_samples)
self.bandwidth_trend = self._analyze_trend(self.bandwidth_samples)

def update_jitter(self, jitter: float):
"""更新抖动统计"""
self.jitter_samples.append(jitter)
self.current_jitter = statistics.mean(self.jitter_samples)

def _analyze_trend(self, samples: deque) -> str:
"""分析趋势"""
if len(samples) < 10:
return "stable"

recent = list(samples)[-10:]
older = list(samples)[-20:-10] if len(samples) >= 20 else list(samples)[:-10]

if not older:
return "stable"

recent_avg = statistics.mean(recent)
older_avg = statistics.mean(older)

change_ratio = (recent_avg - older_avg) / older_avg if older_avg > 0 else 0

if change_ratio > 0.1:
return "increasing"
elif change_ratio < -0.1:
return "decreasing"
else:
return "stable"

def get_network_quality(self) -> str:
"""评估网络质量"""
# 综合评估网络质量
quality_score = 100

# RTT影响
if self.current_rtt > 200:
quality_score -= 30
elif self.current_rtt > 100:
quality_score -= 15

# 丢包率影响
if self.current_loss_rate > 0.05:
quality_score -= 40
elif self.current_loss_rate > 0.02:
quality_score -= 20

# 抖动影响
if self.current_jitter > 50:
quality_score -= 20
elif self.current_jitter > 20:
quality_score -= 10

if quality_score >= 80:
return "excellent"
elif quality_score >= 60:
return "good"
elif quality_score >= 40:
return "fair"
else:
return "poor"

class BitrateController:
"""码率控制器"""

def __init__(self, initial_bitrate: int = 1000000):
self.current_bitrate = initial_bitrate
self.min_bitrate = 100000 # 100 kbps
self.max_bitrate = 5000000 # 5 Mbps

# 控制参数
self.increase_factor = 1.05
self.decrease_factor = 0.85
self.aggressive_decrease_factor = 0.5

# 状态跟踪
self.last_adjustment_time = time.time()
self.adjustment_interval = 1.0 # 1秒调整间隔
self.consecutive_increases = 0
self.consecutive_decreases = 0

def adjust_bitrate(self, network_stats: NetworkStats) -> int:
"""根据网络状况调整码率"""
current_time = time.time()

# 检查调整间隔
if current_time - self.last_adjustment_time < self.adjustment_interval:
return self.current_bitrate

old_bitrate = self.current_bitrate

# 根据网络质量调整
network_quality = network_stats.get_network_quality()

if network_quality == "poor":
# 网络质量差,激进降低码率
self.current_bitrate = int(self.current_bitrate * self.aggressive_decrease_factor)
self.consecutive_decreases += 1
self.consecutive_increases = 0

elif network_quality == "fair":
# 网络质量一般,适度降低码率
self.current_bitrate = int(self.current_bitrate * self.decrease_factor)
self.consecutive_decreases += 1
self.consecutive_increases = 0

elif network_quality in ["good", "excellent"]:
# 网络质量好,可以尝试增加码率
if self.consecutive_decreases > 0:
# 刚从降低中恢复,保持当前码率
self.consecutive_decreases = 0
else:
# 持续良好,增加码率
self.current_bitrate = int(self.current_bitrate * self.increase_factor)
self.consecutive_increases += 1

# 限制码率范围
self.current_bitrate = max(self.min_bitrate, min(self.max_bitrate, self.current_bitrate))

self.last_adjustment_time = current_time

# 记录调整
if self.current_bitrate != old_bitrate:
print(f"Bitrate adjusted: {old_bitrate} -> {self.current_bitrate} bps (quality: {network_quality})")

return self.current_bitrate

def get_encoding_params(self) -> Dict:
"""获取编码参数"""
# 根据码率计算编码参数
if self.current_bitrate >= 2000000:
# 高码率
return {
'resolution': (1280, 720),
'framerate': 30,
'keyframe_interval': 60,
'quality': 'high'
}
elif self.current_bitrate >= 1000000:
# 中等码率
return {
'resolution': (854, 480),
'framerate': 25,
'keyframe_interval': 50,
'quality': 'medium'
}
elif self.current_bitrate >= 500000:
# 低码率
return {
'resolution': (640, 360),
'framerate': 20,
'keyframe_interval': 40,
'quality': 'low'
}
else:
# 极低码率
return {
'resolution': (320, 240),
'framerate': 15,
'keyframe_interval': 30,
'quality': 'very_low'
}

class QualityController:
"""质量控制器"""

def __init__(self):
self.network_stats = NetworkStats()
self.bitrate_controller = BitrateController()

# 质量指标
self.target_fps = 30
self.current_fps = 30
self.frame_drop_count = 0
self.total_frames = 0

# 自适应参数
self.enable_frame_dropping = True
self.enable_resolution_scaling = True
self.enable_framerate_adaptation = True

def process_rtcp_feedback(self, rtcp_data: Dict):
"""处理RTCP反馈"""
# 更新网络统计
if 'rtt' in rtcp_data:
self.network_stats.update_rtt(rtcp_data['rtt'])

if 'loss_rate' in rtcp_data:
self.network_stats.update_loss_rate(rtcp_data['loss_rate'])

if 'bandwidth' in rtcp_data:
self.network_stats.update_bandwidth(rtcp_data['bandwidth'])

if 'jitter' in rtcp_data:
self.network_stats.update_jitter(rtcp_data['jitter'])

# 调整码率
new_bitrate = self.bitrate_controller.adjust_bitrate(self.network_stats)

# 更新编码参数
encoding_params = self.bitrate_controller.get_encoding_params()
self._apply_encoding_params(encoding_params)

def _apply_encoding_params(self, params: Dict):
"""应用编码参数"""
if self.enable_resolution_scaling:
print(f"Resolution: {params['resolution']}")

if self.enable_framerate_adaptation:
self.target_fps = params['framerate']
print(f"Target FPS: {self.target_fps}")

print(f"Quality level: {params['quality']}")

def should_drop_frame(self) -> bool:
"""判断是否应该丢帧"""
if not self.enable_frame_dropping:
return False

# 基于网络质量决定丢帧
network_quality = self.network_stats.get_network_quality()

if network_quality == "poor":
# 网络质量差,丢弃50%的帧
return self.total_frames % 2 == 0
elif network_quality == "fair":
# 网络质量一般,丢弃25%的帧
return self.total_frames % 4 == 0

return False

def process_frame(self, frame: MediaFrame) -> Optional[MediaFrame]:
"""处理媒体帧"""
self.total_frames += 1

# 检查是否需要丢帧
if self.should_drop_frame():
self.frame_drop_count += 1
print(f"Frame dropped (total drops: {self.frame_drop_count})")
return None

# 应用质量调整
processed_frame = self._apply_quality_adjustments(frame)

return processed_frame

def _apply_quality_adjustments(self, frame: MediaFrame) -> MediaFrame:
"""应用质量调整"""
# 根据当前编码参数调整帧质量
encoding_params = self.bitrate_controller.get_encoding_params()

if frame.media_type == MediaType.VIDEO:
# 视频帧处理
target_resolution = encoding_params['resolution']

if (frame.width, frame.height) != target_resolution:
# 需要缩放分辨率
frame = self._resize_video_frame(frame, target_resolution)

return frame

def _resize_video_frame(self, frame: MediaFrame, target_resolution: Tuple[int, int]) -> MediaFrame:
"""调整视频帧分辨率"""
target_width, target_height = target_resolution

# 简化的双线性插值缩放
if frame.data.shape[:2] != (target_height, target_width):
# 实际实现应该使用专业的图像缩放算法
# 这里简化为直接调整数组大小

scale_y = target_height / frame.height
scale_x = target_width / frame.width

# 创建新的帧数据
new_data = np.zeros((target_height, target_width, 3), dtype=np.uint8)

for y in range(target_height):
for x in range(target_width):
src_y = min(int(y / scale_y), frame.height - 1)
src_x = min(int(x / scale_x), frame.width - 1)
new_data[y, x] = frame.data[src_y, src_x]

frame.data = new_data
frame.width = target_width
frame.height = target_height

return frame

def get_quality_metrics(self) -> Dict:
"""获取质量指标"""
drop_rate = self.frame_drop_count / self.total_frames if self.total_frames > 0 else 0

return {
'network_quality': self.network_stats.get_network_quality(),
'current_bitrate': self.bitrate_controller.current_bitrate,
'current_rtt': self.network_stats.current_rtt,
'loss_rate': self.network_stats.current_loss_rate,
'jitter': self.network_stats.current_jitter,
'frame_drop_rate': drop_rate,
'total_frames': self.total_frames,
'target_fps': self.target_fps
}

# 性能优化演示
def demo_quality_control():
print("WebRTC Quality Control Demo")
print("============================")

# 创建质量控制器
quality_controller = QualityController()

# 模拟网络条件变化
network_scenarios = [
{'name': 'Good Network', 'rtt': 50, 'loss_rate': 0.001, 'bandwidth': 2000000, 'jitter': 5},
{'name': 'Degrading Network', 'rtt': 100, 'loss_rate': 0.02, 'bandwidth': 1000000, 'jitter': 15},
{'name': 'Poor Network', 'rtt': 300, 'loss_rate': 0.08, 'bandwidth': 300000, 'jitter': 50},
{'name': 'Recovering Network', 'rtt': 80, 'loss_rate': 0.01, 'bandwidth': 1500000, 'jitter': 10}
]

# 创建视频处理器
video_processor = VideoProcessor(width=1280, height=720, frame_rate=30)

for i, scenario in enumerate(network_scenarios):
print(f"\n{i+1}. Testing {scenario['name']}")
print(f" RTT: {scenario['rtt']}ms, Loss: {scenario['loss_rate']*100:.1f}%, "
f"Bandwidth: {scenario['bandwidth']/1000:.0f}kbps, Jitter: {scenario['jitter']}ms")

# 模拟RTCP反馈
rtcp_feedback = {
'rtt': scenario['rtt'],
'loss_rate': scenario['loss_rate'],
'bandwidth': scenario['bandwidth'],
'jitter': scenario['jitter']
}

# 处理反馈并调整参数
quality_controller.process_rtcp_feedback(rtcp_feedback)

# 模拟处理几帧视频
frames_processed = 0
frames_dropped = 0

for frame_num in range(10):
# 生成测试帧
test_frame = video_processor.generate_video_frame("gradient")

# 处理帧
processed_frame = quality_controller.process_frame(test_frame)

if processed_frame is not None:
frames_processed += 1
else:
frames_dropped += 1

print(f" Frames processed: {frames_processed}, dropped: {frames_dropped}")

# 显示质量指标
metrics = quality_controller.get_quality_metrics()
print(f" Quality metrics: {metrics}")

# 等待一段时间模拟实时处理
time.sleep(0.1)

print(f"\nFinal Quality Metrics:")
final_metrics = quality_controller.get_quality_metrics()
for key, value in final_metrics.items():
print(f" {key}: {value}")

# 运行质量控制演示
if __name__ == "__main__":
demo_quality_control()

6. 总结与实际应用

6.1 关键技术要点

WebRTC实时通信技术涵盖了多个关键领域:

  1. 信令与连接建立

    • SDP协商:媒体能力交换和会话描述
    • ICE协议:NAT穿透和连接路径优化
    • 信令服务器:协调对等端之间的通信
  2. 媒体处理

    • 音频处理:AGC、噪声抑制、回声消除
    • 视频处理:降噪、图像增强、编解码
    • 实时传输:RTP/RTCP协议栈
  3. 安全机制

    • DTLS握手:密钥交换和身份验证
    • SRTP加密:媒体数据保护
    • 证书验证:防止中间人攻击
  4. 质量控制

    • 自适应码率:根据网络条件动态调整
    • 帧率控制:平衡质量与流畅度
    • 错误恢复:丢包重传和前向纠错

6.2 技术发展趋势

AI增强的WebRTC

  • 智能降噪:基于深度学习的音频处理
  • 视频增强:AI驱动的图像质量提升
  • 自适应编码:机器学习优化的码率控制

边缘计算集成

  • 就近处理:减少延迟和带宽消耗
  • 分布式架构:提高系统可扩展性
  • 智能路由:优化数据传输路径

新兴标准支持

  • AV1编码:下一代视频压缩标准
  • QUIC协议:基于UDP的可靠传输
  • WebCodecs API:更灵活的编解码控制

6.3 实际应用建议

架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class WebRTCApplication:
"""WebRTC应用架构示例"""

def __init__(self, config: Dict):
# 核心组件
self.signaling_server = SignalingServer(config['signaling'])
self.media_engine = MediaEngine(config['media'])
self.security_manager = WebRTCSecurity()
self.quality_controller = QualityController()

# 应用配置
self.app_type = config.get('app_type', 'video_call')
self.max_participants = config.get('max_participants', 2)
self.enable_recording = config.get('enable_recording', False)

# 性能监控
self.metrics_collector = MetricsCollector()
self.alert_manager = AlertManager()

def initialize(self):
"""初始化应用"""
# 启动信令服务
self.signaling_server.start()

# 初始化媒体引擎
self.media_engine.initialize()

# 设置安全策略
self.security_manager.configure_policies()

print("WebRTC application initialized")

def create_session(self, session_config: Dict) -> str:
"""创建通信会话"""
session_id = self.signaling_server.create_session(session_config)

# 配置媒体处理
media_config = self._get_media_config(session_config)
self.media_engine.configure_session(session_id, media_config)

# 启用质量控制
self.quality_controller.enable_for_session(session_id)

return session_id

def _get_media_config(self, session_config: Dict) -> Dict:
"""获取媒体配置"""
if self.app_type == 'video_call':
return {
'video': {
'codec': 'VP8',
'resolution': '720p',
'framerate': 30,
'bitrate': 1000000
},
'audio': {
'codec': 'OPUS',
'sample_rate': 48000,
'channels': 2,
'bitrate': 64000
}
}
elif self.app_type == 'screen_share':
return {
'video': {
'codec': 'VP9',
'resolution': '1080p',
'framerate': 15,
'bitrate': 2000000
},
'audio': {
'codec': 'OPUS',
'sample_rate': 48000,
'channels': 1,
'bitrate': 32000
}
}
else:
return session_config.get('media', {})

# 应用配置示例
app_config = {
'app_type': 'video_call',
'max_participants': 4,
'enable_recording': True,
'signaling': {
'server_url': 'wss://signaling.example.com',
'auth_token': 'your_auth_token'
},
'media': {
'enable_simulcast': True,
'enable_svc': False,
'preferred_codecs': ['VP9', 'VP8', 'H264']
}
}

# 创建应用实例
app = WebRTCApplication(app_config)
app.initialize()

性能优化策略

  1. 网络适应性

    • 实施多层次的质量控制
    • 使用simulcast支持多码率
    • 启用SVC(可伸缩视频编码)
  2. 资源管理

    • CPU使用率监控和限制
    • 内存池管理避免频繁分配
    • GPU加速编解码(如可用)
  3. 用户体验优化

    • 快速连接建立(< 3秒)
    • 低延迟传输(< 150ms)
    • 平滑的质量切换

部署考虑

  1. 基础设施

    • STUN/TURN服务器部署
    • 信令服务器集群
    • CDN集成用于静态资源
  2. 监控与运维

    • 实时质量监控
    • 错误日志收集
    • 性能指标分析
  3. 安全防护

    • DDoS攻击防护
    • 访问控制和身份验证
    • 数据加密和隐私保护

WebRTC技术为实时通信应用提供了强大的基础,但成功的实现需要综合考虑技术架构、性能优化、安全防护和用户体验等多个方面。随着技术的不断发展,WebRTC将在更多场景中发挥重要作用,推动实时通信技术的创新和应用。


本文详细介绍了WebRTC实时通信技术的核心组件、实现原理和实际应用。通过深入理解信令机制、媒体处理、安全加密和质量控制等关键技术,开发者可以构建高质量的实时通信应用。

版权所有,如有侵权请联系我