1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.SortedSet;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
39 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
40 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
41
42
43
44
45
46 @InterfaceAudience.Private
47 abstract class StoreFlusher {
48 protected Configuration conf;
49 protected Store store;
50
51 public StoreFlusher(Configuration conf, Store store) {
52 this.conf = conf;
53 this.store = store;
54 }
55
56
57
58
59
60
61
62
63
64
65
66 public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
67 TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
68 throws IOException;
69
70 protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
71 MonitoredTask status) throws IOException {
72
73
74
75 status.setStatus("Flushing " + store + ": appending metadata");
76 writer.appendMetadata(cacheFlushSeqNum, false);
77 status.setStatus("Flushing " + store + ": closing flushed file");
78 writer.close();
79 }
80
81
82
83
84
85
86 protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
87 long smallestReadPoint) throws IOException {
88 KeyValueScanner memstoreScanner =
89 new CollectionBackedScanner(snapshot, store.getComparator());
90 InternalScanner scanner = null;
91 if (store.getCoprocessorHost() != null) {
92 scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
93 }
94 if (scanner == null) {
95 Scan scan = new Scan();
96 scan.setMaxVersions(store.getScanInfo().getMaxVersions());
97 scanner = new StoreScanner(store, store.getScanInfo(), scan,
98 Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
99 smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
100 }
101 assert scanner != null;
102 if (store.getCoprocessorHost() != null) {
103 try {
104 return store.getCoprocessorHost().preFlush(store, scanner);
105 } catch (IOException ioe) {
106 scanner.close();
107 throw ioe;
108 }
109 }
110 return scanner;
111 }
112
113
114
115
116
117
118
119
120 protected long performFlush(InternalScanner scanner,
121 Compactor.CellSink sink, long smallestReadPoint) throws IOException {
122 int compactionKVMax =
123 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
124 List<Cell> kvs = new ArrayList<Cell>();
125 boolean hasMore;
126 long flushed = 0;
127 do {
128 hasMore = scanner.next(kvs, compactionKVMax);
129 if (!kvs.isEmpty()) {
130 for (Cell c : kvs) {
131
132
133
134 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
135 if (kv.getMvccVersion() <= smallestReadPoint) {
136
137
138 kv = kv.shallowCopy();
139 kv.setMvccVersion(0);
140 }
141 sink.append(kv);
142 flushed += MemStore.heapSizeChange(kv, true);
143 }
144 kvs.clear();
145 }
146 } while (hasMore);
147 return flushed;
148 }
149 }