1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.snapshot;
19
20 import java.util.List;
21 import java.util.concurrent.Callable;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.errorhandling.ForeignException;
28 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
29 import org.apache.hadoop.hbase.procedure.ProcedureMember;
30 import org.apache.hadoop.hbase.procedure.Subprocedure;
31 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
32 import org.apache.hadoop.hbase.regionserver.HRegion;
33 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
34 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
35
36
37
38
39
40
41
42
43 @InterfaceAudience.Private
44 @InterfaceStability.Unstable
45 public class FlushSnapshotSubprocedure extends Subprocedure {
46 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
47
48 private final List<HRegion> regions;
49 private final SnapshotDescription snapshot;
50 private final SnapshotSubprocedurePool taskManager;
51 private boolean snapshotSkipFlush = false;
52
53 public FlushSnapshotSubprocedure(ProcedureMember member,
54 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
55 List<HRegion> regions, SnapshotDescription snapshot,
56 SnapshotSubprocedurePool taskManager) {
57 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
58 this.snapshot = snapshot;
59
60 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
61 snapshotSkipFlush = true;
62 }
63 this.regions = regions;
64 this.taskManager = taskManager;
65 }
66
67
68
69
70 private class RegionSnapshotTask implements Callable<Void> {
71 HRegion region;
72 RegionSnapshotTask(HRegion region) {
73 this.region = region;
74 }
75
76 @Override
77 public Void call() throws Exception {
78
79
80
81
82
83 LOG.debug("Starting region operation on " + region);
84 region.startRegionOperation();
85 try {
86 if (snapshotSkipFlush) {
87
88
89
90
91
92
93
94 LOG.debug("take snapshot without flush memstore first");
95 } else {
96 LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
97 region.flushcache();
98 }
99 region.addRegionToSnapshot(snapshot, monitor);
100 if (snapshotSkipFlush) {
101 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
102 } else {
103 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
104 }
105 } finally {
106 LOG.debug("Closing region operation on " + region);
107 region.closeRegionOperation();
108 }
109 return null;
110 }
111 }
112
113 private void flushSnapshot() throws ForeignException {
114 if (regions.isEmpty()) {
115
116 return;
117 }
118
119 monitor.rethrowException();
120
121
122 if (taskManager.hasTasks()) {
123 throw new IllegalStateException("Attempting to take snapshot "
124 + ClientSnapshotDescriptionUtils.toString(snapshot)
125 + " but we currently have outstanding tasks");
126 }
127
128
129 for (HRegion region : regions) {
130
131 taskManager.submitTask(new RegionSnapshotTask(region));
132 monitor.rethrowException();
133 }
134
135
136 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
137 try {
138 taskManager.waitForOutstandingTasks();
139 } catch (InterruptedException e) {
140 throw new ForeignException(getMemberName(), e);
141 }
142 }
143
144
145
146
147 @Override
148 public void acquireBarrier() throws ForeignException {
149
150 }
151
152
153
154
155 @Override
156 public void insideBarrier() throws ForeignException {
157 flushSnapshot();
158 }
159
160
161
162
163 @Override
164 public void cleanup(Exception e) {
165 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
166 + snapshot.getName() + "' due to error", e);
167 try {
168 taskManager.cancelTasks();
169 } catch (InterruptedException e1) {
170 Thread.currentThread().interrupt();
171 }
172 }
173
174
175
176
177 public void releaseBarrier() {
178
179 }
180
181 }