1151 lines
45 KiB
Python
1151 lines
45 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
import curses
|
|
import re
|
|
import csv
|
|
import filecmp
|
|
import yaml
|
|
from typing import List, Dict, Set, Tuple, Optional
|
|
|
|
# Use current directory instead of hardcoded path
|
|
BASE_DIR = os.getcwd()
|
|
|
|
class ConfigSelector:
|
|
def __init__(self, json_path: str, original_config_path: str, config_path: str,
|
|
old_recipe_path: str = None, changed_files_path: str = None, deleted_files_csv: str = None,
|
|
ignored_files_yml: str = None):
|
|
self.json_path = json_path
|
|
self.original_config_path = original_config_path
|
|
self.config_path = config_path
|
|
self.old_recipe_path = old_recipe_path or os.path.join(BASE_DIR, "old_recipe_config")
|
|
self.changed_files_path = changed_files_path or os.path.join(BASE_DIR, "changed_files")
|
|
self.deleted_files_csv = deleted_files_csv or os.path.join(BASE_DIR, "deleted_files.csv")
|
|
self.ignored_files_yml = ignored_files_yml or os.path.join(BASE_DIR, "ignored_files.yml")
|
|
self.prefixes: List[str] = []
|
|
self.matched_files: Dict[str, List[str]] = {}
|
|
self.unmatched_files: Dict[str, List[str]] = {}
|
|
self.ignored_files: Set[str] = set()
|
|
self.load_prefixes()
|
|
self.load_ignored_files()
|
|
self.scan_configs()
|
|
|
|
def load_prefixes(self) -> None:
|
|
"""Load file prefixes from JSON file."""
|
|
try:
|
|
with open(self.json_path, 'r') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
self.prefixes = data
|
|
elif isinstance(data, dict) and 'prefixes' in data:
|
|
self.prefixes = data['prefixes']
|
|
else:
|
|
print("Error: JSON file should contain a list of prefixes or a dict with 'prefixes' key")
|
|
exit(1)
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
print(f"Error loading JSON file: {e}")
|
|
exit(1)
|
|
|
|
def load_ignored_files(self) -> None:
|
|
"""Load ignored file names from YAML file."""
|
|
self.ignored_files = set()
|
|
if not os.path.exists(self.ignored_files_yml):
|
|
return
|
|
|
|
try:
|
|
with open(self.ignored_files_yml, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
if data:
|
|
# Iterate through all categories in the YAML.
|
|
for category, info in data.items():
|
|
if isinstance(info, dict) and 'name' in info:
|
|
self.ignored_files.add(info['name'])
|
|
except (yaml.YAMLError, FileNotFoundError) as e:
|
|
print(f"Warning: Error loading ignored files YAML: {e}")
|
|
|
|
def save_prefixes(self) -> None:
|
|
"""Save current prefixes back to JSON file."""
|
|
try:
|
|
with open(self.json_path, 'w') as f:
|
|
json.dump({'prefixes': self.prefixes}, f, indent=2)
|
|
print(f"Prefixes saved to {self.json_path}")
|
|
except Exception as e:
|
|
print(f"Error saving JSON file: {e}")
|
|
|
|
def scan_configs(self) -> None:
|
|
"""Scan config directories and categorize files based on prefixes."""
|
|
self.matched_files = {prefix: [] for prefix in self.prefixes}
|
|
self.unmatched_files = {}
|
|
|
|
try:
|
|
# Process all files in the directory
|
|
for filename in os.listdir(self.original_config_path):
|
|
if not filename.endswith('.yml'):
|
|
continue
|
|
|
|
# Standard prefix matching
|
|
matched = False
|
|
for prefix in self.prefixes:
|
|
if filename.startswith(prefix):
|
|
self.matched_files[prefix].append(filename)
|
|
matched = True
|
|
break
|
|
|
|
# Add to unmatched if not matched by any prefix
|
|
if not matched:
|
|
# Get the first segment for initial grouping
|
|
parts = filename.split('.')
|
|
potential_prefix = parts[0]
|
|
|
|
if potential_prefix not in self.unmatched_files:
|
|
self.unmatched_files[potential_prefix] = []
|
|
self.unmatched_files[potential_prefix].append(filename)
|
|
|
|
except FileNotFoundError:
|
|
print(f"Error: Directory {self.original_config_path} not found")
|
|
exit(1)
|
|
|
|
def copy_matched_files(self) -> None:
|
|
"""Copy all matched files from original_config to config, removing UUID and _core structures."""
|
|
if not os.path.exists(self.config_path):
|
|
os.makedirs(self.config_path)
|
|
|
|
copied_count = 0
|
|
processed_count = 0
|
|
for prefix in self.prefixes:
|
|
for filename in self.matched_files[prefix]:
|
|
src = os.path.join(self.original_config_path, filename)
|
|
dst = os.path.join(self.config_path, filename)
|
|
|
|
# Read the YAML file
|
|
try:
|
|
with open(src, 'r') as file:
|
|
content = file.read()
|
|
|
|
# Process the content to remove UUID and _core structures
|
|
# 1. Remove uuid key-value pair (ensuring we start at beginning of line)
|
|
content = re.sub(r'^uuid: [a-f0-9\-]+\n', '', content, flags=re.MULTILINE)
|
|
|
|
# 2. Remove _core structure (careful not to remove wisski_core or similar)
|
|
# Match exactly '_core:' at the start of a line
|
|
content = re.sub(r'^_core:\n([ \t]+[^\n]+\n)+', '', content, flags=re.MULTILINE)
|
|
|
|
# Write the modified content
|
|
with open(dst, 'w') as file:
|
|
file.write(content)
|
|
|
|
processed_count += 1
|
|
except Exception as e:
|
|
print(f"Error processing {filename}: {e}")
|
|
# Fall back to direct copy if processing fails
|
|
shutil.copy2(src, dst)
|
|
|
|
copied_count += 1
|
|
|
|
print(f"Copied {copied_count} files to {self.config_path} ({processed_count} files were processed to remove UUIDs and _core structures)")
|
|
|
|
def compare_and_track_changes(self) -> Tuple[int, int, int]:
|
|
"""
|
|
Compare files between old_recipe_config and new_recipe_config.
|
|
Copy changed files to changed_files folder.
|
|
Track deleted files in deleted_files.csv.
|
|
|
|
Returns:
|
|
Tuple of (changed_count, deleted_count, unchanged_count)
|
|
"""
|
|
# Ensure directories exist
|
|
if not os.path.exists(self.changed_files_path):
|
|
os.makedirs(self.changed_files_path)
|
|
|
|
# Track statistics
|
|
changed_count = 0
|
|
deleted_count = 0
|
|
unchanged_count = 0
|
|
ignored_count = 0
|
|
deleted_files_list = []
|
|
|
|
# Get all files in old_recipe_config
|
|
old_files = set()
|
|
if os.path.exists(self.old_recipe_path):
|
|
old_files = {f for f in os.listdir(self.old_recipe_path) if f.endswith('.yml')}
|
|
|
|
# Get all files in new_recipe_config
|
|
new_files = set()
|
|
if os.path.exists(self.config_path):
|
|
new_files = {f for f in os.listdir(self.config_path) if f.endswith('.yml')}
|
|
|
|
# Find deleted files (in old but not in new)
|
|
deleted_files = old_files - new_files
|
|
for filename in sorted(deleted_files):
|
|
deleted_files_list.append(filename)
|
|
deleted_count += 1
|
|
|
|
# Find common files and check for changes
|
|
common_files = old_files & new_files
|
|
for filename in sorted(common_files):
|
|
old_file_path = os.path.join(self.old_recipe_path, filename)
|
|
new_file_path = os.path.join(self.config_path, filename)
|
|
|
|
# Compare file contents
|
|
if filecmp.cmp(old_file_path, new_file_path, shallow=False):
|
|
# Files are identical
|
|
unchanged_count += 1
|
|
else:
|
|
# Files have changed - check if ignored
|
|
if filename in self.ignored_files:
|
|
# Skip copying ignored files
|
|
ignored_count += 1
|
|
else:
|
|
# Copy to changed_files
|
|
changed_dest = os.path.join(self.changed_files_path, filename)
|
|
shutil.copy2(new_file_path, changed_dest)
|
|
changed_count += 1
|
|
|
|
# Write deleted files to CSV
|
|
with open(self.deleted_files_csv, 'w', newline='') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(['filename']) # Header
|
|
for filename in deleted_files_list:
|
|
writer.writerow([filename])
|
|
|
|
if ignored_count > 0:
|
|
print(f"Ignored {ignored_count} changed file(s) as per ignored_files.yml")
|
|
|
|
return changed_count, deleted_count, unchanged_count
|
|
|
|
def add_prefix(self, prefix: str) -> None:
|
|
"""Add a new prefix to the list if it doesn't exist."""
|
|
if prefix and prefix not in self.prefixes:
|
|
self.prefixes.append(prefix)
|
|
self.scan_configs()
|
|
|
|
def remove_prefix(self, prefix: str) -> None:
|
|
"""Remove a prefix from the list if it exists."""
|
|
if prefix in self.prefixes:
|
|
self.prefixes.remove(prefix)
|
|
self.scan_configs()
|
|
|
|
def get_summary(self) -> Tuple[int, int]:
|
|
"""Return the count of matched and unmatched files."""
|
|
matched_count = sum(len(files) for files in self.matched_files.values())
|
|
unmatched_count = sum(len(files) for files in self.unmatched_files.values())
|
|
return matched_count, unmatched_count
|
|
|
|
def get_prefix_segments_at_level(all_prefixes: List[str], current_prefix: str, level: int, selector=None, view_mode=None) -> List[str]:
|
|
"""
|
|
Get unique prefix segments at the specified level.
|
|
|
|
Args:
|
|
all_prefixes: List of all prefixes
|
|
current_prefix: Current prefix we're viewing (e.g., "field.storage")
|
|
level: The level we want to see (0 = root level)
|
|
selector: The ConfigSelector instance (needed for unmatched files)
|
|
view_mode: Current view mode ("matched" or "unmatched")
|
|
|
|
Returns:
|
|
List of unique prefix segments at this level
|
|
"""
|
|
# Current prefix parts (e.g., ["field", "storage"])
|
|
current_parts = current_prefix.split('.') if current_prefix else []
|
|
unique_prefixes = set()
|
|
|
|
# Special case for unmatched view with level > 0
|
|
if view_mode == "unmatched" and selector:
|
|
# Need to scan actual filenames for deeper levels
|
|
for prefix, filenames in selector.unmatched_files.items():
|
|
for filename in filenames:
|
|
# Skip .yml extension for determining parts
|
|
if filename.endswith('.yml'):
|
|
filename = filename[:-4]
|
|
|
|
parts = filename.split('.')
|
|
|
|
# Skip if not enough parts for this level
|
|
if len(parts) <= level:
|
|
continue
|
|
|
|
# Skip if prefix doesn't match current path
|
|
if current_prefix:
|
|
filename_prefix = '.'.join(parts[:len(current_parts)])
|
|
if filename_prefix != current_prefix:
|
|
continue
|
|
|
|
# Add the segment at this level
|
|
prefix_at_level = '.'.join(parts[:level+1])
|
|
unique_prefixes.add(prefix_at_level)
|
|
|
|
return sorted(list(unique_prefixes))
|
|
|
|
# Standard processing for matched view
|
|
elif view_mode == "matched":
|
|
for prefix in all_prefixes:
|
|
# Split each prefix into parts
|
|
parts = prefix.split('.')
|
|
|
|
# Skip if not enough parts for the level
|
|
if len(parts) <= level:
|
|
continue
|
|
|
|
# If we're looking at a nested level, only include prefixes that match the current path
|
|
if current_prefix:
|
|
# Skip if this prefix doesn't start with the current prefix
|
|
if not prefix.startswith(current_prefix + '.') and prefix != current_prefix:
|
|
continue
|
|
|
|
# If current level is within current_prefix, it should match exactly
|
|
if level < len(current_parts) and parts[level] != current_parts[level]:
|
|
continue
|
|
|
|
# Build the prefix up to this level
|
|
prefix_at_level = '.'.join(parts[:level+1])
|
|
unique_prefixes.add(prefix_at_level)
|
|
|
|
# Basic processing for all other cases
|
|
else:
|
|
for prefix in all_prefixes:
|
|
# Split each prefix into parts
|
|
parts = prefix.split('.')
|
|
|
|
# Skip if not enough parts for the level
|
|
if len(parts) <= level:
|
|
continue
|
|
|
|
# If we're looking at a nested level, only include prefixes that match the current path
|
|
if current_prefix:
|
|
# Skip if this prefix doesn't start with the current prefix
|
|
if not prefix.startswith(current_prefix + '.') and prefix != current_prefix:
|
|
continue
|
|
|
|
# If current level is within current_prefix, it should match exactly
|
|
if level < len(current_parts) and parts[level] != current_parts[level]:
|
|
continue
|
|
|
|
# Build the prefix up to this level
|
|
prefix_at_level = '.'.join(parts[:level+1])
|
|
unique_prefixes.add(prefix_at_level)
|
|
|
|
return sorted(list(unique_prefixes))
|
|
|
|
def get_files_for_segment(selector: ConfigSelector, segment: str, view_mode: str) -> List[str]:
|
|
"""Get files that match the given segment in the current view."""
|
|
files = []
|
|
|
|
if view_mode == "matched":
|
|
# For matched files, we need to check if this segment is an exact prefix or a partial one
|
|
if segment in selector.prefixes:
|
|
# Exact match: include all files for this prefix
|
|
files.extend(selector.matched_files[segment])
|
|
else:
|
|
# Partial match: include files from all prefixes that start with this segment
|
|
for prefix in selector.prefixes:
|
|
if prefix == segment or prefix.startswith(segment + '.'):
|
|
files.extend(selector.matched_files[prefix])
|
|
else:
|
|
# For unmatched files, collect files that match this segment
|
|
segment_parts = segment.split('.')
|
|
segment_len = len(segment_parts)
|
|
|
|
for prefix, file_list in selector.unmatched_files.items():
|
|
# For root level prefixes (without dots)
|
|
if '.' not in prefix and segment_len == 1 and prefix == segment:
|
|
files.extend(file_list)
|
|
continue
|
|
|
|
# For multi-level prefixes and filenames
|
|
for filename in file_list:
|
|
# Get filename without .yml extension for comparison
|
|
filename_no_ext = filename
|
|
if filename.endswith('.yml'):
|
|
filename_no_ext = filename[:-4]
|
|
|
|
filename_parts = filename_no_ext.split('.')
|
|
|
|
# Check if this filename matches the segment up to the segment's length
|
|
if len(filename_parts) >= segment_len:
|
|
if '.'.join(filename_parts[:segment_len]) == segment:
|
|
files.append(filename)
|
|
|
|
return sorted(list(set(files)))
|
|
|
|
def draw_menu(stdscr, selector: ConfigSelector, selected_idx: int, view_mode: str, current_prefix: str = "", level: int = 0):
|
|
"""Draw the TUI menu."""
|
|
curses.curs_set(0)
|
|
stdscr.clear()
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Colors
|
|
curses.start_color()
|
|
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Selected item
|
|
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # Matched
|
|
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # Unmatched
|
|
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Titles
|
|
|
|
# Title
|
|
title = "Drupal Configuration Selector"
|
|
stdscr.addstr(0, (w - len(title)) // 2, title, curses.color_pair(4) | curses.A_BOLD)
|
|
|
|
# Summary
|
|
matched_count, unmatched_count = selector.get_summary()
|
|
summary = f"Matched: {matched_count} files | Unmatched: {unmatched_count} files"
|
|
stdscr.addstr(1, (w - len(summary)) // 2, summary)
|
|
|
|
# Current path display
|
|
if current_prefix:
|
|
path_display = f"Current path: {current_prefix}"
|
|
stdscr.addstr(2, 2, path_display)
|
|
|
|
# Instructions
|
|
stdscr.addstr(h-4, 2, "LEFT/RIGHT: Navigate tree levels | SPACE: Toggle view | ENTER: Select files")
|
|
stdscr.addstr(h-3, 2, "A: Add prefix | O: Choose | D: Add/Del | C: Copy | M: Compare | S: Save | Q: Quit")
|
|
|
|
# Content area
|
|
start_y = 3
|
|
max_visible = h - 8 # Account for two instruction lines
|
|
|
|
# Get segments at the current level
|
|
if view_mode == "matched":
|
|
all_prefixes = selector.prefixes
|
|
else:
|
|
all_prefixes = list(selector.unmatched_files.keys())
|
|
|
|
items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode)
|
|
|
|
# Title based on current path
|
|
if current_prefix:
|
|
title = f"{view_mode.capitalize()} Prefixes under '{current_prefix}'"
|
|
else:
|
|
title = f"{view_mode.capitalize()} Prefixes (root level)"
|
|
|
|
stdscr.addstr(start_y, 2, title, curses.color_pair(4) | curses.A_BOLD)
|
|
start_y += 1
|
|
|
|
# Pagination
|
|
if len(items) > max_visible:
|
|
page_size = max_visible
|
|
current_page = selected_idx // page_size
|
|
start_idx = current_page * page_size
|
|
end_idx = min(start_idx + page_size, len(items))
|
|
display_items = items[start_idx:end_idx]
|
|
|
|
# Adjust selected_idx relative to the page
|
|
relative_idx = selected_idx - start_idx
|
|
else:
|
|
display_items = items
|
|
relative_idx = selected_idx if selected_idx < len(items) else 0
|
|
|
|
# Display items
|
|
for i, item in enumerate(display_items):
|
|
y = start_y + i
|
|
|
|
# Get base name (last segment)
|
|
item_base = item.split('.')[-1]
|
|
|
|
# Check if there are child segments
|
|
has_children = has_child_segments(item, view_mode, selector)
|
|
|
|
# Get files for this item
|
|
files = get_files_for_segment(selector, item, view_mode)
|
|
file_count = len(files)
|
|
|
|
# Format display text
|
|
if has_children:
|
|
item_text = f"{item_base} ({file_count} files) ▶"
|
|
else:
|
|
item_text = f"{item_base} ({file_count} files)"
|
|
|
|
if i == relative_idx:
|
|
stdscr.addstr(y, 2, item_text, curses.color_pair(1) | curses.A_BOLD)
|
|
else:
|
|
if view_mode == "matched":
|
|
stdscr.addstr(y, 2, item_text, curses.color_pair(2))
|
|
else:
|
|
stdscr.addstr(y, 2, item_text, curses.color_pair(3))
|
|
|
|
# Selected item details
|
|
if items and len(items) > 0 and selected_idx < len(items):
|
|
try:
|
|
selected_item = items[selected_idx]
|
|
detail_y = start_y + min(len(display_items), max_visible) + 1
|
|
|
|
if detail_y < h - 4: # Make sure we have room to display details
|
|
stdscr.addstr(detail_y, 2, f"Files for '{selected_item}':", curses.A_BOLD)
|
|
detail_y += 1
|
|
|
|
# Get files for the selected item
|
|
files = get_files_for_segment(selector, selected_item, view_mode)
|
|
|
|
max_files_to_show = h - detail_y - 4
|
|
if max_files_to_show > 0:
|
|
sorted_files = sorted(files)[:max_files_to_show]
|
|
for i, file in enumerate(sorted_files):
|
|
if detail_y + i < h - 4: # Prevent writing outside window
|
|
# Truncate filename if too long
|
|
max_width = w - 6 # Allow for padding
|
|
if len(file) > max_width:
|
|
file_display = file[:max_width-3] + "..."
|
|
else:
|
|
file_display = file
|
|
stdscr.addstr(detail_y + i, 4, file_display)
|
|
|
|
if len(files) > max_files_to_show and detail_y + max_files_to_show < h - 4:
|
|
stdscr.addstr(detail_y + max_files_to_show, 4, f"...and {len(files) - max_files_to_show} more")
|
|
except Exception as e:
|
|
# In case of error, just don't display details
|
|
error_msg = f"Error displaying file details: {str(e)}"
|
|
if h > 5:
|
|
stdscr.addstr(h-4, 2, error_msg[:w-4])
|
|
|
|
stdscr.refresh()
|
|
return len(items)
|
|
|
|
def add_prefix_dialog(stdscr):
|
|
"""Show dialog to add a new prefix."""
|
|
h, w = stdscr.getmaxyx()
|
|
dialog_h, dialog_w = 5, 50
|
|
dialog_y = (h - dialog_h) // 2
|
|
dialog_x = (w - dialog_w) // 2
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, "Enter new prefix (ESC or empty input to cancel):")
|
|
dialog_win.refresh()
|
|
|
|
# Input field
|
|
input_win = curses.newwin(1, dialog_w - 6, dialog_y + 2, dialog_x + 3)
|
|
input_win.clear()
|
|
|
|
# Setup input mode
|
|
curses.echo()
|
|
curses.curs_set(1)
|
|
input_win.keypad(True) # Enable special keys
|
|
|
|
# Collect input character by character to handle escape key
|
|
prefix = ""
|
|
input_win.refresh()
|
|
|
|
while True:
|
|
try:
|
|
key = input_win.getch()
|
|
|
|
# Handle escape key
|
|
if key == 27: # ASCII code for Escape
|
|
prefix = ""
|
|
break
|
|
# Handle enter key
|
|
elif key in (10, 13): # ASCII codes for Enter/Return
|
|
break
|
|
# Handle backspace/delete
|
|
elif key in (8, 127, curses.KEY_BACKSPACE):
|
|
if prefix:
|
|
prefix = prefix[:-1]
|
|
# Clear line and rewrite
|
|
input_win.clear()
|
|
input_win.addstr(0, 0, prefix)
|
|
input_win.refresh()
|
|
# Regular character
|
|
elif 32 <= key <= 126: # Printable ASCII
|
|
prefix += chr(key)
|
|
except Exception:
|
|
# In case of any error, just return empty string
|
|
prefix = ""
|
|
break
|
|
|
|
# Restore normal cursor visibility and echo settings
|
|
curses.noecho()
|
|
curses.curs_set(0)
|
|
|
|
return prefix.strip()
|
|
|
|
def confirm_dialog(stdscr, message):
|
|
"""Show a confirmation dialog."""
|
|
h, w = stdscr.getmaxyx()
|
|
dialog_h, dialog_w = 5, 50
|
|
dialog_y = (h - dialog_h) // 2
|
|
dialog_x = (w - dialog_w) // 2
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, message)
|
|
dialog_win.addstr(3, 2, "Press Y to confirm, any other key to cancel")
|
|
dialog_win.refresh()
|
|
|
|
# Get input
|
|
key = stdscr.getch()
|
|
return key == ord('y') or key == ord('Y')
|
|
|
|
def choose_prefix_dialog(stdscr, selector: ConfigSelector):
|
|
"""Show dialog to choose a prefix from unmatched prefixes."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Get sorted list of unmatched prefixes
|
|
unmatched_prefixes = sorted(selector.unmatched_files.keys())
|
|
|
|
if not unmatched_prefixes:
|
|
# Show message if no unmatched prefixes
|
|
message_win = curses.newwin(3, 40, (h - 3) // 2, (w - 40) // 2)
|
|
message_win.box()
|
|
message_win.addstr(1, 2, "No unmatched prefixes available.")
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
return None
|
|
|
|
# Calculate dialog dimensions
|
|
dialog_h = min(15, len(unmatched_prefixes) + 4) # +4 for title, instructions, and border
|
|
dialog_w = 60
|
|
dialog_y = (h - dialog_h) // 2
|
|
dialog_x = (w - dialog_w) // 2
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.keypad(True) # Enable arrow keys
|
|
|
|
# Initialize selection variables
|
|
selected_idx = 0
|
|
current_page = 0
|
|
items_per_page = dialog_h - 4 # -4 for title, instructions, and border
|
|
|
|
# Draw function
|
|
def draw_prefix_list():
|
|
dialog_win.clear()
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, "Choose a prefix to add (ESC to cancel):")
|
|
|
|
# Calculate page items
|
|
start_idx = current_page * items_per_page
|
|
end_idx = min(start_idx + items_per_page, len(unmatched_prefixes))
|
|
|
|
# Display prefixes with selection highlight
|
|
for i, prefix in enumerate(unmatched_prefixes[start_idx:end_idx]):
|
|
file_count = len(selector.unmatched_files[prefix])
|
|
item_text = f"{prefix} ({file_count} files)"
|
|
|
|
if i + start_idx == selected_idx:
|
|
dialog_win.addstr(i + 2, 4, item_text, curses.A_REVERSE)
|
|
else:
|
|
dialog_win.addstr(i + 2, 4, item_text)
|
|
|
|
# Show pagination info if needed
|
|
if len(unmatched_prefixes) > items_per_page:
|
|
page_info = f"Page {current_page + 1}/{(len(unmatched_prefixes) - 1) // items_per_page + 1}"
|
|
dialog_win.addstr(dialog_h - 1, dialog_w - len(page_info) - 2, page_info)
|
|
|
|
dialog_win.refresh()
|
|
|
|
# Main interaction loop
|
|
while True:
|
|
draw_prefix_list()
|
|
|
|
key = dialog_win.getch()
|
|
|
|
if key == 27: # ESC
|
|
return None
|
|
elif key == curses.KEY_UP and selected_idx > 0:
|
|
selected_idx -= 1
|
|
if selected_idx < current_page * items_per_page:
|
|
current_page -= 1
|
|
elif key == curses.KEY_DOWN and selected_idx < len(unmatched_prefixes) - 1:
|
|
selected_idx += 1
|
|
if selected_idx >= (current_page + 1) * items_per_page:
|
|
current_page += 1
|
|
elif key in (10, 13): # Enter
|
|
return unmatched_prefixes[selected_idx]
|
|
elif key == curses.KEY_NPAGE and current_page < (len(unmatched_prefixes) - 1) // items_per_page:
|
|
# Page Down
|
|
current_page += 1
|
|
selected_idx = min(selected_idx + items_per_page, len(unmatched_prefixes) - 1)
|
|
elif key == curses.KEY_PPAGE and current_page > 0:
|
|
# Page Up
|
|
current_page -= 1
|
|
selected_idx = max(selected_idx - items_per_page, 0)
|
|
|
|
def select_individual_files_dialog(stdscr, selector: ConfigSelector, prefix: str, view_mode: str):
|
|
"""Show dialog to select individual files from a prefix."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Get the list of files for this prefix
|
|
if view_mode == "matched":
|
|
files = selector.matched_files.get(prefix, [])
|
|
title = f"Select files to remove from '{prefix}'"
|
|
action_text = "SPACE: Toggle selection | ENTER: Confirm | ESC: Cancel"
|
|
else:
|
|
files = selector.unmatched_files.get(prefix, [])
|
|
title = f"Select files to add from '{prefix}'"
|
|
action_text = "SPACE: Toggle selection | ENTER: Add selected | ESC: Cancel"
|
|
|
|
if not files:
|
|
# Show message if no files
|
|
message_win = curses.newwin(3, 40, (h - 3) // 2, (w - 40) // 2)
|
|
message_win.box()
|
|
message_win.addstr(1, 2, "No files available.")
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
return []
|
|
|
|
# Sort files for consistent display
|
|
files = sorted(files)
|
|
|
|
# Track selected files
|
|
selected_files = set()
|
|
|
|
# Calculate dialog dimensions
|
|
dialog_h = min(20, len(files) + 5) # +5 for title, instructions, pagination, and border
|
|
dialog_w = min(w - 4, 80)
|
|
dialog_y = (h - dialog_h) // 2
|
|
dialog_x = (w - dialog_w) // 2
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.keypad(True) # Enable arrow keys
|
|
|
|
# Initialize selection variables
|
|
selected_idx = 0
|
|
current_page = 0
|
|
items_per_page = dialog_h - 5 # -5 for title, instructions, actions, and border
|
|
|
|
# Draw function
|
|
def draw_file_list():
|
|
dialog_win.clear()
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, title, curses.A_BOLD)
|
|
dialog_win.addstr(dialog_h - 2, 2, action_text)
|
|
|
|
# Calculate page items
|
|
start_idx = current_page * items_per_page
|
|
end_idx = min(start_idx + items_per_page, len(files))
|
|
|
|
# Display files with selection highlight and checkbox
|
|
for i, file in enumerate(files[start_idx:end_idx]):
|
|
y = i + 3 # +3 for title, blank line, and border
|
|
|
|
# Create checkbox display
|
|
checkbox = "[X]" if file in selected_files else "[ ]"
|
|
|
|
# Truncate filename if too long
|
|
max_width = dialog_w - 8 # Allow for padding and checkbox
|
|
if len(file) > max_width:
|
|
file_display = file[:max_width-3] + "..."
|
|
else:
|
|
file_display = file
|
|
|
|
item_text = f"{checkbox} {file_display}"
|
|
|
|
if i + start_idx == selected_idx:
|
|
dialog_win.addstr(y, 2, item_text, curses.A_REVERSE)
|
|
else:
|
|
dialog_win.addstr(y, 2, item_text)
|
|
|
|
# Show pagination info if needed
|
|
if len(files) > items_per_page:
|
|
page_info = f"Page {current_page + 1}/{(len(files) - 1) // items_per_page + 1}"
|
|
dialog_win.addstr(dialog_h - 3, dialog_w - len(page_info) - 2, page_info)
|
|
|
|
dialog_win.refresh()
|
|
|
|
# Main interaction loop
|
|
while True:
|
|
draw_file_list()
|
|
|
|
key = dialog_win.getch()
|
|
|
|
if key == 27: # ESC
|
|
return []
|
|
elif key == curses.KEY_UP and selected_idx > 0:
|
|
selected_idx -= 1
|
|
if selected_idx < current_page * items_per_page:
|
|
current_page -= 1
|
|
elif key == curses.KEY_DOWN and selected_idx < len(files) - 1:
|
|
selected_idx += 1
|
|
if selected_idx >= (current_page + 1) * items_per_page:
|
|
current_page += 1
|
|
elif key == ord(' '): # SPACE to toggle selection
|
|
current_file = files[selected_idx]
|
|
if current_file in selected_files:
|
|
selected_files.remove(current_file)
|
|
else:
|
|
selected_files.add(current_file)
|
|
elif key in (10, 13): # Enter to confirm
|
|
return list(selected_files)
|
|
elif key == curses.KEY_NPAGE and current_page < (len(files) - 1) // items_per_page:
|
|
# Page Down
|
|
current_page += 1
|
|
selected_idx = min(selected_idx + items_per_page, len(files) - 1)
|
|
elif key == curses.KEY_PPAGE and current_page > 0:
|
|
# Page Up
|
|
current_page -= 1
|
|
selected_idx = max(selected_idx - items_per_page, 0)
|
|
|
|
def has_child_segments(item, mode, selector):
|
|
"""
|
|
Check if the given item has child segments.
|
|
|
|
Args:
|
|
item: Current prefix path (e.g., "field.storage")
|
|
mode: View mode ("matched" or "unmatched")
|
|
selector: The ConfigSelector instance
|
|
|
|
Returns:
|
|
True if child segments exist, False otherwise
|
|
"""
|
|
if mode == "matched":
|
|
# For matched prefixes
|
|
for prefix in selector.prefixes:
|
|
if prefix.startswith(item + '.'):
|
|
return True
|
|
return False
|
|
else:
|
|
# For unmatched files, check if any filenames have one more segment
|
|
selected_segments = item.split('.')
|
|
segment_count = len(selected_segments)
|
|
|
|
# Look through all files in all unmatched prefixes
|
|
for prefix, file_list in selector.unmatched_files.items():
|
|
for filename in file_list:
|
|
# Remove .yml extension for better segment counting
|
|
if filename.endswith('.yml'):
|
|
filename = filename[:-4]
|
|
|
|
filename_parts = filename.split('.')
|
|
# Check if this file has this prefix plus at least one more segment
|
|
if (len(filename_parts) > segment_count and
|
|
'.'.join(filename_parts[:segment_count]) == item):
|
|
return True
|
|
return False
|
|
|
|
def preview_yaml_cleaning(stdscr, content):
|
|
"""Show a simplified schematic preview of the YAML cleaning process."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Calculate preview window size
|
|
preview_h = min(12, h - 8) # Smaller height
|
|
preview_w = min(70, w - 4)
|
|
preview_y = (h - preview_h) // 2
|
|
preview_x = (w - preview_w) // 2
|
|
|
|
# Create window
|
|
preview_win = curses.newwin(preview_h, preview_w, preview_y, preview_x)
|
|
preview_win.box()
|
|
preview_win.addstr(1, 2, "YAML Cleaning Preview:", curses.A_BOLD)
|
|
|
|
# Create simple schematic example
|
|
original = """langcode: en
|
|
status: true
|
|
uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae
|
|
dependencies:
|
|
module:
|
|
- node
|
|
_core:
|
|
default_config_hash: fUksROt4FfkAU9BV4hV2XvhTBSS2nTNrZS4U7S-tKrs
|
|
id: example
|
|
name: Example"""
|
|
|
|
cleaned = """langcode: en
|
|
status: true
|
|
dependencies:
|
|
module:
|
|
- node
|
|
id: example
|
|
name: Example"""
|
|
|
|
# Display side by side
|
|
# Show original
|
|
preview_win.addstr(3, 2, "Original:", curses.A_UNDERLINE)
|
|
preview_win.addstr(4, 2, "uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae", curses.A_BOLD)
|
|
preview_win.addstr(5, 2, "_core:", curses.A_BOLD)
|
|
preview_win.addstr(6, 2, " default_config_hash: fUksROt4F...", curses.A_BOLD)
|
|
preview_win.addstr(7, 2, "other YAML content...")
|
|
|
|
# Show arrow
|
|
arrow_x = preview_w // 2 - 2
|
|
for i in range(3, preview_h - 3):
|
|
preview_win.addstr(i, arrow_x, "→")
|
|
|
|
# Show cleaned
|
|
preview_win.addstr(3, arrow_x + 4, "Cleaned:", curses.A_UNDERLINE)
|
|
preview_win.addstr(5, arrow_x + 4, "UUID and _core removed")
|
|
preview_win.addstr(7, arrow_x + 4, "other YAML content preserved")
|
|
|
|
# Instructions
|
|
preview_win.addstr(preview_h - 2, 2, "Press any key to continue...", curses.A_DIM)
|
|
preview_win.refresh()
|
|
|
|
# Wait for key
|
|
stdscr.getch()
|
|
|
|
def confirm_yaml_cleaning(stdscr, selector):
|
|
"""Show a confirmation dialog for YAML cleaning during copy."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Get a sample file to preview
|
|
sample_file = None
|
|
for prefix in selector.prefixes:
|
|
if selector.matched_files[prefix]:
|
|
sample_file = os.path.join(selector.original_config_path, selector.matched_files[prefix][0])
|
|
break
|
|
|
|
if sample_file and os.path.exists(sample_file):
|
|
try:
|
|
with open(sample_file, 'r') as file:
|
|
content = file.read()
|
|
preview_yaml_cleaning(stdscr, content)
|
|
except Exception as e:
|
|
# In case of error, just show a simple message
|
|
error_msg = f"Error reading sample file: {str(e)}"
|
|
message_win = curses.newwin(3, min(len(error_msg) + 4, w - 4), (h - 3) // 2, (w - min(len(error_msg) + 4, w - 4)) // 2)
|
|
message_win.box()
|
|
message_win.addstr(1, 2, error_msg[:w-8])
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
|
|
return confirm_dialog(stdscr, "Copy all matched files to config directory and remove UUID and _core?")
|
|
|
|
def show_comparison_results(stdscr, changed_count, deleted_count, unchanged_count):
|
|
"""Show the results of file comparison."""
|
|
h, w = stdscr.getmaxyx()
|
|
dialog_h, dialog_w = 10, 60
|
|
dialog_y = (h - dialog_h) // 2
|
|
dialog_x = (w - dialog_w) // 2
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, "File Comparison Results", curses.A_BOLD)
|
|
dialog_win.addstr(3, 4, f"Changed files: {changed_count}")
|
|
dialog_win.addstr(4, 4, f"Deleted files: {deleted_count}")
|
|
dialog_win.addstr(5, 4, f"Unchanged files: {unchanged_count}")
|
|
dialog_win.addstr(7, 4, "Changed files copied to: changed_files/")
|
|
dialog_win.addstr(8, 4, "Deleted files tracked in: deleted_files.csv")
|
|
dialog_win.addstr(dialog_h - 2, 2, "Press any key to continue...")
|
|
dialog_win.refresh()
|
|
|
|
# Wait for key
|
|
stdscr.getch()
|
|
|
|
def main(stdscr):
|
|
# Set paths relative to current directory
|
|
json_path = os.path.join(BASE_DIR, "config_prefixes.json")
|
|
original_config_path = os.path.join(BASE_DIR, "original_config")
|
|
config_path = os.path.join(BASE_DIR, "new_recipe_config")
|
|
|
|
# Create directories if they don't exist
|
|
if not os.path.exists(original_config_path):
|
|
os.makedirs(original_config_path)
|
|
print(f"Created directory: {original_config_path}")
|
|
print("Please place your source configuration files in this directory.")
|
|
|
|
if not os.path.exists(config_path):
|
|
os.makedirs(config_path)
|
|
print(f"Created directory: {config_path}")
|
|
|
|
# Check if we need to create default JSON file
|
|
if not os.path.exists(json_path):
|
|
with open(json_path, 'w') as f:
|
|
json.dump({
|
|
"prefixes": [
|
|
"put.your.prefixes.here",
|
|
]
|
|
}, f, indent=2)
|
|
print(f"Created default {json_path}")
|
|
|
|
# Initialize selector
|
|
selector = ConfigSelector(
|
|
json_path=json_path,
|
|
original_config_path=original_config_path,
|
|
config_path=config_path
|
|
)
|
|
|
|
# Menu state
|
|
selected_idx = 0
|
|
view_mode = "matched" # "matched" or "unmatched"
|
|
current_prefix = "" # Current prefix path, e.g. "field.storage"
|
|
level = 0 # Current hierarchical level
|
|
|
|
# Navigation history for going back
|
|
path_history = []
|
|
|
|
# Main loop
|
|
while True:
|
|
# Get segments at the current level for the current view
|
|
all_prefixes = selector.prefixes if view_mode == "matched" else list(selector.unmatched_files.keys())
|
|
items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode)
|
|
|
|
# Draw the menu
|
|
item_count = draw_menu(stdscr, selector, selected_idx, view_mode, current_prefix, level)
|
|
|
|
if item_count == 0:
|
|
key = stdscr.getch()
|
|
if key == ord('q') or key == ord('Q'):
|
|
break
|
|
elif key == ord(' '):
|
|
view_mode = "unmatched" if view_mode == "matched" else "matched"
|
|
selected_idx = 0
|
|
# Reset navigation when toggling view
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
elif key == ord('a') or key == ord('A'):
|
|
new_prefix = add_prefix_dialog(stdscr)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == ord('o') or key == ord('O'):
|
|
new_prefix = choose_prefix_dialog(stdscr, selector)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == curses.KEY_LEFT and (current_prefix or level > 0):
|
|
# Go back one level
|
|
if level > 0:
|
|
level -= 1
|
|
if current_prefix:
|
|
# Remove last segment
|
|
parts = current_prefix.split('.')
|
|
current_prefix = '.'.join(parts[:-1])
|
|
selected_idx = 0
|
|
elif path_history:
|
|
# Pop from history
|
|
prev_prefix, prev_level, prev_idx = path_history.pop()
|
|
current_prefix = prev_prefix
|
|
level = prev_level
|
|
selected_idx = prev_idx
|
|
continue
|
|
|
|
key = stdscr.getch()
|
|
|
|
if key == ord('q') or key == ord('Q'):
|
|
break
|
|
elif key == curses.KEY_UP and selected_idx > 0:
|
|
selected_idx -= 1
|
|
elif key == curses.KEY_DOWN and selected_idx < item_count - 1:
|
|
selected_idx += 1
|
|
elif key == ord(' '):
|
|
view_mode = "unmatched" if view_mode == "matched" else "matched"
|
|
selected_idx = 0
|
|
# Reset navigation when toggling view
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
elif key == curses.KEY_RIGHT:
|
|
# Drill down into the selected item
|
|
if selected_idx < len(items):
|
|
selected_item = items[selected_idx]
|
|
|
|
# Check if there are child segments
|
|
if has_child_segments(selected_item, view_mode, selector):
|
|
# Save current state to history
|
|
path_history.append((current_prefix, level, selected_idx))
|
|
|
|
# Update current prefix and level
|
|
current_prefix = selected_item
|
|
level += 1
|
|
selected_idx = 0
|
|
elif key == curses.KEY_LEFT and (current_prefix or level > 0):
|
|
# Go back one level
|
|
if level > 0:
|
|
level -= 1
|
|
if current_prefix:
|
|
# Remove last segment
|
|
parts = current_prefix.split('.')
|
|
current_prefix = '.'.join(parts[:-1])
|
|
selected_idx = 0
|
|
elif path_history:
|
|
# Pop from history
|
|
prev_prefix, prev_level, prev_idx = path_history.pop()
|
|
current_prefix = prev_prefix
|
|
level = prev_level
|
|
selected_idx = prev_idx
|
|
elif key == ord('a') or key == ord('A'):
|
|
new_prefix = add_prefix_dialog(stdscr)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == ord('o') or key == ord('O'):
|
|
new_prefix = choose_prefix_dialog(stdscr, selector)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == ord('d') or key == ord('D'):
|
|
if view_mode == "matched" and len(items) > 0 and selected_idx < len(items):
|
|
selected_item = items[selected_idx]
|
|
# Only allow deleting complete prefixes
|
|
if selected_item in selector.prefixes:
|
|
if confirm_dialog(stdscr, f"Remove prefix '{selected_item}'?"):
|
|
selector.remove_prefix(selected_item)
|
|
if selected_idx >= len(items) - 1:
|
|
selected_idx = max(0, len(items) - 1)
|
|
else:
|
|
# Allow adding the current selection (partial prefix) directly
|
|
if confirm_dialog(stdscr, f"Add current path '{selected_item}' as a matched prefix?"):
|
|
selector.add_prefix(selected_item)
|
|
view_mode = "matched"
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
selected_idx = 0
|
|
elif view_mode == "unmatched" and len(items) > 0 and selected_idx < len(items):
|
|
# Add the item to matched prefixes
|
|
selected_item = items[selected_idx]
|
|
if confirm_dialog(stdscr, f"Add '{selected_item}' to matched prefixes?"):
|
|
selector.add_prefix(selected_item)
|
|
view_mode = "matched"
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
selected_idx = 0
|
|
elif key == ord('c') or key == ord('C'):
|
|
if confirm_yaml_cleaning(stdscr, selector):
|
|
selector.copy_matched_files()
|
|
elif key == ord('m') or key == ord('M'):
|
|
if confirm_dialog(stdscr, "Compare old and new recipe configs and track changes?"):
|
|
changed_count, deleted_count, unchanged_count = selector.compare_and_track_changes()
|
|
show_comparison_results(stdscr, changed_count, deleted_count, unchanged_count)
|
|
elif key == ord('s') or key == ord('S'):
|
|
selector.save_prefixes()
|
|
elif key in (10, 13): # ENTER key to select individual files
|
|
if len(items) > 0 and selected_idx < len(items):
|
|
selected_item = items[selected_idx]
|
|
files = get_files_for_segment(selector, selected_item, view_mode)
|
|
|
|
if files:
|
|
if view_mode == "matched":
|
|
# Check if this is a complete prefix in matched
|
|
if selected_item in selector.prefixes:
|
|
selected_files = select_individual_files_dialog(stdscr, selector, selected_item, view_mode)
|
|
if selected_files and confirm_dialog(stdscr, f"Remove {len(selected_files)} files from matched?"):
|
|
# Remove individual files from matched_files
|
|
for file in selected_files:
|
|
if file in selector.matched_files[selected_item]:
|
|
selector.matched_files[selected_item].remove(file)
|
|
else:
|
|
# Show message that you can only select files from complete prefixes
|
|
h, w = stdscr.getmaxyx()
|
|
message_win = curses.newwin(3, 60, (h - 3) // 2, (w - 60) // 2)
|
|
message_win.box()
|
|
message_win.addstr(1, 2, "Can only select files from complete prefixes.")
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
else: # Unmatched view
|
|
# Find a matching prefix to add files to
|
|
target_prefix = None
|
|
for prefix in selector.prefixes:
|
|
if selected_item.startswith(prefix) or prefix.startswith(selected_item):
|
|
target_prefix = prefix
|
|
break
|
|
|
|
if not target_prefix:
|
|
if confirm_dialog(stdscr, f"Add prefix '{selected_item}' to matched prefixes?"):
|
|
selector.add_prefix(selected_item)
|
|
target_prefix = selected_item
|
|
|
|
if target_prefix:
|
|
file_selection = select_individual_files_dialog(stdscr, selector, selected_item, view_mode)
|
|
if file_selection:
|
|
# Add individual files to matched_files
|
|
for file in file_selection:
|
|
if file not in selector.matched_files[target_prefix]:
|
|
selector.matched_files[target_prefix].append(file)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
curses.wrapper(main)
|
|
except KeyboardInterrupt:
|
|
print("Program terminated by user")
|