001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URISyntaxException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.permission.FsPermission;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.oozie.action.ActionExecutor;
036import org.apache.oozie.action.ActionExecutorException;
037import org.apache.oozie.client.WorkflowAction;
038import org.apache.oozie.service.HadoopAccessorException;
039import org.apache.oozie.service.HadoopAccessorService;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.util.XConfiguration;
042import org.apache.oozie.util.XmlUtils;
043import org.jdom.Element;
044
045/**
046 * File system action executor. <p/> This executes the file system mkdir, move and delete commands
047 */
048public class FsActionExecutor extends ActionExecutor {
049
050    public FsActionExecutor() {
051        super("fs");
052    }
053
054    Path getPath(Element element, String attribute) {
055        String str = element.getAttributeValue(attribute).trim();
056        return new Path(str);
057    }
058
059    void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
060        try {
061            String scheme = path.toUri().getScheme();
062            if (withScheme) {
063                if (scheme == null) {
064                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
065                                                      "Missing scheme in path [{0}]", path);
066                }
067                else {
068                    Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
069                }
070            }
071            else {
072                if (scheme != null) {
073                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
074                                                      "Scheme [{0}] not allowed in path [{1}]", scheme, path);
075                }
076            }
077        }
078        catch (HadoopAccessorException hex) {
079            throw convertException(hex);
080        }
081    }
082
083    Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
084        Path fullPath;
085
086        // If no nameNode is given, validate the path as-is and return it as-is
087        if (nameNode == null) {
088            validatePath(path, withScheme);
089            fullPath = path;
090        } else {
091            // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
092            String pathScheme = path.toUri().getScheme();
093            String pathAuthority = path.toUri().getAuthority();
094            if (pathScheme == null || pathAuthority == null) {
095                if (path.isAbsolute()) {
096                    String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
097                    fullPath = new Path(nameNodeSchemeAuthority + path.toString());
098                } else {
099                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
100                            "Path [{0}] cannot be relative", path);
101                }
102            } else {
103                // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
104                // If it is the nameNode, then it should have already been verified earlier so return it as-is
105                if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
106                    validatePath(path, withScheme);
107                }
108                fullPath = path;
109            }
110        }
111        return fullPath;
112    }
113
114    void validateSameNN(Path source, Path dest) throws ActionExecutorException {
115        Path destPath = new Path(source, dest);
116        String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
117        String s = source.toUri().getScheme() + source.toUri().getAuthority();
118
119        //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
120        if(!t.equals(s)) {
121            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
122                    "move, target NN URI different from that of source", dest);
123        }
124    }
125
126    @SuppressWarnings("unchecked")
127    void doOperations(Context context, Element element) throws ActionExecutorException {
128        try {
129            FileSystem fs = context.getAppFileSystem();
130            boolean recovery = fs.exists(getRecoveryPath(context));
131            if (!recovery) {
132                fs.mkdirs(getRecoveryPath(context));
133            }
134
135            Path nameNodePath = null;
136            Element nameNodeElement = element.getChild("name-node", element.getNamespace());
137            if (nameNodeElement != null) {
138                String nameNode = nameNodeElement.getTextTrim();
139                if (nameNode != null) {
140                    nameNodePath = new Path(nameNode);
141                    // Verify the name node now
142                    validatePath(nameNodePath, true);
143                }
144            }
145
146            XConfiguration fsConf = new XConfiguration();
147            Path appPath = new Path(context.getWorkflow().getAppPath());
148            // app path could be a file
149            if (fs.isFile(appPath)) {
150                appPath = appPath.getParent();
151            }
152            JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
153
154            for (Element commandElement : (List<Element>) element.getChildren()) {
155                String command = commandElement.getName();
156                if (command.equals("mkdir")) {
157                    Path path = getPath(commandElement, "path");
158                    mkdir(context, fsConf, nameNodePath, path);
159                }
160                else {
161                    if (command.equals("delete")) {
162                        Path path = getPath(commandElement, "path");
163                        delete(context, fsConf, nameNodePath, path);
164                    }
165                    else {
166                        if (command.equals("move")) {
167                            Path source = getPath(commandElement, "source");
168                            Path target = getPath(commandElement, "target");
169                            move(context, fsConf, nameNodePath, source, target, recovery);
170                        }
171                        else {
172                            if (command.equals("chmod")) {
173                                Path path = getPath(commandElement, "path");
174                                boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
175                                String str = commandElement.getAttributeValue("dir-files");
176                                boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
177                                String permissionsMask = commandElement.getAttributeValue("permissions").trim();
178                                chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive);
179                            }
180                            else {
181                                if (command.equals("touchz")) {
182                                    Path path = getPath(commandElement, "path");
183                                    touchz(context, fsConf, nameNodePath, path);
184                                }
185                                else {
186                                    if (command.equals("chgrp")) {
187                                        Path path = getPath(commandElement, "path");
188                                        boolean recursive = commandElement.getChild("recursive",
189                                                commandElement.getNamespace()) != null;
190                                        String group = commandElement.getAttributeValue("group");
191                                        String str = commandElement.getAttributeValue("dir-files");
192                                        boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
193                                        chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
194                                                group, dirFiles, recursive);
195                                    }
196                                }
197                            }
198                        }
199                    }
200                }
201            }
202        }
203        catch (Exception ex) {
204            throw convertException(ex);
205        }
206    }
207
208    void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
209            boolean dirFiles, boolean recursive) throws ActionExecutorException {
210
211        HashMap<String, String> argsMap = new HashMap<String, String>();
212        argsMap.put("user", user);
213        argsMap.put("group", group);
214        try {
215            FileSystem fs = getFileSystemFor(path, context, fsConf);
216            recursiveFsOperation("chgrp", fs, nameNodePath, path, argsMap, dirFiles, recursive, true);
217        }
218        catch (Exception ex) {
219            throw convertException(ex);
220        }
221    }
222
223    private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
224            Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
225            throws ActionExecutorException {
226
227        try {
228            path = resolveToFullPath(nameNodePath, path, true);
229            if (!fs.exists(path)) {
230                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", op
231                        + ", path [{0}] does not exist", path);
232            }
233            FileStatus pathStatus = fs.getFileStatus(path);
234            List<Path> paths = new ArrayList<Path>();
235
236            if (dirFiles && pathStatus.isDir()) {
237                if (isRoot) {
238                    paths.add(path);
239                }
240                FileStatus[] filesStatus = fs.listStatus(path);
241                for (int i = 0; i < filesStatus.length; i++) {
242                    Path p = filesStatus[i].getPath();
243                    paths.add(p);
244                    if (recursive && filesStatus[i].isDir()) {
245                        recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
246                    }
247                }
248            }
249            else {
250                paths.add(path);
251            }
252            for (Path p : paths) {
253                doFsOperation(op, fs, p, argsMap);
254            }
255        }
256        catch (Exception ex) {
257            throw convertException(ex);
258        }
259    }
260
261    private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
262            throws ActionExecutorException, IOException {
263        if (op.equals("chmod")) {
264            String permissions = argsMap.get("permissions");
265            FsPermission newFsPermission = createShortPermission(permissions, p);
266            fs.setPermission(p, newFsPermission);
267        }
268        else if (op.equals("chgrp")) {
269            String user = argsMap.get("user");
270            String group = argsMap.get("group");
271            fs.setOwner(p, user, group);
272        }
273    }
274
275    /**
276     * @param path
277     * @param context
278     * @param fsConf
279     * @return FileSystem
280     * @throws HadoopAccessorException
281     */
282    private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
283        String user = context.getWorkflow().getUser();
284        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
285        JobConf conf = has.createJobConf(path.toUri().getAuthority());
286        XConfiguration.copy(context.getProtoActionConf(), conf);
287        if (fsConf != null) {
288            XConfiguration.copy(fsConf, conf);
289        }
290        return has.createFileSystem(user, path.toUri(), conf);
291    }
292
293    /**
294     * @param path
295     * @param user
296     * @param group
297     * @return FileSystem
298     * @throws HadoopAccessorException
299     */
300    private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
301        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
302        JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
303        return has.createFileSystem(user, path.toUri(), jobConf);
304    }
305
306    void mkdir(Context context, Path path) throws ActionExecutorException {
307        mkdir(context, null, null, path);
308    }
309
310    void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
311        try {
312            path = resolveToFullPath(nameNodePath, path, true);
313            FileSystem fs = getFileSystemFor(path, context, fsConf);
314
315            if (!fs.exists(path)) {
316                if (!fs.mkdirs(path)) {
317                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
318                                                      "mkdir, path [{0}] could not create directory", path);
319                }
320            }
321        }
322        catch (Exception ex) {
323            throw convertException(ex);
324        }
325    }
326
327    /**
328     * Delete path
329     *
330     * @param context
331     * @param path
332     * @throws ActionExecutorException
333     */
334    public void delete(Context context, Path path) throws ActionExecutorException {
335        delete(context, null, null, path);
336    }
337
338    /**
339     * Delete path
340     *
341     * @param context
342     * @param fsConf
343     * @param nameNodePath
344     * @param path
345     * @throws ActionExecutorException
346     */
347    public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
348        try {
349            path = resolveToFullPath(nameNodePath, path, true);
350            FileSystem fs = getFileSystemFor(path, context, fsConf);
351
352            if (fs.exists(path)) {
353                if (!fs.delete(path, true)) {
354                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
355                                                      "delete, path [{0}] could not delete path", path);
356                }
357            }
358        }
359        catch (Exception ex) {
360            throw convertException(ex);
361        }
362    }
363
364    /**
365     * Delete path
366     *
367     * @param user
368     * @param group
369     * @param path
370     * @throws ActionExecutorException
371     */
372    public void delete(String user, String group, Path path) throws ActionExecutorException {
373        try {
374            validatePath(path, true);
375            FileSystem fs = getFileSystemFor(path, user);
376
377            if (fs.exists(path)) {
378                if (!fs.delete(path, true)) {
379                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
380                            "delete, path [{0}] could not delete path", path);
381                }
382            }
383        }
384        catch (Exception ex) {
385            throw convertException(ex);
386        }
387    }
388
389    /**
390     * Move source to target
391     *
392     * @param context
393     * @param source
394     * @param target
395     * @param recovery
396     * @throws ActionExecutorException
397     */
398    public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
399        move(context, null, null, source, target, recovery);
400    }
401
402    /**
403     * Move source to target
404     *
405     * @param context
406     * @param fsConf
407     * @param nameNodePath
408     * @param source
409     * @param target
410     * @param recovery
411     * @throws ActionExecutorException
412     */
413    public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
414            throws ActionExecutorException {
415        try {
416            source = resolveToFullPath(nameNodePath, source, true);
417            validateSameNN(source, target);
418            FileSystem fs = getFileSystemFor(source, context, fsConf);
419
420            if (!fs.exists(source) && !recovery) {
421                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
422                                                  "move, source path [{0}] does not exist", source);
423            }
424
425            if (!fs.rename(source, target) && !recovery) {
426                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
427                                                  "move, could not move [{0}] to [{1}]", source, target);
428            }
429        }
430        catch (Exception ex) {
431            throw convertException(ex);
432        }
433    }
434
435    void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
436        chmod(context, null, null, path, permissions, dirFiles, recursive);
437    }
438
439    void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
440            boolean dirFiles, boolean recursive) throws ActionExecutorException {
441
442        HashMap<String, String> argsMap = new HashMap<String, String>();
443        argsMap.put("permissions", permissions);
444        try {
445            FileSystem fs = getFileSystemFor(path, context, fsConf);
446            recursiveFsOperation("chmod", fs, nameNodePath, path, argsMap, dirFiles, recursive, true);
447        }
448        catch (Exception ex) {
449            throw convertException(ex);
450        }
451    }
452
453    void touchz(Context context, Path path) throws ActionExecutorException {
454        touchz(context, null, null, path);
455    }
456
457    void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
458        try {
459            path = resolveToFullPath(nameNodePath, path, true);
460            FileSystem fs = getFileSystemFor(path, context, fsConf);
461
462            FileStatus st;
463            if (fs.exists(path)) {
464                st = fs.getFileStatus(path);
465                if (st.isDir()) {
466                    throw new Exception(path.toString() + " is a directory");
467                } else if (st.getLen() != 0)
468                    throw new Exception(path.toString() + " must be a zero-length file");
469            }
470            FSDataOutputStream out = fs.create(path);
471            out.close();
472        }
473        catch (Exception ex) {
474            throw convertException(ex);
475        }
476    }
477
478    FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
479        if (permissions.length() == 3) {
480            char user = permissions.charAt(0);
481            char group = permissions.charAt(1);
482            char other = permissions.charAt(2);
483            int useri = user - '0';
484            int groupi = group - '0';
485            int otheri = other - '0';
486            int mask = useri * 100 + groupi * 10 + otheri;
487            short omask = Short.parseShort(Integer.toString(mask), 8);
488            return new FsPermission(omask);
489        }
490        else {
491            if (permissions.length() == 10) {
492                return FsPermission.valueOf(permissions);
493            }
494            else {
495                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
496                                                  "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
497            }
498        }
499    }
500
501    @Override
502    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
503    }
504
505    @Override
506    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
507    }
508
509    @Override
510    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
511        try {
512            context.setStartData("-", "-", "-");
513            Element actionXml = XmlUtils.parseXml(action.getConf());
514            doOperations(context, actionXml);
515            context.setExecutionData("OK", null);
516        }
517        catch (Exception ex) {
518            throw convertException(ex);
519        }
520    }
521
522    @Override
523    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
524        String externalStatus = action.getExternalStatus();
525        WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
526                                       WorkflowAction.Status.ERROR;
527        context.setEndData(status, getActionSignal(status));
528        if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
529            try {
530                FileSystem fs = context.getAppFileSystem();
531                fs.delete(context.getActionDir(), true);
532            }
533            catch (Exception ex) {
534                throw convertException(ex);
535            }
536        }
537    }
538
539    @Override
540    public boolean isCompleted(String externalStatus) {
541        return true;
542    }
543
544    /**
545     * @param context
546     * @return
547     * @throws HadoopAccessorException
548     * @throws IOException
549     * @throws URISyntaxException
550     */
551    public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
552        return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
553    }
554
555}