1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import com.fasterxml.jackson.databind.ObjectMapper;
public class ServiceDiscovery implements Watcher { private ZooKeeper zk; private String servicePath = "/services"; private ObjectMapper objectMapper = new ObjectMapper(); private Map<String, List<ServiceInstance>> serviceCache = new ConcurrentHashMap<>(); public static class ServiceInstance { private String host; private int port; private Map<String, String> metadata; public ServiceInstance() {} public ServiceInstance(String host, int port) { this.host = host; this.port = port; this.metadata = new HashMap<>(); } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public Map<String, String> getMetadata() { return metadata; } public void setMetadata(Map<String, String> metadata) { this.metadata = metadata; } @Override public String toString() { return host + ":" + port; } } public ServiceDiscovery(String connectString) throws Exception { zk = new ZooKeeper(connectString, 5000, this); if (zk.exists(servicePath, false) == null) { zk.create(servicePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } public void registerService(String serviceName, ServiceInstance instance) throws Exception { String serviceTypePath = servicePath + "/" + serviceName; if (zk.exists(serviceTypePath, false) == null) { zk.create(serviceTypePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } String instancePath = serviceTypePath + "/" + instance.getHost() + ":" + instance.getPort() + "-"; byte[] data = objectMapper.writeValueAsBytes(instance); String actualPath = zk.create(instancePath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("服务注册成功: " + serviceName + " -> " + actualPath); } public List<ServiceInstance> discoverServices(String serviceName) throws Exception { String serviceTypePath = servicePath + "/" + serviceName; if (zk.exists(serviceTypePath, false) == null) { return Collections.emptyList(); } List<String> children = zk.getChildren(serviceTypePath, true); List<ServiceInstance> instances = new ArrayList<>(); for (String child : children) { String childPath = serviceTypePath + "/" + child; byte[] data = zk.getData(childPath, false, null); if (data != null) { ServiceInstance instance = objectMapper.readValue(data, ServiceInstance.class); instances.add(instance); } } serviceCache.put(serviceName, instances); return instances; } public ServiceInstance selectInstance(String serviceName, LoadBalancer loadBalancer) throws Exception { List<ServiceInstance> instances = serviceCache.get(serviceName); if (instances == null || instances.isEmpty()) { instances = discoverServices(serviceName); } if (instances.isEmpty()) { throw new RuntimeException("没有可用的服务实例: " + serviceName); } return loadBalancer.select(instances); } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { String path = event.getPath(); String serviceName = path.substring(servicePath.length() + 1); try { discoverServices(serviceName); System.out.println("检测到服务变更: " + serviceName); } catch (Exception e) { e.printStackTrace(); } } } public interface LoadBalancer { ServiceInstance select(List<ServiceInstance> instances); } public static class RoundRobinLoadBalancer implements LoadBalancer { private AtomicInteger counter = new AtomicInteger(0); @Override public ServiceInstance select(List<ServiceInstance> instances) { int index = counter.getAndIncrement() % instances.size(); return instances.get(index); } } public static void main(String[] args) throws Exception { ServiceDiscovery discovery = new ServiceDiscovery("localhost:2181"); ServiceInstance instance1 = new ServiceInstance("192.168.1.10", 8080); instance1.getMetadata().put("version", "1.0"); discovery.registerService("user-service", instance1); ServiceInstance instance2 = new ServiceInstance("192.168.1.11", 8080); instance2.getMetadata().put("version", "1.0"); discovery.registerService("user-service", instance2); Thread.sleep(1000); List<ServiceInstance> instances = discovery.discoverServices("user-service"); System.out.println("发现服务实例: " + instances); LoadBalancer loadBalancer = new RoundRobinLoadBalancer(); for (int i = 0; i < 5; i++) { ServiceInstance selected = discovery.selectInstance("user-service", loadBalancer); System.out.println("选择的服务实例: " + selected); } } }
|