View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.Abortable;
33  import org.apache.hadoop.hbase.exceptions.DeserializationException;
34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
39  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40  import org.apache.zookeeper.KeeperException;
41  import org.apache.zookeeper.KeeperException.NodeExistsException;
42  
43  import com.google.protobuf.InvalidProtocolBufferException;
44  
45  @InterfaceAudience.Private
46  public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
47    private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
48  
49    private final ReplicationPeerConfig peerConfig;
50    private final String id;
51    private volatile PeerState peerState;
52    private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
53    private final Configuration conf;
54  
55    private PeerStateTracker peerStateTracker;
56    private TableCFsTracker tableCFsTracker;
57  
58    /**
59     * Constructor that takes all the objects required to communicate with the
60     * specified peer, except for the region server addresses.
61     * @param conf configuration object to this peer
62     * @param id string representation of this peer's identifier
63     * @param peerConfig configuration for the replication peer
64     */
65    public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
66        throws ReplicationException {
67      this.conf = conf;
68      this.peerConfig = peerConfig;
69      this.id = id;
70    }
71  
72    /**
73     * start a state tracker to check whether this peer is enabled or not
74     *
75     * @param zookeeper zk watcher for the local cluster
76     * @param peerStateNode path to zk node which stores peer state
77     * @throws KeeperException
78     */
79    public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
80        throws KeeperException {
81      ensurePeerEnabled(zookeeper, peerStateNode);
82      this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
83      this.peerStateTracker.start();
84      try {
85        this.readPeerStateZnode();
86      } catch (DeserializationException e) {
87        throw ZKUtil.convert(e);
88      }
89    }
90  
91    private void readPeerStateZnode() throws DeserializationException {
92      this.peerState =
93          isStateEnabled(this.peerStateTracker.getData(false))
94            ? PeerState.ENABLED
95            : PeerState.DISABLED;
96    }
97  
98    /**
99     * start a table-cfs tracker to listen the (table, cf-list) map change
100    *
101    * @param zookeeper zk watcher for the local cluster
102    * @param tableCFsNode path to zk node which stores table-cfs
103    * @throws KeeperException
104    */
105   public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
106     throws KeeperException {
107     this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
108         this);
109     this.tableCFsTracker.start();
110     this.readTableCFsZnode();
111   }
112 
113   static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
114     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
115       return null;
116     }
117 
118     Map<String, List<String>> tableCFsMap = null;
119     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
120     // parse out (table, cf-list) pairs from tableCFsConfig
121     // format: "table1:cf1,cf2;table2:cfA,cfB"
122     String[] tables = tableCFsConfig.split(";");
123     for (String tab : tables) {
124       // 1 ignore empty table config
125       tab = tab.trim();
126       if (tab.length() == 0) {
127         continue;
128       }
129       // 2 split to "table" and "cf1,cf2"
130       //   for each table: "table:cf1,cf2" or "table"
131       String[] pair = tab.split(":");
132       String tabName = pair[0].trim();
133       if (pair.length > 2 || tabName.length() == 0) {
134         LOG.error("ignore invalid tableCFs setting: " + tab);
135         continue;
136       }
137 
138       // 3 parse "cf1,cf2" part to List<cf>
139       List<String> cfs = null;
140       if (pair.length == 2) {
141         String[] cfsList = pair[1].split(",");
142         for (String cf : cfsList) {
143           String cfName = cf.trim();
144           if (cfName.length() > 0) {
145             if (cfs == null) {
146               cfs = new ArrayList<String>();
147             }
148             cfs.add(cfName);
149           }
150         }
151       }
152 
153       // 4 put <table, List<cf>> to map
154       if (tableCFsMap == null) {
155         tableCFsMap = new HashMap<String, List<String>>();
156       }
157       tableCFsMap.put(tabName, cfs);
158     }
159 
160     return tableCFsMap;
161   }
162 
163   private void readTableCFsZnode() {
164     String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
165     this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
166   }
167 
168   @Override
169   public PeerState getPeerState() {
170     return peerState;
171   }
172 
173   /**
174    * Get the identifier of this peer
175    * @return string representation of the id (short)
176    */
177   @Override
178   public String getId() {
179     return id;
180   }
181 
182   /**
183    * Get the peer config object
184    * @return the ReplicationPeerConfig for this peer
185    */
186   @Override
187   public ReplicationPeerConfig getPeerConfig() {
188     return peerConfig;
189   }
190 
191   /**
192    * Get the configuration object required to communicate with this peer
193    * @return configuration object
194    */
195   @Override
196   public Configuration getConfiguration() {
197     return conf;
198   }
199 
200   /**
201    * Get replicable (table, cf-list) map of this peer
202    * @return the replicable (table, cf-list) map
203    */
204   @Override
205   public Map<String, List<String>> getTableCFs() {
206     return this.tableCFs;
207   }
208 
209   @Override
210   public void abort(String why, Throwable e) {
211     LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
212         + " was aborted for the following reason(s):" + why, e);
213   }
214 
215   @Override
216   public boolean isAborted() {
217     // Currently the replication peer is never "Aborted", we just log when the
218     // abort method is called.
219     return false;
220   }
221 
222   @Override
223   public void close() throws IOException {
224     // TODO: stop zkw?
225   }
226 
227   /**
228    * Parse the raw data from ZK to get a peer's state
229    * @param bytes raw ZK data
230    * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
231    * @throws DeserializationException
232    */
233   public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
234     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
235     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
236   }
237 
238   /**
239    * @param bytes Content of a state znode.
240    * @return State parsed from the passed bytes.
241    * @throws DeserializationException
242    */
243   private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
244       throws DeserializationException {
245     ProtobufUtil.expectPBMagicPrefix(bytes);
246     int pblen = ProtobufUtil.lengthOfPBMagic();
247     ZooKeeperProtos.ReplicationState.Builder builder =
248         ZooKeeperProtos.ReplicationState.newBuilder();
249     ZooKeeperProtos.ReplicationState state;
250     try {
251       state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
252       return state.getState();
253     } catch (InvalidProtocolBufferException e) {
254       throw new DeserializationException(e);
255     }
256   }
257 
258   /**
259    * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
260    * @param zookeeper
261    * @param path Path to znode to check
262    * @return True if we created the znode.
263    * @throws NodeExistsException
264    * @throws KeeperException
265    */
266   private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
267       throws NodeExistsException, KeeperException {
268     if (ZKUtil.checkExists(zookeeper, path) == -1) {
269       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
270       // peer-state znode. This happens while adding a peer.
271       // The peer state data is set as "ENABLED" by default.
272       ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
273         ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
274       return true;
275     }
276     return false;
277   }
278 
279   /**
280    * Tracker for state of this peer
281    */
282   public class PeerStateTracker extends ZooKeeperNodeTracker {
283 
284     public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
285         Abortable abortable) {
286       super(watcher, peerStateZNode, abortable);
287     }
288 
289     @Override
290     public synchronized void nodeDataChanged(String path) {
291       if (path.equals(node)) {
292         super.nodeDataChanged(path);
293         try {
294           readPeerStateZnode();
295         } catch (DeserializationException e) {
296           LOG.warn("Failed deserializing the content of " + path, e);
297         }
298       }
299     }
300   }
301 
302   /**
303    * Tracker for (table, cf-list) map of this peer
304    */
305   public class TableCFsTracker extends ZooKeeperNodeTracker {
306 
307     public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
308         Abortable abortable) {
309       super(watcher, tableCFsZNode, abortable);
310     }
311 
312     @Override
313     public synchronized void nodeCreated(String path) {
314       if (path.equals(node)) {
315         super.nodeCreated(path);
316         readTableCFsZnode();
317       }
318     }
319 
320     @Override
321     public synchronized void nodeDataChanged(String path) {
322       if (path.equals(node)) {
323         super.nodeDataChanged(path);
324       }
325     }
326   }
327 }