#!/usr/bin/env python3 import os import json import shutil import curses import re import csv import filecmp import yaml from typing import List, Dict, Set, Tuple, Optional # Use current directory instead of hardcoded path BASE_DIR = os.getcwd() class ConfigSelector: def __init__(self, json_path: str, original_config_path: str, config_path: str, old_recipe_path: str = None, changed_files_path: str = None, deleted_files_csv: str = None, ignored_files_yml: str = None): self.json_path = json_path self.original_config_path = original_config_path self.config_path = config_path self.old_recipe_path = old_recipe_path or os.path.join(BASE_DIR, "old_recipe_config") self.changed_files_path = changed_files_path or os.path.join(BASE_DIR, "changed_files") self.deleted_files_csv = deleted_files_csv or os.path.join(BASE_DIR, "deleted_files.csv") self.ignored_files_yml = ignored_files_yml or os.path.join(BASE_DIR, "ignored_files.yml") self.prefixes: List[str] = [] self.matched_files: Dict[str, List[str]] = {} self.unmatched_files: Dict[str, List[str]] = {} self.ignored_files: Set[str] = set() self.load_prefixes() self.load_ignored_files() self.scan_configs() def load_prefixes(self) -> None: """Load file prefixes from JSON file.""" try: with open(self.json_path, 'r') as f: data = json.load(f) if isinstance(data, list): self.prefixes = data elif isinstance(data, dict) and 'prefixes' in data: self.prefixes = data['prefixes'] else: print("Error: JSON file should contain a list of prefixes or a dict with 'prefixes' key") exit(1) except (json.JSONDecodeError, FileNotFoundError) as e: print(f"Error loading JSON file: {e}") exit(1) def load_ignored_files(self) -> None: """Load ignored file names from YAML file.""" self.ignored_files = set() if not os.path.exists(self.ignored_files_yml): return try: with open(self.ignored_files_yml, 'r') as f: data = yaml.safe_load(f) if data: # Iterate through all categories in the YAML. for category, info in data.items(): if isinstance(info, dict) and 'name' in info: self.ignored_files.add(info['name']) except (yaml.YAMLError, FileNotFoundError) as e: print(f"Warning: Error loading ignored files YAML: {e}") def save_prefixes(self) -> None: """Save current prefixes back to JSON file.""" try: with open(self.json_path, 'w') as f: json.dump({'prefixes': self.prefixes}, f, indent=2) print(f"Prefixes saved to {self.json_path}") except Exception as e: print(f"Error saving JSON file: {e}") def scan_configs(self) -> None: """Scan config directories and categorize files based on prefixes.""" self.matched_files = {prefix: [] for prefix in self.prefixes} self.unmatched_files = {} try: # Process all files in the directory for filename in os.listdir(self.original_config_path): if not filename.endswith('.yml'): continue # Standard prefix matching matched = False for prefix in self.prefixes: if filename.startswith(prefix): self.matched_files[prefix].append(filename) matched = True break # Add to unmatched if not matched by any prefix if not matched: # Get the first segment for initial grouping parts = filename.split('.') potential_prefix = parts[0] if potential_prefix not in self.unmatched_files: self.unmatched_files[potential_prefix] = [] self.unmatched_files[potential_prefix].append(filename) except FileNotFoundError: print(f"Error: Directory {self.original_config_path} not found") exit(1) def copy_matched_files(self) -> None: """Copy all matched files from original_config to config, removing UUID and _core structures.""" if not os.path.exists(self.config_path): os.makedirs(self.config_path) copied_count = 0 processed_count = 0 for prefix in self.prefixes: for filename in self.matched_files[prefix]: src = os.path.join(self.original_config_path, filename) dst = os.path.join(self.config_path, filename) # Read the YAML file try: with open(src, 'r') as file: content = file.read() # Process the content to remove UUID and _core structures # 1. Remove uuid key-value pair (ensuring we start at beginning of line) content = re.sub(r'^uuid: [a-f0-9\-]+\n', '', content, flags=re.MULTILINE) # 2. Remove _core structure (careful not to remove wisski_core or similar) # Match exactly '_core:' at the start of a line content = re.sub(r'^_core:\n([ \t]+[^\n]+\n)+', '', content, flags=re.MULTILINE) # Write the modified content with open(dst, 'w') as file: file.write(content) processed_count += 1 except Exception as e: print(f"Error processing {filename}: {e}") # Fall back to direct copy if processing fails shutil.copy2(src, dst) copied_count += 1 print(f"Copied {copied_count} files to {self.config_path} ({processed_count} files were processed to remove UUIDs and _core structures)") def compare_and_track_changes(self) -> Tuple[int, int, int]: """ Compare files between old_recipe_config and new_recipe_config. Copy changed files to changed_files folder. Track deleted files in deleted_files.csv. Returns: Tuple of (changed_count, deleted_count, unchanged_count) """ # Ensure directories exist if not os.path.exists(self.changed_files_path): os.makedirs(self.changed_files_path) # Track statistics changed_count = 0 deleted_count = 0 unchanged_count = 0 ignored_count = 0 deleted_files_list = [] # Get all files in old_recipe_config old_files = set() if os.path.exists(self.old_recipe_path): old_files = {f for f in os.listdir(self.old_recipe_path) if f.endswith('.yml')} # Get all files in new_recipe_config new_files = set() if os.path.exists(self.config_path): new_files = {f for f in os.listdir(self.config_path) if f.endswith('.yml')} # Find deleted files (in old but not in new) deleted_files = old_files - new_files for filename in sorted(deleted_files): deleted_files_list.append(filename) deleted_count += 1 # Find common files and check for changes common_files = old_files & new_files for filename in sorted(common_files): old_file_path = os.path.join(self.old_recipe_path, filename) new_file_path = os.path.join(self.config_path, filename) # Compare file contents if filecmp.cmp(old_file_path, new_file_path, shallow=False): # Files are identical unchanged_count += 1 else: # Files have changed - check if ignored if filename in self.ignored_files: # Skip copying ignored files ignored_count += 1 else: # Copy to changed_files changed_dest = os.path.join(self.changed_files_path, filename) shutil.copy2(new_file_path, changed_dest) changed_count += 1 # Write deleted files to CSV with open(self.deleted_files_csv, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(['filename']) # Header for filename in deleted_files_list: writer.writerow([filename]) if ignored_count > 0: print(f"Ignored {ignored_count} changed file(s) as per ignored_files.yml") return changed_count, deleted_count, unchanged_count def add_prefix(self, prefix: str) -> None: """Add a new prefix to the list if it doesn't exist.""" if prefix and prefix not in self.prefixes: self.prefixes.append(prefix) self.scan_configs() def remove_prefix(self, prefix: str) -> None: """Remove a prefix from the list if it exists.""" if prefix in self.prefixes: self.prefixes.remove(prefix) self.scan_configs() def get_summary(self) -> Tuple[int, int]: """Return the count of matched and unmatched files.""" matched_count = sum(len(files) for files in self.matched_files.values()) unmatched_count = sum(len(files) for files in self.unmatched_files.values()) return matched_count, unmatched_count def get_prefix_segments_at_level(all_prefixes: List[str], current_prefix: str, level: int, selector=None, view_mode=None) -> List[str]: """ Get unique prefix segments at the specified level. Args: all_prefixes: List of all prefixes current_prefix: Current prefix we're viewing (e.g., "field.storage") level: The level we want to see (0 = root level) selector: The ConfigSelector instance (needed for unmatched files) view_mode: Current view mode ("matched" or "unmatched") Returns: List of unique prefix segments at this level """ # Current prefix parts (e.g., ["field", "storage"]) current_parts = current_prefix.split('.') if current_prefix else [] unique_prefixes = set() # Special case for unmatched view with level > 0 if view_mode == "unmatched" and selector: # Need to scan actual filenames for deeper levels for prefix, filenames in selector.unmatched_files.items(): for filename in filenames: # Skip .yml extension for determining parts if filename.endswith('.yml'): filename = filename[:-4] parts = filename.split('.') # Skip if not enough parts for this level if len(parts) <= level: continue # Skip if prefix doesn't match current path if current_prefix: filename_prefix = '.'.join(parts[:len(current_parts)]) if filename_prefix != current_prefix: continue # Add the segment at this level prefix_at_level = '.'.join(parts[:level+1]) unique_prefixes.add(prefix_at_level) return sorted(list(unique_prefixes)) # Standard processing for matched view elif view_mode == "matched": for prefix in all_prefixes: # Split each prefix into parts parts = prefix.split('.') # Skip if not enough parts for the level if len(parts) <= level: continue # If we're looking at a nested level, only include prefixes that match the current path if current_prefix: # Skip if this prefix doesn't start with the current prefix if not prefix.startswith(current_prefix + '.') and prefix != current_prefix: continue # If current level is within current_prefix, it should match exactly if level < len(current_parts) and parts[level] != current_parts[level]: continue # Build the prefix up to this level prefix_at_level = '.'.join(parts[:level+1]) unique_prefixes.add(prefix_at_level) # Basic processing for all other cases else: for prefix in all_prefixes: # Split each prefix into parts parts = prefix.split('.') # Skip if not enough parts for the level if len(parts) <= level: continue # If we're looking at a nested level, only include prefixes that match the current path if current_prefix: # Skip if this prefix doesn't start with the current prefix if not prefix.startswith(current_prefix + '.') and prefix != current_prefix: continue # If current level is within current_prefix, it should match exactly if level < len(current_parts) and parts[level] != current_parts[level]: continue # Build the prefix up to this level prefix_at_level = '.'.join(parts[:level+1]) unique_prefixes.add(prefix_at_level) return sorted(list(unique_prefixes)) def get_files_for_segment(selector: ConfigSelector, segment: str, view_mode: str) -> List[str]: """Get files that match the given segment in the current view.""" files = [] if view_mode == "matched": # For matched files, we need to check if this segment is an exact prefix or a partial one if segment in selector.prefixes: # Exact match: include all files for this prefix files.extend(selector.matched_files[segment]) else: # Partial match: include files from all prefixes that start with this segment for prefix in selector.prefixes: if prefix == segment or prefix.startswith(segment + '.'): files.extend(selector.matched_files[prefix]) else: # For unmatched files, collect files that match this segment segment_parts = segment.split('.') segment_len = len(segment_parts) for prefix, file_list in selector.unmatched_files.items(): # For root level prefixes (without dots) if '.' not in prefix and segment_len == 1 and prefix == segment: files.extend(file_list) continue # For multi-level prefixes and filenames for filename in file_list: # Get filename without .yml extension for comparison filename_no_ext = filename if filename.endswith('.yml'): filename_no_ext = filename[:-4] filename_parts = filename_no_ext.split('.') # Check if this filename matches the segment up to the segment's length if len(filename_parts) >= segment_len: if '.'.join(filename_parts[:segment_len]) == segment: files.append(filename) return sorted(list(set(files))) def draw_menu(stdscr, selector: ConfigSelector, selected_idx: int, view_mode: str, current_prefix: str = "", level: int = 0): """Draw the TUI menu.""" curses.curs_set(0) stdscr.clear() h, w = stdscr.getmaxyx() # Colors curses.start_color() curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Selected item curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # Matched curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # Unmatched curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Titles # Title title = "Drupal Configuration Selector" stdscr.addstr(0, (w - len(title)) // 2, title, curses.color_pair(4) | curses.A_BOLD) # Summary matched_count, unmatched_count = selector.get_summary() summary = f"Matched: {matched_count} files | Unmatched: {unmatched_count} files" stdscr.addstr(1, (w - len(summary)) // 2, summary) # Current path display if current_prefix: path_display = f"Current path: {current_prefix}" stdscr.addstr(2, 2, path_display) # Instructions stdscr.addstr(h-4, 2, "LEFT/RIGHT: Navigate tree levels | SPACE: Toggle view | ENTER: Select files") stdscr.addstr(h-3, 2, "A: Add prefix | O: Choose | D: Add/Del | C: Copy | M: Compare | S: Save | Q: Quit") # Content area start_y = 3 max_visible = h - 8 # Account for two instruction lines # Get segments at the current level if view_mode == "matched": all_prefixes = selector.prefixes else: all_prefixes = list(selector.unmatched_files.keys()) items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode) # Title based on current path if current_prefix: title = f"{view_mode.capitalize()} Prefixes under '{current_prefix}'" else: title = f"{view_mode.capitalize()} Prefixes (root level)" stdscr.addstr(start_y, 2, title, curses.color_pair(4) | curses.A_BOLD) start_y += 1 # Pagination if len(items) > max_visible: page_size = max_visible current_page = selected_idx // page_size start_idx = current_page * page_size end_idx = min(start_idx + page_size, len(items)) display_items = items[start_idx:end_idx] # Adjust selected_idx relative to the page relative_idx = selected_idx - start_idx else: display_items = items relative_idx = selected_idx if selected_idx < len(items) else 0 # Display items for i, item in enumerate(display_items): y = start_y + i # Get base name (last segment) item_base = item.split('.')[-1] # Check if there are child segments has_children = has_child_segments(item, view_mode, selector) # Get files for this item files = get_files_for_segment(selector, item, view_mode) file_count = len(files) # Format display text if has_children: item_text = f"{item_base} ({file_count} files) ▶" else: item_text = f"{item_base} ({file_count} files)" if i == relative_idx: stdscr.addstr(y, 2, item_text, curses.color_pair(1) | curses.A_BOLD) else: if view_mode == "matched": stdscr.addstr(y, 2, item_text, curses.color_pair(2)) else: stdscr.addstr(y, 2, item_text, curses.color_pair(3)) # Selected item details if items and len(items) > 0 and selected_idx < len(items): try: selected_item = items[selected_idx] detail_y = start_y + min(len(display_items), max_visible) + 1 if detail_y < h - 4: # Make sure we have room to display details stdscr.addstr(detail_y, 2, f"Files for '{selected_item}':", curses.A_BOLD) detail_y += 1 # Get files for the selected item files = get_files_for_segment(selector, selected_item, view_mode) max_files_to_show = h - detail_y - 4 if max_files_to_show > 0: sorted_files = sorted(files)[:max_files_to_show] for i, file in enumerate(sorted_files): if detail_y + i < h - 4: # Prevent writing outside window # Truncate filename if too long max_width = w - 6 # Allow for padding if len(file) > max_width: file_display = file[:max_width-3] + "..." else: file_display = file stdscr.addstr(detail_y + i, 4, file_display) if len(files) > max_files_to_show and detail_y + max_files_to_show < h - 4: stdscr.addstr(detail_y + max_files_to_show, 4, f"...and {len(files) - max_files_to_show} more") except Exception as e: # In case of error, just don't display details error_msg = f"Error displaying file details: {str(e)}" if h > 5: stdscr.addstr(h-4, 2, error_msg[:w-4]) stdscr.refresh() return len(items) def add_prefix_dialog(stdscr): """Show dialog to add a new prefix.""" h, w = stdscr.getmaxyx() # Safety checks for minimum terminal size min_h, min_w = 7, 20 if h < min_h or w < min_w: return "" # Terminal too small dialog_h, dialog_w = min(5, h - 2), min(50, w - 4) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return "" # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.box() dialog_win.addstr(1, 2, "Enter new prefix (ESC or empty input to cancel):") dialog_win.refresh() # Input field - ensure it fits within the dialog input_w = max(1, dialog_w - 6) input_win = curses.newwin(1, input_w, dialog_y + 2, dialog_x + 3) input_win.clear() # Setup input mode curses.echo() curses.curs_set(1) input_win.keypad(True) # Enable special keys # Collect input character by character to handle escape key prefix = "" input_win.refresh() while True: try: key = input_win.getch() # Handle escape key if key == 27: # ASCII code for Escape prefix = "" break # Handle enter key elif key in (10, 13): # ASCII codes for Enter/Return break # Handle backspace/delete elif key in (8, 127, curses.KEY_BACKSPACE): if prefix: prefix = prefix[:-1] # Clear line and rewrite input_win.clear() input_win.addstr(0, 0, prefix) input_win.refresh() # Regular character elif 32 <= key <= 126: # Printable ASCII prefix += chr(key) except Exception: # In case of any error, just return empty string prefix = "" break # Restore normal cursor visibility and echo settings curses.noecho() curses.curs_set(0) return prefix.strip() def confirm_dialog(stdscr, message): """Show a confirmation dialog.""" h, w = stdscr.getmaxyx() # Safety checks for minimum terminal size min_h, min_w = 7, 20 if h < min_h or w < min_w: return False # Terminal too small, default to cancel dialog_h, dialog_w = min(5, h - 2), min(max(50, len(message) + 4), w - 4) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return False # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.box() # Truncate message if it's too long for the dialog max_msg_len = dialog_w - 4 display_message = message[:max_msg_len] if len(message) > max_msg_len else message dialog_win.addstr(1, 2, display_message) dialog_win.addstr(3, 2, "Press Y to confirm, any other key to cancel") dialog_win.refresh() # Get input key = stdscr.getch() return key == ord('y') or key == ord('Y') def choose_prefix_dialog(stdscr, selector: ConfigSelector): """Show dialog to choose a prefix from unmatched prefixes.""" h, w = stdscr.getmaxyx() # Get sorted list of unmatched prefixes unmatched_prefixes = sorted(selector.unmatched_files.keys()) if not unmatched_prefixes: # Show message if no unmatched prefixes - ensure minimum window size msg_h = min(5, h - 2) msg_w = min(40, w - 4) if msg_h < 3 or msg_w < 10: return None # Terminal too small message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2)) message_win.box() message_win.addstr(1, 2, "No unmatched prefixes available.") message_win.refresh() message_win.getch() return None # Calculate dialog dimensions with safety checks min_dialog_h = 6 # Minimum height for usable dialog min_dialog_w = 30 # Minimum width for usable dialog # Ensure we have enough terminal space if h < min_dialog_h + 2 or w < min_dialog_w + 4: return None # Terminal too small dialog_h = min(max(min_dialog_h, min(15, len(unmatched_prefixes) + 4)), h - 2) # +4 for title, instructions, and border dialog_w = min(max(min_dialog_w, 60), w - 2) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return None # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.keypad(True) # Enable arrow keys # Initialize selection variables selected_idx = 0 current_page = 0 items_per_page = dialog_h - 4 # -4 for title, instructions, and border # Draw function def draw_prefix_list(): dialog_win.clear() dialog_win.box() dialog_win.addstr(1, 2, "Choose a prefix to add (ESC to cancel):") # Calculate page items start_idx = current_page * items_per_page end_idx = min(start_idx + items_per_page, len(unmatched_prefixes)) # Display prefixes with selection highlight for i, prefix in enumerate(unmatched_prefixes[start_idx:end_idx]): file_count = len(selector.unmatched_files[prefix]) item_text = f"{prefix} ({file_count} files)" if i + start_idx == selected_idx: dialog_win.addstr(i + 2, 4, item_text, curses.A_REVERSE) else: dialog_win.addstr(i + 2, 4, item_text) # Show pagination info if needed if len(unmatched_prefixes) > items_per_page: page_info = f"Page {current_page + 1}/{(len(unmatched_prefixes) - 1) // items_per_page + 1}" dialog_win.addstr(dialog_h - 1, dialog_w - len(page_info) - 2, page_info) dialog_win.refresh() # Main interaction loop while True: draw_prefix_list() key = dialog_win.getch() if key == 27: # ESC return None elif key == curses.KEY_UP and selected_idx > 0: selected_idx -= 1 if selected_idx < current_page * items_per_page: current_page -= 1 elif key == curses.KEY_DOWN and selected_idx < len(unmatched_prefixes) - 1: selected_idx += 1 if selected_idx >= (current_page + 1) * items_per_page: current_page += 1 elif key in (10, 13): # Enter return unmatched_prefixes[selected_idx] elif key == curses.KEY_NPAGE and current_page < (len(unmatched_prefixes) - 1) // items_per_page: # Page Down current_page += 1 selected_idx = min(selected_idx + items_per_page, len(unmatched_prefixes) - 1) elif key == curses.KEY_PPAGE and current_page > 0: # Page Up current_page -= 1 selected_idx = max(selected_idx - items_per_page, 0) def select_individual_files_dialog(stdscr, selector: ConfigSelector, prefix: str, view_mode: str): """Show dialog to select individual files from a prefix.""" h, w = stdscr.getmaxyx() # Get the list of files for this prefix if view_mode == "matched": files = selector.matched_files.get(prefix, []) title = f"Select files to remove from '{prefix}'" action_text = "SPACE: Toggle selection | ENTER: Confirm | ESC: Cancel" else: files = selector.unmatched_files.get(prefix, []) title = f"Select files to add from '{prefix}'" action_text = "SPACE: Toggle selection | ENTER: Add selected | ESC: Cancel" if not files: # Show message if no files - ensure minimum window size msg_h = min(5, h - 2) msg_w = min(40, w - 4) if msg_h < 3 or msg_w < 10: return [] # Terminal too small message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2)) message_win.box() message_win.addstr(1, 2, "No files available.") message_win.refresh() message_win.getch() return [] # Sort files for consistent display files = sorted(files) # Track selected files selected_files = set() # Calculate dialog dimensions with safety checks min_dialog_h = 8 # Minimum height for usable dialog min_dialog_w = 30 # Minimum width for usable dialog # Ensure we have enough terminal space if h < min_dialog_h + 2 or w < min_dialog_w + 4: return [] # Terminal too small dialog_h = min(max(min_dialog_h, min(20, len(files) + 5)), h - 2) # +5 for title, instructions, pagination, and border dialog_w = min(max(min_dialog_w, min(w - 4, 80)), w - 2) dialog_y = max(0, (h - dialog_h) // 2) dialog_x = max(0, (w - dialog_w) // 2) # Final safety check if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0: return [] # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.keypad(True) # Enable arrow keys # Initialize selection variables selected_idx = 0 current_page = 0 items_per_page = dialog_h - 5 # -5 for title, instructions, actions, and border # Draw function def draw_file_list(): dialog_win.clear() dialog_win.box() dialog_win.addstr(1, 2, title, curses.A_BOLD) dialog_win.addstr(dialog_h - 2, 2, action_text) # Calculate page items start_idx = current_page * items_per_page end_idx = min(start_idx + items_per_page, len(files)) # Display files with selection highlight and checkbox for i, file in enumerate(files[start_idx:end_idx]): y = i + 3 # +3 for title, blank line, and border # Create checkbox display checkbox = "[X]" if file in selected_files else "[ ]" # Truncate filename if too long max_width = dialog_w - 8 # Allow for padding and checkbox if len(file) > max_width: file_display = file[:max_width-3] + "..." else: file_display = file item_text = f"{checkbox} {file_display}" if i + start_idx == selected_idx: dialog_win.addstr(y, 2, item_text, curses.A_REVERSE) else: dialog_win.addstr(y, 2, item_text) # Show pagination info if needed if len(files) > items_per_page: page_info = f"Page {current_page + 1}/{(len(files) - 1) // items_per_page + 1}" dialog_win.addstr(dialog_h - 3, dialog_w - len(page_info) - 2, page_info) dialog_win.refresh() # Main interaction loop while True: draw_file_list() key = dialog_win.getch() if key == 27: # ESC return [] elif key == curses.KEY_UP and selected_idx > 0: selected_idx -= 1 if selected_idx < current_page * items_per_page: current_page -= 1 elif key == curses.KEY_DOWN and selected_idx < len(files) - 1: selected_idx += 1 if selected_idx >= (current_page + 1) * items_per_page: current_page += 1 elif key == ord(' '): # SPACE to toggle selection current_file = files[selected_idx] if current_file in selected_files: selected_files.remove(current_file) else: selected_files.add(current_file) elif key in (10, 13): # Enter to confirm return list(selected_files) elif key == curses.KEY_NPAGE and current_page < (len(files) - 1) // items_per_page: # Page Down current_page += 1 selected_idx = min(selected_idx + items_per_page, len(files) - 1) elif key == curses.KEY_PPAGE and current_page > 0: # Page Up current_page -= 1 selected_idx = max(selected_idx - items_per_page, 0) def has_child_segments(item, mode, selector): """ Check if the given item has child segments. Args: item: Current prefix path (e.g., "field.storage") mode: View mode ("matched" or "unmatched") selector: The ConfigSelector instance Returns: True if child segments exist, False otherwise """ if mode == "matched": # For matched prefixes for prefix in selector.prefixes: if prefix.startswith(item + '.'): return True return False else: # For unmatched files, check if any filenames have one more segment selected_segments = item.split('.') segment_count = len(selected_segments) # Look through all files in all unmatched prefixes for prefix, file_list in selector.unmatched_files.items(): for filename in file_list: # Remove .yml extension for better segment counting if filename.endswith('.yml'): filename = filename[:-4] filename_parts = filename.split('.') # Check if this file has this prefix plus at least one more segment if (len(filename_parts) > segment_count and '.'.join(filename_parts[:segment_count]) == item): return True return False def preview_yaml_cleaning(stdscr, content): """Show a simplified schematic preview of the YAML cleaning process.""" h, w = stdscr.getmaxyx() # Safety checks for minimum terminal size min_h, min_w = 10, 30 if h < min_h or w < min_w: return # Terminal too small # Calculate preview window size preview_h = min(max(8, min(12, h - 8)), h - 2) # Smaller height preview_w = min(max(30, min(70, w - 4)), w - 2) preview_y = max(0, (h - preview_h) // 2) preview_x = max(0, (w - preview_w) // 2) # Final safety check if preview_h <= 0 or preview_w <= 0 or preview_y < 0 or preview_x < 0: return # Create window preview_win = curses.newwin(preview_h, preview_w, preview_y, preview_x) preview_win.box() preview_win.addstr(1, 2, "YAML Cleaning Preview:", curses.A_BOLD) # Create simple schematic example original = """langcode: en status: true uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae dependencies: module: - node _core: default_config_hash: fUksROt4FfkAU9BV4hV2XvhTBSS2nTNrZS4U7S-tKrs id: example name: Example""" cleaned = """langcode: en status: true dependencies: module: - node id: example name: Example""" # Display side by side # Show original preview_win.addstr(3, 2, "Original:", curses.A_UNDERLINE) preview_win.addstr(4, 2, "uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae", curses.A_BOLD) preview_win.addstr(5, 2, "_core:", curses.A_BOLD) preview_win.addstr(6, 2, " default_config_hash: fUksROt4F...", curses.A_BOLD) preview_win.addstr(7, 2, "other YAML content...") # Show arrow arrow_x = preview_w // 2 - 2 for i in range(3, preview_h - 3): preview_win.addstr(i, arrow_x, "→") # Show cleaned preview_win.addstr(3, arrow_x + 4, "Cleaned:", curses.A_UNDERLINE) preview_win.addstr(5, arrow_x + 4, "UUID and _core removed") preview_win.addstr(7, arrow_x + 4, "other YAML content preserved") # Instructions preview_win.addstr(preview_h - 2, 2, "Press any key to continue...", curses.A_DIM) preview_win.refresh() # Wait for key stdscr.getch() def confirm_yaml_cleaning(stdscr, selector): """Show a confirmation dialog for YAML cleaning during copy.""" h, w = stdscr.getmaxyx() # Get a sample file to preview sample_file = None for prefix in selector.prefixes: if selector.matched_files[prefix]: sample_file = os.path.join(selector.original_config_path, selector.matched_files[prefix][0]) break if sample_file and os.path.exists(sample_file): try: with open(sample_file, 'r') as file: content = file.read() preview_yaml_cleaning(stdscr, content) except Exception as e: # In case of error, just show a simple message error_msg = f"Error reading sample file: {str(e)}" # Safety checks for window creation if h >= 5 and w >= 20: msg_h = min(3, h - 2) msg_w = min(len(error_msg) + 4, w - 4) if msg_w < 10: msg_w = min(20, w - 4) msg_y = max(0, (h - msg_h) // 2) msg_x = max(0, (w - msg_w) // 2) if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0: message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x) message_win.box() display_msg = error_msg[:msg_w-4] if len(error_msg) > msg_w-4 else error_msg message_win.addstr(1, 2, display_msg) message_win.refresh() message_win.getch() return confirm_dialog(stdscr, "Copy all matched files to config directory and remove UUID and _core?") def show_comparison_results(stdscr, changed_count, deleted_count, unchanged_count): """Show the results of file comparison.""" h, w = stdscr.getmaxyx() dialog_h, dialog_w = 10, 60 dialog_y = (h - dialog_h) // 2 dialog_x = (w - dialog_w) // 2 # Draw dialog box dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x) dialog_win.box() dialog_win.addstr(1, 2, "File Comparison Results", curses.A_BOLD) dialog_win.addstr(3, 4, f"Changed files: {changed_count}") dialog_win.addstr(4, 4, f"Deleted files: {deleted_count}") dialog_win.addstr(5, 4, f"Unchanged files: {unchanged_count}") dialog_win.addstr(7, 4, "Changed files copied to: changed_files/") dialog_win.addstr(8, 4, "Deleted files tracked in: deleted_files.csv") dialog_win.addstr(dialog_h - 2, 2, "Press any key to continue...") dialog_win.refresh() # Wait for key stdscr.getch() def main(stdscr): # Set paths relative to current directory json_path = os.path.join(BASE_DIR, "config_prefixes.json") original_config_path = os.path.join(BASE_DIR, "original_config") config_path = os.path.join(BASE_DIR, "new_recipe_config") # Create directories if they don't exist if not os.path.exists(original_config_path): os.makedirs(original_config_path) print(f"Created directory: {original_config_path}") print("Please place your source configuration files in this directory.") if not os.path.exists(config_path): os.makedirs(config_path) print(f"Created directory: {config_path}") # Check if we need to create default JSON file if not os.path.exists(json_path): with open(json_path, 'w') as f: json.dump({ "prefixes": [ "put.your.prefixes.here", ] }, f, indent=2) print(f"Created default {json_path}") # Initialize selector selector = ConfigSelector( json_path=json_path, original_config_path=original_config_path, config_path=config_path ) # Menu state selected_idx = 0 view_mode = "matched" # "matched" or "unmatched" current_prefix = "" # Current prefix path, e.g. "field.storage" level = 0 # Current hierarchical level # Navigation history for going back path_history = [] # Main loop while True: # Get segments at the current level for the current view all_prefixes = selector.prefixes if view_mode == "matched" else list(selector.unmatched_files.keys()) items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode) # Draw the menu item_count = draw_menu(stdscr, selector, selected_idx, view_mode, current_prefix, level) if item_count == 0: key = stdscr.getch() if key == ord('q') or key == ord('Q'): break elif key == ord(' '): view_mode = "unmatched" if view_mode == "matched" else "matched" selected_idx = 0 # Reset navigation when toggling view current_prefix = "" level = 0 path_history = [] elif key == ord('a') or key == ord('A'): new_prefix = add_prefix_dialog(stdscr) if new_prefix: selector.add_prefix(new_prefix) elif key == ord('o') or key == ord('O'): new_prefix = choose_prefix_dialog(stdscr, selector) if new_prefix: selector.add_prefix(new_prefix) elif key == curses.KEY_LEFT and (current_prefix or level > 0): # Go back one level if level > 0: level -= 1 if current_prefix: # Remove last segment parts = current_prefix.split('.') current_prefix = '.'.join(parts[:-1]) selected_idx = 0 elif path_history: # Pop from history prev_prefix, prev_level, prev_idx = path_history.pop() current_prefix = prev_prefix level = prev_level selected_idx = prev_idx continue key = stdscr.getch() if key == ord('q') or key == ord('Q'): break elif key == curses.KEY_UP and selected_idx > 0: selected_idx -= 1 elif key == curses.KEY_DOWN and selected_idx < item_count - 1: selected_idx += 1 elif key == ord(' '): view_mode = "unmatched" if view_mode == "matched" else "matched" selected_idx = 0 # Reset navigation when toggling view current_prefix = "" level = 0 path_history = [] elif key == curses.KEY_RIGHT: # Drill down into the selected item if selected_idx < len(items): selected_item = items[selected_idx] # Check if there are child segments if has_child_segments(selected_item, view_mode, selector): # Save current state to history path_history.append((current_prefix, level, selected_idx)) # Update current prefix and level current_prefix = selected_item level += 1 selected_idx = 0 elif key == curses.KEY_LEFT and (current_prefix or level > 0): # Go back one level if level > 0: level -= 1 if current_prefix: # Remove last segment parts = current_prefix.split('.') current_prefix = '.'.join(parts[:-1]) selected_idx = 0 elif path_history: # Pop from history prev_prefix, prev_level, prev_idx = path_history.pop() current_prefix = prev_prefix level = prev_level selected_idx = prev_idx elif key == ord('a') or key == ord('A'): new_prefix = add_prefix_dialog(stdscr) if new_prefix: selector.add_prefix(new_prefix) elif key == ord('o') or key == ord('O'): new_prefix = choose_prefix_dialog(stdscr, selector) if new_prefix: selector.add_prefix(new_prefix) elif key == ord('d') or key == ord('D'): if view_mode == "matched" and len(items) > 0 and selected_idx < len(items): selected_item = items[selected_idx] # Only allow deleting complete prefixes if selected_item in selector.prefixes: if confirm_dialog(stdscr, f"Remove prefix '{selected_item}'?"): selector.remove_prefix(selected_item) if selected_idx >= len(items) - 1: selected_idx = max(0, len(items) - 1) else: # Allow adding the current selection (partial prefix) directly if confirm_dialog(stdscr, f"Add current path '{selected_item}' as a matched prefix?"): selector.add_prefix(selected_item) view_mode = "matched" current_prefix = "" level = 0 path_history = [] selected_idx = 0 elif view_mode == "unmatched" and len(items) > 0 and selected_idx < len(items): # Add the item to matched prefixes selected_item = items[selected_idx] if confirm_dialog(stdscr, f"Add '{selected_item}' to matched prefixes?"): selector.add_prefix(selected_item) view_mode = "matched" current_prefix = "" level = 0 path_history = [] selected_idx = 0 elif key == ord('c') or key == ord('C'): if confirm_yaml_cleaning(stdscr, selector): selector.copy_matched_files() elif key == ord('m') or key == ord('M'): if confirm_dialog(stdscr, "Compare old and new recipe configs and track changes?"): changed_count, deleted_count, unchanged_count = selector.compare_and_track_changes() show_comparison_results(stdscr, changed_count, deleted_count, unchanged_count) elif key == ord('s') or key == ord('S'): selector.save_prefixes() elif key in (10, 13): # ENTER key to select individual files if len(items) > 0 and selected_idx < len(items): selected_item = items[selected_idx] files = get_files_for_segment(selector, selected_item, view_mode) if files: if view_mode == "matched": # Check if this is a complete prefix in matched if selected_item in selector.prefixes: selected_files = select_individual_files_dialog(stdscr, selector, selected_item, view_mode) if selected_files and confirm_dialog(stdscr, f"Remove {len(selected_files)} files from matched?"): # Remove individual files from matched_files for file in selected_files: if file in selector.matched_files[selected_item]: selector.matched_files[selected_item].remove(file) else: # Show message that you can only select files from complete prefixes h, w = stdscr.getmaxyx() # Safety checks for window creation if h >= 5 and w >= 30: msg_h = min(3, h - 2) msg_w = min(60, w - 4) if msg_w < 20: msg_w = min(30, w - 4) msg_y = max(0, (h - msg_h) // 2) msg_x = max(0, (w - msg_w) // 2) if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0: message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x) message_win.box() message_win.addstr(1, 2, "Can only select files from complete prefixes.") message_win.refresh() message_win.getch() else: # Unmatched view # Find a matching prefix to add files to target_prefix = None for prefix in selector.prefixes: if selected_item.startswith(prefix) or prefix.startswith(selected_item): target_prefix = prefix break if not target_prefix: if confirm_dialog(stdscr, f"Add prefix '{selected_item}' to matched prefixes?"): selector.add_prefix(selected_item) target_prefix = selected_item if target_prefix: file_selection = select_individual_files_dialog(stdscr, selector, selected_item, view_mode) if file_selection: # Add individual files to matched_files for file in file_selection: if file not in selector.matched_files[target_prefix]: selector.matched_files[target_prefix].append(file) if __name__ == "__main__": try: curses.wrapper(main) except KeyboardInterrupt: print("Program terminated by user")