001package net.filebot.cli;
002
003import static java.nio.file.StandardWatchEventKinds.*;
004import static java.util.stream.Collectors.*;
005import static net.filebot.Logging.*;
006import static net.filebot.util.FileUtilities.*;
007
008import java.io.Closeable;
009import java.io.File;
010import java.io.FileFilter;
011import java.nio.file.Path;
012import java.nio.file.WatchKey;
013import java.nio.file.WatchService;
014import java.util.ArrayList;
015import java.util.List;
016import java.util.concurrent.ExecutorService;
017import java.util.concurrent.Executors;
018import java.util.concurrent.TimeUnit;
019import java.util.function.Consumer;
020
021import net.filebot.util.DefaultThreadFactory;
022import net.filebot.util.Timer;
023
024public class FolderWatchService implements Closeable {
025
026        private final ExecutorService processor = Executors.newSingleThreadExecutor();
027        private final ExecutorService watchers = Executors.newCachedThreadPool(new DefaultThreadFactory("FolderWatcher"));
028
029        private final List<File> queue = new ArrayList<File>();
030        private final Timer timer = new Timer(this::commit);
031
032        private final boolean recursive;
033        private final long delay;
034        private final FileFilter filter;
035
036        private final Consumer<List<File>> handler;
037
038        public FolderWatchService(boolean recursive, long delay, FileFilter filter, Consumer<List<File>> handler) {
039                this.recursive = recursive;
040                this.delay = delay;
041                this.filter = filter;
042                this.handler = handler;
043        }
044
045        public synchronized void resetCommitTimer() {
046                timer.set(delay, TimeUnit.MILLISECONDS, false);
047        }
048
049        private void enqueue(List<File> changes) {
050                // reset count down
051                resetCommitTimer();
052
053                // add changes to the queue
054                synchronized (queue) {
055                        queue.addAll(changes);
056                }
057        }
058
059        private void process(List<File> changes) {
060                try {
061                        handler.accept(changes);
062                } catch (Throwable e) {
063                        debug.severe(cause("Failed to process changes", changes, e));
064                }
065        }
066
067        public synchronized void commit() {
068                List<File> changes = new ArrayList<File>();
069
070                synchronized (queue) {
071                        queue.stream().filter(filter::accept).sorted().distinct().forEach(changes::add);
072                        queue.clear();
073                }
074
075                if (changes.isEmpty()) {
076                        return;
077                }
078
079                // start watching newly created folders
080                if (recursive) {
081                        changes.stream().filter(File::isDirectory).forEach(this::watchFolder);
082                }
083
084                processor.submit(() -> process(changes));
085        }
086
087        public synchronized void watchFolder(File folder) {
088                if (recursive) {
089                        for (File f : getChildren(folder, filter)) {
090                                if (f.isDirectory()) {
091                                        watchFolder(f);
092                                }
093                        }
094                }
095
096                watchers.submit(new FolderWatcher(folder, filter, this::enqueue));
097        }
098
099        @Override
100        public synchronized void close() {
101                timer.cancel();
102                processor.shutdownNow();
103                watchers.shutdownNow();
104        }
105
106        private static class FolderWatcher implements Runnable {
107
108                private final Path node;
109                private final FileFilter filter;
110                private final Consumer<List<File>> handler;
111
112                public FolderWatcher(File node, FileFilter filter, Consumer<List<File>> handler) {
113                        this.node = node.toPath();
114                        this.filter = filter;
115                        this.handler = handler;
116                }
117
118                @Override
119                public void run() {
120                        debug.config(message("Start watching folder", node));
121
122                        try (WatchService watcher = node.getFileSystem().newWatchService()) {
123                                node.register(watcher, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE);
124
125                                for (;;) {
126                                        WatchKey key = watcher.take();
127
128                                        // collect file paths
129                                        List<File> changes = key.pollEvents().stream().filter(e -> {
130                                                // an OVERFLOW event can occur regardless if events are lost or discarded
131                                                return e.kind() != OVERFLOW;
132                                        }).map(e -> {
133                                                return node.resolve((Path) e.context()).toFile();
134                                        }).filter(f -> {
135                                                return filter.accept(f);
136                                        }).collect(toList());
137
138                                        if (!changes.isEmpty()) {
139                                                handler.accept(changes);
140                                        }
141
142                                        // if the key is no longer valid, then the directory no longer exists
143                                        boolean valid = key.reset();
144                                        if (!valid) {
145                                                debug.severe(message("Stop watching folder", node));
146                                                return;
147                                        }
148                                }
149                        } catch (Throwable e) {
150                                debug.severe(cause("Failed to watch folder", node, e));
151                        }
152                }
153        }
154
155}