HBase客户端的写缓存BufferedMutator

客户端的写缓存

HBase的每一个put操作实际上是一个RPC操作,将客户端的数据传输到服务器再返回结果,这只适用于小数据量的操作,如果数据量多的话,每次put都需要建立一次RPC的连接(TCP连接),而建立连接传输数据是需要时间的,因此减少RPC的调用可以提高数据传输的效率,减少建立连接的时间和IO消耗。

HBase的客户端API提供了写缓存区,put的数据一开始放在缓存区内,当数量到达指定的容量或者用户强制提交是才将数据一次性提交到HBase的服务器。这个缓冲区可以通过调用 HTable.setAutoFlush(false) 来开启。而新版HBbase的API中使用了BufferedMutator替换了老版的缓冲区,通过BufferedMutator对象提交的数据自动存放在缓冲区中。

BufferedMutator

BufferedMutator通过mutate方法提交数据,flush方法可以强制刷新缓冲区提交数据,在执行close方法之前也会刷新缓冲区。

BufferedMutator是通过设定BufferedMutator.ExceptionListener监听器来异步处理异常,重写onException来实现异常处理,该监听器用来监听接受服务器端发送回来的错误消息。

用户可以通过设定BufferedMutatorParams的来定制符合要求的BufferedMutator。比如缓冲区的大小通过BufferedMutatorParams中的writeBufferSize方法设置(缓冲区的大小也可以通过配置文件的 hbase.client.write.buffer设置,值为long类型,单位为byte),异常监听器也是在BufferedMutatorParams中设置。

官方例子分析

官方给出了BufferedMutator的使用例子,通过分析代码可以了解到BufferedMutator的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class BufferedMutatorExample extends Configured implements Tool {

private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorExample.class);

//线程池的大小
private static final int POOL_SIZE = 10;
//线程的数量
private static final int TASK_COUNT = 100;
private static final TableName TABLE = TableName.valueOf("tanle_name");
private static final byte[] FAMILY = Bytes.toBytes("f");

@Override
public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

//异常监听器的建立,当写入异常是触发该监听器
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i++) {
LOG.info("Failed to sent put " + e.getRow(i) + ".");
}
}
};
//创建BufferedMutatorParams对象,设置监听器
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
.listener(listener);
//可以改动缓冲区的大小,如下面设置缓冲区大小为4M
params.writeBufferSize(4*1023*1024);
/** 创建连接和根据BufferedMutatorParams创建BufferedMutator
* 这里用到java 7的新特性 try-with-resources
*/
try (final Connection conn = ConnectionFactory.createConnection(getConf());
final BufferedMutator mutator = conn.getBufferedMutator(params)) {

/** 线程池的建立,运行线程不断put数据 */
final AtomicInteger count = new AtomicInteger(1);
final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE);
List<Future<Void>> futures = new ArrayList<>(TASK_COUNT);
for (int i = 0; i < TASK_COUNT; i++) {
futures.add(workerPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Integer value = count.getAndIncrement();
Put p = new Put(Bytes.toBytes("task " + value));
p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("task " + value + " info"));
mutator.mutate(p);
return null;
}
}));
}

// 结束线程和线程池
for (Future<Void> f : futures) {
f.get(5, TimeUnit.MINUTES);
}
workerPool.shutdown();
} catch (IOException e) {
LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
}
return 0;
}

public static void main(String[] args) throws Exception {
ToolRunner.run(new BufferedMutatorExample(), args);
}
}

上面的官方例子就是启动线程不断提交数据,BufferedMutator中缓冲区可以避免频繁的调用RPC,在批处理数据时及其重要,并且BufferedMutator的mutate操作是异步的,所以不会产生阻塞,这在Map-Reduce作业有很好的使用,BufferedMutator接收来自MR作业的puts,异步的批量提交数据,不影响MR作业的运行。

错误处理

当通过BufferedMutator批量提交发生错误时触发绑定的BufferedMutator.ExceptionListener监听器实例的onException方法,其中RetriesExhaustedWithDetailsException记录了发生错误的内容及其提交的错误内容等信息,而其余正确的提交的内容则会正确放入HBase表中。

对于提交内容的检查分为客户端的检查和服务器端的检查。

  • 当客户端检查到提交的内容出错(比如Put未添加内容或者未指定列),会抛出客户端的错误,这样错误不会RetriesExhaustedWithDetailsException监听器接受,被其运行的线程会因错误而终止,则在该Put之后的内容都不会提交。
  • 当服务端检查到提交的内容出错(比如指定的列簇不存在),会向客户端传输错误,而这错误会被RetriesExhaustedWithDetailsException监听器接受,不会对后续提交的数据产生影响。

Table的put,get,delete方法提交一个(put,get,delete)列表操作,其中有错误的内容时,也分客户端的检查和服务器端的检查(不同操作检查内容不同),在客户端检查出错会抛出异常终止程序,服务端异常时会传输会错误信息,但是其余正确的操作已将提交到服务端并被正确执行。

try-with-resources的特性不了解可以参考 https://www.jianshu.com/p/3ab87269140c

0%