#!/usr/bin/env python3 import os import sys import json import shutil import curses import re import csv import filecmp import glob import tarfile import yaml from typing import List, Dict, Set, Tuple, Optional # Use current directory instead of hardcoded path. BASE_DIR = os.getcwd() INGEST_DIR = os.path.join(BASE_DIR, "ingest") ORIGINAL_CONFIG_DIR = os.path.join(BASE_DIR, "original_config") RECIPE_CONFIG_DIR = os.path.join(BASE_DIR, "recipes", "wisski_default_data_model", "config") def _clear_dir(path: str) -> int: """Remove all files and subdirectories inside path. Returns count of removed items.""" if not os.path.isdir(path): return 0 removedCount = 0 for entry in os.listdir(path): entryPath = os.path.join(path, entry) if os.path.isdir(entryPath): shutil.rmtree(entryPath) else: os.remove(entryPath) removedCount += 1 return removedCount def ingest_config(force: bool = False) -> None: """Extract a config*.tar.gz archive from ingest/ into original_config/. Skips extraction when YAML files are already present in original_config/, unless force is True. With --force, original_config/, new_recipe_config/, and changed_files/ are all cleared before re-extraction. """ os.makedirs(ORIGINAL_CONFIG_DIR, exist_ok=True) existingFiles = [f for f in os.listdir(ORIGINAL_CONFIG_DIR) if f.endswith(".yml")] if existingFiles and not force: print( f"original_config/ already contains {len(existingFiles)} YAML files. " "Use --force to re-extract." ) return archives = sorted(glob.glob(os.path.join(INGEST_DIR, "config*.tar.gz"))) if not archives: if not existingFiles: print("No config*.tar.gz found in ingest/ and original_config/ is empty.") return # Use the most recently named archive. archive = archives[-1] print(f"Extracting {os.path.basename(archive)} to original_config/...") if force: _clear_dir(ORIGINAL_CONFIG_DIR) newRecipeDir = os.path.join(BASE_DIR, "new_recipe_config") changedFilesDir = os.path.join(BASE_DIR, "changed_files") cleared = _clear_dir(newRecipeDir) if cleared: print(f"Cleared {cleared} items from new_recipe_config/.") cleared = _clear_dir(changedFilesDir) if cleared: print(f"Cleared {cleared} items from changed_files/.") extractedCount = 0 with tarfile.open(archive, "r:gz") as tar: for member in tar.getmembers(): if not (member.isfile() and member.name.endswith(".yml")): continue # Preserve subdirectory structure (e.g. language/de/foo.yml) but strip # any leading archive root component that is not part of the config layout. relPath = member.name.lstrip("./") destPath = os.path.join(ORIGINAL_CONFIG_DIR, relPath) os.makedirs(os.path.dirname(destPath), exist_ok=True) srcFile = tar.extractfile(member) if srcFile is not None: with open(destPath, "wb") as dst: dst.write(srcFile.read()) extractedCount += 1 print(f"Extracted {extractedCount} YAML files to original_config/.") class ConfigSelector: def __init__(self, json_path: str, original_config_path: str, config_path: str, old_recipe_path: str = None, changed_files_path: str = None, deleted_files_csv: str = None, ignored_files_yml: str = None): self.json_path = json_path self.original_config_path = original_config_path self.config_path = config_path self.old_recipe_path = old_recipe_path or os.path.join(BASE_DIR, "old_recipe_config") self.changed_files_path = changed_files_path or os.path.join(BASE_DIR, "changed_files") self.deleted_files_csv = deleted_files_csv or os.path.join(BASE_DIR, "deleted_files.csv") self.new_recipe_not_in_original_csv = os.path.join(BASE_DIR, "new_recipe_not_in_original.csv") self.changed_files_manifest_csv = os.path.join(BASE_DIR, "changed_files_manifest.csv") self.ignored_files_yml = ignored_files_yml or os.path.join(BASE_DIR, "ignored_files.yml") self.prefixes: List[str] = [] self.matched_files: Dict[str, List[str]] = {} self.unmatched_files: Dict[str, List[str]] = {} self.ignored_files: Set[str] = set() self.load_prefixes() self.load_ignored_files() self.scan_configs() def load_prefixes(self) -> None: """Load file prefixes from JSON file.""" try: with open(self.json_path, 'r') as f: data = json.load(f) if isinstance(data, list): self.prefixes = data elif isinstance(data, dict) and 'prefixes' in data: self.prefixes = data['prefixes'] else: print("Error: JSON file should contain a list of prefixes or a dict with 'prefixes' key") exit(1) except (json.JSONDecodeError, FileNotFoundError) as e: print(f"Error loading JSON file: {e}") exit(1) def load_ignored_files(self) -> None: """Load ignored file names from YAML file.""" self.ignored_files = set() if not os.path.exists(self.ignored_files_yml): return try: with open(self.ignored_files_yml, 'r') as f: data = yaml.safe_load(f) if data: # Iterate through all categories in the YAML. for category, info in data.items(): if isinstance(info, dict) and 'name' in info: name = info['name'] if not isinstance(name, str): continue self.ignored_files.add(name) # Basenames on disk always include .yml; allow omitting it in YAML. if not name.endswith('.yml'): self.ignored_files.add(f'{name}.yml') except (yaml.YAMLError, FileNotFoundError) as e: print(f"Warning: Error loading ignored files YAML: {e}") def save_prefixes(self) -> None: """Save current prefixes back to JSON file.""" try: with open(self.json_path, 'w') as f: json.dump({'prefixes': self.prefixes}, f, indent=2) print(f"Prefixes saved to {self.json_path}") except Exception as e: print(f"Error saving JSON file: {e}") def scan_configs(self) -> None: """Scan config directories and categorize files based on prefixes.""" self.matched_files = {prefix: [] for prefix in self.prefixes} self.unmatched_files = {} try: # Process all files in the directory for filename in os.listdir(self.original_config_path): if not filename.endswith('.yml'): continue # Standard prefix matching matched = False for prefix in self.prefixes: if filename.startswith(prefix): self.matched_files[prefix].append(filename) matched = True break # Add to unmatched if not matched by any prefix if not matched: # Get the first segment for initial grouping parts = filename.split('.') potential_prefix = parts[0] if potential_prefix not in self.unmatched_files: self.unmatched_files[potential_prefix] = [] self.unmatched_files[potential_prefix].append(filename) except FileNotFoundError: print(f"Error: Directory {self.original_config_path} not found") exit(1) @staticmethod def _strip_metadata(content: str) -> str: """Remove uuid and _core keys from Drupal YAML config content.""" content = re.sub(r'^uuid: [a-f0-9\-]+\n', '', content, flags=re.MULTILINE) # Match exactly '_core:' at the start of a line (avoids touching wisski_core). content = re.sub(r'^_core:\n([ \t]+[^\n]+\n)+', '', content, flags=re.MULTILINE) return content def _collect_bundle_hashes(self) -> Set[str]: """Return the set of bundle hashes found in matched wisski_core.wisski_bundle files.""" bundlePrefix = "wisski_core.wisski_bundle." bundleHashes: Set[str] = set() for prefix in self.prefixes: for filename in self.matched_files.get(prefix, []): if filename.startswith(bundlePrefix) and filename.endswith(".yml"): bundleHash = filename[len(bundlePrefix):-4] bundleHashes.add(bundleHash) return bundleHashes def _copy_file(self, src: str, dst: str) -> None: """Read src, strip metadata, write to dst. Falls back to raw copy on error.""" try: with open(src, 'r') as f: content = f.read() content = self._strip_metadata(content) with open(dst, 'w') as f: f.write(content) except Exception as e: print(f"Error processing {os.path.basename(src)}: {e}") shutil.copy2(src, dst) def _copy_with_language_overrides(self, filename: str) -> int: """Copy original_config/language/{lang}/{filename} to new_recipe_config/language/{lang}/ for every language that has an override for this file. Returns count of overrides copied.""" langBase = os.path.join(self.original_config_path, "language") if not os.path.isdir(langBase): return 0 overridesCopied = 0 for langCode in os.listdir(langBase): srcLangDir = os.path.join(langBase, langCode) if not os.path.isdir(srcLangDir): continue srcOverride = os.path.join(srcLangDir, filename) if not os.path.isfile(srcOverride): continue dstLangDir = os.path.join(self.config_path, "language", langCode) os.makedirs(dstLangDir, exist_ok=True) self._copy_file(srcOverride, os.path.join(dstLangDir, filename)) overridesCopied += 1 return overridesCopied def copy_matched_files(self) -> None: """Copy all matched files from original_config to config, removing UUID and _core structures.""" if not os.path.exists(self.config_path): os.makedirs(self.config_path) copiedCount = 0 processedCount = 0 overrideCopied = 0 for prefix in self.prefixes: for filename in self.matched_files[prefix]: src = os.path.join(self.original_config_path, filename) dst = os.path.join(self.config_path, filename) try: with open(src, 'r') as f: content = f.read() content = self._strip_metadata(content) with open(dst, 'w') as f: f.write(content) processedCount += 1 except Exception as e: print(f"Error processing {filename}: {e}") shutil.copy2(src, dst) copiedCount += 1 overrideCopied += self._copy_with_language_overrides(filename) # For every matched bundle, also copy its language.content_settings counterpart. langCopied = 0 for bundleHash in self._collect_bundle_hashes(): langFilename = f"language.content_settings.wisski_individual.{bundleHash}.yml" langSrc = os.path.join(self.original_config_path, langFilename) langDst = os.path.join(self.config_path, langFilename) if os.path.exists(langSrc): self._copy_file(langSrc, langDst) langCopied += 1 overrideCopied += self._copy_with_language_overrides(langFilename) print( f"Copied {copiedCount} files to {self.config_path} " f"({processedCount} processed to remove UUIDs and _core structures)" ) if langCopied: print(f"Also copied {langCopied} corresponding language.content_settings files.") if overrideCopied: print(f"Also copied {overrideCopied} language override files from original_config/language/.") pruned_export = self._prune_new_recipe_not_in_original() if pruned_export: print( f"Removed {pruned_export} file(s) from new_recipe_config that are not in original_config " f"(stale vs current export)." ) pruned = self._prune_stale_language_overrides() if pruned: print(f"Removed {pruned} stale language override file(s) (no longer in export or missing base config).") @staticmethod def _remove_empty_language_dirs(config_root: str) -> None: """Remove empty language// and language/ under config_root if possible.""" lang_root = os.path.join(config_root, "language") if not os.path.isdir(lang_root): return for lang_code in list(os.listdir(lang_root)): lang_dir = os.path.join(lang_root, lang_code) if not os.path.isdir(lang_dir): continue try: if not os.listdir(lang_dir): os.rmdir(lang_dir) except OSError: pass try: if not os.listdir(lang_root): os.rmdir(lang_root) except OSError: pass def _prune_new_recipe_not_in_original(self) -> int: """Delete YAML under config_path whose relative path is absent from original_config. Skips when original_config has no YAML (avoid wiping new_recipe before an export). """ orig_paths = self._collect_yaml_relpaths(self.original_config_path) if not orig_paths: return 0 new_paths = self._collect_yaml_relpaths(self.config_path) orphans = new_paths - orig_paths removed = 0 for rel in sorted(orphans): path = self._abs_config_path(self.config_path, rel) try: if os.path.isfile(path): os.remove(path) removed += 1 except OSError: pass self._remove_empty_language_dirs(self.config_path) return removed def _prune_stale_language_overrides(self) -> int: """Remove language/*/*.yml in config_path that should not be kept. Drops overrides that are missing from the current export (translation removed on the site) or whose base config is not present under config_path (orphans from earlier copies). Without this, compare would not list those as deleted. """ removed = 0 lang_dst_root = os.path.join(self.config_path, "language") if not os.path.isdir(lang_dst_root): return 0 lang_src_root = os.path.join(self.original_config_path, "language") for lang_code in list(os.listdir(lang_dst_root)): dst_lang_dir = os.path.join(lang_dst_root, lang_code) if not os.path.isdir(dst_lang_dir): continue src_lang_dir = os.path.join(lang_src_root, lang_code) for name in list(os.listdir(dst_lang_dir)): if not name.endswith(".yml"): continue base_fp = os.path.join(self.config_path, name) src_fp = os.path.join(src_lang_dir, name) if os.path.isfile(base_fp) and os.path.isfile(src_fp): continue try: os.remove(os.path.join(dst_lang_dir, name)) removed += 1 except OSError: pass self._remove_empty_language_dirs(self.config_path) return removed @staticmethod def _collect_yaml_relpaths(config_root: str) -> Set[str]: """Relative paths (posix-style) for YAML under a Drupal config tree. Includes top-level *.yml and language//*.yml (same layout as exports). """ rel_paths: Set[str] = set() if not os.path.isdir(config_root): return rel_paths try: for name in os.listdir(config_root): if name.endswith(".yml"): rel_paths.add(name) lang_root = os.path.join(config_root, "language") if os.path.isdir(lang_root): for lang_code in os.listdir(lang_root): lang_dir = os.path.join(lang_root, lang_code) if not os.path.isdir(lang_dir): continue for name in os.listdir(lang_dir): if name.endswith(".yml"): rel_paths.add(f"language/{lang_code}/{name}") except OSError: pass return rel_paths def _abs_config_path(self, config_root: str, rel_path: str) -> str: """Join config root with a posix rel_path from _collect_yaml_relpaths.""" return os.path.normpath(os.path.join(config_root, *rel_path.split("/"))) def compare_and_track_changes(self) -> Tuple[int, int, int, int, int, int]: """ Compare committed recipe config (old_recipe_path) to new_recipe_config (config_path). - Copies **new** YAML (in new_recipe but not in committed recipe) to changed_files/. - Copies **modified** YAML (same path, different content) to changed_files/. - Respects ignored_files.yml (basename) for both new and modified copies. - Writes deleted_files.csv: paths in committed recipe missing from new_recipe_config. - Writes changed_files_manifest.csv: relative_path + kind (new|modified) for each copy. - Writes new_recipe_not_in_original.csv for paths in new_recipe not in original export. Returns: Tuple of (added_count, modified_count, deleted_count, unchanged_count, language_deleted_count, not_in_original_count) """ _clear_dir(self.changed_files_path) os.makedirs(self.changed_files_path, exist_ok=True) # Track statistics added_count = 0 modified_count = 0 deleted_count = 0 unchanged_count = 0 ignored_count = 0 deleted_files_list = [] manifest_rows: List[Tuple[str, str]] = [] # Top-level and language//*.yml (matches Drupal export layout) old_files = self._collect_yaml_relpaths(self.old_recipe_path) new_files = self._collect_yaml_relpaths(self.config_path) # Deleted: in committed recipe but not in new_recipe_config deleted_files = old_files - new_files for rel_path in sorted(deleted_files): deleted_files_list.append(rel_path) deleted_count += 1 # Modified: same path in both, content differs common_files = old_files & new_files for rel_path in sorted(common_files): old_file_path = self._abs_config_path(self.old_recipe_path, rel_path) new_file_path = self._abs_config_path(self.config_path, rel_path) base_name = os.path.basename(rel_path) if filecmp.cmp(old_file_path, new_file_path, shallow=False): unchanged_count += 1 else: if base_name in self.ignored_files: ignored_count += 1 else: changed_dest = os.path.join(self.changed_files_path, *rel_path.split("/")) os.makedirs(os.path.dirname(changed_dest), exist_ok=True) shutil.copy2(new_file_path, changed_dest) modified_count += 1 manifest_rows.append((rel_path, "modified")) # New: in new_recipe (from original_config via prefixes) but not in committed recipe added_files = new_files - old_files for rel_path in sorted(added_files): new_file_path = self._abs_config_path(self.config_path, rel_path) base_name = os.path.basename(rel_path) if base_name in self.ignored_files: ignored_count += 1 else: changed_dest = os.path.join(self.changed_files_path, *rel_path.split("/")) os.makedirs(os.path.dirname(changed_dest), exist_ok=True) shutil.copy2(new_file_path, changed_dest) added_count += 1 manifest_rows.append((rel_path, "new")) with open(self.changed_files_manifest_csv, "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(["relative_path", "kind"]) for rel_path, kind in sorted(manifest_rows, key=lambda x: (x[1], x[0])): writer.writerow([rel_path, kind]) # Write deleted files to CSV with open(self.deleted_files_csv, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(['filename']) # Header for filename in deleted_files_list: writer.writerow([filename]) if ignored_count > 0: print( f"Ignored {ignored_count} new/modified file(s) as per ignored_files.yml " "(not copied to changed_files/)." ) total_copied = added_count + modified_count if total_copied: print( f"Copied {total_copied} file(s) to changed_files/ " f"({added_count} new vs committed recipe, {modified_count} modified)." ) language_deleted_count = sum(1 for p in deleted_files_list if p.startswith("language/")) if language_deleted_count: print( f"Deleted list includes {language_deleted_count} language override path(s) " f"(see deleted_files.csv under language//)." ) orig_paths = self._collect_yaml_relpaths(self.original_config_path) new_paths = self._collect_yaml_relpaths(self.config_path) not_in_original = sorted(new_paths - orig_paths) not_in_original_count = len(not_in_original) with open(self.new_recipe_not_in_original_csv, "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(["filename"]) for rel_path in not_in_original: writer.writerow([rel_path]) if not_in_original_count: print( f"{not_in_original_count} path(s) in new_recipe_config are not in original_config " f"(see {os.path.basename(self.new_recipe_not_in_original_csv)})." ) elif not orig_paths: print("Warning: original_config has no YAML; orphan CSV is empty.") return ( added_count, modified_count, deleted_count, unchanged_count, language_deleted_count, not_in_original_count, ) def add_prefix(self, prefix: str) -> None: """Add a new prefix to the list if it doesn't exist.""" if prefix and prefix not in self.prefixes: self.prefixes.append(prefix) self.scan_configs() def remove_prefix(self, prefix: str) -> None: """Remove a prefix from the list if it exists.""" if prefix in self.prefixes: self.prefixes.remove(prefix) self.scan_configs() def get_summary(self) -> Tuple[int, int]: """Return the count of matched and unmatched files.""" matched_count = sum(len(files) for files in self.matched_files.values()) unmatched_count = sum(len(files) for files in self.unmatched_files.values()) return matched_count, unmatched_count def get_prefix_segments_at_level(all_prefixes: List[str], current_prefix: str, level: int, selector=None, view_mode=None) -> List[str]: """ Get unique prefix segments at the specified level. Args: all_prefixes: List of all prefixes current_prefix: Current prefix we're viewing (e.g., "field.storage") level: The level we want to see (0 = root level) selector: The ConfigSelector instance (needed for unmatched files) view_mode: Current view mode ("matched" or "unmatched") Returns: List of unique prefix segments at this level """ # Current prefix parts (e.g., ["field", "storage"]) current_parts = current_prefix.split('.') if current_prefix else [] unique_prefixes = set() # Special case for unmatched view with level > 0 if view_mode == "unmatched" and selector: # Need to scan actual filenames for deeper levels for prefix, filenames in selector.unmatched_files.items(): for filename in filenames: # Skip .yml extension for determining parts if filename.endswith('.yml'): filename = filename[:-4] parts = filename.split('.') # Skip if not enough parts for this level if len(parts) <= level: continue # Skip if prefix doesn't match current path if current_prefix: filename_prefix = '.'.join(parts[:len(current_parts)]) if filename_prefix != current_prefix: continue # Add the segment at this level prefix_at_level = '.'.join(parts[:level+1]) unique_prefixes.add(prefix_at_level) return sorted(list(unique_prefixes)) # Standard processing for matched view elif view_mode == "matched": for prefix in all_prefixes: # Split each prefix into parts parts = prefix.split('.') # Skip if not enough parts for the level if len(parts) <= level: continue # If we're looking at a nested level, only include prefixes that match the current path if current_prefix: # Skip if this prefix doesn't start with the current prefix if not prefix.startswith(current_prefix + '.') and prefix != current_prefix: continue # If current level is within current_prefix, it should match exactly if level < len(current_parts) and parts[level] != current_parts[level]: continue # Build the prefix up to this level prefix_at_level = '.'.join(parts[:level+1]) unique_prefixes.add(prefix_at_level) # Basic processing for all other cases else: for prefix in all_prefixes: # Split each prefix into parts parts = prefix.split('.') # Skip if not enough parts for the level if len(parts) <= level: continue # If we're looking at a nested level, only include prefixes that match the current path if current_prefix: # Skip if this prefix doesn't start with the current prefix if not prefix.startswith(current_prefix + '.') and prefix != current_prefix: continue # If current level is within current_prefix, it should match exactly if level < len(current_parts) and parts[level] != current_parts[level]: continue # Build the prefix up to this level prefix_at_level = '.'.join(parts[:level+1]) unique_prefixes.add(prefix_at_level) return sorted(list(unique_prefixes)) def get_files_for_segment(selector: ConfigSelector, segment: str, view_mode: str) -> List[str]: """Get files that match the given segment in the current view.""" files = [] if view_mode == "matched": # For matched files, we need to check if this segment is an exact prefix or a partial one if segment in selector.prefixes: # Exact match: include all files for this prefix files.extend(selector.matched_files[segment]) else: # Partial match: include files from all prefixes that start with this segment for prefix in selector.prefixes: if prefix == segment or prefix.startswith(segment + '.'): files.extend(selector.matched_files[prefix]) else: # For unmatched files, collect files that match this segment segment_parts = segment.split('.') segment_len = len(segment_parts) for prefix, file_list in selector.unmatched_files.items(): # For root level prefixes (without dots) if '.' not in prefix and segment_len == 1 and prefix == segment: files.extend(file_list) continue # For multi-level prefixes and filenames for filename in file_list: # Get filename without .yml extension for comparison filename_no_ext = filename if filename.endswith('.yml'): filename_no_ext = filename[:-4] filename_parts = filename_no_ext.split('.') # Check if this filename matches the segment up to the segment's length if len(filename_parts) >= segment_len: if '.'.join(filename_parts[:segment_len]) == segment: files.append(filename) return sorted(list(set(files))) def draw_menu(stdscr, selector: ConfigSelector, selected_idx: int, view_mode: str, current_prefix: str = "", level: int = 0): """Draw the TUI menu.""" curses.curs_set(0) stdscr.clear() h, w = stdscr.getmaxyx() # Colors curses.start_color() curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Selected item curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # Matched curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # Unmatched curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Titles # Title title = "Drupal Configuration Selector" stdscr.addstr(0, (w - len(title)) // 2, title, curses.color_pair(4) | curses.A_BOLD) # Summary matched_count, unmatched_count = selector.get_summary() summary = f"Matched: {matched_count} files | Unmatched: {unmatched_count} files" stdscr.addstr(1, (w - len(summary)) // 2, summary) # Current path display if current_prefix: path_display = f"Current path: {current_prefix}" stdscr.addstr(2, 2, path_display) # Instructions stdscr.addstr(h-4, 2, "LEFT/RIGHT: Navigate tree levels | SPACE: Toggle view | ENTER: Select files") stdscr.addstr(h-3, 2, "A: Add prefix | O: Choose | D: Add/Del | C: Copy | M: Compare | S: Save | Q: Quit") # Content area start_y = 3 max_visible = h - 8 # Account for two instruction lines # Get segments at the current level if view_mode == "matched": all_prefixes = selector.prefixes else: all_prefixes = list(selector.unmatched_files.keys()) items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode) # Title based on current path if current_prefix: title = f"{view_mode.capitalize()} Prefixes under '{current_prefix}'" else: title = f"{view_mode.capitalize()} Prefixes (root level)" stdscr.addstr(start_y, 2, title, curses.color_pair(4) | curses.A_BOLD) start_y += 1 # Pagination if len(items) > max_visible: page_size = max_visible current_page = selected_idx // page_size start_idx = current_page * page_size end_idx = min(start_idx + page_size, len(items)) display_items = items[start_idx:end_idx] # Adjust selected_idx relative to the page relative_idx = selected_idx - start_idx else: display_items = items relative_idx = selected_idx if selected_idx < len(items) else 0 # Display items for i, item in enumerate(display_items): y = start_y + i # Get base name (last segment) item_base = item.split('.')[-1] # Check if there are child segments has_children = has_child_segments(item, view_mode, selector) # Get files for this item files = get_files_for_segment(selector, item, view_mode) file_count = len(files) # Format display text if has_children: item_text = f"{item_base} ({file_count} files) ▶" else: item_text = f"{item_base} ({file_count} files)" if i == relative_idx: stdscr.addstr(y, 2, item_text, curses.color_pair(1) | curses.A_BOLD) else: if view_mode == "matched": stdscr.addstr(y, 2, item_text, curses.color_pair(2)) else: stdscr.addstr(y, 2, item_text, curses.color_pair(3)) # Selected item details if items and len(items) > 0 and selected_idx < len(items): try: selected_item = items[selected_idx] detail_y = start_y + min(len(display_items), max_visible) + 1 if detail_y < h - 4: # Make sure we have room to display details stdscr.addstr(detail_y, 2, f"Files for '{selected_item}':", curses.A_BOLD) detail_y += 1 # Get files for the selected item files = get_files_for_segment(selector, selected_item, view_mode) max_files_to_show = h - detail_y - 4 if max_files_to_show > 0: sorted_files = sorted(files)[:max_files_to_show] for i, file in enumerate(sorted_files): if detail_y + i < h - 4: # Prevent writing outside window # Truncate filename if too long max_width = w - 6 # Allow for padding if len(file) > max_width: file_display = file[:max_width-3] + "..." else: file_display = file stdscr.addstr(detail_y + i, 4, file_display) if len(files) > max_files_to_show and detail_y + max_files_to_show < h - 4: stdscr.addstr(detail_y + max_files_to_show, 4, f"...and {len(files) - max_files_to_show} more") except Exception as e: # In case of error, just don't display details error_msg = f"Error displaying file details: {str(e)}" if h > 5: stdscr.addstr(h-4, 2, error_msg[:w-4]) stdscr.refresh() return len(items) def add_prefix_dialog(stdscr): """Show dialog to add a new prefix.""" h, w = stdscr.getmaxyx() # Safety checks for minimum terminal size min_h, min_w = 7, 20 if h < min_h or w < min_w: return "" # Terminal too small dialog_h, dialog_w = min(5, h - 2), min(50, w - 4) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return "" # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.box() dialog_win.addstr(1, 2, "Enter new prefix (ESC or empty input to cancel):") dialog_win.refresh() # Input field - ensure it fits within the dialog input_w = max(1, dialog_w - 6) input_win = curses.newwin(1, input_w, dialog_y + 2, dialog_x + 3) input_win.clear() # Setup input mode curses.echo() curses.curs_set(1) input_win.keypad(True) # Enable special keys # Collect input character by character to handle escape key prefix = "" input_win.refresh() while True: try: key = input_win.getch() # Handle escape key if key == 27: # ASCII code for Escape prefix = "" break # Handle enter key elif key in (10, 13): # ASCII codes for Enter/Return break # Handle backspace/delete elif key in (8, 127, curses.KEY_BACKSPACE): if prefix: prefix = prefix[:-1] # Clear line and rewrite input_win.clear() input_win.addstr(0, 0, prefix) input_win.refresh() # Regular character elif 32 <= key <= 126: # Printable ASCII prefix += chr(key) except Exception: # In case of any error, just return empty string prefix = "" break # Restore normal cursor visibility and echo settings curses.noecho() curses.curs_set(0) return prefix.strip() def confirm_dialog(stdscr, message): """Show a confirmation dialog.""" h, w = stdscr.getmaxyx() # Safety checks for minimum terminal size min_h, min_w = 7, 20 if h < min_h or w < min_w: return False # Terminal too small, default to cancel dialog_h, dialog_w = min(5, h - 2), min(max(50, len(message) + 4), w - 4) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return False # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.box() # Truncate message if it's too long for the dialog max_msg_len = dialog_w - 4 display_message = message[:max_msg_len] if len(message) > max_msg_len else message dialog_win.addstr(1, 2, display_message) dialog_win.addstr(3, 2, "Press Y to confirm, any other key to cancel") dialog_win.refresh() # Get input key = stdscr.getch() return key == ord('y') or key == ord('Y') def choose_prefix_dialog(stdscr, selector: ConfigSelector): """Show dialog to choose a prefix from unmatched prefixes.""" h, w = stdscr.getmaxyx() # Get sorted list of unmatched prefixes unmatched_prefixes = sorted(selector.unmatched_files.keys()) if not unmatched_prefixes: # Show message if no unmatched prefixes - ensure minimum window size msg_h = min(5, h - 2) msg_w = min(40, w - 4) if msg_h < 3 or msg_w < 10: return None # Terminal too small message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2)) message_win.box() message_win.addstr(1, 2, "No unmatched prefixes available.") message_win.refresh() message_win.getch() return None # Calculate dialog dimensions with safety checks min_dialog_h = 6 # Minimum height for usable dialog min_dialog_w = 30 # Minimum width for usable dialog # Ensure we have enough terminal space if h < min_dialog_h + 2 or w < min_dialog_w + 4: return None # Terminal too small dialog_h = min(max(min_dialog_h, min(15, len(unmatched_prefixes) + 4)), h - 2) # +4 for title, instructions, and border dialog_w = min(max(min_dialog_w, 60), w - 2) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return None # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.keypad(True) # Enable arrow keys # Initialize selection variables selected_idx = 0 current_page = 0 items_per_page = dialog_h - 4 # -4 for title, instructions, and border # Draw function def draw_prefix_list(): dialog_win.clear() dialog_win.box() dialog_win.addstr(1, 2, "Choose a prefix to add (ESC to cancel):") # Calculate page items start_idx = current_page * items_per_page end_idx = min(start_idx + items_per_page, len(unmatched_prefixes)) # Display prefixes with selection highlight for i, prefix in enumerate(unmatched_prefixes[start_idx:end_idx]): file_count = len(selector.unmatched_files[prefix]) item_text = f"{prefix} ({file_count} files)" if i + start_idx == selected_idx: dialog_win.addstr(i + 2, 4, item_text, curses.A_REVERSE) else: dialog_win.addstr(i + 2, 4, item_text) # Show pagination info if needed if len(unmatched_prefixes) > items_per_page: page_info = f"Page {current_page + 1}/{(len(unmatched_prefixes) - 1) // items_per_page + 1}" dialog_win.addstr(dialog_h - 1, dialog_w - len(page_info) - 2, page_info) dialog_win.refresh() # Main interaction loop while True: draw_prefix_list() key = dialog_win.getch() if key == 27: # ESC return None elif key == curses.KEY_UP and selected_idx > 0: selected_idx -= 1 if selected_idx < current_page * items_per_page: current_page -= 1 elif key == curses.KEY_DOWN and selected_idx < len(unmatched_prefixes) - 1: selected_idx += 1 if selected_idx >= (current_page + 1) * items_per_page: current_page += 1 elif key in (10, 13): # Enter return unmatched_prefixes[selected_idx] elif key == curses.KEY_NPAGE and current_page < (len(unmatched_prefixes) - 1) // items_per_page: # Page Down current_page += 1 selected_idx = min(selected_idx + items_per_page, len(unmatched_prefixes) - 1) elif key == curses.KEY_PPAGE and current_page > 0: # Page Up current_page -= 1 selected_idx = max(selected_idx - items_per_page, 0) def select_individual_files_dialog(stdscr, selector: ConfigSelector, prefix: str, view_mode: str): """Show dialog to select individual files from a prefix.""" h, w = stdscr.getmaxyx() # Get the list of files for this prefix if view_mode == "matched": files = selector.matched_files.get(prefix, []) title = f"Select files to remove from '{prefix}'" action_text = "SPACE: Toggle selection | ENTER: Confirm | ESC: Cancel" else: files = selector.unmatched_files.get(prefix, []) title = f"Select files to add from '{prefix}'" action_text = "SPACE: Toggle selection | ENTER: Add selected | ESC: Cancel" if not files: # Show message if no files - ensure minimum window size msg_h = min(5, h - 2) msg_w = min(40, w - 4) if msg_h < 3 or msg_w < 10: return [] # Terminal too small message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2)) message_win.box() message_win.addstr(1, 2, "No files available.") message_win.refresh() message_win.getch() return [] # Sort files for consistent display files = sorted(files) # Track selected files selected_files = set() # Calculate dialog dimensions with safety checks min_dialog_h = 8 # Minimum height for usable dialog min_dialog_w = 30 # Minimum width for usable dialog # Ensure we have enough terminal space if h < min_dialog_h + 2 or w < min_dialog_w + 4: return [] # Terminal too small dialog_h = min(max(min_dialog_h, min(20, len(files) + 5)), h - 2) # +5 for title, instructions, pagination, and border dialog_w = min(max(min_dialog_w, min(w - 4, 80)), w - 2) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return [] # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.keypad(True) # Enable arrow keys # Initialize selection variables selected_idx = 0 current_page = 0 items_per_page = dialog_h - 5 # -5 for title, instructions, actions, and border # Draw function def draw_file_list(): dialog_win.clear() dialog_win.box() dialog_win.addstr(1, 2, title, curses.A_BOLD) dialog_win.addstr(dialog_h - 2, 2, action_text) # Calculate page items start_idx = current_page * items_per_page end_idx = min(start_idx + items_per_page, len(files)) # Display files with selection highlight and checkbox for i, file in enumerate(files[start_idx:end_idx]): y = i + 3 # +3 for title, blank line, and border # Create checkbox display checkbox = "[X]" if file in selected_files else "[ ]" # Truncate filename if too long max_width = dialog_w - 8 # Allow for padding and checkbox if len(file) > max_width: file_display = file[:max_width-3] + "..." else: file_display = file item_text = f"{checkbox} {file_display}" if i + start_idx == selected_idx: dialog_win.addstr(y, 2, item_text, curses.A_REVERSE) else: dialog_win.addstr(y, 2, item_text) # Show pagination info if needed if len(files) > items_per_page: page_info = f"Page {current_page + 1}/{(len(files) - 1) // items_per_page + 1}" dialog_win.addstr(dialog_h - 3, dialog_w - len(page_info) - 2, page_info) dialog_win.refresh() # Main interaction loop while True: draw_file_list() key = dialog_win.getch() if key == 27: # ESC return [] elif key == curses.KEY_UP and selected_idx > 0: selected_idx -= 1 if selected_idx < current_page * items_per_page: current_page -= 1 elif key == curses.KEY_DOWN and selected_idx < len(files) - 1: selected_idx += 1 if selected_idx >= (current_page + 1) * items_per_page: current_page += 1 elif key == ord(' '): # SPACE to toggle selection current_file = files[selected_idx] if current_file in selected_files: selected_files.remove(current_file) else: selected_files.add(current_file) elif key in (10, 13): # Enter to confirm return list(selected_files) elif key == curses.KEY_NPAGE and current_page < (len(files) - 1) // items_per_page: # Page Down current_page += 1 selected_idx = min(selected_idx + items_per_page, len(files) - 1) elif key == curses.KEY_PPAGE and current_page > 0: # Page Up current_page -= 1 selected_idx = max(selected_idx - items_per_page, 0) def has_child_segments(item, mode, selector): """ Check if the given item has child segments. Args: item: Current prefix path (e.g., "field.storage") mode: View mode ("matched" or "unmatched") selector: The ConfigSelector instance Returns: True if child segments exist, False otherwise """ if mode == "matched": # For matched prefixes for prefix in selector.prefixes: if prefix.startswith(item + '.'): return True return False else: # For unmatched files, check if any filenames have one more segment selected_segments = item.split('.') segment_count = len(selected_segments) # Look through all files in all unmatched prefixes for prefix, file_list in selector.unmatched_files.items(): for filename in file_list: # Remove .yml extension for better segment counting if filename.endswith('.yml'): filename = filename[:-4] filename_parts = filename.split('.') # Check if this file has this prefix plus at least one more segment if (len(filename_parts) > segment_count and '.'.join(filename_parts[:segment_count]) == item): return True return False def preview_yaml_cleaning(stdscr, content): """Show a simplified schematic preview of the YAML cleaning process.""" h, w = stdscr.getmaxyx() # Safety checks for minimum terminal size min_h, min_w = 10, 30 if h < min_h or w < min_w: return # Terminal too small # Calculate preview window size preview_h = min(max(8, min(12, h - 8)), h - 2) # Smaller height preview_w = min(max(30, min(70, w - 4)), w - 2) preview_y = max(0, (h - preview_h) // 2) preview_x = max(0, (w - preview_w) // 2) # Final safety check if preview_h <= 0 or preview_w <= 0 or preview_y < 0 or preview_x < 0: return # Create window preview_win = curses.newwin(preview_h, preview_w, preview_y, preview_x) preview_win.box() preview_win.addstr(1, 2, "YAML Cleaning Preview:", curses.A_BOLD) # Create simple schematic example original = """langcode: en status: true uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae dependencies: module: - node _core: default_config_hash: fUksROt4FfkAU9BV4hV2XvhTBSS2nTNrZS4U7S-tKrs id: example name: Example""" cleaned = """langcode: en status: true dependencies: module: - node id: example name: Example""" # Display side by side # Show original preview_win.addstr(3, 2, "Original:", curses.A_UNDERLINE) preview_win.addstr(4, 2, "uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae", curses.A_BOLD) preview_win.addstr(5, 2, "_core:", curses.A_BOLD) preview_win.addstr(6, 2, " default_config_hash: fUksROt4F...", curses.A_BOLD) preview_win.addstr(7, 2, "other YAML content...") # Show arrow arrow_x = preview_w // 2 - 2 for i in range(3, preview_h - 3): preview_win.addstr(i, arrow_x, "→") # Show cleaned preview_win.addstr(3, arrow_x + 4, "Cleaned:", curses.A_UNDERLINE) preview_win.addstr(5, arrow_x + 4, "UUID and _core removed") preview_win.addstr(7, arrow_x + 4, "other YAML content preserved") # Instructions preview_win.addstr(preview_h - 2, 2, "Press any key to continue...", curses.A_DIM) preview_win.refresh() # Wait for key stdscr.getch() def confirm_yaml_cleaning(stdscr, selector): """Show a confirmation dialog for YAML cleaning during copy.""" h, w = stdscr.getmaxyx() # Get a sample file to preview sample_file = None for prefix in selector.prefixes: if selector.matched_files[prefix]: sample_file = os.path.join(selector.original_config_path, selector.matched_files[prefix][0]) break if sample_file and os.path.exists(sample_file): try: with open(sample_file, 'r') as file: content = file.read() preview_yaml_cleaning(stdscr, content) except Exception as e: # In case of error, just show a simple message error_msg = f"Error reading sample file: {str(e)}" # Safety checks for window creation if h >= 5 and w >= 20: msg_h = min(3, h - 2) msg_w = min(len(error_msg) + 4, w - 4) if msg_w < 10: msg_w = min(20, w - 4) msg_y = max(0, (h - msg_h) // 2) msg_x = max(0, (w - msg_w) // 2) if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0: message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x) message_win.box() display_msg = error_msg[:msg_w-4] if len(error_msg) > msg_w-4 else error_msg message_win.addstr(1, 2, display_msg) message_win.refresh() message_win.getch() return confirm_dialog(stdscr, "Copy all matched files to config directory and remove UUID and _core?") def show_comparison_results( stdscr, added_count: int, modified_count: int, deleted_count: int, unchanged_count: int, language_deleted_count: int = 0, not_in_original_count: int = 0, ): """Show the results of file comparison.""" h, w = stdscr.getmaxyx() dialog_w = 60 total_out = added_count + modified_count # Rows from 3: 5 stats + optional notes + blank + 2 CSV hints; footer at dialog_h - 2 lines = ( 5 + (1 if not_in_original_count else 0) + (1 if language_deleted_count else 0) + 1 + 2 ) last_content_row = 2 + lines dialog_h = min(h - 2, max(14, last_content_row + 3)) dialog_y = (h - dialog_h) // 2 dialog_x = (w - dialog_w) // 2 # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.box() dialog_win.addstr(1, 2, "File Comparison Results", curses.A_BOLD) row = 3 dialog_win.addstr(row, 4, f"New (not in committed recipe): {added_count}"[: dialog_w - 6]) row += 1 dialog_win.addstr(row, 4, f"Modified vs committed: {modified_count}"[: dialog_w - 6]) row += 1 dialog_win.addstr(row, 4, f"Total → changed_files/: {total_out}"[: dialog_w - 6]) row += 1 deleted_line = f"Deleted (recipe, missing in new): {deleted_count}" if language_deleted_count: deleted_line += f" ({language_deleted_count} language/)" dialog_win.addstr(row, 4, deleted_line[: dialog_w - 6]) row += 1 dialog_win.addstr(row, 4, f"Unchanged: {unchanged_count}"[: dialog_w - 6]) row += 1 if not_in_original_count: dialog_win.addstr( row, 4, f"Not in original export: {not_in_original_count} (new_recipe_not_in_original.csv)"[ : dialog_w - 6 ], ) row += 1 if language_deleted_count: dialog_win.addstr(row, 4, "Language deletions: paths under language/… in deleted CSV") row += 1 row += 1 dialog_win.addstr(row, 4, "Manifest → changed_files_manifest.csv") row += 1 dialog_win.addstr(row, 4, "Deletions → deleted_files.csv") dialog_win.addstr(dialog_h - 2, 2, "Press any key to continue...") dialog_win.refresh() # Wait for key stdscr.getch() def main(stdscr): # Set paths relative to current directory. jsonPath = os.path.join(BASE_DIR, "config_prefixes.json") configPath = os.path.join(BASE_DIR, "new_recipe_config") # Create directories if they don't exist. os.makedirs(ORIGINAL_CONFIG_DIR, exist_ok=True) os.makedirs(configPath, exist_ok=True) # Check if we need to create default JSON file. if not os.path.exists(jsonPath): with open(jsonPath, "w") as f: json.dump({"prefixes": ["put.your.prefixes.here"]}, f, indent=2) print(f"Created default {jsonPath}") # Initialize selector. selector = ConfigSelector( json_path=jsonPath, original_config_path=ORIGINAL_CONFIG_DIR, config_path=configPath, old_recipe_path=RECIPE_CONFIG_DIR, ) # Menu state selected_idx = 0 view_mode = "matched" # "matched" or "unmatched" current_prefix = "" # Current prefix path, e.g. "field.storage" level = 0 # Current hierarchical level # Navigation history for going back path_history = [] # Main loop while True: # Get segments at the current level for the current view all_prefixes = selector.prefixes if view_mode == "matched" else list(selector.unmatched_files.keys()) items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode) # Draw the menu item_count = draw_menu(stdscr, selector, selected_idx, view_mode, current_prefix, level) if item_count == 0: key = stdscr.getch() if key == ord('q') or key == ord('Q'): break elif key == ord(' '): view_mode = "unmatched" if view_mode == "matched" else "matched" selected_idx = 0 # Reset navigation when toggling view current_prefix = "" level = 0 path_history = [] elif key == ord('a') or key == ord('A'): new_prefix = add_prefix_dialog(stdscr) if new_prefix: selector.add_prefix(new_prefix) elif key == ord('o') or key == ord('O'): new_prefix = choose_prefix_dialog(stdscr, selector) if new_prefix: selector.add_prefix(new_prefix) elif key == curses.KEY_LEFT and (current_prefix or level > 0): # Go back one level if level > 0: level -= 1 if current_prefix: # Remove last segment parts = current_prefix.split('.') current_prefix = '.'.join(parts[:-1]) selected_idx = 0 elif path_history: # Pop from history prev_prefix, prev_level, prev_idx = path_history.pop() current_prefix = prev_prefix level = prev_level selected_idx = prev_idx continue key = stdscr.getch() if key == ord('q') or key == ord('Q'): break elif key == curses.KEY_UP and selected_idx > 0: selected_idx -= 1 elif key == curses.KEY_DOWN and selected_idx < item_count - 1: selected_idx += 1 elif key == ord(' '): view_mode = "unmatched" if view_mode == "matched" else "matched" selected_idx = 0 # Reset navigation when toggling view current_prefix = "" level = 0 path_history = [] elif key == curses.KEY_RIGHT: # Drill down into the selected item if selected_idx < len(items): selected_item = items[selected_idx] # Check if there are child segments if has_child_segments(selected_item, view_mode, selector): # Save current state to history path_history.append((current_prefix, level, selected_idx)) # Update current prefix and level current_prefix = selected_item level += 1 selected_idx = 0 elif key == curses.KEY_LEFT and (current_prefix or level > 0): # Go back one level if level > 0: level -= 1 if current_prefix: # Remove last segment parts = current_prefix.split('.') current_prefix = '.'.join(parts[:-1]) selected_idx = 0 elif path_history: # Pop from history prev_prefix, prev_level, prev_idx = path_history.pop() current_prefix = prev_prefix level = prev_level selected_idx = prev_idx elif key == ord('a') or key == ord('A'): new_prefix = add_prefix_dialog(stdscr) if new_prefix: selector.add_prefix(new_prefix) elif key == ord('o') or key == ord('O'): new_prefix = choose_prefix_dialog(stdscr, selector) if new_prefix: selector.add_prefix(new_prefix) elif key == ord('d') or key == ord('D'): if view_mode == "matched" and len(items) > 0 and selected_idx < len(items): selected_item = items[selected_idx] # Only allow deleting complete prefixes if selected_item in selector.prefixes: if confirm_dialog(stdscr, f"Remove prefix '{selected_item}'?"): selector.remove_prefix(selected_item) if selected_idx >= len(items) - 1: selected_idx = max(0, len(items) - 1) else: # Allow adding the current selection (partial prefix) directly if confirm_dialog(stdscr, f"Add current path '{selected_item}' as a matched prefix?"): selector.add_prefix(selected_item) view_mode = "matched" current_prefix = "" level = 0 path_history = [] selected_idx = 0 elif view_mode == "unmatched" and len(items) > 0 and selected_idx < len(items): # Add the item to matched prefixes selected_item = items[selected_idx] if confirm_dialog(stdscr, f"Add '{selected_item}' to matched prefixes?"): selector.add_prefix(selected_item) view_mode = "matched" current_prefix = "" level = 0 path_history = [] selected_idx = 0 elif key == ord('c') or key == ord('C'): if confirm_yaml_cleaning(stdscr, selector): selector.copy_matched_files() elif key == ord('m') or key == ord('M'): if confirm_dialog(stdscr, "Compare old and new recipe configs and track changes?"): ( added_count, modified_count, deleted_count, unchanged_count, language_deleted_count, not_in_original_count, ) = selector.compare_and_track_changes() show_comparison_results( stdscr, added_count, modified_count, deleted_count, unchanged_count, language_deleted_count, not_in_original_count, ) elif key == ord('s') or key == ord('S'): selector.save_prefixes() elif key in (10, 13): # ENTER key to select individual files if len(items) > 0 and selected_idx < len(items): selected_item = items[selected_idx] files = get_files_for_segment(selector, selected_item, view_mode) if files: if view_mode == "matched": # Check if this is a complete prefix in matched if selected_item in selector.prefixes: selected_files = select_individual_files_dialog(stdscr, selector, selected_item, view_mode) if selected_files and confirm_dialog(stdscr, f"Remove {len(selected_files)} files from matched?"): # Remove individual files from matched_files for file in selected_files: if file in selector.matched_files[selected_item]: selector.matched_files[selected_item].remove(file) else: # Show message that you can only select files from complete prefixes h, w = stdscr.getmaxyx() # Safety checks for window creation if h >= 5 and w >= 30: msg_h = min(3, h - 2) msg_w = min(60, w - 4) if msg_w < 20: msg_w = min(30, w - 4) msg_y = max(0, (h - msg_h) // 2) msg_x = max(0, (w - msg_w) // 2) if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0: message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x) message_win.box() message_win.addstr(1, 2, "Can only select files from complete prefixes.") message_win.refresh() message_win.getch() else: # Unmatched view # Find a matching prefix to add files to target_prefix = None for prefix in selector.prefixes: if selected_item.startswith(prefix) or prefix.startswith(selected_item): target_prefix = prefix break if not target_prefix: if confirm_dialog(stdscr, f"Add prefix '{selected_item}' to matched prefixes?"): selector.add_prefix(selected_item) target_prefix = selected_item if target_prefix: file_selection = select_individual_files_dialog(stdscr, selector, selected_item, view_mode) if file_selection: # Add individual files to matched_files for file in file_selection: if file not in selector.matched_files[target_prefix]: selector.matched_files[target_prefix].append(file) if __name__ == "__main__": forceIngest = "--force" in sys.argv ingest_config(force=forceIngest) try: curses.wrapper(main) except KeyboardInterrupt: print("Program terminated by user")