drupal_config_extract/config_selector.py

1848 lines
74 KiB
Python

#!/usr/bin/env python3
import os
import sys
import json
import shutil
import curses
import re
import csv
import filecmp
import glob
import tarfile
import yaml
from typing import List, Dict, Set, Tuple, Optional
# Use current directory instead of hardcoded path.
BASE_DIR = os.getcwd()
INGEST_DIR = os.path.join(BASE_DIR, "ingest")
ORIGINAL_CONFIG_DIR = os.path.join(BASE_DIR, "original_config")
RECIPE_CONFIG_DIR = os.path.join(BASE_DIR, "recipes", "wisski_default_data_model", "config")
RECIPE_SOURCE_DIR = os.path.join(BASE_DIR, "recipes", "wisski_default_data_model")
RECIPE_UPDATE_BASE = os.path.join(BASE_DIR, "update")
def recipe_update_dir(recipe_source: str = RECIPE_SOURCE_DIR) -> str:
"""Path to updated recipe copy: update/<recipe_name>/."""
return os.path.join(RECIPE_UPDATE_BASE, os.path.basename(recipe_source))
def _clear_dir(path: str) -> int:
"""Remove all files and subdirectories inside path. Returns count of removed items."""
if not os.path.isdir(path):
return 0
removedCount = 0
for entry in os.listdir(path):
entryPath = os.path.join(path, entry)
if os.path.isdir(entryPath):
shutil.rmtree(entryPath)
else:
os.remove(entryPath)
removedCount += 1
return removedCount
def _copy_recipe_tree(src: str, dst: str) -> None:
"""Copy a full recipe directory tree, including .git."""
if os.path.exists(dst):
shutil.rmtree(dst)
shutil.copytree(src, dst)
def ingest_config(force: bool = False) -> None:
"""Extract a config*.tar.gz archive from ingest/ into original_config/.
Skips extraction when YAML files are already present in original_config/,
unless force is True. With --force, original_config/, new_recipe_config/,
and changed_files/ are all cleared before re-extraction.
"""
os.makedirs(ORIGINAL_CONFIG_DIR, exist_ok=True)
existingFiles = [f for f in os.listdir(ORIGINAL_CONFIG_DIR) if f.endswith(".yml")]
if existingFiles and not force:
print(
f"original_config/ already contains {len(existingFiles)} YAML files. "
"Use --force to re-extract."
)
return
archives = sorted(glob.glob(os.path.join(INGEST_DIR, "config*.tar.gz")))
if not archives:
if not existingFiles:
print("No config*.tar.gz found in ingest/ and original_config/ is empty.")
return
# Use the most recently named archive.
archive = archives[-1]
print(f"Extracting {os.path.basename(archive)} to original_config/...")
if force:
_clear_dir(ORIGINAL_CONFIG_DIR)
newRecipeDir = os.path.join(BASE_DIR, "new_recipe_config")
changedFilesDir = os.path.join(BASE_DIR, "changed_files")
cleared = _clear_dir(newRecipeDir)
if cleared:
print(f"Cleared {cleared} items from new_recipe_config/.")
cleared = _clear_dir(changedFilesDir)
if cleared:
print(f"Cleared {cleared} items from changed_files/.")
extractedCount = 0
with tarfile.open(archive, "r:gz") as tar:
for member in tar.getmembers():
if not (member.isfile() and member.name.endswith(".yml")):
continue
# Preserve subdirectory structure (e.g. language/de/foo.yml) but strip
# any leading archive root component that is not part of the config layout.
relPath = member.name.lstrip("./")
destPath = os.path.join(ORIGINAL_CONFIG_DIR, relPath)
os.makedirs(os.path.dirname(destPath), exist_ok=True)
srcFile = tar.extractfile(member)
if srcFile is not None:
with open(destPath, "wb") as dst:
dst.write(srcFile.read())
extractedCount += 1
print(f"Extracted {extractedCount} YAML files to original_config/.")
class ConfigSelector:
def __init__(self, json_path: str, original_config_path: str, config_path: str,
old_recipe_path: str = None, changed_files_path: str = None, deleted_files_csv: str = None,
ignored_files_yml: str = None):
self.json_path = json_path
self.original_config_path = original_config_path
self.config_path = config_path
self.old_recipe_path = old_recipe_path or os.path.join(BASE_DIR, "old_recipe_config")
self.changed_files_path = changed_files_path or os.path.join(BASE_DIR, "changed_files")
self.deleted_files_csv = deleted_files_csv or os.path.join(BASE_DIR, "deleted_files.csv")
self.new_recipe_not_in_original_csv = os.path.join(BASE_DIR, "new_recipe_not_in_original.csv")
self.language_inconsistency_csv = os.path.join(BASE_DIR, "language_inconsistency.csv")
self.changed_files_manifest_csv = os.path.join(BASE_DIR, "changed_files_manifest.csv")
self.ignored_files_yml = ignored_files_yml or os.path.join(BASE_DIR, "ignored_files.yml")
self.prefixes: List[str] = []
self.matched_files: Dict[str, List[str]] = {}
self.unmatched_files: Dict[str, List[str]] = {}
self.ignored_files: Set[str] = set()
self.load_prefixes()
self.load_ignored_files()
self.scan_configs()
def load_prefixes(self) -> None:
"""Load file prefixes from JSON file."""
try:
with open(self.json_path, 'r') as f:
data = json.load(f)
if isinstance(data, list):
self.prefixes = data
elif isinstance(data, dict) and 'prefixes' in data:
self.prefixes = data['prefixes']
else:
print("Error: JSON file should contain a list of prefixes or a dict with 'prefixes' key")
exit(1)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error loading JSON file: {e}")
exit(1)
def load_ignored_files(self) -> None:
"""Load ignored file names from YAML file."""
self.ignored_files = set()
if not os.path.exists(self.ignored_files_yml):
return
try:
with open(self.ignored_files_yml, 'r') as f:
data = yaml.safe_load(f)
if data:
# Iterate through all categories in the YAML.
for category, info in data.items():
if isinstance(info, dict) and 'name' in info:
name = info['name']
if not isinstance(name, str):
continue
self.ignored_files.add(name)
# Basenames on disk always include .yml; allow omitting it in YAML.
if not name.endswith('.yml'):
self.ignored_files.add(f'{name}.yml')
except (yaml.YAMLError, FileNotFoundError) as e:
print(f"Warning: Error loading ignored files YAML: {e}")
def save_prefixes(self) -> None:
"""Save current prefixes back to JSON file."""
try:
with open(self.json_path, 'w') as f:
json.dump({'prefixes': self.prefixes}, f, indent=2)
print(f"Prefixes saved to {self.json_path}")
except Exception as e:
print(f"Error saving JSON file: {e}")
def scan_configs(self) -> None:
"""Scan config directories and categorize files based on prefixes."""
self.matched_files = {prefix: [] for prefix in self.prefixes}
self.unmatched_files = {}
try:
# Process all files in the directory
for filename in os.listdir(self.original_config_path):
if not filename.endswith('.yml'):
continue
# Standard prefix matching
matched = False
for prefix in self.prefixes:
if filename.startswith(prefix):
self.matched_files[prefix].append(filename)
matched = True
break
# Add to unmatched if not matched by any prefix
if not matched:
# Get the first segment for initial grouping
parts = filename.split('.')
potential_prefix = parts[0]
if potential_prefix not in self.unmatched_files:
self.unmatched_files[potential_prefix] = []
self.unmatched_files[potential_prefix].append(filename)
except FileNotFoundError:
print(f"Error: Directory {self.original_config_path} not found")
exit(1)
@staticmethod
def _strip_metadata(content: str) -> str:
"""Remove uuid and _core keys from Drupal YAML config content."""
content = re.sub(r'^uuid: [a-f0-9\-]+\n', '', content, flags=re.MULTILINE)
# Match exactly '_core:' at the start of a line (avoids touching wisski_core).
content = re.sub(r'^_core:\n([ \t]+[^\n]+\n)+', '', content, flags=re.MULTILINE)
return content
def _collect_bundle_hashes(self) -> Set[str]:
"""Return the set of bundle hashes found in matched wisski_core.wisski_bundle files."""
bundlePrefix = "wisski_core.wisski_bundle."
bundleHashes: Set[str] = set()
for prefix in self.prefixes:
for filename in self.matched_files.get(prefix, []):
if filename.startswith(bundlePrefix) and filename.endswith(".yml"):
bundleHash = filename[len(bundlePrefix):-4]
bundleHashes.add(bundleHash)
return bundleHashes
def _copy_file(self, src: str, dst: str) -> None:
"""Read src, strip metadata, write to dst. Falls back to raw copy on error."""
try:
with open(src, 'r') as f:
content = f.read()
content = self._strip_metadata(content)
with open(dst, 'w') as f:
f.write(content)
except Exception as e:
print(f"Error processing {os.path.basename(src)}: {e}")
shutil.copy2(src, dst)
def _copy_with_language_overrides(self, filename: str) -> int:
"""Copy original_config/language/{lang}/{filename} to new_recipe_config/language/{lang}/
for every language that has an override for this file. Returns count of overrides copied."""
langBase = os.path.join(self.original_config_path, "language")
if not os.path.isdir(langBase):
return 0
overridesCopied = 0
for langCode in os.listdir(langBase):
srcLangDir = os.path.join(langBase, langCode)
if not os.path.isdir(srcLangDir):
continue
srcOverride = os.path.join(srcLangDir, filename)
if not os.path.isfile(srcOverride):
continue
dstLangDir = os.path.join(self.config_path, "language", langCode)
os.makedirs(dstLangDir, exist_ok=True)
self._copy_file(srcOverride, os.path.join(dstLangDir, filename))
overridesCopied += 1
return overridesCopied
def copy_matched_files(self) -> None:
"""Copy all matched files from original_config to config, removing UUID and _core structures."""
if not os.path.exists(self.config_path):
os.makedirs(self.config_path)
copiedCount = 0
processedCount = 0
overrideCopied = 0
for prefix in self.prefixes:
for filename in self.matched_files[prefix]:
src = os.path.join(self.original_config_path, filename)
dst = os.path.join(self.config_path, filename)
try:
with open(src, 'r') as f:
content = f.read()
content = self._strip_metadata(content)
with open(dst, 'w') as f:
f.write(content)
processedCount += 1
except Exception as e:
print(f"Error processing {filename}: {e}")
shutil.copy2(src, dst)
copiedCount += 1
overrideCopied += self._copy_with_language_overrides(filename)
# For every matched bundle, also copy its language.content_settings counterpart.
langCopied = 0
for bundleHash in self._collect_bundle_hashes():
langFilename = f"language.content_settings.wisski_individual.{bundleHash}.yml"
langSrc = os.path.join(self.original_config_path, langFilename)
langDst = os.path.join(self.config_path, langFilename)
if os.path.exists(langSrc):
self._copy_file(langSrc, langDst)
langCopied += 1
overrideCopied += self._copy_with_language_overrides(langFilename)
print(
f"Copied {copiedCount} files to {self.config_path} "
f"({processedCount} processed to remove UUIDs and _core structures)"
)
if langCopied:
print(f"Also copied {langCopied} corresponding language.content_settings files.")
if overrideCopied:
print(f"Also copied {overrideCopied} language override files from original_config/language/.")
pruned_export = self._prune_new_recipe_not_in_original()
if pruned_export:
print(
f"Removed {pruned_export} file(s) from new_recipe_config that are not in original_config "
f"(stale vs current export)."
)
pruned = self._prune_stale_language_overrides()
if pruned:
print(f"Removed {pruned} stale language override file(s) (no longer in export or missing base config).")
@staticmethod
def _remove_empty_language_dirs(config_root: str) -> None:
"""Remove empty language/<lang>/ and language/ under config_root if possible."""
lang_root = os.path.join(config_root, "language")
if not os.path.isdir(lang_root):
return
for lang_code in list(os.listdir(lang_root)):
lang_dir = os.path.join(lang_root, lang_code)
if not os.path.isdir(lang_dir):
continue
try:
if not os.listdir(lang_dir):
os.rmdir(lang_dir)
except OSError:
pass
try:
if not os.listdir(lang_root):
os.rmdir(lang_root)
except OSError:
pass
def _prune_new_recipe_not_in_original(self) -> int:
"""Delete YAML under config_path whose relative path is absent from original_config.
Skips when original_config has no YAML (avoid wiping new_recipe before an export).
"""
orig_paths = self._collect_yaml_relpaths(self.original_config_path)
if not orig_paths:
return 0
new_paths = self._collect_yaml_relpaths(self.config_path)
orphans = new_paths - orig_paths
removed = 0
for rel in sorted(orphans):
path = self._abs_config_path(self.config_path, rel)
try:
if os.path.isfile(path):
os.remove(path)
removed += 1
except OSError:
pass
self._remove_empty_language_dirs(self.config_path)
return removed
def _prune_stale_language_overrides(self) -> int:
"""Remove language/*/*.yml in config_path that should not be kept.
Drops overrides that are missing from the current export (translation removed
on the site) or whose base config is not present under config_path (orphans
from earlier copies). Without this, compare would not list those as deleted.
"""
removed = 0
lang_dst_root = os.path.join(self.config_path, "language")
if not os.path.isdir(lang_dst_root):
return 0
lang_src_root = os.path.join(self.original_config_path, "language")
for lang_code in list(os.listdir(lang_dst_root)):
dst_lang_dir = os.path.join(lang_dst_root, lang_code)
if not os.path.isdir(dst_lang_dir):
continue
src_lang_dir = os.path.join(lang_src_root, lang_code)
for name in list(os.listdir(dst_lang_dir)):
if not name.endswith(".yml"):
continue
base_fp = os.path.join(self.config_path, name)
src_fp = os.path.join(src_lang_dir, name)
if os.path.isfile(base_fp) and os.path.isfile(src_fp):
continue
try:
os.remove(os.path.join(dst_lang_dir, name))
removed += 1
except OSError:
pass
self._remove_empty_language_dirs(self.config_path)
return removed
@staticmethod
def _collect_yaml_relpaths(config_root: str) -> Set[str]:
"""Relative paths (posix-style) for YAML under a Drupal config tree.
Includes top-level *.yml and language/<lang>/*.yml (same layout as exports).
"""
rel_paths: Set[str] = set()
if not os.path.isdir(config_root):
return rel_paths
try:
for name in os.listdir(config_root):
if name.endswith(".yml"):
rel_paths.add(name)
lang_root = os.path.join(config_root, "language")
if os.path.isdir(lang_root):
for lang_code in os.listdir(lang_root):
lang_dir = os.path.join(lang_root, lang_code)
if not os.path.isdir(lang_dir):
continue
for name in os.listdir(lang_dir):
if name.endswith(".yml"):
rel_paths.add(f"language/{lang_code}/{name}")
except OSError:
pass
return rel_paths
def _abs_config_path(self, config_root: str, rel_path: str) -> str:
"""Join config root with a posix rel_path from _collect_yaml_relpaths."""
return os.path.normpath(os.path.join(config_root, *rel_path.split("/")))
def _collect_language_inconsistencies(self) -> List[str]:
"""Language overrides in config_path with no matching base *.yml at config root."""
inconsistent: List[str] = []
lang_root = os.path.join(self.config_path, "language")
if not os.path.isdir(lang_root):
return inconsistent
try:
for lang_code in os.listdir(lang_root):
lang_dir = os.path.join(lang_root, lang_code)
if not os.path.isdir(lang_dir):
continue
for name in os.listdir(lang_dir):
if not name.endswith(".yml"):
continue
base_path = os.path.join(self.config_path, name)
if not os.path.isfile(base_path):
inconsistent.append(f"language/{lang_code}/{name}")
except OSError:
pass
return sorted(inconsistent)
def _write_language_inconsistency_csv(self, inconsistent: List[str]) -> None:
with open(self.language_inconsistency_csv, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["filename"])
for rel_path in inconsistent:
writer.writerow([rel_path])
def delete_language_inconsistencies(self) -> int:
"""Remove language/*/*.yml in config_path that have no base config pendant."""
inconsistent = self._collect_language_inconsistencies()
removed = 0
for rel_path in inconsistent:
path = self._abs_config_path(self.config_path, rel_path)
try:
if os.path.isfile(path):
os.remove(path)
removed += 1
except OSError:
pass
self._remove_empty_language_dirs(self.config_path)
remaining = self._collect_language_inconsistencies()
self._write_language_inconsistency_csv(remaining)
return removed
@staticmethod
def _read_deleted_files_csv(csv_path: str) -> List[str]:
paths: List[str] = []
if not os.path.isfile(csv_path):
return paths
with open(csv_path, newline="") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
name = (row.get("filename") or "").strip()
if name:
paths.append(name)
return paths
def update_recipe(self) -> Tuple[int, int, int, str]:
"""Copy committed recipe to update/<recipe_name>/, apply deletions and changed_files/.
Returns:
Tuple of (files_deleted, files_applied, deletes_missing, update_dir).
"""
if not os.path.isdir(RECIPE_SOURCE_DIR):
raise FileNotFoundError(f"Recipe source not found: {RECIPE_SOURCE_DIR}")
update_dir = recipe_update_dir()
_copy_recipe_tree(RECIPE_SOURCE_DIR, update_dir)
update_config = os.path.join(update_dir, "config")
files_deleted = 0
deletes_missing = 0
for rel_path in self._read_deleted_files_csv(self.deleted_files_csv):
target = self._abs_config_path(update_config, rel_path)
try:
if os.path.isfile(target):
os.remove(target)
files_deleted += 1
else:
deletes_missing += 1
except OSError:
deletes_missing += 1
self._remove_empty_language_dirs(update_config)
files_applied = 0
for rel_path in sorted(self._collect_yaml_relpaths(self.changed_files_path)):
src = self._abs_config_path(self.changed_files_path, rel_path)
dst = self._abs_config_path(update_config, rel_path)
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
files_applied += 1
print(
f"Updated recipe written to {update_dir}/ "
f"({files_deleted} deleted from config, {files_applied} copied from changed_files/, "
f"{deletes_missing} delete entries not present in copy)."
)
return files_deleted, files_applied, deletes_missing, update_dir
def compare_and_track_changes(self) -> Tuple[int, int, int, int, int, int, int]:
"""
Compare committed recipe config (old_recipe_path) to new_recipe_config (config_path).
- Copies **new** YAML (in new_recipe but not in committed recipe) to changed_files/.
- Copies **modified** YAML (same path, different content) to changed_files/.
- Respects ignored_files.yml (basename) for both new and modified copies.
- Writes deleted_files.csv: paths in committed recipe missing from new_recipe_config.
- Writes changed_files_manifest.csv: relative_path + kind (new|modified) for each copy.
- Writes new_recipe_not_in_original.csv for paths in new_recipe not in original export.
- Writes language_inconsistency.csv for language/*/*.yml without a base config pendant.
Returns:
Tuple of (added_count, modified_count, deleted_count, unchanged_count,
language_deleted_count, not_in_original_count,
language_inconsistency_count)
"""
_clear_dir(self.changed_files_path)
os.makedirs(self.changed_files_path, exist_ok=True)
# Track statistics
added_count = 0
modified_count = 0
deleted_count = 0
unchanged_count = 0
ignored_count = 0
deleted_files_list = []
manifest_rows: List[Tuple[str, str]] = []
# Top-level and language/<lang>/*.yml (matches Drupal export layout)
old_files = self._collect_yaml_relpaths(self.old_recipe_path)
new_files = self._collect_yaml_relpaths(self.config_path)
# Deleted: in committed recipe but not in new_recipe_config
deleted_files = old_files - new_files
for rel_path in sorted(deleted_files):
deleted_files_list.append(rel_path)
deleted_count += 1
# Modified: same path in both, content differs
common_files = old_files & new_files
for rel_path in sorted(common_files):
old_file_path = self._abs_config_path(self.old_recipe_path, rel_path)
new_file_path = self._abs_config_path(self.config_path, rel_path)
base_name = os.path.basename(rel_path)
if filecmp.cmp(old_file_path, new_file_path, shallow=False):
unchanged_count += 1
else:
if base_name in self.ignored_files:
ignored_count += 1
else:
changed_dest = os.path.join(self.changed_files_path, *rel_path.split("/"))
os.makedirs(os.path.dirname(changed_dest), exist_ok=True)
shutil.copy2(new_file_path, changed_dest)
modified_count += 1
manifest_rows.append((rel_path, "modified"))
# New: in new_recipe (from original_config via prefixes) but not in committed recipe
added_files = new_files - old_files
for rel_path in sorted(added_files):
new_file_path = self._abs_config_path(self.config_path, rel_path)
base_name = os.path.basename(rel_path)
if base_name in self.ignored_files:
ignored_count += 1
else:
changed_dest = os.path.join(self.changed_files_path, *rel_path.split("/"))
os.makedirs(os.path.dirname(changed_dest), exist_ok=True)
shutil.copy2(new_file_path, changed_dest)
added_count += 1
manifest_rows.append((rel_path, "new"))
with open(self.changed_files_manifest_csv, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["relative_path", "kind"])
for rel_path, kind in sorted(manifest_rows, key=lambda x: (x[1], x[0])):
writer.writerow([rel_path, kind])
# Write deleted files to CSV
with open(self.deleted_files_csv, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['filename']) # Header
for filename in deleted_files_list:
writer.writerow([filename])
if ignored_count > 0:
print(
f"Ignored {ignored_count} new/modified file(s) as per ignored_files.yml "
"(not copied to changed_files/)."
)
total_copied = added_count + modified_count
if total_copied:
print(
f"Copied {total_copied} file(s) to changed_files/ "
f"({added_count} new vs committed recipe, {modified_count} modified)."
)
language_deleted_count = sum(1 for p in deleted_files_list if p.startswith("language/"))
if language_deleted_count:
print(
f"Deleted list includes {language_deleted_count} language override path(s) "
f"(see deleted_files.csv under language/<lang>/)."
)
orig_paths = self._collect_yaml_relpaths(self.original_config_path)
new_paths = self._collect_yaml_relpaths(self.config_path)
not_in_original = sorted(new_paths - orig_paths)
not_in_original_count = len(not_in_original)
with open(self.new_recipe_not_in_original_csv, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["filename"])
for rel_path in not_in_original:
writer.writerow([rel_path])
if not_in_original_count:
print(
f"{not_in_original_count} path(s) in new_recipe_config are not in original_config "
f"(see {os.path.basename(self.new_recipe_not_in_original_csv)})."
)
elif not orig_paths:
print("Warning: original_config has no YAML; orphan CSV is empty.")
language_inconsistent = self._collect_language_inconsistencies()
language_inconsistency_count = len(language_inconsistent)
self._write_language_inconsistency_csv(language_inconsistent)
if language_inconsistency_count:
print(
f"{language_inconsistency_count} language override(s) in new_recipe_config "
f"have no base config pendant "
f"(see {os.path.basename(self.language_inconsistency_csv)})."
)
return (
added_count,
modified_count,
deleted_count,
unchanged_count,
language_deleted_count,
not_in_original_count,
language_inconsistency_count,
)
def add_prefix(self, prefix: str) -> None:
"""Add a new prefix to the list if it doesn't exist."""
if prefix and prefix not in self.prefixes:
self.prefixes.append(prefix)
self.scan_configs()
def remove_prefix(self, prefix: str) -> None:
"""Remove a prefix from the list if it exists."""
if prefix in self.prefixes:
self.prefixes.remove(prefix)
self.scan_configs()
def get_summary(self) -> Tuple[int, int]:
"""Return the count of matched and unmatched files."""
matched_count = sum(len(files) for files in self.matched_files.values())
unmatched_count = sum(len(files) for files in self.unmatched_files.values())
return matched_count, unmatched_count
def get_prefix_segments_at_level(all_prefixes: List[str], current_prefix: str, level: int, selector=None, view_mode=None) -> List[str]:
"""
Get unique prefix segments at the specified level.
Args:
all_prefixes: List of all prefixes
current_prefix: Current prefix we're viewing (e.g., "field.storage")
level: The level we want to see (0 = root level)
selector: The ConfigSelector instance (needed for unmatched files)
view_mode: Current view mode ("matched" or "unmatched")
Returns:
List of unique prefix segments at this level
"""
# Current prefix parts (e.g., ["field", "storage"])
current_parts = current_prefix.split('.') if current_prefix else []
unique_prefixes = set()
# Special case for unmatched view with level > 0
if view_mode == "unmatched" and selector:
# Need to scan actual filenames for deeper levels
for prefix, filenames in selector.unmatched_files.items():
for filename in filenames:
# Skip .yml extension for determining parts
if filename.endswith('.yml'):
filename = filename[:-4]
parts = filename.split('.')
# Skip if not enough parts for this level
if len(parts) <= level:
continue
# Skip if prefix doesn't match current path
if current_prefix:
filename_prefix = '.'.join(parts[:len(current_parts)])
if filename_prefix != current_prefix:
continue
# Add the segment at this level
prefix_at_level = '.'.join(parts[:level+1])
unique_prefixes.add(prefix_at_level)
return sorted(list(unique_prefixes))
# Standard processing for matched view
elif view_mode == "matched":
for prefix in all_prefixes:
# Split each prefix into parts
parts = prefix.split('.')
# Skip if not enough parts for the level
if len(parts) <= level:
continue
# If we're looking at a nested level, only include prefixes that match the current path
if current_prefix:
# Skip if this prefix doesn't start with the current prefix
if not prefix.startswith(current_prefix + '.') and prefix != current_prefix:
continue
# If current level is within current_prefix, it should match exactly
if level < len(current_parts) and parts[level] != current_parts[level]:
continue
# Build the prefix up to this level
prefix_at_level = '.'.join(parts[:level+1])
unique_prefixes.add(prefix_at_level)
# Basic processing for all other cases
else:
for prefix in all_prefixes:
# Split each prefix into parts
parts = prefix.split('.')
# Skip if not enough parts for the level
if len(parts) <= level:
continue
# If we're looking at a nested level, only include prefixes that match the current path
if current_prefix:
# Skip if this prefix doesn't start with the current prefix
if not prefix.startswith(current_prefix + '.') and prefix != current_prefix:
continue
# If current level is within current_prefix, it should match exactly
if level < len(current_parts) and parts[level] != current_parts[level]:
continue
# Build the prefix up to this level
prefix_at_level = '.'.join(parts[:level+1])
unique_prefixes.add(prefix_at_level)
return sorted(list(unique_prefixes))
def get_files_for_segment(selector: ConfigSelector, segment: str, view_mode: str) -> List[str]:
"""Get files that match the given segment in the current view."""
files = []
if view_mode == "matched":
# For matched files, we need to check if this segment is an exact prefix or a partial one
if segment in selector.prefixes:
# Exact match: include all files for this prefix
files.extend(selector.matched_files[segment])
else:
# Partial match: include files from all prefixes that start with this segment
for prefix in selector.prefixes:
if prefix == segment or prefix.startswith(segment + '.'):
files.extend(selector.matched_files[prefix])
else:
# For unmatched files, collect files that match this segment
segment_parts = segment.split('.')
segment_len = len(segment_parts)
for prefix, file_list in selector.unmatched_files.items():
# For root level prefixes (without dots)
if '.' not in prefix and segment_len == 1 and prefix == segment:
files.extend(file_list)
continue
# For multi-level prefixes and filenames
for filename in file_list:
# Get filename without .yml extension for comparison
filename_no_ext = filename
if filename.endswith('.yml'):
filename_no_ext = filename[:-4]
filename_parts = filename_no_ext.split('.')
# Check if this filename matches the segment up to the segment's length
if len(filename_parts) >= segment_len:
if '.'.join(filename_parts[:segment_len]) == segment:
files.append(filename)
return sorted(list(set(files)))
def draw_menu(stdscr, selector: ConfigSelector, selected_idx: int, view_mode: str, current_prefix: str = "", level: int = 0):
"""Draw the TUI menu."""
curses.curs_set(0)
stdscr.clear()
h, w = stdscr.getmaxyx()
# Colors
curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Selected item
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # Matched
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # Unmatched
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Titles
# Title
title = "Drupal Configuration Selector"
stdscr.addstr(0, (w - len(title)) // 2, title, curses.color_pair(4) | curses.A_BOLD)
# Summary
matched_count, unmatched_count = selector.get_summary()
summary = f"Matched: {matched_count} files | Unmatched: {unmatched_count} files"
stdscr.addstr(1, (w - len(summary)) // 2, summary)
# Current path display
if current_prefix:
path_display = f"Current path: {current_prefix}"
stdscr.addstr(2, 2, path_display)
# Instructions
stdscr.addstr(h-4, 2, "LEFT/RIGHT: Navigate tree levels | SPACE: Toggle view | ENTER: Select files")
stdscr.addstr(
h - 3,
2,
"A: Add | O: Choose | D: Add/Del | C: Copy | M: Compare | I: Lang orphans | U: Update | S: Save | Q: Quit",
)
# Content area
start_y = 3
max_visible = h - 8 # Account for two instruction lines
# Get segments at the current level
if view_mode == "matched":
all_prefixes = selector.prefixes
else:
all_prefixes = list(selector.unmatched_files.keys())
items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode)
# Title based on current path
if current_prefix:
title = f"{view_mode.capitalize()} Prefixes under '{current_prefix}'"
else:
title = f"{view_mode.capitalize()} Prefixes (root level)"
stdscr.addstr(start_y, 2, title, curses.color_pair(4) | curses.A_BOLD)
start_y += 1
# Pagination
if len(items) > max_visible:
page_size = max_visible
current_page = selected_idx // page_size
start_idx = current_page * page_size
end_idx = min(start_idx + page_size, len(items))
display_items = items[start_idx:end_idx]
# Adjust selected_idx relative to the page
relative_idx = selected_idx - start_idx
else:
display_items = items
relative_idx = selected_idx if selected_idx < len(items) else 0
# Display items
for i, item in enumerate(display_items):
y = start_y + i
# Get base name (last segment)
item_base = item.split('.')[-1]
# Check if there are child segments
has_children = has_child_segments(item, view_mode, selector)
# Get files for this item
files = get_files_for_segment(selector, item, view_mode)
file_count = len(files)
# Format display text
if has_children:
item_text = f"{item_base} ({file_count} files) ▶"
else:
item_text = f"{item_base} ({file_count} files)"
if i == relative_idx:
stdscr.addstr(y, 2, item_text, curses.color_pair(1) | curses.A_BOLD)
else:
if view_mode == "matched":
stdscr.addstr(y, 2, item_text, curses.color_pair(2))
else:
stdscr.addstr(y, 2, item_text, curses.color_pair(3))
# Selected item details
if items and len(items) > 0 and selected_idx < len(items):
try:
selected_item = items[selected_idx]
detail_y = start_y + min(len(display_items), max_visible) + 1
if detail_y < h - 4: # Make sure we have room to display details
stdscr.addstr(detail_y, 2, f"Files for '{selected_item}':", curses.A_BOLD)
detail_y += 1
# Get files for the selected item
files = get_files_for_segment(selector, selected_item, view_mode)
max_files_to_show = h - detail_y - 4
if max_files_to_show > 0:
sorted_files = sorted(files)[:max_files_to_show]
for i, file in enumerate(sorted_files):
if detail_y + i < h - 4: # Prevent writing outside window
# Truncate filename if too long
max_width = w - 6 # Allow for padding
if len(file) > max_width:
file_display = file[:max_width-3] + "..."
else:
file_display = file
stdscr.addstr(detail_y + i, 4, file_display)
if len(files) > max_files_to_show and detail_y + max_files_to_show < h - 4:
stdscr.addstr(detail_y + max_files_to_show, 4, f"...and {len(files) - max_files_to_show} more")
except Exception as e:
# In case of error, just don't display details
error_msg = f"Error displaying file details: {str(e)}"
if h > 5:
stdscr.addstr(h-4, 2, error_msg[:w-4])
stdscr.refresh()
return len(items)
def add_prefix_dialog(stdscr):
"""Show dialog to add a new prefix."""
h, w = stdscr.getmaxyx()
# Safety checks for minimum terminal size
min_h, min_w = 7, 20
if h < min_h or w < min_w:
return "" # Terminal too small
dialog_h, dialog_w = min(5, h - 2), min(50, w - 4)
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
# Final safety check
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
return ""
# Draw dialog box
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.box()
dialog_win.addstr(1, 2, "Enter new prefix (ESC or empty input to cancel):")
dialog_win.refresh()
# Input field - ensure it fits within the dialog
input_w = max(1, dialog_w - 6)
input_win = curses.newwin(1, input_w, dialog_y + 2, dialog_x + 3)
input_win.clear()
# Setup input mode
curses.echo()
curses.curs_set(1)
input_win.keypad(True) # Enable special keys
# Collect input character by character to handle escape key
prefix = ""
input_win.refresh()
while True:
try:
key = input_win.getch()
# Handle escape key
if key == 27: # ASCII code for Escape
prefix = ""
break
# Handle enter key
elif key in (10, 13): # ASCII codes for Enter/Return
break
# Handle backspace/delete
elif key in (8, 127, curses.KEY_BACKSPACE):
if prefix:
prefix = prefix[:-1]
# Clear line and rewrite
input_win.clear()
input_win.addstr(0, 0, prefix)
input_win.refresh()
# Regular character
elif 32 <= key <= 126: # Printable ASCII
prefix += chr(key)
except Exception:
# In case of any error, just return empty string
prefix = ""
break
# Restore normal cursor visibility and echo settings
curses.noecho()
curses.curs_set(0)
return prefix.strip()
def confirm_dialog(stdscr, message):
"""Show a confirmation dialog."""
h, w = stdscr.getmaxyx()
# Safety checks for minimum terminal size
min_h, min_w = 7, 20
if h < min_h or w < min_w:
return False # Terminal too small, default to cancel
dialog_h, dialog_w = min(5, h - 2), min(max(50, len(message) + 4), w - 4)
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
# Final safety check
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
return False
# Draw dialog box
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.box()
# Truncate message if it's too long for the dialog
max_msg_len = dialog_w - 4
display_message = message[:max_msg_len] if len(message) > max_msg_len else message
dialog_win.addstr(1, 2, display_message)
dialog_win.addstr(3, 2, "Press Y to confirm, any other key to cancel")
dialog_win.refresh()
key = stdscr.getch()
return key == ord('y') or key == ord('Y')
def choose_prefix_dialog(stdscr, selector: ConfigSelector):
"""Show dialog to choose a prefix from unmatched prefixes."""
h, w = stdscr.getmaxyx()
# Get sorted list of unmatched prefixes
unmatched_prefixes = sorted(selector.unmatched_files.keys())
if not unmatched_prefixes:
# Show message if no unmatched prefixes - ensure minimum window size
msg_h = min(5, h - 2)
msg_w = min(40, w - 4)
if msg_h < 3 or msg_w < 10:
return None # Terminal too small
message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2))
message_win.box()
message_win.addstr(1, 2, "No unmatched prefixes available.")
message_win.refresh()
message_win.getch()
return None
# Calculate dialog dimensions with safety checks
min_dialog_h = 6 # Minimum height for usable dialog
min_dialog_w = 30 # Minimum width for usable dialog
# Ensure we have enough terminal space
if h < min_dialog_h + 2 or w < min_dialog_w + 4:
return None # Terminal too small
dialog_h = min(max(min_dialog_h, min(15, len(unmatched_prefixes) + 4)), h - 2) # +4 for title, instructions, and border
dialog_w = min(max(min_dialog_w, 60), w - 2)
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
# Final safety check
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
return None
# Draw dialog box
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.keypad(True) # Enable arrow keys
# Initialize selection variables
selected_idx = 0
current_page = 0
items_per_page = dialog_h - 4 # -4 for title, instructions, and border
# Draw function
def draw_prefix_list():
dialog_win.clear()
dialog_win.box()
dialog_win.addstr(1, 2, "Choose a prefix to add (ESC to cancel):")
# Calculate page items
start_idx = current_page * items_per_page
end_idx = min(start_idx + items_per_page, len(unmatched_prefixes))
# Display prefixes with selection highlight
for i, prefix in enumerate(unmatched_prefixes[start_idx:end_idx]):
file_count = len(selector.unmatched_files[prefix])
item_text = f"{prefix} ({file_count} files)"
if i + start_idx == selected_idx:
dialog_win.addstr(i + 2, 4, item_text, curses.A_REVERSE)
else:
dialog_win.addstr(i + 2, 4, item_text)
# Show pagination info if needed
if len(unmatched_prefixes) > items_per_page:
page_info = f"Page {current_page + 1}/{(len(unmatched_prefixes) - 1) // items_per_page + 1}"
dialog_win.addstr(dialog_h - 1, dialog_w - len(page_info) - 2, page_info)
dialog_win.refresh()
# Main interaction loop
while True:
draw_prefix_list()
key = dialog_win.getch()
if key == 27: # ESC
return None
elif key == curses.KEY_UP and selected_idx > 0:
selected_idx -= 1
if selected_idx < current_page * items_per_page:
current_page -= 1
elif key == curses.KEY_DOWN and selected_idx < len(unmatched_prefixes) - 1:
selected_idx += 1
if selected_idx >= (current_page + 1) * items_per_page:
current_page += 1
elif key in (10, 13): # Enter
return unmatched_prefixes[selected_idx]
elif key == curses.KEY_NPAGE and current_page < (len(unmatched_prefixes) - 1) // items_per_page:
# Page Down
current_page += 1
selected_idx = min(selected_idx + items_per_page, len(unmatched_prefixes) - 1)
elif key == curses.KEY_PPAGE and current_page > 0:
# Page Up
current_page -= 1
selected_idx = max(selected_idx - items_per_page, 0)
def select_individual_files_dialog(stdscr, selector: ConfigSelector, prefix: str, view_mode: str):
"""Show dialog to select individual files from a prefix."""
h, w = stdscr.getmaxyx()
# Get the list of files for this prefix
if view_mode == "matched":
files = selector.matched_files.get(prefix, [])
title = f"Select files to remove from '{prefix}'"
action_text = "SPACE: Toggle selection | ENTER: Confirm | ESC: Cancel"
else:
files = selector.unmatched_files.get(prefix, [])
title = f"Select files to add from '{prefix}'"
action_text = "SPACE: Toggle selection | ENTER: Add selected | ESC: Cancel"
if not files:
# Show message if no files - ensure minimum window size
msg_h = min(5, h - 2)
msg_w = min(40, w - 4)
if msg_h < 3 or msg_w < 10:
return [] # Terminal too small
message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2))
message_win.box()
message_win.addstr(1, 2, "No files available.")
message_win.refresh()
message_win.getch()
return []
# Sort files for consistent display
files = sorted(files)
# Track selected files
selected_files = set()
# Calculate dialog dimensions with safety checks
min_dialog_h = 8 # Minimum height for usable dialog
min_dialog_w = 30 # Minimum width for usable dialog
# Ensure we have enough terminal space
if h < min_dialog_h + 2 or w < min_dialog_w + 4:
return [] # Terminal too small
dialog_h = min(max(min_dialog_h, min(20, len(files) + 5)), h - 2) # +5 for title, instructions, pagination, and border
dialog_w = min(max(min_dialog_w, min(w - 4, 80)), w - 2)
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
# Final safety check
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
return []
# Draw dialog box
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.keypad(True) # Enable arrow keys
# Initialize selection variables
selected_idx = 0
current_page = 0
items_per_page = dialog_h - 5 # -5 for title, instructions, actions, and border
# Draw function
def draw_file_list():
dialog_win.clear()
dialog_win.box()
dialog_win.addstr(1, 2, title, curses.A_BOLD)
dialog_win.addstr(dialog_h - 2, 2, action_text)
# Calculate page items
start_idx = current_page * items_per_page
end_idx = min(start_idx + items_per_page, len(files))
# Display files with selection highlight and checkbox
for i, file in enumerate(files[start_idx:end_idx]):
y = i + 3 # +3 for title, blank line, and border
# Create checkbox display
checkbox = "[X]" if file in selected_files else "[ ]"
# Truncate filename if too long
max_width = dialog_w - 8 # Allow for padding and checkbox
if len(file) > max_width:
file_display = file[:max_width-3] + "..."
else:
file_display = file
item_text = f"{checkbox} {file_display}"
if i + start_idx == selected_idx:
dialog_win.addstr(y, 2, item_text, curses.A_REVERSE)
else:
dialog_win.addstr(y, 2, item_text)
# Show pagination info if needed
if len(files) > items_per_page:
page_info = f"Page {current_page + 1}/{(len(files) - 1) // items_per_page + 1}"
dialog_win.addstr(dialog_h - 3, dialog_w - len(page_info) - 2, page_info)
dialog_win.refresh()
# Main interaction loop
while True:
draw_file_list()
key = dialog_win.getch()
if key == 27: # ESC
return []
elif key == curses.KEY_UP and selected_idx > 0:
selected_idx -= 1
if selected_idx < current_page * items_per_page:
current_page -= 1
elif key == curses.KEY_DOWN and selected_idx < len(files) - 1:
selected_idx += 1
if selected_idx >= (current_page + 1) * items_per_page:
current_page += 1
elif key == ord(' '): # SPACE to toggle selection
current_file = files[selected_idx]
if current_file in selected_files:
selected_files.remove(current_file)
else:
selected_files.add(current_file)
elif key in (10, 13): # Enter to confirm
return list(selected_files)
elif key == curses.KEY_NPAGE and current_page < (len(files) - 1) // items_per_page:
# Page Down
current_page += 1
selected_idx = min(selected_idx + items_per_page, len(files) - 1)
elif key == curses.KEY_PPAGE and current_page > 0:
# Page Up
current_page -= 1
selected_idx = max(selected_idx - items_per_page, 0)
def has_child_segments(item, mode, selector):
"""
Check if the given item has child segments.
Args:
item: Current prefix path (e.g., "field.storage")
mode: View mode ("matched" or "unmatched")
selector: The ConfigSelector instance
Returns:
True if child segments exist, False otherwise
"""
if mode == "matched":
# For matched prefixes
for prefix in selector.prefixes:
if prefix.startswith(item + '.'):
return True
return False
else:
# For unmatched files, check if any filenames have one more segment
selected_segments = item.split('.')
segment_count = len(selected_segments)
# Look through all files in all unmatched prefixes
for prefix, file_list in selector.unmatched_files.items():
for filename in file_list:
# Remove .yml extension for better segment counting
if filename.endswith('.yml'):
filename = filename[:-4]
filename_parts = filename.split('.')
# Check if this file has this prefix plus at least one more segment
if (len(filename_parts) > segment_count and
'.'.join(filename_parts[:segment_count]) == item):
return True
return False
def preview_yaml_cleaning(stdscr, content):
"""Show a simplified schematic preview of the YAML cleaning process."""
h, w = stdscr.getmaxyx()
# Safety checks for minimum terminal size
min_h, min_w = 10, 30
if h < min_h or w < min_w:
return # Terminal too small
# Calculate preview window size
preview_h = min(max(8, min(12, h - 8)), h - 2) # Smaller height
preview_w = min(max(30, min(70, w - 4)), w - 2)
preview_y = max(0, (h - preview_h) // 2)
preview_x = max(0, (w - preview_w) // 2)
# Final safety check
if preview_h <= 0 or preview_w <= 0 or preview_y < 0 or preview_x < 0:
return
# Create window
preview_win = curses.newwin(preview_h, preview_w, preview_y, preview_x)
preview_win.box()
preview_win.addstr(1, 2, "YAML Cleaning Preview:", curses.A_BOLD)
# Create simple schematic example
original = """langcode: en
status: true
uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae
dependencies:
module:
- node
_core:
default_config_hash: fUksROt4FfkAU9BV4hV2XvhTBSS2nTNrZS4U7S-tKrs
id: example
name: Example"""
cleaned = """langcode: en
status: true
dependencies:
module:
- node
id: example
name: Example"""
# Display side by side
# Show original
preview_win.addstr(3, 2, "Original:", curses.A_UNDERLINE)
preview_win.addstr(4, 2, "uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae", curses.A_BOLD)
preview_win.addstr(5, 2, "_core:", curses.A_BOLD)
preview_win.addstr(6, 2, " default_config_hash: fUksROt4F...", curses.A_BOLD)
preview_win.addstr(7, 2, "other YAML content...")
# Show arrow
arrow_x = preview_w // 2 - 2
for i in range(3, preview_h - 3):
preview_win.addstr(i, arrow_x, "")
# Show cleaned
preview_win.addstr(3, arrow_x + 4, "Cleaned:", curses.A_UNDERLINE)
preview_win.addstr(5, arrow_x + 4, "UUID and _core removed")
preview_win.addstr(7, arrow_x + 4, "other YAML content preserved")
# Instructions
preview_win.addstr(preview_h - 2, 2, "Press any key to continue...", curses.A_DIM)
preview_win.refresh()
# Wait for key
stdscr.getch()
def confirm_yaml_cleaning(stdscr, selector):
"""Show a confirmation dialog for YAML cleaning during copy."""
h, w = stdscr.getmaxyx()
# Get a sample file to preview
sample_file = None
for prefix in selector.prefixes:
if selector.matched_files[prefix]:
sample_file = os.path.join(selector.original_config_path, selector.matched_files[prefix][0])
break
if sample_file and os.path.exists(sample_file):
try:
with open(sample_file, 'r') as file:
content = file.read()
preview_yaml_cleaning(stdscr, content)
except Exception as e:
# In case of error, just show a simple message
error_msg = f"Error reading sample file: {str(e)}"
# Safety checks for window creation
if h >= 5 and w >= 20:
msg_h = min(3, h - 2)
msg_w = min(len(error_msg) + 4, w - 4)
if msg_w < 10:
msg_w = min(20, w - 4)
msg_y = max(0, (h - msg_h) // 2)
msg_x = max(0, (w - msg_w) // 2)
if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0:
message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x)
message_win.box()
display_msg = error_msg[:msg_w-4] if len(error_msg) > msg_w-4 else error_msg
message_win.addstr(1, 2, display_msg)
message_win.refresh()
message_win.getch()
return confirm_dialog(stdscr, "Copy all matched files to config directory and remove UUID and _core?")
def _curses_addstr_clipped(win, y: int, x: int, text: str, max_width: int, attrs: int = 0) -> None:
"""Write text inside a bordered curses window, clipping to fit."""
win_h, _ = win.getmaxyx()
if y <= 0 or y >= win_h - 1:
return
clipped = text[: max(0, max_width - x - 1)]
if not clipped:
return
try:
if attrs:
win.addstr(y, x, clipped, attrs)
else:
win.addstr(y, x, clipped)
except curses.error:
pass
def show_info_dialog(stdscr, message: str) -> None:
"""Show a simple message dialog."""
h, w = stdscr.getmaxyx()
if h < 5 or w < 20:
return
dialog_h = min(5, h - 2)
dialog_w = min(max(40, len(message) + 4), w - 4)
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
if dialog_h <= 0 or dialog_w <= 0:
return
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.box()
_curses_addstr_clipped(dialog_win, 1, 2, message, dialog_w)
_curses_addstr_clipped(dialog_win, dialog_h - 2, 2, "Press any key to continue...", dialog_w)
dialog_win.refresh()
stdscr.getch()
def show_update_recipe_results(
stdscr,
files_deleted: int,
files_applied: int,
deletes_missing: int,
update_dir: str,
) -> None:
"""Show the results of building update/<recipe_name>/ from the committed recipe."""
h, w = stdscr.getmaxyx()
dialog_w = min(60, w - 4)
content_lines = [
f"Recipe copy: {update_dir}/",
f"Deleted from config: {files_deleted}",
f"Applied from changed_files/: {files_applied}",
]
if deletes_missing:
content_lines.append(f"Delete CSV entries not in copy: {deletes_missing}")
content_start = 3
dialog_h = min(h - 2, max(12, content_start + len(content_lines) + 3))
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.box()
_curses_addstr_clipped(
dialog_win, 1, 2, "Recipe Update Results", dialog_w, curses.A_BOLD
)
max_footer_row = dialog_h - 3
for i, line in enumerate(content_lines):
row = content_start + i
if row > max_footer_row:
break
_curses_addstr_clipped(dialog_win, row, 4, line, dialog_w)
_curses_addstr_clipped(
dialog_win, dialog_h - 2, 2, "Press any key to continue...", dialog_w
)
dialog_win.refresh()
stdscr.getch()
def show_comparison_results(
stdscr,
added_count: int,
modified_count: int,
deleted_count: int,
unchanged_count: int,
language_deleted_count: int = 0,
not_in_original_count: int = 0,
language_inconsistency_count: int = 0,
):
"""Show the results of file comparison."""
h, w = stdscr.getmaxyx()
dialog_w = min(60, w - 4)
total_out = added_count + modified_count
deleted_line = f"Deleted (recipe, missing in new): {deleted_count}"
if language_deleted_count:
deleted_line += f" ({language_deleted_count} language/)"
content_lines = [
f"New (not in committed recipe): {added_count}",
f"Modified vs committed: {modified_count}",
f"Total -> changed_files/: {total_out}",
deleted_line,
f"Unchanged: {unchanged_count}",
]
if not_in_original_count:
content_lines.append(
f"Not in original export: {not_in_original_count} "
"(new_recipe_not_in_original.csv)"
)
if language_deleted_count:
content_lines.append(
"Language deletions: paths under language/ in deleted CSV"
)
if language_inconsistency_count:
content_lines.append(
f"Language inconsistency: {language_inconsistency_count} "
"(language_inconsistency.csv)"
)
content_lines.extend(
[
"",
"Manifest -> changed_files_manifest.csv",
"Deletions -> deleted_files.csv",
]
)
content_start = 3
# Leave one blank row between content and footer (footer at dialog_h - 2).
dialog_h = min(h - 2, max(14, content_start + len(content_lines) + 3))
dialog_y = max(0, (h - dialog_h) // 2)
dialog_x = max(0, (w - dialog_w) // 2)
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
dialog_win.box()
_curses_addstr_clipped(
dialog_win, 1, 2, "File Comparison Results", dialog_w, curses.A_BOLD
)
max_footer_row = dialog_h - 3
for i, line in enumerate(content_lines):
row = content_start + i
if row > max_footer_row:
break
_curses_addstr_clipped(dialog_win, row, 4, line, dialog_w)
_curses_addstr_clipped(
dialog_win, dialog_h - 2, 2, "Press any key to continue...", dialog_w
)
dialog_win.refresh()
# Wait for key
stdscr.getch()
def main(stdscr):
# Set paths relative to current directory.
jsonPath = os.path.join(BASE_DIR, "config_prefixes.json")
configPath = os.path.join(BASE_DIR, "new_recipe_config")
# Create directories if they don't exist.
os.makedirs(ORIGINAL_CONFIG_DIR, exist_ok=True)
os.makedirs(configPath, exist_ok=True)
# Check if we need to create default JSON file.
if not os.path.exists(jsonPath):
with open(jsonPath, "w") as f:
json.dump({"prefixes": ["put.your.prefixes.here"]}, f, indent=2)
print(f"Created default {jsonPath}")
# Initialize selector.
selector = ConfigSelector(
json_path=jsonPath,
original_config_path=ORIGINAL_CONFIG_DIR,
config_path=configPath,
old_recipe_path=RECIPE_CONFIG_DIR,
)
# Menu state
selected_idx = 0
view_mode = "matched" # "matched" or "unmatched"
current_prefix = "" # Current prefix path, e.g. "field.storage"
level = 0 # Current hierarchical level
# Navigation history for going back
path_history = []
# Main loop
while True:
# Get segments at the current level for the current view
all_prefixes = selector.prefixes if view_mode == "matched" else list(selector.unmatched_files.keys())
items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode)
# Draw the menu
item_count = draw_menu(stdscr, selector, selected_idx, view_mode, current_prefix, level)
if item_count == 0:
key = stdscr.getch()
if key == ord('q') or key == ord('Q'):
break
elif key == ord(' '):
view_mode = "unmatched" if view_mode == "matched" else "matched"
selected_idx = 0
# Reset navigation when toggling view
current_prefix = ""
level = 0
path_history = []
elif key == ord('a') or key == ord('A'):
new_prefix = add_prefix_dialog(stdscr)
if new_prefix:
selector.add_prefix(new_prefix)
elif key == ord('o') or key == ord('O'):
new_prefix = choose_prefix_dialog(stdscr, selector)
if new_prefix:
selector.add_prefix(new_prefix)
elif key == curses.KEY_LEFT and (current_prefix or level > 0):
# Go back one level
if level > 0:
level -= 1
if current_prefix:
# Remove last segment
parts = current_prefix.split('.')
current_prefix = '.'.join(parts[:-1])
selected_idx = 0
elif path_history:
# Pop from history
prev_prefix, prev_level, prev_idx = path_history.pop()
current_prefix = prev_prefix
level = prev_level
selected_idx = prev_idx
continue
key = stdscr.getch()
if key == ord('q') or key == ord('Q'):
break
elif key == curses.KEY_UP and selected_idx > 0:
selected_idx -= 1
elif key == curses.KEY_DOWN and selected_idx < item_count - 1:
selected_idx += 1
elif key == ord(' '):
view_mode = "unmatched" if view_mode == "matched" else "matched"
selected_idx = 0
# Reset navigation when toggling view
current_prefix = ""
level = 0
path_history = []
elif key == curses.KEY_RIGHT:
# Drill down into the selected item
if selected_idx < len(items):
selected_item = items[selected_idx]
# Check if there are child segments
if has_child_segments(selected_item, view_mode, selector):
# Save current state to history
path_history.append((current_prefix, level, selected_idx))
# Update current prefix and level
current_prefix = selected_item
level += 1
selected_idx = 0
elif key == curses.KEY_LEFT and (current_prefix or level > 0):
# Go back one level
if level > 0:
level -= 1
if current_prefix:
# Remove last segment
parts = current_prefix.split('.')
current_prefix = '.'.join(parts[:-1])
selected_idx = 0
elif path_history:
# Pop from history
prev_prefix, prev_level, prev_idx = path_history.pop()
current_prefix = prev_prefix
level = prev_level
selected_idx = prev_idx
elif key == ord('a') or key == ord('A'):
new_prefix = add_prefix_dialog(stdscr)
if new_prefix:
selector.add_prefix(new_prefix)
elif key == ord('o') or key == ord('O'):
new_prefix = choose_prefix_dialog(stdscr, selector)
if new_prefix:
selector.add_prefix(new_prefix)
elif key == ord('d') or key == ord('D'):
if view_mode == "matched" and len(items) > 0 and selected_idx < len(items):
selected_item = items[selected_idx]
# Only allow deleting complete prefixes
if selected_item in selector.prefixes:
if confirm_dialog(stdscr, f"Remove prefix '{selected_item}'?"):
selector.remove_prefix(selected_item)
if selected_idx >= len(items) - 1:
selected_idx = max(0, len(items) - 1)
else:
# Allow adding the current selection (partial prefix) directly
if confirm_dialog(stdscr, f"Add current path '{selected_item}' as a matched prefix?"):
selector.add_prefix(selected_item)
view_mode = "matched"
current_prefix = ""
level = 0
path_history = []
selected_idx = 0
elif view_mode == "unmatched" and len(items) > 0 and selected_idx < len(items):
# Add the item to matched prefixes
selected_item = items[selected_idx]
if confirm_dialog(stdscr, f"Add '{selected_item}' to matched prefixes?"):
selector.add_prefix(selected_item)
view_mode = "matched"
current_prefix = ""
level = 0
path_history = []
selected_idx = 0
elif key == ord('c') or key == ord('C'):
if confirm_yaml_cleaning(stdscr, selector):
selector.copy_matched_files()
elif key == ord('m') or key == ord('M'):
if confirm_dialog(stdscr, "Compare old and new recipe configs and track changes?"):
(
added_count,
modified_count,
deleted_count,
unchanged_count,
language_deleted_count,
not_in_original_count,
language_inconsistency_count,
) = selector.compare_and_track_changes()
show_comparison_results(
stdscr,
added_count,
modified_count,
deleted_count,
unchanged_count,
language_deleted_count,
not_in_original_count,
language_inconsistency_count,
)
elif key == ord('i') or key == ord('I'):
inconsistent = selector._collect_language_inconsistencies()
if not inconsistent:
show_info_dialog(stdscr, "No language inconsistencies in new_recipe_config.")
elif confirm_dialog(
stdscr,
f"Delete {len(inconsistent)} language file(s) without base config pendant?",
):
removed = selector.delete_language_inconsistencies()
show_info_dialog(
stdscr,
f"Removed {removed} language inconsistency file(s) from new_recipe_config.",
)
elif key == ord('u') or key == ord('U'):
if confirm_dialog(
stdscr,
f"Copy recipe to {recipe_update_dir()}/, remove deleted_files.csv "
"entries, apply changed_files/ to config?",
):
try:
files_deleted, files_applied, deletes_missing, update_dir = (
selector.update_recipe()
)
show_update_recipe_results(
stdscr,
files_deleted,
files_applied,
deletes_missing,
update_dir,
)
except FileNotFoundError as exc:
show_info_dialog(stdscr, str(exc))
elif key == ord('s') or key == ord('S'):
selector.save_prefixes()
elif key in (10, 13): # ENTER key to select individual files
if len(items) > 0 and selected_idx < len(items):
selected_item = items[selected_idx]
files = get_files_for_segment(selector, selected_item, view_mode)
if files:
if view_mode == "matched":
# Check if this is a complete prefix in matched
if selected_item in selector.prefixes:
selected_files = select_individual_files_dialog(stdscr, selector, selected_item, view_mode)
if selected_files and confirm_dialog(stdscr, f"Remove {len(selected_files)} files from matched?"):
# Remove individual files from matched_files
for file in selected_files:
if file in selector.matched_files[selected_item]:
selector.matched_files[selected_item].remove(file)
else:
# Show message that you can only select files from complete prefixes
h, w = stdscr.getmaxyx()
# Safety checks for window creation
if h >= 5 and w >= 30:
msg_h = min(3, h - 2)
msg_w = min(60, w - 4)
if msg_w < 20:
msg_w = min(30, w - 4)
msg_y = max(0, (h - msg_h) // 2)
msg_x = max(0, (w - msg_w) // 2)
if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0:
message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x)
message_win.box()
message_win.addstr(1, 2, "Can only select files from complete prefixes.")
message_win.refresh()
message_win.getch()
else: # Unmatched view
# Find a matching prefix to add files to
target_prefix = None
for prefix in selector.prefixes:
if selected_item.startswith(prefix) or prefix.startswith(selected_item):
target_prefix = prefix
break
if not target_prefix:
if confirm_dialog(stdscr, f"Add prefix '{selected_item}' to matched prefixes?"):
selector.add_prefix(selected_item)
target_prefix = selected_item
if target_prefix:
file_selection = select_individual_files_dialog(stdscr, selector, selected_item, view_mode)
if file_selection:
# Add individual files to matched_files
for file in file_selection:
if file not in selector.matched_files[target_prefix]:
selector.matched_files[target_prefix].append(file)
if __name__ == "__main__":
forceIngest = "--force" in sys.argv
ingest_config(force=forceIngest)
try:
curses.wrapper(main)
except KeyboardInterrupt:
print("Program terminated by user")