auto decrypt (#44)

This commit is contained in:
Sarv
2025-04-16 01:02:29 +08:00
committed by GitHub
parent f2aa923e99
commit 25d0b394e2
20 changed files with 2903 additions and 712 deletions

628
pkg/filecopy/filecopy.go Normal file
View File

@@ -0,0 +1,628 @@
package filecopy
import (
"encoding/json"
"fmt"
"hash/fnv"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
var (
// Singleton locks to ensure only one thread processes the same file at a time
fileOperationLocks = make(map[string]*sync.Mutex)
locksMutex = sync.RWMutex{}
// Mapping from original file paths to temporary file paths
pathToTempFile = make(map[string]string)
// Metadata information for original files
fileMetadata = make(map[string]fileMetaInfo)
// Track old versions of temporary files for each original file
oldVersions = make(map[string]string)
mapMutex = sync.RWMutex{}
// Temporary directory
tempDir string
// Path to the mapping file
mappingFilePath string
// Channel for delayed file deletion
fileDeletionChan = make(chan FileDeletion, 1000)
// Default deletion delay time (30 seconds)
DefaultDeletionDelay = 30 * time.Second
)
type FileDeletion struct {
Path string
Time time.Time
}
// File metadata information
type fileMetaInfo struct {
ModTime time.Time `json:"mod_time"`
Size int64 `json:"size"`
}
// Persistent mapping information
type persistentMapping struct {
OriginalPath string `json:"original_path"`
TempPath string `json:"temp_path"`
Metadata fileMetaInfo `json:"metadata"`
}
// Initialize temporary directory
func initTempDir() {
// Get process name to create a unique temporary directory
procName := getProcessName()
tempDir = filepath.Join(os.TempDir(), "filecopy_"+procName)
if err := os.MkdirAll(tempDir, 0755); err != nil {
tempDir = filepath.Join(os.TempDir(), "filecopy")
if err := os.MkdirAll(tempDir, 0755); err != nil {
tempDir = os.TempDir()
}
}
// Set mapping file path
mappingFilePath = filepath.Join(tempDir, "file_mappings.json")
// Load existing mappings if available
loadMappings()
// Scan and clean existing temporary files
cleanupExistingTempFiles()
}
// Get process name
func getProcessName() string {
executable, err := os.Executable()
if err != nil {
return "unknown"
}
// Extract base name (without extension)
baseName := filepath.Base(executable)
ext := filepath.Ext(baseName)
if ext != "" {
baseName = baseName[:len(baseName)-len(ext)]
}
// Clean name, keep only letters, numbers, underscores and hyphens
baseName = strings.Map(func(r rune) rune {
if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' {
return r
}
return '_'
}, baseName)
return baseName
}
// Load file mappings from persistent storage
func loadMappings() {
file, err := os.Open(mappingFilePath)
if err != nil {
// It's okay if the file doesn't exist yet
return
}
defer file.Close()
var mappings []persistentMapping
decoder := json.NewDecoder(file)
if err := decoder.Decode(&mappings); err != nil {
// If the file is corrupted, we'll just start fresh
return
}
// Restore mappings
mapMutex.Lock()
defer mapMutex.Unlock()
for _, mapping := range mappings {
// Verify that both the original file and temp file still exist
origStat, origErr := os.Stat(mapping.OriginalPath)
_, tempErr := os.Stat(mapping.TempPath)
if origErr == nil && tempErr == nil {
// Check if the original file has changed since the mapping was saved
if origStat.ModTime() == mapping.Metadata.ModTime && origStat.Size() == mapping.Metadata.Size {
// The mapping is still valid
pathToTempFile[mapping.OriginalPath] = mapping.TempPath
fileMetadata[mapping.OriginalPath] = mapping.Metadata
}
}
}
}
// Save file mappings to persistent storage
func saveMappings() {
mapMutex.RLock()
defer mapMutex.RUnlock()
var mappings []persistentMapping
for origPath, tempPath := range pathToTempFile {
if meta, exists := fileMetadata[origPath]; exists {
mappings = append(mappings, persistentMapping{
OriginalPath: origPath,
TempPath: tempPath,
Metadata: meta,
})
}
}
// Create the file
file, err := os.Create(mappingFilePath)
if err != nil {
return
}
defer file.Close()
// Write the mappings
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(mappings); err != nil {
// If we can't save, just continue - it's not critical
return
}
}
// Clean up existing temporary files
func cleanupExistingTempFiles() {
files, err := os.ReadDir(tempDir)
if err != nil {
return
}
// Skip the mapping file
mappingFileName := filepath.Base(mappingFilePath)
// First, collect all files that are already in our mapping
knownFiles := make(map[string]bool)
mapMutex.RLock()
for _, tempPath := range pathToTempFile {
knownFiles[tempPath] = true
}
mapMutex.RUnlock()
// Group files by prefix (baseName_hashPrefix)
fileGroups := make(map[string][]tempFileInfo)
for _, file := range files {
if file.IsDir() {
continue
}
fileName := file.Name()
// Skip the mapping file
if fileName == mappingFileName {
continue
}
filePath := filepath.Join(tempDir, fileName)
parts := strings.Split(fileName, "_")
// Skip files that don't match our naming convention
if len(parts) < 3 {
removeFileImmediately(filePath)
continue
}
// Extract base name and hash part as key
baseName := parts[0]
hashPart := parts[1]
groupKey := baseName + "_" + hashPart
// Extract timestamp
timeStr := strings.Split(parts[2], ".")[0] // Remove extension part
var timestamp int64
if _, err := fmt.Sscanf(timeStr, "%d", &timestamp); err != nil {
removeFileImmediately(filePath)
continue
}
// Add file info to corresponding group
fileGroups[groupKey] = append(fileGroups[groupKey], tempFileInfo{
path: filePath,
timestamp: timestamp,
})
}
// Process each group of files, keep only the newest one
for _, fileInfos := range fileGroups {
if len(fileInfos) == 0 {
continue
}
// Find the newest file
var newestFile tempFileInfo
for _, fileInfo := range fileInfos {
if fileInfo.timestamp > newestFile.timestamp {
newestFile = fileInfo
}
}
// Delete all files except the newest one
for _, fileInfo := range fileInfos {
if fileInfo.path != newestFile.path {
// If this file is already in our mapping, keep it
if knownFiles[fileInfo.path] {
continue
}
removeFileImmediately(fileInfo.path)
}
}
}
}
// Temporary file information
type tempFileInfo struct {
path string
timestamp int64
}
// Get file lock
func getFileLock(path string) *sync.Mutex {
locksMutex.RLock()
lock, exists := fileOperationLocks[path]
locksMutex.RUnlock()
if exists {
return lock
}
locksMutex.Lock()
defer locksMutex.Unlock()
// Check again, might have been created while we were acquiring the write lock
lock, exists = fileOperationLocks[path]
if !exists {
lock = &sync.Mutex{}
fileOperationLocks[path] = lock
}
return lock
}
// GetTempCopy returns a temporary copy path of the original file
// If the file hasn't changed since the last copy, returns the existing copy
func GetTempCopy(originalPath string) (string, error) {
// Get the operation lock for this file to ensure thread safety
fileLock := getFileLock(originalPath)
fileLock.Lock()
defer fileLock.Unlock()
// Check if original file exists
stat, err := os.Stat(originalPath)
if err != nil {
return "", fmt.Errorf("original file does not exist: %w", err)
}
// Current file info
currentInfo := fileMetaInfo{
ModTime: stat.ModTime(),
Size: stat.Size(),
}
// Check existing mapping
mapMutex.RLock()
tempPath, pathExists := pathToTempFile[originalPath]
cachedInfo, infoExists := fileMetadata[originalPath]
mapMutex.RUnlock()
// If we have an existing temp file and original file hasn't changed, return it
if pathExists && infoExists {
fileChanged := currentInfo.ModTime.After(cachedInfo.ModTime) ||
currentInfo.Size != cachedInfo.Size
if !fileChanged {
// Verify temp file still exists
if _, err := os.Stat(tempPath); err == nil {
// Try to open file to verify accessibility
if file, err := os.Open(tempPath); err == nil {
file.Close()
return tempPath, nil
}
}
}
}
// Generate new temp file path
fileName := filepath.Base(originalPath)
fileExt := filepath.Ext(fileName)
baseName := fileName[:len(fileName)-len(fileExt)]
if baseName == "" {
baseName = "file" // Use default name if empty
}
// Generate hash for original path
pathHash := hashString(originalPath)
hashPrefix := getHashPrefix(pathHash, 8)
// Format: basename_pathhash_timestamp.ext
timestamp := time.Now().UnixNano()
tempPath = filepath.Join(tempDir,
fmt.Sprintf("%s_%s_%d%s",
baseName,
hashPrefix,
timestamp,
fileExt))
// Copy file (with retry mechanism)
if err := copyFileWithRetry(originalPath, tempPath, 3); err != nil {
return "", err
}
// Update mappings
mapMutex.Lock()
oldPath := pathToTempFile[originalPath]
// If there's an old path and it's different, move it to old versions and schedule for deletion
if oldPath != "" && oldPath != tempPath {
// First clean up previous old version (if any)
if oldVersionPath, hasOldVersion := oldVersions[originalPath]; hasOldVersion && oldVersionPath != oldPath {
removeFileImmediately(oldVersionPath)
}
// Set current version as old version
oldVersions[originalPath] = oldPath
scheduleForDeletion(oldPath)
}
// Update to new temp file
pathToTempFile[originalPath] = tempPath
fileMetadata[originalPath] = currentInfo
mapMutex.Unlock()
// Save mappings to persistent storage
go saveMappings()
// Immediately clean up any other related temp files
go cleanupRelatedTempFiles(originalPath, tempPath, oldPath)
return tempPath, nil
}
// Immediately clean up other temp files related to the specified original file
func cleanupRelatedTempFiles(originalPath, currentTempPath, knownOldPath string) {
// Extract hash prefix of original file to match related files
fileName := filepath.Base(originalPath)
fileExt := filepath.Ext(fileName)
baseName := fileName[:len(fileName)-len(fileExt)]
if baseName == "" {
baseName = "file"
}
pathHash := hashString(originalPath)
hashPrefix := getHashPrefix(pathHash, 8)
// File name prefix pattern
filePrefix := baseName + "_" + hashPrefix
currentTempPathNoExt := strings.TrimSuffix(currentTempPath, filepath.Ext(currentTempPath))
knownOldPathNoExt := strings.TrimSuffix(knownOldPath, filepath.Ext(knownOldPath))
files, err := os.ReadDir(tempDir)
if err != nil {
return
}
for _, file := range files {
if file.IsDir() {
continue
}
fileName := file.Name()
// Skip the mapping file
if fileName == filepath.Base(mappingFilePath) {
continue
}
filePath := filepath.Join(tempDir, fileName)
filePathNoExt := strings.TrimSuffix(filePath, filepath.Ext(filePath))
// Skip current file and known old version
if filePathNoExt == currentTempPathNoExt || filePathNoExt == knownOldPathNoExt {
continue
}
// If file name matches our pattern, delete it immediately
if strings.HasPrefix(fileName, filePrefix) {
removeFileImmediately(filePath)
}
}
}
// Immediately delete file without waiting for delay
func removeFileImmediately(path string) {
if path == "" {
return
}
// Try to delete file
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
// Silently fail if we can't delete
}
}
// Schedule file for delayed deletion
func scheduleForDeletion(path string) {
if path == "" {
return
}
// Check if file exists
if _, err := os.Stat(path); os.IsNotExist(err) {
return
}
// Put file in deletion channel
select {
case fileDeletionChan <- FileDeletion{Path: path, Time: time.Now().Add(DefaultDeletionDelay)}:
// Successfully scheduled
default:
// If channel is full, delete file immediately
removeFileImmediately(path)
}
}
// File deletion handler
func fileDeletionHandler() {
for {
// Get file to delete from channel
file := <-fileDeletionChan
if !time.Now().After(file.Time) {
time.Sleep(time.Until(file.Time))
}
// Ensure file is not in active mappings
isActive := false
mapMutex.RLock()
for _, activePath := range pathToTempFile {
if activePath == file.Path {
isActive = true
break
}
}
mapMutex.RUnlock()
if isActive {
continue
}
// Delete file
removeFileImmediately(file.Path)
}
}
// CleanupTempFiles cleans up unused temporary files
func CleanupTempFiles() {
files, err := os.ReadDir(tempDir)
if err != nil {
return
}
// Skip the mapping file
mappingFileName := filepath.Base(mappingFilePath)
// Get current active temp file paths and old version paths
mapMutex.RLock()
activeTempFiles := make(map[string]bool)
for _, tempFilePath := range pathToTempFile {
tempFilePath = strings.TrimSuffix(tempFilePath, filepath.Ext(tempFilePath))
activeTempFiles[tempFilePath] = true
}
for _, oldVersionPath := range oldVersions {
oldVersionPath = strings.TrimSuffix(oldVersionPath, filepath.Ext(oldVersionPath))
activeTempFiles[oldVersionPath] = true
}
mapMutex.RUnlock()
// Schedule deletion of inactive temp files
for _, file := range files {
if file.IsDir() {
continue
}
fileName := file.Name()
// Skip the mapping file
if fileName == mappingFileName {
continue
}
tempFilePath := filepath.Join(tempDir, fileName)
tempFilePath = strings.TrimSuffix(tempFilePath, filepath.Ext(tempFilePath))
if !activeTempFiles[tempFilePath] {
scheduleForDeletion(tempFilePath)
}
}
}
// Copy file with retry mechanism
func copyFileWithRetry(src, dst string, maxRetries int) error {
var err error
for i := 0; i < maxRetries; i++ {
err = copyFile(src, dst)
if err == nil {
return nil
}
// Wait before retrying
time.Sleep(time.Duration(100*(i+1)) * time.Millisecond)
}
return fmt.Errorf("failed to copy file after %d attempts: %w", maxRetries, err)
}
// Copy file
func copyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return fmt.Errorf("failed to open source file: %w", err)
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return fmt.Errorf("failed to create destination file: %w", err)
}
defer func() {
cerr := out.Close()
if err == nil && cerr != nil {
err = fmt.Errorf("failed to close destination file: %w", cerr)
}
}()
// Use buffered copy for better performance
buf := make([]byte, 256*1024) // 256KB buffer
if _, err = io.CopyBuffer(out, in, buf); err != nil {
return fmt.Errorf("failed to copy file contents: %w", err)
}
return out.Sync()
}
// Generate hash for string
func hashString(s string) string {
h := fnv.New32a()
h.Write([]byte(s))
return fmt.Sprintf("%x", h.Sum32())
}
// Safely get hash prefix, avoid index out of bounds
func getHashPrefix(hash string, length int) string {
if len(hash) <= length {
return hash
}
return hash[:length]
}
// Initialize temp directory and start background cleanup
func init() {
// Initialize temp directory and scan existing files
initTempDir()
// Start multiple file deletion handlers
for i := 0; i < 2; i++ {
go fileDeletionHandler()
}
// Start periodic cleanup routine
go func() {
for {
time.Sleep(30 * time.Second)
CleanupTempFiles()
// Also periodically save mappings
saveMappings()
}
}()
}

View File

@@ -0,0 +1,182 @@
package filemonitor
import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog/log"
)
// FileChangeCallback defines the callback function signature for file change events
type FileChangeCallback func(event fsnotify.Event) error
// FileGroup represents a group of files with the same processing logic
type FileGroup struct {
ID string // Unique identifier
RootDir string // Root directory
Pattern *regexp.Regexp // File matching pattern
PatternStr string // Original pattern string for rebuilding
Blacklist []string // Blacklist patterns
Callbacks []FileChangeCallback // File change callbacks
mutex sync.RWMutex // Concurrency control
}
// NewFileGroup creates a new file group
func NewFileGroup(id, rootDir, pattern string, blacklist []string) (*FileGroup, error) {
// Compile the regular expression
re, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("invalid pattern '%s': %w", pattern, err)
}
// Normalize root directory path
rootDir = filepath.Clean(rootDir)
return &FileGroup{
ID: id,
RootDir: rootDir,
Pattern: re,
PatternStr: pattern,
Blacklist: blacklist,
Callbacks: []FileChangeCallback{},
}, nil
}
// AddCallback adds a callback function to the file group
func (fg *FileGroup) AddCallback(callback FileChangeCallback) {
fg.mutex.Lock()
defer fg.mutex.Unlock()
fg.Callbacks = append(fg.Callbacks, callback)
}
// RemoveCallback removes a callback function from the file group
func (fg *FileGroup) RemoveCallback(callbackToRemove FileChangeCallback) bool {
fg.mutex.Lock()
defer fg.mutex.Unlock()
for i, callback := range fg.Callbacks {
// Compare function addresses
if fmt.Sprintf("%p", callback) == fmt.Sprintf("%p", callbackToRemove) {
// Remove the callback
fg.Callbacks = append(fg.Callbacks[:i], fg.Callbacks[i+1:]...)
return true
}
}
return false
}
// Match checks if a file path matches this group's criteria
func (fg *FileGroup) Match(path string) bool {
// Normalize paths for comparison
path = filepath.Clean(path)
rootDir := filepath.Clean(fg.RootDir)
// Check if path is under root directory
// Use filepath.Rel to handle path comparison safely across different OSes
relPath, err := filepath.Rel(rootDir, path)
if err != nil || strings.HasPrefix(relPath, "..") {
return false
}
// Check if file matches pattern
if !fg.Pattern.MatchString(filepath.Base(path)) {
return false
}
// Check blacklist
for _, blackItem := range fg.Blacklist {
if strings.Contains(relPath, blackItem) {
return false
}
}
return true
}
// List returns a list of files in the group (real-time scan)
func (fg *FileGroup) List() ([]string, error) {
files := []string{}
// Scan directory for matching files using fs.WalkDir
err := fs.WalkDir(os.DirFS(fg.RootDir), ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return fs.SkipDir
}
// Skip directories
if d.IsDir() {
return nil
}
// Convert relative path to absolute
absPath := filepath.Join(fg.RootDir, path)
// Use Match function to check if file belongs to this group
if fg.Match(absPath) {
files = append(files, absPath)
}
return nil
})
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("error listing files: %w", err)
}
return files, nil
}
// ListMatchingDirectories returns directories containing matching files
func (fg *FileGroup) ListMatchingDirectories() (map[string]bool, error) {
directories := make(map[string]bool)
// Get matching files
files, err := fg.List()
if err != nil {
return nil, err
}
// Extract directories from matching files
for _, file := range files {
dir := filepath.Dir(file)
directories[dir] = true
}
return directories, nil
}
// HandleEvent processes a file event and triggers callbacks if the file matches
func (fg *FileGroup) HandleEvent(event fsnotify.Event) {
// Check if this event is relevant for this group
if !fg.Match(event.Name) {
return
}
// Get callbacks under read lock
fg.mutex.RLock()
callbacks := make([]FileChangeCallback, len(fg.Callbacks))
copy(callbacks, fg.Callbacks)
fg.mutex.RUnlock()
// Asynchronously call callbacks
for _, callback := range callbacks {
go func(cb FileChangeCallback) {
if err := cb(event); err != nil {
log.Error().
Str("file", event.Name).
Str("op", event.Op.String()).
Err(err).
Msg("Callback error")
}
}(callback)
}
}

View File

@@ -0,0 +1,430 @@
package filemonitor
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog/log"
)
// FileMonitor manages multiple file groups
type FileMonitor struct {
groups map[string]*FileGroup // Map of file groups
watcher *fsnotify.Watcher // File system watcher
watchDirs map[string]bool // Monitored directories
blacklist []string // Global blacklist patterns
mutex sync.RWMutex // Concurrency control for groups and watchDirs
stopCh chan struct{} // Stop signal
wg sync.WaitGroup // Wait group
isRunning bool // Running state flag
stateMutex sync.RWMutex // State mutex
}
func (fm *FileMonitor) Watcher() *fsnotify.Watcher {
return fm.watcher
}
// NewFileMonitor creates a new file monitor
func NewFileMonitor() *FileMonitor {
return &FileMonitor{
groups: make(map[string]*FileGroup),
watchDirs: make(map[string]bool),
blacklist: []string{},
isRunning: false,
}
}
// SetBlacklist sets the global directory blacklist
func (fm *FileMonitor) SetBlacklist(blacklist []string) {
fm.mutex.Lock()
defer fm.mutex.Unlock()
fm.blacklist = make([]string, len(blacklist))
copy(fm.blacklist, blacklist)
}
// AddGroup adds a new file group
func (fm *FileMonitor) AddGroup(group *FileGroup) error {
if group == nil {
return errors.New("group cannot be nil")
}
// First check if monitor is running
isRunning := fm.IsRunning()
// Add group to monitor
fm.mutex.Lock()
// Check if ID already exists
if _, exists := fm.groups[group.ID]; exists {
fm.mutex.Unlock()
return fmt.Errorf("group with ID '%s' already exists", group.ID)
}
// Add to monitor
fm.groups[group.ID] = group
fm.mutex.Unlock()
// If monitor is running, set up watching
if isRunning {
if err := fm.setupWatchForGroup(group); err != nil {
// Remove group on failure
fm.mutex.Lock()
delete(fm.groups, group.ID)
fm.mutex.Unlock()
return err
}
}
return nil
}
// CreateGroup creates and adds a new file group (convenience method)
func (fm *FileMonitor) CreateGroup(id, rootDir, pattern string, blacklist []string) (*FileGroup, error) {
// Create file group
group, err := NewFileGroup(id, rootDir, pattern, blacklist)
if err != nil {
return nil, err
}
// Add to monitor
if err := fm.AddGroup(group); err != nil {
return nil, err
}
return group, nil
}
// RemoveGroup removes a file group
func (fm *FileMonitor) RemoveGroup(id string) error {
fm.mutex.Lock()
defer fm.mutex.Unlock()
// Check if group exists
_, exists := fm.groups[id]
if !exists {
return fmt.Errorf("group with ID '%s' does not exist", id)
}
// Remove group
delete(fm.groups, id)
// log.Info().Str("groupID", id).Msg("Removed file group")
return nil
}
// GetGroups returns a list of all file group IDs
func (fm *FileMonitor) GetGroups() []*FileGroup {
fm.mutex.RLock()
defer fm.mutex.RUnlock()
groups := make([]*FileGroup, 0, len(fm.groups))
for _, group := range fm.groups {
groups = append(groups, group)
}
return groups
}
// GetGroup returns the specified file group
func (fm *FileMonitor) GetGroup(id string) (*FileGroup, bool) {
fm.mutex.RLock()
defer fm.mutex.RUnlock()
group, exists := fm.groups[id]
return group, exists
}
// Start starts the file monitor
func (fm *FileMonitor) Start() error {
// Check if already running
fm.stateMutex.Lock()
if fm.isRunning {
fm.stateMutex.Unlock()
return errors.New("file monitor is already running")
}
// Create new watcher
watcher, err := fsnotify.NewWatcher()
if err != nil {
fm.stateMutex.Unlock()
return fmt.Errorf("failed to create watcher: %w", err)
}
fm.watcher = watcher
// Reset stop channel
fm.stopCh = make(chan struct{})
// Get groups to monitor (without holding the state lock)
fm.mutex.RLock()
groups := make([]*FileGroup, 0, len(fm.groups))
for _, group := range fm.groups {
groups = append(groups, group)
}
fm.mutex.RUnlock()
// Reset monitored directories
fm.mutex.Lock()
fm.watchDirs = make(map[string]bool)
fm.mutex.Unlock()
// Mark as running before setting up watches
fm.isRunning = true
fm.stateMutex.Unlock()
// Set up monitoring for all groups (without holding any locks)
for _, group := range groups {
if err := fm.setupWatchForGroup(group); err != nil {
// Clean up resources on failure
_ = fm.watcher.Close()
// Reset running state
fm.stateMutex.Lock()
fm.watcher = nil
fm.isRunning = false
fm.stateMutex.Unlock()
return fmt.Errorf("failed to setup watch for group '%s': %w", group.ID, err)
}
}
// Start watch loop
fm.wg.Add(1)
go fm.watchLoop()
// log.Info().Msg("File monitor started")
return nil
}
// Stop stops the file monitor
func (fm *FileMonitor) Stop() error {
// Check if already stopped
fm.stateMutex.Lock()
if !fm.isRunning {
fm.stateMutex.Unlock()
return errors.New("file monitor is not running")
}
// Get watcher reference before changing state
watcher := fm.watcher
// Send stop signal
close(fm.stopCh)
// Mark as not running
fm.isRunning = false
fm.stateMutex.Unlock()
// Wait for all goroutines to exit
fm.wg.Wait()
// Close watcher
if watcher != nil {
if err := watcher.Close(); err != nil {
return fmt.Errorf("failed to close watcher: %w", err)
}
fm.stateMutex.Lock()
fm.watcher = nil
fm.stateMutex.Unlock()
}
// log.Info().Msg("File monitor stopped")
return nil
}
// IsRunning returns whether the file monitor is running
func (fm *FileMonitor) IsRunning() bool {
fm.stateMutex.RLock()
defer fm.stateMutex.RUnlock()
return fm.isRunning
}
// addWatchDir adds a directory to monitoring
func (fm *FileMonitor) addWatchDir(dirPath string) error {
// Check global blacklist first
fm.mutex.RLock()
for _, pattern := range fm.blacklist {
if strings.Contains(dirPath, pattern) {
fm.mutex.RUnlock()
log.Debug().Str("dir", dirPath).Msg("Skipping blacklisted directory")
return nil
}
}
fm.mutex.RUnlock()
fm.mutex.Lock()
defer fm.mutex.Unlock()
// Check if directory is already being monitored
if _, watched := fm.watchDirs[dirPath]; watched {
return nil // Already monitored, no need to add again
}
// Add to monitoring
if err := fm.watcher.Add(dirPath); err != nil {
return fmt.Errorf("failed to watch directory '%s': %w", dirPath, err)
}
fm.watchDirs[dirPath] = true
// log.Debug().Str("dir", dirPath).Msg("Added watch for directory")
return nil
}
// setupWatchForGroup sets up monitoring for a file group
func (fm *FileMonitor) setupWatchForGroup(group *FileGroup) error {
// Check if file monitor is running
if !fm.IsRunning() {
return errors.New("file monitor is not running")
}
// Find directories containing matching files
matchingDirs, err := group.ListMatchingDirectories()
if err != nil {
return fmt.Errorf("failed to list matching directories: %w", err)
}
// Always watch the root directory to catch new files
rootDir := filepath.Clean(group.RootDir)
if err := fm.addWatchDir(rootDir); err != nil {
return err
}
// Watch directories containing matching files
for dir := range matchingDirs {
if err := fm.addWatchDir(dir); err != nil {
return err
}
}
return nil
}
// RefreshWatches updates the watched directories based on current matching files
func (fm *FileMonitor) RefreshWatches() error {
// Check if file monitor is running
if !fm.IsRunning() {
return errors.New("file monitor is not running")
}
// Get groups to refresh
fm.mutex.RLock()
groups := make([]*FileGroup, 0, len(fm.groups))
for _, group := range fm.groups {
groups = append(groups, group)
}
fm.mutex.RUnlock()
// Reset watched directories
fm.mutex.Lock()
oldWatchDirs := fm.watchDirs
fm.watchDirs = make(map[string]bool)
fm.mutex.Unlock()
// Setup watches for each group
for _, group := range groups {
if err := fm.setupWatchForGroup(group); err != nil {
return fmt.Errorf("failed to refresh watches for group '%s': %w", group.ID, err)
}
}
// Remove watches for directories no longer needed
for dir := range oldWatchDirs {
fm.mutex.RLock()
_, stillWatched := fm.watchDirs[dir]
fm.mutex.RUnlock()
if !stillWatched && fm.watcher != nil {
_ = fm.watcher.Remove(dir)
log.Debug().Str("dir", dir).Msg("Removed watch for directory")
}
}
return nil
}
// watchLoop monitors for file system events
func (fm *FileMonitor) watchLoop() {
defer fm.wg.Done()
for {
select {
case <-fm.stopCh:
return
case event, ok := <-fm.watcher.Events:
if !ok {
// Channel closed, exit loop
return
}
// Handle directory creation events to add new watches
info, err := os.Stat(event.Name)
if err == nil && info.IsDir() && event.Op&(fsnotify.Create|fsnotify.Rename) != 0 {
// Add new directory to monitoring
if err := fm.addWatchDir(event.Name); err != nil {
log.Error().
Str("dir", event.Name).
Err(err).
Msg("Error watching new directory")
}
continue
}
// For file creation/modification, check if we need to watch its directory
if event.Op&(fsnotify.Create|fsnotify.Write) != 0 {
// Check if this file matches any group
shouldWatch := false
fm.mutex.RLock()
for _, group := range fm.groups {
if group.Match(event.Name) {
shouldWatch = true
break
}
}
fm.mutex.RUnlock()
// If file matches, ensure its directory is watched
if shouldWatch {
dir := filepath.Dir(event.Name)
if err := fm.addWatchDir(dir); err != nil {
log.Error().
Str("dir", dir).
Err(err).
Msg("Error watching directory of matching file")
}
}
}
// Forward event to all groups
fm.forwardEventToGroups(event)
case err, ok := <-fm.watcher.Errors:
if !ok {
// Channel closed, exit loop
return
}
log.Error().Err(err).Msg("Watcher error")
}
}
}
// forwardEventToGroups forwards file events to matching groups
func (fm *FileMonitor) forwardEventToGroups(event fsnotify.Event) {
// Get a copy of groups to avoid holding lock during processing
fm.mutex.RLock()
groupsCopy := make([]*FileGroup, 0, len(fm.groups))
for _, group := range fm.groups {
groupsCopy = append(groupsCopy, group)
}
fm.mutex.RUnlock()
// Forward to all groups - each group will check if the event is relevant
for _, group := range groupsCopy {
group.HandleEvent(event)
}
}