1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
| import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.conf.Configuration;
public class HBaseExample { private static Configuration config = HBaseConfiguration.create(); private static Connection connection; static { try { config.set("hbase.zookeeper.quorum", "localhost:2181"); config.set("hbase.zookeeper.property.clientPort", "2181"); connection = ConnectionFactory.createConnection(config); } catch (Exception e) { e.printStackTrace(); } } public static void createTable(String tableName, String[] columnFamilies) throws Exception { Admin admin = connection.getAdmin(); TableName table = TableName.valueOf(tableName); if (admin.tableExists(table)) { System.out.println("表 " + tableName + " 已存在"); admin.close(); return; } TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table); for (String cf : columnFamilies) { ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder .newBuilder(Bytes.toBytes(cf)) .setMaxVersions(3) .setTimeToLive(86400) .setCompressionType(Compression.Algorithm.SNAPPY) .build(); tableBuilder.setColumnFamily(cfDesc); } admin.createTable(tableBuilder.build()); System.out.println("表 " + tableName + " 创建成功"); admin.close(); } public static void putData(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); table.put(put); System.out.println("数据插入成功: " + rowKey + " -> " + columnFamily + ":" + column + " = " + value); table.close(); } public static void batchPutData(String tableName, List<UserData> userData) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); List<Put> puts = new ArrayList<>(); for (UserData user : userData) { Put put = new Put(Bytes.toBytes(user.getUserId())); put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes(user.getName())); put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("email"), Bytes.toBytes(user.getEmail())); put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes(user.getAge())); if (user.getPreferences() != null) { for (Map.Entry<String, String> pref : user.getPreferences().entrySet()) { put.addColumn(Bytes.toBytes("preferences"), Bytes.toBytes(pref.getKey()), Bytes.toBytes(pref.getValue())); } } puts.add(put); } table.put(puts); System.out.println("批量插入 " + puts.size() + " 条用户数据"); table.close(); } public static void getData(String tableName, String rowKey) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); if (!result.isEmpty()) { System.out.println("行键: " + rowKey); for (Cell cell : result.listCells()) { String cf = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); long timestamp = cell.getTimestamp(); System.out.printf(" %s:%s = %s (时间戳: %d)%n", cf, column, value, timestamp); } } else { System.out.println("没有找到行键为 " + rowKey + " 的数据"); } table.close(); } public static void scanData(String tableName, String startRow, String endRow) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); if (startRow != null) scan.withStartRow(Bytes.toBytes(startRow)); if (endRow != null) scan.withStopRow(Bytes.toBytes(endRow)); scan.setCaching(1000); scan.setBatch(100); ResultScanner scanner = table.getScanner(scan); int count = 0; for (Result result : scanner) { String rowKey = Bytes.toString(result.getRow()); System.out.println("行键: " + rowKey); for (Cell cell : result.listCells()) { String cf = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.printf(" %s:%s = %s%n", cf, column, value); } count++; if (count >= 10) break; } scanner.close(); table.close(); System.out.println("扫描完成,共 " + count + " 行数据"); } public static void deleteData(String tableName, String rowKey) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); System.out.println("删除行键 " + rowKey + " 的数据"); table.close(); } }
|