Apache Cassandra - 分布式NoSQL数据库

项目简介

Apache Cassandra是一个开源的分布式NoSQL数据库,最初由Facebook开发,后来成为Apache软件基金会的顶级项目。Cassandra结合了Amazon DynamoDB的分布式设计和Google BigTable的数据模型,提供了高可用性、高性能和线性扩展能力。

Cassandra采用去中心化的架构,没有单点故障,能够跨多个数据中心部署,为大规模应用提供连续可用的数据服务。它特别适合需要快速写入和线性扩展的应用场景。

主要特性

  • 线性扩展:水平扩展能力,节点数量增加性能线性提升
  • 高可用性:无单点故障,自动故障检测和恢复
  • 最终一致性:可调整的一致性级别
  • 分布式架构:去中心化的对等节点架构
  • 多数据中心支持:跨地域数据复制和故障切换
  • 灵活数据模型:列族数据模型,支持复杂数据结构

项目原理

核心架构

Cassandra采用环形架构,所有节点地位平等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Cassandra集群架构
├── Ring Architecture (环形架构)
│ ├── Node 1 (Token Range: 0-256)
│ ├── Node 2 (Token Range: 256-512)
│ ├── Node 3 (Token Range: 512-768)
│ └── Node 4 (Token Range: 768-1024)
├── Data Distribution (数据分布)
│ ├── Consistent Hashing (一致性哈希)
│ ├── Virtual Nodes (虚拟节点)
│ └── Replication Strategy (复制策略)
└── Gossip Protocol (Gossip协议)
├── Failure Detection (故障检测)
├── State Propagation (状态传播)
└── Ring Membership (环成员管理)

数据模型

Keyspace(键空间)

  • 类似于关系数据库的数据库
  • 定义复制策略和副本数量
  • 包含多个列族

Column Family(列族)

  • 类似于关系数据库的表
  • 包含行和列的集合
  • 支持动态列添加

Row(行)

  • 由唯一的行键标识
  • 包含多个列
  • 列可以动态添加

Column(列)

  • 由名称、值和时间戳组成
  • 支持TTL(生存时间)
  • 可以嵌套形成复合结构

一致性级别

Cassandra提供可调整的一致性级别:

写一致性级别

  • ONE:至少一个副本写入成功
  • QUORUM:大多数副本写入成功
  • ALL:所有副本写入成功

读一致性级别

  • ONE:从一个副本读取
  • QUORUM:从大多数副本读取并比较
  • ALL:从所有副本读取并比较

使用场景

1. 时序数据存储

存储大量的时间序列数据,如监控指标、日志数据、传感器数据等。

2. 用户个性化数据

存储用户配置、偏好设置、购物车等个性化数据。

3. 内容管理系统

管理大量的媒体文件、文档、评论等内容数据。

4. 物联网数据平台

处理大规模物联网设备产生的数据流。

5. 实时推荐系统

存储用户行为数据,支持实时推荐算法。

具体案例

案例1:基本数据操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
-- 创建键空间
CREATE KEYSPACE ecommerce
WITH REPLICATION = {
'class': 'NetworkTopologyStrategy',
'dc1': 3,
'dc2': 2
} AND DURABLE_WRITES = true;

-- 使用键空间
USE ecommerce;

-- 创建用户表
CREATE TABLE users (
user_id UUID PRIMARY KEY,
username TEXT,
email TEXT,
first_name TEXT,
last_name TEXT,
phone TEXT,
address MAP<TEXT, TEXT>,
preferences SET<TEXT>,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

-- 创建产品表
CREATE TABLE products (
product_id UUID PRIMARY KEY,
name TEXT,
description TEXT,
price DECIMAL,
category TEXT,
tags SET<TEXT>,
attributes MAP<TEXT, TEXT>,
inventory_count INT,
created_at TIMESTAMP
);

-- 创建订单表
CREATE TABLE orders (
order_id UUID,
user_id UUID,
order_date TIMESTAMP,
status TEXT,
total_amount DECIMAL,
items LIST<FROZEN<order_item>>,
shipping_address MAP<TEXT, TEXT>,
PRIMARY KEY (user_id, order_date, order_id)
) WITH CLUSTERING ORDER BY (order_date DESC);

-- 定义订单项类型
CREATE TYPE order_item (
product_id UUID,
product_name TEXT,
quantity INT,
unit_price DECIMAL
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Java客户端操作示例
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;

public class CassandraExample {
private Cluster cluster;
private Session session;

public void connect(String[] contactPoints) {
cluster = Cluster.builder()
.addContactPoints(contactPoints)
.withCredentials("username", "password")
.withRetryPolicy(DefaultRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(1000L))
.build();

session = cluster.connect("ecommerce");
System.out.println("连接到Cassandra集群成功");
}

// 插入用户数据
public void insertUser(UUID userId, String username, String email,
String firstName, String lastName) {
String query = "INSERT INTO users (user_id, username, email, first_name, last_name, created_at) " +
"VALUES (?, ?, ?, ?, ?, ?)";

PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId, username, email,
firstName, lastName, new Date());

session.execute(bound);
System.out.println("用户插入成功: " + username);
}

// 查询用户数据
public void getUserById(UUID userId) {
String query = "SELECT * FROM users WHERE user_id = ?";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId);

ResultSet resultSet = session.execute(bound);
Row row = resultSet.one();

if (row != null) {
System.out.println("用户信息:");
System.out.println("ID: " + row.getUUID("user_id"));
System.out.println("用户名: " + row.getString("username"));
System.out.println("邮箱: " + row.getString("email"));
System.out.println("姓名: " + row.getString("first_name") + " " +
row.getString("last_name"));
}
}

// 批量操作
public void batchInsertProducts(List<Product> products) {
BatchStatement batch = new BatchStatement();

String query = "INSERT INTO products (product_id, name, description, price, category, created_at) " +
"VALUES (?, ?, ?, ?, ?, ?)";
PreparedStatement prepared = session.prepare(query);

for (Product product : products) {
BoundStatement bound = prepared.bind(
product.getId(),
product.getName(),
product.getDescription(),
product.getPrice(),
product.getCategory(),
new Date()
);
batch.add(bound);
}

session.execute(batch);
System.out.println("批量插入产品成功,数量: " + products.size());
}

// 异步操作
public void asyncInsertUser(UUID userId, String username, String email) {
String query = "INSERT INTO users (user_id, username, email, created_at) VALUES (?, ?, ?, ?)";
PreparedStatement prepared = session.prepare(query);
BoundStatement bound = prepared.bind(userId, username, email, new Date());

ResultSetFuture future = session.executeAsync(bound);

// 设置回调
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
System.out.println("异步插入成功: " + username);
}

@Override
public void onFailure(Throwable t) {
System.err.println("异步插入失败: " + t.getMessage());
}
});
}

public void close() {
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

案例2:时序数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- 创建传感器数据表
CREATE TABLE sensor_data (
sensor_id TEXT,
year INT,
month INT,
day INT,
hour INT,
timestamp TIMESTAMP,
temperature DOUBLE,
humidity DOUBLE,
pressure DOUBLE,
location MAP<TEXT, DOUBLE>,
PRIMARY KEY ((sensor_id, year, month, day), hour, timestamp)
) WITH CLUSTERING ORDER BY (hour ASC, timestamp ASC)
AND COMPACTION = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1
};

-- 创建传感器数据汇总表
CREATE TABLE sensor_data_hourly (
sensor_id TEXT,
date DATE,
hour INT,
avg_temperature DOUBLE,
max_temperature DOUBLE,
min_temperature DOUBLE,
avg_humidity DOUBLE,
sample_count INT,
PRIMARY KEY ((sensor_id, date), hour)
) WITH CLUSTERING ORDER BY (hour ASC);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// 时序数据处理服务
public class SensorDataService {
private Session session;
private PreparedStatement insertStatement;
private PreparedStatement queryStatement;

public SensorDataService(Session session) {
this.session = session;
prepareStatements();
}

private void prepareStatements() {
// 插入传感器数据
String insertQuery = "INSERT INTO sensor_data " +
"(sensor_id, year, month, day, hour, timestamp, temperature, humidity, pressure, location) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?";
insertStatement = session.prepare(insertQuery);

// 查询传感器数据
String queryRangeQuery = "SELECT * FROM sensor_data " +
"WHERE sensor_id = ? AND year = ? AND month = ? AND day = ? " +
"AND hour >= ? AND hour <= ? ORDER BY hour ASC, timestamp ASC";
queryStatement = session.prepare(queryRangeQuery);
}

// 插入传感器数据
public void insertSensorData(String sensorId, Date timestamp,
double temperature, double humidity, double pressure,
Map<String, Double> location) {
Calendar cal = Calendar.getInstance();
cal.setTime(timestamp);

int year = cal.get(Calendar.YEAR);
int month = cal.get(Calendar.MONTH) + 1;
int day = cal.get(Calendar.DAY_OF_MONTH);
int hour = cal.get(Calendar.HOUR_OF_DAY);

// 设置TTL为30天
int ttl = 30 * 24 * 60 * 60;

BoundStatement bound = insertStatement.bind(
sensorId, year, month, day, hour, timestamp,
temperature, humidity, pressure, location, ttl
);

session.executeAsync(bound);
}

// 批量插入传感器数据
public void batchInsertSensorData(List<SensorReading> readings) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);

for (SensorReading reading : readings) {
Calendar cal = Calendar.getInstance();
cal.setTime(reading.getTimestamp());

BoundStatement bound = insertStatement.bind(
reading.getSensorId(),
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DAY_OF_MONTH),
cal.get(Calendar.HOUR_OF_DAY),
reading.getTimestamp(),
reading.getTemperature(),
reading.getHumidity(),
reading.getPressure(),
reading.getLocation(),
30 * 24 * 60 * 60 // TTL
);
batch.add(bound);

// 限制批次大小
if (batch.size() >= 100) {
session.execute(batch);
batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
}
}

if (batch.size() > 0) {
session.execute(batch);
}
}

// 查询时间范围内的数据
public List<SensorReading> getSensorDataRange(String sensorId, Date startDate, Date endDate) {
List<SensorReading> results = new ArrayList<>();

Calendar start = Calendar.getInstance();
start.setTime(startDate);
Calendar end = Calendar.getInstance();
end.setTime(endDate);

// 按天查询数据
while (!start.after(end)) {
int year = start.get(Calendar.YEAR);
int month = start.get(Calendar.MONTH) + 1;
int day = start.get(Calendar.DAY_OF_MONTH);

BoundStatement bound = queryStatement.bind(sensorId, year, month, day, 0, 23);
ResultSet resultSet = session.execute(bound);

for (Row row : resultSet) {
SensorReading reading = new SensorReading();
reading.setSensorId(row.getString("sensor_id"));
reading.setTimestamp(row.getTimestamp("timestamp"));
reading.setTemperature(row.getDouble("temperature"));
reading.setHumidity(row.getDouble("humidity"));
reading.setPressure(row.getDouble("pressure"));
reading.setLocation(row.getMap("location", String.class, Double.class));
results.add(reading);
}

start.add(Calendar.DAY_OF_MONTH, 1);
}

return results;
}

// 数据聚合处理
public void aggregateHourlyData(String sensorId, Date date) {
// 查询一天的数据
Calendar cal = Calendar.getInstance();
cal.setTime(date);

String query = "SELECT hour, temperature, humidity FROM sensor_data " +
"WHERE sensor_id = ? AND year = ? AND month = ? AND day = ?";

PreparedStatement stmt = session.prepare(query);
BoundStatement bound = stmt.bind(
sensorId,
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DAY_OF_MONTH)
);

ResultSet resultSet = session.execute(bound);

// 按小时聚合数据
Map<Integer, List<Double>> temperatureByHour = new HashMap<>();
Map<Integer, List<Double>> humidityByHour = new HashMap<>();

for (Row row : resultSet) {
int hour = row.getInt("hour");
double temp = row.getDouble("temperature");
double hum = row.getDouble("humidity");

temperatureByHour.computeIfAbsent(hour, k -> new ArrayList<>()).add(temp);
humidityByHour.computeIfAbsent(hour, k -> new ArrayList<>()).add(hum);
}

// 插入聚合数据
String insertAggQuery = "INSERT INTO sensor_data_hourly " +
"(sensor_id, date, hour, avg_temperature, max_temperature, min_temperature, avg_humidity, sample_count) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)";

PreparedStatement aggStmt = session.prepare(insertAggQuery);

for (int hour = 0; hour < 24; hour++) {
List<Double> temps = temperatureByHour.get(hour);
List<Double> hums = humidityByHour.get(hour);

if (temps != null && !temps.isEmpty()) {
double avgTemp = temps.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
double maxTemp = temps.stream().mapToDouble(Double::doubleValue).max().orElse(0.0);
double minTemp = temps.stream().mapToDouble(Double::doubleValue).min().orElse(0.0);
double avgHum = hums.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);

BoundStatement aggBound = aggStmt.bind(
sensorId, date, hour, avgTemp, maxTemp, minTemp, avgHum, temps.size()
);

session.execute(aggBound);
}
}
}
}

案例3:计数器和轻量级事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 创建计数器表
CREATE TABLE page_views (
page_url TEXT,
date DATE,
view_count COUNTER,
PRIMARY KEY (page_url, date)
);

CREATE TABLE user_stats (
user_id UUID PRIMARY KEY,
login_count COUNTER,
post_count COUNTER,
like_count COUNTER
);

-- 创建轻量级事务表
CREATE TABLE user_locks (
user_id UUID PRIMARY KEY,
lock_holder TEXT,
lock_time TIMESTAMP
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 计数器操作
public class CounterService {
private Session session;

public void incrementPageView(String pageUrl, Date date) {
String query = "UPDATE page_views SET view_count = view_count + 1 WHERE page_url = ? AND date = ?";
PreparedStatement stmt = session.prepare(query);
BoundStatement bound = stmt.bind(pageUrl, date);
session.execute(bound);
}

public void batchIncrementCounters(UUID userId, int loginInc, int postInc, int likeInc) {
BatchStatement batch = new BatchStatement();

if (loginInc > 0) {
String loginQuery = "UPDATE user_stats SET login_count = login_count + ? WHERE user_id = ?";
PreparedStatement loginStmt = session.prepare(loginQuery);
batch.add(loginStmt.bind(loginInc, userId));
}

if (postInc > 0) {
String postQuery = "UPDATE user_stats SET post_count = post_count + ? WHERE user_id = ?";
PreparedStatement postStmt = session.prepare(postQuery);
batch.add(postStmt.bind(postInc, userId));
}

if (likeInc > 0) {
String likeQuery = "UPDATE user_stats SET like_count = like_count + ? WHERE user_id = ?";
PreparedStatement likeStmt = session.prepare(likeQuery);
batch.add(likeStmt.bind(likeInc, userId));
}

session.execute(batch);
}

// 轻量级事务示例
public boolean acquireLock(UUID userId, String lockHolder) {
String insertQuery = "INSERT INTO user_locks (user_id, lock_holder, lock_time) " +
"VALUES (?, ?, ?) IF NOT EXISTS";

PreparedStatement stmt = session.prepare(insertQuery);
BoundStatement bound = stmt.bind(userId, lockHolder, new Date());

ResultSet result = session.execute(bound);
return result.one().getBool("[applied]");
}

public boolean releaseLock(UUID userId, String lockHolder) {
String deleteQuery = "DELETE FROM user_locks WHERE user_id = ? IF lock_holder = ?";

PreparedStatement stmt = session.prepare(deleteQuery);
BoundStatement bound = stmt.bind(userId, lockHolder);

ResultSet result = session.execute(bound);
return result.one().getBool("[applied]");
}
}

案例4:集群配置和调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# cassandra.yaml 配置文件
cluster_name: 'Production Cluster'
num_tokens: 256
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours

# 网络配置
listen_address: 192.168.1.100
rpc_address: 0.0.0.0
broadcast_address: 192.168.1.100

# 种子节点
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "192.168.1.100,192.168.1.101,192.168.1.102"

# 数据存储
data_file_directories:
- /var/lib/cassandra/data

commitlog_directory: /var/lib/cassandra/commitlog
saved_caches_directory: /var/lib/cassandra/saved_caches

# 内存设置
memtable_allocation_type: heap_buffers
memtable_heap_space_in_mb: 2048
memtable_offheap_space_in_mb: 2048

# 压缩设置
column_index_size_in_kb: 64
compaction_throughput_mb_per_sec: 64
concurrent_compactors: 4

# 读写性能
concurrent_reads: 32
concurrent_writes: 32
concurrent_counter_writes: 32

# 缓存设置
key_cache_size_in_mb: 200
row_cache_size_in_mb: 200

# 网络超时
read_request_timeout_in_ms: 5000
write_request_timeout_in_ms: 2000
counter_write_request_timeout_in_ms: 5000

案例5:监控和运维

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash
# Cassandra集群监控脚本

# 检查节点状态
echo "=== 节点状态 ==="
nodetool status

# 检查集群信息
echo -e "\n=== 集群信息 ==="
nodetool info

# 检查压缩状态
echo -e "\n=== 压缩状态 ==="
nodetool compactionstats

# 检查线程池状态
echo -e "\n=== 线程池状态 ==="
nodetool tpstats

# 检查内存使用
echo -e "\n=== 内存使用 ==="
nodetool gcstats

# 检查磁盘使用
echo -e "\n=== 磁盘使用 ==="
du -sh /var/lib/cassandra/data/*

# 检查日志错误
echo -e "\n=== 最近错误 ==="
tail -100 /var/log/cassandra/system.log | grep ERROR | tail -10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// JMX监控
public class CassandraMonitor {
private MBeanServerConnection mBeanServer;

public void connectJMX(String host, int port) throws Exception {
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
JMXConnector connector = JMXConnectorFactory.connect(url);
mBeanServer = connector.getMBeanServerConnection();
}

public void getClusterMetrics() throws Exception {
// 获取存储服务指标
ObjectName storageService = new ObjectName("org.apache.cassandra.db:type=StorageService");

String[] liveNodes = (String[]) mBeanServer.getAttribute(storageService, "LiveNodes");
String[] unreachableNodes = (String[]) mBeanServer.getAttribute(storageService, "UnreachableNodes");

System.out.println("活跃节点: " + Arrays.toString(liveNodes));
System.out.println("不可达节点: " + Arrays.toString(unreachableNodes));

// 获取压缩指标
ObjectName compactionManager = new ObjectName("org.apache.cassandra.db:type=CompactionManager");
Long pendingTasks = (Long) mBeanServer.getAttribute(compactionManager, "PendingTasks");
System.out.println("待压缩任务: " + pendingTasks);

// 获取读写延迟
ObjectName readLatency = new ObjectName("org.apache.cassandra.db:type=StorageProxy,scope=Read,name=Latency");
Double readMean = (Double) mBeanServer.getAttribute(readLatency, "Mean");
System.out.println("平均读延迟: " + readMean + " 微秒");

ObjectName writeLatency = new ObjectName("org.apache.cassandra.db:type=StorageProxy,scope=Write,name=Latency");
Double writeMean = (Double) mBeanServer.getAttribute(writeLatency, "Mean");
System.out.println("平均写延迟: " + writeMean + " 微秒");
}
}

性能优化建议

1. 数据建模优化

1
2
3
4
5
6
7
8
9
-- 避免热点分区
CREATE TABLE user_events (
user_id UUID,
bucket INT, -- 添加分桶字段
event_time TIMESTAMP,
event_type TEXT,
data MAP<TEXT, TEXT>,
PRIMARY KEY ((user_id, bucket), event_time)
) WITH CLUSTERING ORDER BY (event_time DESC);

2. JVM调优

1
2
3
4
5
6
7
8
# jvm.options
-Xms8G
-Xmx8G
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap

3. 硬件配置建议

CPU: 16-32核心,支持高并发操作
内存: 32-64GB,用于缓存和JVM堆
存储: SSD,多块磁盘并行I/O
网络: 万兆以太网,减少跨节点延迟

Apache Cassandra作为高性能的分布式NoSQL数据库,其无中心架构和线性扩展能力使其成为处理大规模数据的理想选择。通过合理的数据建模和集群配置,Cassandra可以为企业提供高可用、高性能的数据存储服务。

版权所有,如有侵权请联系我