1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ThreadPoolExecutor;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.ServerName;
28 import org.apache.hadoop.hbase.errorhandling.ForeignException;
29 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
30 import org.apache.hadoop.hbase.executor.ExecutorService;
31 import org.apache.hadoop.hbase.master.MasterServices;
32 import org.apache.hadoop.hbase.master.MetricsMaster;
33 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
34 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
35 import org.apache.zookeeper.KeeperException;
36
37 public class SimpleMasterProcedureManager extends MasterProcedureManager {
38
39 public static final String SIMPLE_SIGNATURE = "simle_test";
40
41 private static final Log LOG = LogFactory.getLog(SimpleMasterProcedureManager.class);
42
43 private MasterServices master;
44 private ProcedureCoordinator coordinator;
45 private ExecutorService executorService;
46
47 private boolean done;
48
49 @Override
50 public void stop(String why) {
51 LOG.info("stop: " + why);
52 }
53
54 @Override
55 public boolean isStopped() {
56 return false;
57 }
58
59 @Override
60 public void initialize(MasterServices master, MetricsMaster metricsMaster)
61 throws KeeperException, IOException, UnsupportedOperationException {
62 this.master = master;
63 this.done = false;
64
65
66 String name = master.getServerName().toString();
67 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
68 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
69 master.getZooKeeper(), getProcedureSignature(), name);
70
71 this.coordinator = new ProcedureCoordinator(comms, tpool);
72 this.executorService = master.getExecutorService();
73 }
74
75 @Override
76 public String getProcedureSignature() {
77 return SIMPLE_SIGNATURE;
78 }
79
80 @Override
81 public void execProcedure(ProcedureDescription desc) throws IOException {
82 this.done = false;
83
84 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
85
86 List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
87 List<String> servers = new ArrayList<String>();
88 for (ServerName sn : serverNames) {
89 servers.add(sn.toString());
90 }
91 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
92 if (proc == null) {
93 String msg = "Failed to submit distributed procedure for '"
94 + getProcedureSignature() + "'";
95 LOG.error(msg);
96 throw new HBaseSnapshotException(msg);
97 }
98
99 try {
100
101
102 proc.waitForCompleted();
103 LOG.info("Done waiting - exec procedure for " + desc.getInstance());
104 this.done = true;
105 } catch (InterruptedException e) {
106 ForeignException ee =
107 new ForeignException("Interrupted while waiting for procdure to finish", e);
108 monitor.receive(ee);
109 Thread.currentThread().interrupt();
110 } catch (ForeignException e) {
111 monitor.receive(e);
112 }
113 }
114
115 @Override
116 public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
117 return done;
118 }
119
120 }