001package net.filebot.cli; 002 003import static java.nio.file.StandardWatchEventKinds.*; 004import static java.util.stream.Collectors.*; 005import static net.filebot.Logging.*; 006import static net.filebot.util.FileUtilities.*; 007 008import java.io.Closeable; 009import java.io.File; 010import java.io.FileFilter; 011import java.nio.file.Path; 012import java.nio.file.WatchKey; 013import java.nio.file.WatchService; 014import java.util.ArrayList; 015import java.util.List; 016import java.util.concurrent.ExecutorService; 017import java.util.concurrent.Executors; 018import java.util.concurrent.TimeUnit; 019import java.util.function.Consumer; 020 021import net.filebot.util.DefaultThreadFactory; 022import net.filebot.util.Timer; 023 024public class FolderWatchService implements Closeable { 025 026 private final ExecutorService processor = Executors.newSingleThreadExecutor(); 027 private final ExecutorService watchers = Executors.newCachedThreadPool(new DefaultThreadFactory("FolderWatcher")); 028 029 private final List<File> queue = new ArrayList<File>(); 030 private final Timer timer = new Timer(this::commit); 031 032 private final boolean recursive; 033 private final long delay; 034 private final FileFilter filter; 035 036 private final Consumer<List<File>> handler; 037 038 public FolderWatchService(boolean recursive, long delay, FileFilter filter, Consumer<List<File>> handler) { 039 this.recursive = recursive; 040 this.delay = delay; 041 this.filter = filter; 042 this.handler = handler; 043 } 044 045 public synchronized void resetCommitTimer() { 046 timer.set(delay, TimeUnit.MILLISECONDS, false); 047 } 048 049 private void enqueue(List<File> changes) { 050 // reset count down 051 resetCommitTimer(); 052 053 // add changes to the queue 054 synchronized (queue) { 055 queue.addAll(changes); 056 } 057 } 058 059 private void process(List<File> changes) { 060 try { 061 handler.accept(changes); 062 } catch (Throwable e) { 063 debug.severe(cause("Failed to process changes", changes, e)); 064 } 065 } 066 067 public synchronized void commit() { 068 List<File> changes = new ArrayList<File>(); 069 070 synchronized (queue) { 071 queue.stream().filter(filter::accept).sorted().distinct().forEach(changes::add); 072 queue.clear(); 073 } 074 075 if (changes.isEmpty()) { 076 return; 077 } 078 079 // start watching newly created folders 080 if (recursive) { 081 changes.stream().filter(File::isDirectory).forEach(this::watchFolder); 082 } 083 084 processor.submit(() -> process(changes)); 085 } 086 087 public synchronized void watchFolder(File folder) { 088 if (recursive) { 089 for (File f : getChildren(folder, filter)) { 090 if (f.isDirectory()) { 091 watchFolder(f); 092 } 093 } 094 } 095 096 watchers.submit(new FolderWatcher(folder, filter, this::enqueue)); 097 } 098 099 @Override 100 public synchronized void close() { 101 timer.cancel(); 102 processor.shutdownNow(); 103 watchers.shutdownNow(); 104 } 105 106 private static class FolderWatcher implements Runnable { 107 108 private final Path node; 109 private final FileFilter filter; 110 private final Consumer<List<File>> handler; 111 112 public FolderWatcher(File node, FileFilter filter, Consumer<List<File>> handler) { 113 this.node = node.toPath(); 114 this.filter = filter; 115 this.handler = handler; 116 } 117 118 @Override 119 public void run() { 120 debug.config(message("Start watching folder", node)); 121 122 try (WatchService watcher = node.getFileSystem().newWatchService()) { 123 node.register(watcher, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE); 124 125 for (;;) { 126 WatchKey key = watcher.take(); 127 128 // collect file paths 129 List<File> changes = key.pollEvents().stream().filter(e -> { 130 // an OVERFLOW event can occur regardless if events are lost or discarded 131 return e.kind() != OVERFLOW; 132 }).map(e -> { 133 return node.resolve((Path) e.context()).toFile(); 134 }).filter(f -> { 135 return filter.accept(f); 136 }).collect(toList()); 137 138 if (!changes.isEmpty()) { 139 handler.accept(changes); 140 } 141 142 // if the key is no longer valid, then the directory no longer exists 143 boolean valid = key.reset(); 144 if (!valid) { 145 debug.severe(message("Stop watching folder", node)); 146 return; 147 } 148 } 149 } catch (Throwable e) { 150 debug.severe(cause("Failed to watch folder", node, e)); 151 } 152 } 153 } 154 155}