1949 lines
79 KiB
Python
1949 lines
79 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import shutil
|
|
import curses
|
|
import re
|
|
import csv
|
|
import filecmp
|
|
import glob
|
|
import tarfile
|
|
import yaml
|
|
from typing import List, Dict, Set, Tuple, Optional
|
|
|
|
# Use current directory instead of hardcoded path.
|
|
BASE_DIR = os.getcwd()
|
|
|
|
INGEST_DIR = os.path.join(BASE_DIR, "ingest")
|
|
ORIGINAL_CONFIG_DIR = os.path.join(BASE_DIR, "original_config")
|
|
RECIPE_CONFIG_DIR = os.path.join(BASE_DIR, "recipes", "wisski_default_data_model", "config")
|
|
RECIPE_SOURCE_DIR = os.path.join(BASE_DIR, "recipes", "wisski_default_data_model")
|
|
RECIPE_UPDATE_BASE = os.path.join(BASE_DIR, "update")
|
|
|
|
|
|
def recipe_update_dir(recipe_source: str = RECIPE_SOURCE_DIR) -> str:
|
|
"""Path to updated recipe copy: update/<recipe_name>/."""
|
|
return os.path.join(RECIPE_UPDATE_BASE, os.path.basename(recipe_source))
|
|
|
|
|
|
def _clear_dir(path: str) -> int:
|
|
"""Remove all files and subdirectories inside path. Returns count of removed items."""
|
|
if not os.path.isdir(path):
|
|
return 0
|
|
removedCount = 0
|
|
for entry in os.listdir(path):
|
|
entryPath = os.path.join(path, entry)
|
|
if os.path.isdir(entryPath):
|
|
shutil.rmtree(entryPath)
|
|
else:
|
|
os.remove(entryPath)
|
|
removedCount += 1
|
|
return removedCount
|
|
|
|
|
|
def _copy_recipe_tree(src: str, dst: str) -> None:
|
|
"""Copy a full recipe directory tree, including .git."""
|
|
if os.path.exists(dst):
|
|
shutil.rmtree(dst)
|
|
shutil.copytree(src, dst)
|
|
|
|
|
|
def ingest_config(force: bool = False) -> None:
|
|
"""Extract a config*.tar.gz archive from ingest/ into original_config/.
|
|
|
|
Skips extraction when YAML files are already present in original_config/,
|
|
unless force is True. With --force, original_config/, new_recipe_config/,
|
|
and changed_files/ are all cleared before re-extraction.
|
|
"""
|
|
os.makedirs(ORIGINAL_CONFIG_DIR, exist_ok=True)
|
|
|
|
existingFiles = [f for f in os.listdir(ORIGINAL_CONFIG_DIR) if f.endswith(".yml")]
|
|
if existingFiles and not force:
|
|
print(
|
|
f"original_config/ already contains {len(existingFiles)} YAML files. "
|
|
"Use --force to re-extract."
|
|
)
|
|
return
|
|
|
|
archives = sorted(glob.glob(os.path.join(INGEST_DIR, "config*.tar.gz")))
|
|
if not archives:
|
|
if not existingFiles:
|
|
print("No config*.tar.gz found in ingest/ and original_config/ is empty.")
|
|
return
|
|
|
|
# Use the most recently named archive.
|
|
archive = archives[-1]
|
|
print(f"Extracting {os.path.basename(archive)} to original_config/...")
|
|
|
|
if force:
|
|
_clear_dir(ORIGINAL_CONFIG_DIR)
|
|
newRecipeDir = os.path.join(BASE_DIR, "new_recipe_config")
|
|
changedFilesDir = os.path.join(BASE_DIR, "changed_files")
|
|
cleared = _clear_dir(newRecipeDir)
|
|
if cleared:
|
|
print(f"Cleared {cleared} items from new_recipe_config/.")
|
|
cleared = _clear_dir(changedFilesDir)
|
|
if cleared:
|
|
print(f"Cleared {cleared} items from changed_files/.")
|
|
|
|
extractedCount = 0
|
|
with tarfile.open(archive, "r:gz") as tar:
|
|
for member in tar.getmembers():
|
|
if not (member.isfile() and member.name.endswith(".yml")):
|
|
continue
|
|
# Preserve subdirectory structure (e.g. language/de/foo.yml) but strip
|
|
# any leading archive root component that is not part of the config layout.
|
|
relPath = member.name.lstrip("./")
|
|
destPath = os.path.join(ORIGINAL_CONFIG_DIR, relPath)
|
|
os.makedirs(os.path.dirname(destPath), exist_ok=True)
|
|
srcFile = tar.extractfile(member)
|
|
if srcFile is not None:
|
|
with open(destPath, "wb") as dst:
|
|
dst.write(srcFile.read())
|
|
extractedCount += 1
|
|
|
|
print(f"Extracted {extractedCount} YAML files to original_config/.")
|
|
|
|
class ConfigSelector:
|
|
def __init__(self, json_path: str, original_config_path: str, config_path: str,
|
|
old_recipe_path: str = None, changed_files_path: str = None, deleted_files_csv: str = None,
|
|
ignored_files_yml: str = None, translatable_prefixes_json: str = None):
|
|
self.json_path = json_path
|
|
self.original_config_path = original_config_path
|
|
self.config_path = config_path
|
|
self.old_recipe_path = old_recipe_path or os.path.join(BASE_DIR, "old_recipe_config")
|
|
self.changed_files_path = changed_files_path or os.path.join(BASE_DIR, "changed_files")
|
|
self.deleted_files_csv = deleted_files_csv or os.path.join(BASE_DIR, "deleted_files.csv")
|
|
self.new_recipe_not_in_original_csv = os.path.join(BASE_DIR, "new_recipe_not_in_original.csv")
|
|
self.language_inconsistency_csv = os.path.join(BASE_DIR, "language_inconsistency.csv")
|
|
self.need_translations_csv = os.path.join(BASE_DIR, "need_translations.csv")
|
|
self.changed_files_manifest_csv = os.path.join(BASE_DIR, "changed_files_manifest.csv")
|
|
self.ignored_files_yml = ignored_files_yml or os.path.join(BASE_DIR, "ignored_files.yml")
|
|
self.translatable_prefixes_json = (
|
|
translatable_prefixes_json or os.path.join(BASE_DIR, "translatable_config_prefixes.json")
|
|
)
|
|
self.prefixes: List[str] = []
|
|
self.translatable_prefixes: List[str] = []
|
|
self.matched_files: Dict[str, List[str]] = {}
|
|
self.unmatched_files: Dict[str, List[str]] = {}
|
|
self.ignored_files: Set[str] = set()
|
|
self.load_prefixes()
|
|
self.load_translatable_prefixes()
|
|
self.load_ignored_files()
|
|
self.scan_configs()
|
|
|
|
def load_prefixes(self) -> None:
|
|
"""Load file prefixes from JSON file."""
|
|
try:
|
|
with open(self.json_path, 'r') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
self.prefixes = data
|
|
elif isinstance(data, dict) and 'prefixes' in data:
|
|
self.prefixes = data['prefixes']
|
|
else:
|
|
print("Error: JSON file should contain a list of prefixes or a dict with 'prefixes' key")
|
|
exit(1)
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
print(f"Error loading JSON file: {e}")
|
|
exit(1)
|
|
|
|
def load_translatable_prefixes(self) -> None:
|
|
"""Load config prefixes that require German (de) translations from JSON."""
|
|
self.translatable_prefixes = []
|
|
if not os.path.exists(self.translatable_prefixes_json):
|
|
return
|
|
try:
|
|
with open(self.translatable_prefixes_json, "r") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
self.translatable_prefixes = data
|
|
elif isinstance(data, dict) and "prefixes" in data:
|
|
self.translatable_prefixes = data["prefixes"]
|
|
else:
|
|
print(
|
|
"Warning: translatable_config_prefixes.json should contain a list "
|
|
"or a dict with 'prefixes' key"
|
|
)
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
print(f"Warning: error loading translatable prefixes JSON: {e}")
|
|
|
|
def load_ignored_files(self) -> None:
|
|
"""Load ignored file names from YAML file."""
|
|
self.ignored_files = set()
|
|
if not os.path.exists(self.ignored_files_yml):
|
|
return
|
|
|
|
try:
|
|
with open(self.ignored_files_yml, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
if data:
|
|
# Iterate through all categories in the YAML.
|
|
for category, info in data.items():
|
|
if isinstance(info, dict) and 'name' in info:
|
|
name = info['name']
|
|
if not isinstance(name, str):
|
|
continue
|
|
self.ignored_files.add(name)
|
|
# Basenames on disk always include .yml; allow omitting it in YAML.
|
|
if not name.endswith('.yml'):
|
|
self.ignored_files.add(f'{name}.yml')
|
|
except (yaml.YAMLError, FileNotFoundError) as e:
|
|
print(f"Warning: Error loading ignored files YAML: {e}")
|
|
|
|
def save_prefixes(self) -> None:
|
|
"""Save current prefixes back to JSON file."""
|
|
try:
|
|
with open(self.json_path, 'w') as f:
|
|
json.dump({'prefixes': self.prefixes}, f, indent=2)
|
|
print(f"Prefixes saved to {self.json_path}")
|
|
except Exception as e:
|
|
print(f"Error saving JSON file: {e}")
|
|
|
|
def scan_configs(self) -> None:
|
|
"""Scan config directories and categorize files based on prefixes."""
|
|
self.matched_files = {prefix: [] for prefix in self.prefixes}
|
|
self.unmatched_files = {}
|
|
|
|
try:
|
|
# Process all files in the directory
|
|
for filename in os.listdir(self.original_config_path):
|
|
if not filename.endswith('.yml'):
|
|
continue
|
|
|
|
# Standard prefix matching
|
|
matched = False
|
|
for prefix in self.prefixes:
|
|
if filename.startswith(prefix):
|
|
self.matched_files[prefix].append(filename)
|
|
matched = True
|
|
break
|
|
|
|
# Add to unmatched if not matched by any prefix
|
|
if not matched:
|
|
# Get the first segment for initial grouping
|
|
parts = filename.split('.')
|
|
potential_prefix = parts[0]
|
|
|
|
if potential_prefix not in self.unmatched_files:
|
|
self.unmatched_files[potential_prefix] = []
|
|
self.unmatched_files[potential_prefix].append(filename)
|
|
|
|
except FileNotFoundError:
|
|
print(f"Error: Directory {self.original_config_path} not found")
|
|
exit(1)
|
|
|
|
@staticmethod
|
|
def _strip_metadata(content: str) -> str:
|
|
"""Remove uuid and _core keys from Drupal YAML config content."""
|
|
content = re.sub(r'^uuid: [a-f0-9\-]+\n', '', content, flags=re.MULTILINE)
|
|
# Match exactly '_core:' at the start of a line (avoids touching wisski_core).
|
|
content = re.sub(r'^_core:\n([ \t]+[^\n]+\n)+', '', content, flags=re.MULTILINE)
|
|
return content
|
|
|
|
def _collect_bundle_hashes(self) -> Set[str]:
|
|
"""Return the set of bundle hashes found in matched wisski_core.wisski_bundle files."""
|
|
bundlePrefix = "wisski_core.wisski_bundle."
|
|
bundleHashes: Set[str] = set()
|
|
for prefix in self.prefixes:
|
|
for filename in self.matched_files.get(prefix, []):
|
|
if filename.startswith(bundlePrefix) and filename.endswith(".yml"):
|
|
bundleHash = filename[len(bundlePrefix):-4]
|
|
bundleHashes.add(bundleHash)
|
|
return bundleHashes
|
|
|
|
def _copy_file(self, src: str, dst: str) -> None:
|
|
"""Read src, strip metadata, write to dst. Falls back to raw copy on error."""
|
|
try:
|
|
with open(src, 'r') as f:
|
|
content = f.read()
|
|
content = self._strip_metadata(content)
|
|
with open(dst, 'w') as f:
|
|
f.write(content)
|
|
except Exception as e:
|
|
print(f"Error processing {os.path.basename(src)}: {e}")
|
|
shutil.copy2(src, dst)
|
|
|
|
def _copy_with_language_overrides(self, filename: str) -> int:
|
|
"""Copy original_config/language/{lang}/{filename} to new_recipe_config/language/{lang}/
|
|
for every language that has an override for this file. Returns count of overrides copied."""
|
|
langBase = os.path.join(self.original_config_path, "language")
|
|
if not os.path.isdir(langBase):
|
|
return 0
|
|
overridesCopied = 0
|
|
for langCode in os.listdir(langBase):
|
|
srcLangDir = os.path.join(langBase, langCode)
|
|
if not os.path.isdir(srcLangDir):
|
|
continue
|
|
srcOverride = os.path.join(srcLangDir, filename)
|
|
if not os.path.isfile(srcOverride):
|
|
continue
|
|
dstLangDir = os.path.join(self.config_path, "language", langCode)
|
|
os.makedirs(dstLangDir, exist_ok=True)
|
|
self._copy_file(srcOverride, os.path.join(dstLangDir, filename))
|
|
overridesCopied += 1
|
|
return overridesCopied
|
|
|
|
def copy_matched_files(self, quiet: bool = False) -> None:
|
|
"""Copy all matched files from original_config to config, removing UUID and _core structures."""
|
|
if not os.path.exists(self.config_path):
|
|
os.makedirs(self.config_path)
|
|
|
|
copiedCount = 0
|
|
processedCount = 0
|
|
overrideCopied = 0
|
|
for prefix in self.prefixes:
|
|
for filename in self.matched_files[prefix]:
|
|
src = os.path.join(self.original_config_path, filename)
|
|
dst = os.path.join(self.config_path, filename)
|
|
try:
|
|
with open(src, 'r') as f:
|
|
content = f.read()
|
|
content = self._strip_metadata(content)
|
|
with open(dst, 'w') as f:
|
|
f.write(content)
|
|
processedCount += 1
|
|
except Exception as e:
|
|
print(f"Error processing {filename}: {e}")
|
|
shutil.copy2(src, dst)
|
|
copiedCount += 1
|
|
overrideCopied += self._copy_with_language_overrides(filename)
|
|
|
|
# For every matched bundle, also copy its language.content_settings counterpart.
|
|
langCopied = 0
|
|
for bundleHash in self._collect_bundle_hashes():
|
|
langFilename = f"language.content_settings.wisski_individual.{bundleHash}.yml"
|
|
langSrc = os.path.join(self.original_config_path, langFilename)
|
|
langDst = os.path.join(self.config_path, langFilename)
|
|
if os.path.exists(langSrc):
|
|
self._copy_file(langSrc, langDst)
|
|
langCopied += 1
|
|
overrideCopied += self._copy_with_language_overrides(langFilename)
|
|
|
|
if not quiet:
|
|
print(
|
|
f"Copied {copiedCount} files to {self.config_path} "
|
|
f"({processedCount} processed to remove UUIDs and _core structures)"
|
|
)
|
|
if langCopied:
|
|
print(f"Also copied {langCopied} corresponding language.content_settings files.")
|
|
if overrideCopied:
|
|
print(f"Also copied {overrideCopied} language override files from original_config/language/.")
|
|
pruned_export = self._prune_new_recipe_not_in_original()
|
|
if pruned_export and not quiet:
|
|
print(
|
|
f"Removed {pruned_export} file(s) from new_recipe_config that are not in original_config "
|
|
f"(stale vs current export)."
|
|
)
|
|
pruned = self._prune_stale_language_overrides()
|
|
if pruned and not quiet:
|
|
print(f"Removed {pruned} stale language override file(s) (no longer in export or missing base config).")
|
|
|
|
@staticmethod
|
|
def _remove_empty_language_dirs(config_root: str) -> None:
|
|
"""Remove empty language/<lang>/ and language/ under config_root if possible."""
|
|
lang_root = os.path.join(config_root, "language")
|
|
if not os.path.isdir(lang_root):
|
|
return
|
|
for lang_code in list(os.listdir(lang_root)):
|
|
lang_dir = os.path.join(lang_root, lang_code)
|
|
if not os.path.isdir(lang_dir):
|
|
continue
|
|
try:
|
|
if not os.listdir(lang_dir):
|
|
os.rmdir(lang_dir)
|
|
except OSError:
|
|
pass
|
|
try:
|
|
if not os.listdir(lang_root):
|
|
os.rmdir(lang_root)
|
|
except OSError:
|
|
pass
|
|
|
|
def _prune_new_recipe_not_in_original(self) -> int:
|
|
"""Delete YAML under config_path whose relative path is absent from original_config.
|
|
|
|
Skips when original_config has no YAML (avoid wiping new_recipe before an export).
|
|
"""
|
|
orig_paths = self._collect_yaml_relpaths(self.original_config_path)
|
|
if not orig_paths:
|
|
return 0
|
|
new_paths = self._collect_yaml_relpaths(self.config_path)
|
|
orphans = new_paths - orig_paths
|
|
removed = 0
|
|
for rel in sorted(orphans):
|
|
path = self._abs_config_path(self.config_path, rel)
|
|
try:
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
removed += 1
|
|
except OSError:
|
|
pass
|
|
self._remove_empty_language_dirs(self.config_path)
|
|
return removed
|
|
|
|
def _prune_stale_language_overrides(self) -> int:
|
|
"""Remove language/*/*.yml in config_path that should not be kept.
|
|
|
|
Drops overrides that are missing from the current export (translation removed
|
|
on the site) or whose base config is not present under config_path (orphans
|
|
from earlier copies). Without this, compare would not list those as deleted.
|
|
"""
|
|
removed = 0
|
|
lang_dst_root = os.path.join(self.config_path, "language")
|
|
if not os.path.isdir(lang_dst_root):
|
|
return 0
|
|
lang_src_root = os.path.join(self.original_config_path, "language")
|
|
for lang_code in list(os.listdir(lang_dst_root)):
|
|
dst_lang_dir = os.path.join(lang_dst_root, lang_code)
|
|
if not os.path.isdir(dst_lang_dir):
|
|
continue
|
|
src_lang_dir = os.path.join(lang_src_root, lang_code)
|
|
for name in list(os.listdir(dst_lang_dir)):
|
|
if not name.endswith(".yml"):
|
|
continue
|
|
base_fp = os.path.join(self.config_path, name)
|
|
src_fp = os.path.join(src_lang_dir, name)
|
|
if os.path.isfile(base_fp) and os.path.isfile(src_fp):
|
|
continue
|
|
try:
|
|
os.remove(os.path.join(dst_lang_dir, name))
|
|
removed += 1
|
|
except OSError:
|
|
pass
|
|
self._remove_empty_language_dirs(self.config_path)
|
|
return removed
|
|
|
|
@staticmethod
|
|
def _collect_yaml_relpaths(config_root: str) -> Set[str]:
|
|
"""Relative paths (posix-style) for YAML under a Drupal config tree.
|
|
|
|
Includes top-level *.yml and language/<lang>/*.yml (same layout as exports).
|
|
"""
|
|
rel_paths: Set[str] = set()
|
|
if not os.path.isdir(config_root):
|
|
return rel_paths
|
|
try:
|
|
for name in os.listdir(config_root):
|
|
if name.endswith(".yml"):
|
|
rel_paths.add(name)
|
|
lang_root = os.path.join(config_root, "language")
|
|
if os.path.isdir(lang_root):
|
|
for lang_code in os.listdir(lang_root):
|
|
lang_dir = os.path.join(lang_root, lang_code)
|
|
if not os.path.isdir(lang_dir):
|
|
continue
|
|
for name in os.listdir(lang_dir):
|
|
if name.endswith(".yml"):
|
|
rel_paths.add(f"language/{lang_code}/{name}")
|
|
except OSError:
|
|
pass
|
|
return rel_paths
|
|
|
|
def _abs_config_path(self, config_root: str, rel_path: str) -> str:
|
|
"""Join config root with a posix rel_path from _collect_yaml_relpaths."""
|
|
return os.path.normpath(os.path.join(config_root, *rel_path.split("/")))
|
|
|
|
def _collect_language_inconsistencies(self) -> List[str]:
|
|
"""Language overrides in config_path with no matching base *.yml at config root."""
|
|
inconsistent: List[str] = []
|
|
lang_root = os.path.join(self.config_path, "language")
|
|
if not os.path.isdir(lang_root):
|
|
return inconsistent
|
|
try:
|
|
for lang_code in os.listdir(lang_root):
|
|
lang_dir = os.path.join(lang_root, lang_code)
|
|
if not os.path.isdir(lang_dir):
|
|
continue
|
|
for name in os.listdir(lang_dir):
|
|
if not name.endswith(".yml"):
|
|
continue
|
|
base_path = os.path.join(self.config_path, name)
|
|
if not os.path.isfile(base_path):
|
|
inconsistent.append(f"language/{lang_code}/{name}")
|
|
except OSError:
|
|
pass
|
|
return sorted(inconsistent)
|
|
|
|
def _write_language_inconsistency_csv(self, inconsistent: List[str]) -> None:
|
|
with open(self.language_inconsistency_csv, "w", newline="") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(["filename"])
|
|
for rel_path in inconsistent:
|
|
writer.writerow([rel_path])
|
|
|
|
@staticmethod
|
|
def _matches_any_prefix(filename: str, prefixes: List[str]) -> bool:
|
|
return any(filename.startswith(prefix) for prefix in prefixes)
|
|
|
|
def _collect_missing_de_translations(self) -> List[str]:
|
|
"""Base configs in config_path matching translatable prefixes without language/de/.
|
|
|
|
A translation is considered present when language/de/{filename} exists in
|
|
new_recipe_config or in the committed recipe (old_recipe_path).
|
|
"""
|
|
missing: List[str] = []
|
|
if not self.translatable_prefixes:
|
|
return missing
|
|
|
|
def de_filenames(config_root: str) -> Set[str]:
|
|
de_dir = os.path.join(config_root, "language", "de")
|
|
if not os.path.isdir(de_dir):
|
|
return set()
|
|
try:
|
|
return {
|
|
name
|
|
for name in os.listdir(de_dir)
|
|
if name.endswith(".yml")
|
|
}
|
|
except OSError:
|
|
return set()
|
|
|
|
try:
|
|
de_files = de_filenames(self.config_path) | de_filenames(self.old_recipe_path)
|
|
for name in os.listdir(self.config_path):
|
|
if not name.endswith(".yml"):
|
|
continue
|
|
if not self._matches_any_prefix(name, self.translatable_prefixes):
|
|
continue
|
|
if name not in de_files:
|
|
missing.append(name)
|
|
except OSError:
|
|
pass
|
|
return sorted(missing)
|
|
|
|
def _write_need_translations_csv(self, missing: List[str]) -> None:
|
|
with open(self.need_translations_csv, "w", newline="") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(["filename"])
|
|
for filename in missing:
|
|
writer.writerow([filename])
|
|
|
|
def delete_language_inconsistencies(self) -> int:
|
|
"""Remove language/*/*.yml in config_path that have no base config pendant."""
|
|
inconsistent = self._collect_language_inconsistencies()
|
|
removed = 0
|
|
for rel_path in inconsistent:
|
|
path = self._abs_config_path(self.config_path, rel_path)
|
|
try:
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
removed += 1
|
|
except OSError:
|
|
pass
|
|
self._remove_empty_language_dirs(self.config_path)
|
|
remaining = self._collect_language_inconsistencies()
|
|
self._write_language_inconsistency_csv(remaining)
|
|
return removed
|
|
|
|
@staticmethod
|
|
def _read_deleted_files_csv(csv_path: str) -> List[str]:
|
|
paths: List[str] = []
|
|
if not os.path.isfile(csv_path):
|
|
return paths
|
|
with open(csv_path, newline="") as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
name = (row.get("filename") or "").strip()
|
|
if name:
|
|
paths.append(name)
|
|
return paths
|
|
|
|
def update_recipe(self, quiet: bool = False) -> Tuple[int, int, int, str]:
|
|
"""Copy committed recipe to update/<recipe_name>/, apply deletions and changed_files/.
|
|
|
|
Returns:
|
|
Tuple of (files_deleted, files_applied, deletes_missing, update_dir).
|
|
"""
|
|
if not os.path.isdir(RECIPE_SOURCE_DIR):
|
|
raise FileNotFoundError(f"Recipe source not found: {RECIPE_SOURCE_DIR}")
|
|
|
|
update_dir = recipe_update_dir()
|
|
_copy_recipe_tree(RECIPE_SOURCE_DIR, update_dir)
|
|
update_config = os.path.join(update_dir, "config")
|
|
|
|
files_deleted = 0
|
|
deletes_missing = 0
|
|
for rel_path in self._read_deleted_files_csv(self.deleted_files_csv):
|
|
target = self._abs_config_path(update_config, rel_path)
|
|
try:
|
|
if os.path.isfile(target):
|
|
os.remove(target)
|
|
files_deleted += 1
|
|
else:
|
|
deletes_missing += 1
|
|
except OSError:
|
|
deletes_missing += 1
|
|
self._remove_empty_language_dirs(update_config)
|
|
|
|
files_applied = 0
|
|
for rel_path in sorted(self._collect_yaml_relpaths(self.changed_files_path)):
|
|
src = self._abs_config_path(self.changed_files_path, rel_path)
|
|
dst = self._abs_config_path(update_config, rel_path)
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
shutil.copy2(src, dst)
|
|
files_applied += 1
|
|
|
|
if not quiet:
|
|
print(
|
|
f"Updated recipe written to {update_dir}/ "
|
|
f"({files_deleted} deleted from config, {files_applied} copied from changed_files/, "
|
|
f"{deletes_missing} delete entries not present in copy)."
|
|
)
|
|
return files_deleted, files_applied, deletes_missing, update_dir
|
|
|
|
def compare_and_track_changes(self, quiet: bool = False) -> Tuple[int, int, int, int, int, int, int, int]:
|
|
"""
|
|
Compare committed recipe config (old_recipe_path) to new_recipe_config (config_path).
|
|
|
|
- Copies **new** YAML (in new_recipe but not in committed recipe) to changed_files/.
|
|
- Copies **modified** YAML (same path, different content) to changed_files/.
|
|
- Respects ignored_files.yml (basename) for both new and modified copies.
|
|
- Writes deleted_files.csv: paths in committed recipe missing from new_recipe_config.
|
|
- Writes changed_files_manifest.csv: relative_path + kind (new|modified) for each copy.
|
|
- Writes new_recipe_not_in_original.csv for paths in new_recipe not in original export.
|
|
- Writes language_inconsistency.csv for language/*/*.yml without a base config pendant.
|
|
- Writes need_translations.csv for translatable-prefix configs missing language/de/.
|
|
|
|
Returns:
|
|
Tuple of (added_count, modified_count, deleted_count, unchanged_count,
|
|
language_deleted_count, not_in_original_count,
|
|
language_inconsistency_count, need_translations_count)
|
|
"""
|
|
_clear_dir(self.changed_files_path)
|
|
os.makedirs(self.changed_files_path, exist_ok=True)
|
|
|
|
# Track statistics
|
|
added_count = 0
|
|
modified_count = 0
|
|
deleted_count = 0
|
|
unchanged_count = 0
|
|
ignored_count = 0
|
|
deleted_files_list = []
|
|
manifest_rows: List[Tuple[str, str]] = []
|
|
|
|
# Top-level and language/<lang>/*.yml (matches Drupal export layout)
|
|
old_files = self._collect_yaml_relpaths(self.old_recipe_path)
|
|
new_files = self._collect_yaml_relpaths(self.config_path)
|
|
|
|
# Deleted: in committed recipe but not in new_recipe_config
|
|
deleted_files = old_files - new_files
|
|
for rel_path in sorted(deleted_files):
|
|
deleted_files_list.append(rel_path)
|
|
deleted_count += 1
|
|
|
|
# Modified: same path in both, content differs
|
|
common_files = old_files & new_files
|
|
for rel_path in sorted(common_files):
|
|
old_file_path = self._abs_config_path(self.old_recipe_path, rel_path)
|
|
new_file_path = self._abs_config_path(self.config_path, rel_path)
|
|
base_name = os.path.basename(rel_path)
|
|
|
|
if filecmp.cmp(old_file_path, new_file_path, shallow=False):
|
|
unchanged_count += 1
|
|
else:
|
|
if base_name in self.ignored_files:
|
|
ignored_count += 1
|
|
else:
|
|
changed_dest = os.path.join(self.changed_files_path, *rel_path.split("/"))
|
|
os.makedirs(os.path.dirname(changed_dest), exist_ok=True)
|
|
shutil.copy2(new_file_path, changed_dest)
|
|
modified_count += 1
|
|
manifest_rows.append((rel_path, "modified"))
|
|
|
|
# New: in new_recipe (from original_config via prefixes) but not in committed recipe
|
|
added_files = new_files - old_files
|
|
for rel_path in sorted(added_files):
|
|
new_file_path = self._abs_config_path(self.config_path, rel_path)
|
|
base_name = os.path.basename(rel_path)
|
|
if base_name in self.ignored_files:
|
|
ignored_count += 1
|
|
else:
|
|
changed_dest = os.path.join(self.changed_files_path, *rel_path.split("/"))
|
|
os.makedirs(os.path.dirname(changed_dest), exist_ok=True)
|
|
shutil.copy2(new_file_path, changed_dest)
|
|
added_count += 1
|
|
manifest_rows.append((rel_path, "new"))
|
|
|
|
with open(self.changed_files_manifest_csv, "w", newline="") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(["relative_path", "kind"])
|
|
for rel_path, kind in sorted(manifest_rows, key=lambda x: (x[1], x[0])):
|
|
writer.writerow([rel_path, kind])
|
|
|
|
# Write deleted files to CSV
|
|
with open(self.deleted_files_csv, 'w', newline='') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(['filename']) # Header
|
|
for filename in deleted_files_list:
|
|
writer.writerow([filename])
|
|
|
|
if ignored_count > 0 and not quiet:
|
|
print(
|
|
f"Ignored {ignored_count} new/modified file(s) as per ignored_files.yml "
|
|
"(not copied to changed_files/)."
|
|
)
|
|
total_copied = added_count + modified_count
|
|
if total_copied and not quiet:
|
|
print(
|
|
f"Copied {total_copied} file(s) to changed_files/ "
|
|
f"({added_count} new vs committed recipe, {modified_count} modified)."
|
|
)
|
|
|
|
language_deleted_count = sum(1 for p in deleted_files_list if p.startswith("language/"))
|
|
if language_deleted_count and not quiet:
|
|
print(
|
|
f"Deleted list includes {language_deleted_count} language override path(s) "
|
|
f"(see deleted_files.csv under language/<lang>/)."
|
|
)
|
|
|
|
orig_paths = self._collect_yaml_relpaths(self.original_config_path)
|
|
new_paths = self._collect_yaml_relpaths(self.config_path)
|
|
not_in_original = sorted(new_paths - orig_paths)
|
|
not_in_original_count = len(not_in_original)
|
|
with open(self.new_recipe_not_in_original_csv, "w", newline="") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(["filename"])
|
|
for rel_path in not_in_original:
|
|
writer.writerow([rel_path])
|
|
if not_in_original_count and not quiet:
|
|
print(
|
|
f"{not_in_original_count} path(s) in new_recipe_config are not in original_config "
|
|
f"(see {os.path.basename(self.new_recipe_not_in_original_csv)})."
|
|
)
|
|
elif not orig_paths and not quiet:
|
|
print("Warning: original_config has no YAML; orphan CSV is empty.")
|
|
|
|
language_inconsistent = self._collect_language_inconsistencies()
|
|
language_inconsistency_count = len(language_inconsistent)
|
|
self._write_language_inconsistency_csv(language_inconsistent)
|
|
if language_inconsistency_count and not quiet:
|
|
print(
|
|
f"{language_inconsistency_count} language override(s) in new_recipe_config "
|
|
f"have no base config pendant "
|
|
f"(see {os.path.basename(self.language_inconsistency_csv)})."
|
|
)
|
|
|
|
missing_de_translations = self._collect_missing_de_translations()
|
|
need_translations_count = len(missing_de_translations)
|
|
self._write_need_translations_csv(missing_de_translations)
|
|
if need_translations_count and not quiet:
|
|
print(
|
|
f"{need_translations_count} translatable config(s) in new_recipe_config "
|
|
f"have no German (de) translation "
|
|
f"(see {os.path.basename(self.need_translations_csv)})."
|
|
)
|
|
|
|
return (
|
|
added_count,
|
|
modified_count,
|
|
deleted_count,
|
|
unchanged_count,
|
|
language_deleted_count,
|
|
not_in_original_count,
|
|
language_inconsistency_count,
|
|
need_translations_count,
|
|
)
|
|
|
|
def add_prefix(self, prefix: str) -> None:
|
|
"""Add a new prefix to the list if it doesn't exist."""
|
|
if prefix and prefix not in self.prefixes:
|
|
self.prefixes.append(prefix)
|
|
self.scan_configs()
|
|
|
|
def remove_prefix(self, prefix: str) -> None:
|
|
"""Remove a prefix from the list if it exists."""
|
|
if prefix in self.prefixes:
|
|
self.prefixes.remove(prefix)
|
|
self.scan_configs()
|
|
|
|
def get_summary(self) -> Tuple[int, int]:
|
|
"""Return the count of matched and unmatched files."""
|
|
matched_count = sum(len(files) for files in self.matched_files.values())
|
|
unmatched_count = sum(len(files) for files in self.unmatched_files.values())
|
|
return matched_count, unmatched_count
|
|
|
|
def get_prefix_segments_at_level(all_prefixes: List[str], current_prefix: str, level: int, selector=None, view_mode=None) -> List[str]:
|
|
"""
|
|
Get unique prefix segments at the specified level.
|
|
|
|
Args:
|
|
all_prefixes: List of all prefixes
|
|
current_prefix: Current prefix we're viewing (e.g., "field.storage")
|
|
level: The level we want to see (0 = root level)
|
|
selector: The ConfigSelector instance (needed for unmatched files)
|
|
view_mode: Current view mode ("matched" or "unmatched")
|
|
|
|
Returns:
|
|
List of unique prefix segments at this level
|
|
"""
|
|
# Current prefix parts (e.g., ["field", "storage"])
|
|
current_parts = current_prefix.split('.') if current_prefix else []
|
|
unique_prefixes = set()
|
|
|
|
# Special case for unmatched view with level > 0
|
|
if view_mode == "unmatched" and selector:
|
|
# Need to scan actual filenames for deeper levels
|
|
for prefix, filenames in selector.unmatched_files.items():
|
|
for filename in filenames:
|
|
# Skip .yml extension for determining parts
|
|
if filename.endswith('.yml'):
|
|
filename = filename[:-4]
|
|
|
|
parts = filename.split('.')
|
|
|
|
# Skip if not enough parts for this level
|
|
if len(parts) <= level:
|
|
continue
|
|
|
|
# Skip if prefix doesn't match current path
|
|
if current_prefix:
|
|
filename_prefix = '.'.join(parts[:len(current_parts)])
|
|
if filename_prefix != current_prefix:
|
|
continue
|
|
|
|
# Add the segment at this level
|
|
prefix_at_level = '.'.join(parts[:level+1])
|
|
unique_prefixes.add(prefix_at_level)
|
|
|
|
return sorted(list(unique_prefixes))
|
|
|
|
# Standard processing for matched view
|
|
elif view_mode == "matched":
|
|
for prefix in all_prefixes:
|
|
# Split each prefix into parts
|
|
parts = prefix.split('.')
|
|
|
|
# Skip if not enough parts for the level
|
|
if len(parts) <= level:
|
|
continue
|
|
|
|
# If we're looking at a nested level, only include prefixes that match the current path
|
|
if current_prefix:
|
|
# Skip if this prefix doesn't start with the current prefix
|
|
if not prefix.startswith(current_prefix + '.') and prefix != current_prefix:
|
|
continue
|
|
|
|
# If current level is within current_prefix, it should match exactly
|
|
if level < len(current_parts) and parts[level] != current_parts[level]:
|
|
continue
|
|
|
|
# Build the prefix up to this level
|
|
prefix_at_level = '.'.join(parts[:level+1])
|
|
unique_prefixes.add(prefix_at_level)
|
|
|
|
# Basic processing for all other cases
|
|
else:
|
|
for prefix in all_prefixes:
|
|
# Split each prefix into parts
|
|
parts = prefix.split('.')
|
|
|
|
# Skip if not enough parts for the level
|
|
if len(parts) <= level:
|
|
continue
|
|
|
|
# If we're looking at a nested level, only include prefixes that match the current path
|
|
if current_prefix:
|
|
# Skip if this prefix doesn't start with the current prefix
|
|
if not prefix.startswith(current_prefix + '.') and prefix != current_prefix:
|
|
continue
|
|
|
|
# If current level is within current_prefix, it should match exactly
|
|
if level < len(current_parts) and parts[level] != current_parts[level]:
|
|
continue
|
|
|
|
# Build the prefix up to this level
|
|
prefix_at_level = '.'.join(parts[:level+1])
|
|
unique_prefixes.add(prefix_at_level)
|
|
|
|
return sorted(list(unique_prefixes))
|
|
|
|
def get_files_for_segment(selector: ConfigSelector, segment: str, view_mode: str) -> List[str]:
|
|
"""Get files that match the given segment in the current view."""
|
|
files = []
|
|
|
|
if view_mode == "matched":
|
|
# For matched files, we need to check if this segment is an exact prefix or a partial one
|
|
if segment in selector.prefixes:
|
|
# Exact match: include all files for this prefix
|
|
files.extend(selector.matched_files[segment])
|
|
else:
|
|
# Partial match: include files from all prefixes that start with this segment
|
|
for prefix in selector.prefixes:
|
|
if prefix == segment or prefix.startswith(segment + '.'):
|
|
files.extend(selector.matched_files[prefix])
|
|
else:
|
|
# For unmatched files, collect files that match this segment
|
|
segment_parts = segment.split('.')
|
|
segment_len = len(segment_parts)
|
|
|
|
for prefix, file_list in selector.unmatched_files.items():
|
|
# For root level prefixes (without dots)
|
|
if '.' not in prefix and segment_len == 1 and prefix == segment:
|
|
files.extend(file_list)
|
|
continue
|
|
|
|
# For multi-level prefixes and filenames
|
|
for filename in file_list:
|
|
# Get filename without .yml extension for comparison
|
|
filename_no_ext = filename
|
|
if filename.endswith('.yml'):
|
|
filename_no_ext = filename[:-4]
|
|
|
|
filename_parts = filename_no_ext.split('.')
|
|
|
|
# Check if this filename matches the segment up to the segment's length
|
|
if len(filename_parts) >= segment_len:
|
|
if '.'.join(filename_parts[:segment_len]) == segment:
|
|
files.append(filename)
|
|
|
|
return sorted(list(set(files)))
|
|
|
|
def draw_menu(stdscr, selector: ConfigSelector, selected_idx: int, view_mode: str, current_prefix: str = "", level: int = 0):
|
|
"""Draw the TUI menu."""
|
|
curses.curs_set(0)
|
|
stdscr.clear()
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Colors
|
|
curses.start_color()
|
|
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Selected item
|
|
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # Matched
|
|
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # Unmatched
|
|
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Titles
|
|
|
|
# Title
|
|
title = "Drupal Configuration Selector"
|
|
stdscr.addstr(0, (w - len(title)) // 2, title, curses.color_pair(4) | curses.A_BOLD)
|
|
|
|
# Summary
|
|
matched_count, unmatched_count = selector.get_summary()
|
|
summary = f"Matched: {matched_count} files | Unmatched: {unmatched_count} files"
|
|
stdscr.addstr(1, (w - len(summary)) // 2, summary)
|
|
|
|
# Current path display
|
|
if current_prefix:
|
|
path_display = f"Current path: {current_prefix}"
|
|
stdscr.addstr(2, 2, path_display)
|
|
|
|
# Instructions
|
|
stdscr.addstr(h-4, 2, "LEFT/RIGHT: Navigate tree levels | SPACE: Toggle view | ENTER: Select files")
|
|
stdscr.addstr(
|
|
h - 3,
|
|
2,
|
|
"A: Add | O: Choose | D: Add/Del | C: Copy | M: Compare | I: Lang orphans | U: Update | S: Save | Q: Quit",
|
|
)
|
|
|
|
# Content area
|
|
start_y = 3
|
|
max_visible = h - 8 # Account for two instruction lines
|
|
|
|
# Get segments at the current level
|
|
if view_mode == "matched":
|
|
all_prefixes = selector.prefixes
|
|
else:
|
|
all_prefixes = list(selector.unmatched_files.keys())
|
|
|
|
items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode)
|
|
|
|
# Title based on current path
|
|
if current_prefix:
|
|
title = f"{view_mode.capitalize()} Prefixes under '{current_prefix}'"
|
|
else:
|
|
title = f"{view_mode.capitalize()} Prefixes (root level)"
|
|
|
|
stdscr.addstr(start_y, 2, title, curses.color_pair(4) | curses.A_BOLD)
|
|
start_y += 1
|
|
|
|
# Pagination
|
|
if len(items) > max_visible:
|
|
page_size = max_visible
|
|
current_page = selected_idx // page_size
|
|
start_idx = current_page * page_size
|
|
end_idx = min(start_idx + page_size, len(items))
|
|
display_items = items[start_idx:end_idx]
|
|
|
|
# Adjust selected_idx relative to the page
|
|
relative_idx = selected_idx - start_idx
|
|
else:
|
|
display_items = items
|
|
relative_idx = selected_idx if selected_idx < len(items) else 0
|
|
|
|
# Display items
|
|
for i, item in enumerate(display_items):
|
|
y = start_y + i
|
|
|
|
# Get base name (last segment)
|
|
item_base = item.split('.')[-1]
|
|
|
|
# Check if there are child segments
|
|
has_children = has_child_segments(item, view_mode, selector)
|
|
|
|
# Get files for this item
|
|
files = get_files_for_segment(selector, item, view_mode)
|
|
file_count = len(files)
|
|
|
|
# Format display text
|
|
if has_children:
|
|
item_text = f"{item_base} ({file_count} files) ▶"
|
|
else:
|
|
item_text = f"{item_base} ({file_count} files)"
|
|
|
|
if i == relative_idx:
|
|
stdscr.addstr(y, 2, item_text, curses.color_pair(1) | curses.A_BOLD)
|
|
else:
|
|
if view_mode == "matched":
|
|
stdscr.addstr(y, 2, item_text, curses.color_pair(2))
|
|
else:
|
|
stdscr.addstr(y, 2, item_text, curses.color_pair(3))
|
|
|
|
# Selected item details
|
|
if items and len(items) > 0 and selected_idx < len(items):
|
|
try:
|
|
selected_item = items[selected_idx]
|
|
detail_y = start_y + min(len(display_items), max_visible) + 1
|
|
|
|
if detail_y < h - 4: # Make sure we have room to display details
|
|
stdscr.addstr(detail_y, 2, f"Files for '{selected_item}':", curses.A_BOLD)
|
|
detail_y += 1
|
|
|
|
# Get files for the selected item
|
|
files = get_files_for_segment(selector, selected_item, view_mode)
|
|
|
|
max_files_to_show = h - detail_y - 4
|
|
if max_files_to_show > 0:
|
|
sorted_files = sorted(files)[:max_files_to_show]
|
|
for i, file in enumerate(sorted_files):
|
|
if detail_y + i < h - 4: # Prevent writing outside window
|
|
# Truncate filename if too long
|
|
max_width = w - 6 # Allow for padding
|
|
if len(file) > max_width:
|
|
file_display = file[:max_width-3] + "..."
|
|
else:
|
|
file_display = file
|
|
stdscr.addstr(detail_y + i, 4, file_display)
|
|
|
|
if len(files) > max_files_to_show and detail_y + max_files_to_show < h - 4:
|
|
stdscr.addstr(detail_y + max_files_to_show, 4, f"...and {len(files) - max_files_to_show} more")
|
|
except Exception as e:
|
|
# In case of error, just don't display details
|
|
error_msg = f"Error displaying file details: {str(e)}"
|
|
if h > 5:
|
|
stdscr.addstr(h-4, 2, error_msg[:w-4])
|
|
|
|
stdscr.refresh()
|
|
return len(items)
|
|
|
|
def add_prefix_dialog(stdscr):
|
|
"""Show dialog to add a new prefix."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Safety checks for minimum terminal size
|
|
min_h, min_w = 7, 20
|
|
if h < min_h or w < min_w:
|
|
return "" # Terminal too small
|
|
|
|
dialog_h, dialog_w = min(5, h - 2), min(50, w - 4)
|
|
dialog_y = max(0, (h - dialog_h) // 2)
|
|
dialog_x = max(0, (w - dialog_w) // 2)
|
|
|
|
# Final safety check
|
|
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
|
|
return ""
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, "Enter new prefix (ESC or empty input to cancel):")
|
|
dialog_win.refresh()
|
|
|
|
# Input field - ensure it fits within the dialog
|
|
input_w = max(1, dialog_w - 6)
|
|
input_win = curses.newwin(1, input_w, dialog_y + 2, dialog_x + 3)
|
|
input_win.clear()
|
|
|
|
# Setup input mode
|
|
curses.echo()
|
|
curses.curs_set(1)
|
|
input_win.keypad(True) # Enable special keys
|
|
|
|
# Collect input character by character to handle escape key
|
|
prefix = ""
|
|
input_win.refresh()
|
|
|
|
while True:
|
|
try:
|
|
key = input_win.getch()
|
|
|
|
# Handle escape key
|
|
if key == 27: # ASCII code for Escape
|
|
prefix = ""
|
|
break
|
|
# Handle enter key
|
|
elif key in (10, 13): # ASCII codes for Enter/Return
|
|
break
|
|
# Handle backspace/delete
|
|
elif key in (8, 127, curses.KEY_BACKSPACE):
|
|
if prefix:
|
|
prefix = prefix[:-1]
|
|
# Clear line and rewrite
|
|
input_win.clear()
|
|
input_win.addstr(0, 0, prefix)
|
|
input_win.refresh()
|
|
# Regular character
|
|
elif 32 <= key <= 126: # Printable ASCII
|
|
prefix += chr(key)
|
|
except Exception:
|
|
# In case of any error, just return empty string
|
|
prefix = ""
|
|
break
|
|
|
|
# Restore normal cursor visibility and echo settings
|
|
curses.noecho()
|
|
curses.curs_set(0)
|
|
|
|
return prefix.strip()
|
|
|
|
def confirm_dialog(stdscr, message):
|
|
"""Show a confirmation dialog."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Safety checks for minimum terminal size
|
|
min_h, min_w = 7, 20
|
|
if h < min_h or w < min_w:
|
|
return False # Terminal too small, default to cancel
|
|
|
|
dialog_h, dialog_w = min(5, h - 2), min(max(50, len(message) + 4), w - 4)
|
|
dialog_y = max(0, (h - dialog_h) // 2)
|
|
dialog_x = max(0, (w - dialog_w) // 2)
|
|
|
|
# Final safety check
|
|
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
|
|
return False
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
|
|
# Truncate message if it's too long for the dialog
|
|
max_msg_len = dialog_w - 4
|
|
display_message = message[:max_msg_len] if len(message) > max_msg_len else message
|
|
|
|
dialog_win.addstr(1, 2, display_message)
|
|
dialog_win.addstr(3, 2, "Press Y to confirm, any other key to cancel")
|
|
dialog_win.refresh()
|
|
|
|
key = stdscr.getch()
|
|
return key == ord('y') or key == ord('Y')
|
|
|
|
def choose_prefix_dialog(stdscr, selector: ConfigSelector):
|
|
"""Show dialog to choose a prefix from unmatched prefixes."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Get sorted list of unmatched prefixes
|
|
unmatched_prefixes = sorted(selector.unmatched_files.keys())
|
|
|
|
if not unmatched_prefixes:
|
|
# Show message if no unmatched prefixes - ensure minimum window size
|
|
msg_h = min(5, h - 2)
|
|
msg_w = min(40, w - 4)
|
|
if msg_h < 3 or msg_w < 10:
|
|
return None # Terminal too small
|
|
message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2))
|
|
message_win.box()
|
|
message_win.addstr(1, 2, "No unmatched prefixes available.")
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
return None
|
|
|
|
# Calculate dialog dimensions with safety checks
|
|
min_dialog_h = 6 # Minimum height for usable dialog
|
|
min_dialog_w = 30 # Minimum width for usable dialog
|
|
|
|
# Ensure we have enough terminal space
|
|
if h < min_dialog_h + 2 or w < min_dialog_w + 4:
|
|
return None # Terminal too small
|
|
|
|
dialog_h = min(max(min_dialog_h, min(15, len(unmatched_prefixes) + 4)), h - 2) # +4 for title, instructions, and border
|
|
dialog_w = min(max(min_dialog_w, 60), w - 2)
|
|
dialog_y = max(0, (h - dialog_h) // 2)
|
|
dialog_x = max(0, (w - dialog_w) // 2)
|
|
|
|
# Final safety check
|
|
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
|
|
return None
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.keypad(True) # Enable arrow keys
|
|
|
|
# Initialize selection variables
|
|
selected_idx = 0
|
|
current_page = 0
|
|
items_per_page = dialog_h - 4 # -4 for title, instructions, and border
|
|
|
|
# Draw function
|
|
def draw_prefix_list():
|
|
dialog_win.clear()
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, "Choose a prefix to add (ESC to cancel):")
|
|
|
|
# Calculate page items
|
|
start_idx = current_page * items_per_page
|
|
end_idx = min(start_idx + items_per_page, len(unmatched_prefixes))
|
|
|
|
# Display prefixes with selection highlight
|
|
for i, prefix in enumerate(unmatched_prefixes[start_idx:end_idx]):
|
|
file_count = len(selector.unmatched_files[prefix])
|
|
item_text = f"{prefix} ({file_count} files)"
|
|
|
|
if i + start_idx == selected_idx:
|
|
dialog_win.addstr(i + 2, 4, item_text, curses.A_REVERSE)
|
|
else:
|
|
dialog_win.addstr(i + 2, 4, item_text)
|
|
|
|
# Show pagination info if needed
|
|
if len(unmatched_prefixes) > items_per_page:
|
|
page_info = f"Page {current_page + 1}/{(len(unmatched_prefixes) - 1) // items_per_page + 1}"
|
|
dialog_win.addstr(dialog_h - 1, dialog_w - len(page_info) - 2, page_info)
|
|
|
|
dialog_win.refresh()
|
|
|
|
# Main interaction loop
|
|
while True:
|
|
draw_prefix_list()
|
|
|
|
key = dialog_win.getch()
|
|
|
|
if key == 27: # ESC
|
|
return None
|
|
elif key == curses.KEY_UP and selected_idx > 0:
|
|
selected_idx -= 1
|
|
if selected_idx < current_page * items_per_page:
|
|
current_page -= 1
|
|
elif key == curses.KEY_DOWN and selected_idx < len(unmatched_prefixes) - 1:
|
|
selected_idx += 1
|
|
if selected_idx >= (current_page + 1) * items_per_page:
|
|
current_page += 1
|
|
elif key in (10, 13): # Enter
|
|
return unmatched_prefixes[selected_idx]
|
|
elif key == curses.KEY_NPAGE and current_page < (len(unmatched_prefixes) - 1) // items_per_page:
|
|
# Page Down
|
|
current_page += 1
|
|
selected_idx = min(selected_idx + items_per_page, len(unmatched_prefixes) - 1)
|
|
elif key == curses.KEY_PPAGE and current_page > 0:
|
|
# Page Up
|
|
current_page -= 1
|
|
selected_idx = max(selected_idx - items_per_page, 0)
|
|
|
|
def select_individual_files_dialog(stdscr, selector: ConfigSelector, prefix: str, view_mode: str):
|
|
"""Show dialog to select individual files from a prefix."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Get the list of files for this prefix
|
|
if view_mode == "matched":
|
|
files = selector.matched_files.get(prefix, [])
|
|
title = f"Select files to remove from '{prefix}'"
|
|
action_text = "SPACE: Toggle selection | ENTER: Confirm | ESC: Cancel"
|
|
else:
|
|
files = selector.unmatched_files.get(prefix, [])
|
|
title = f"Select files to add from '{prefix}'"
|
|
action_text = "SPACE: Toggle selection | ENTER: Add selected | ESC: Cancel"
|
|
|
|
if not files:
|
|
# Show message if no files - ensure minimum window size
|
|
msg_h = min(5, h - 2)
|
|
msg_w = min(40, w - 4)
|
|
if msg_h < 3 or msg_w < 10:
|
|
return [] # Terminal too small
|
|
message_win = curses.newwin(msg_h, msg_w, max(0, (h - msg_h) // 2), max(0, (w - msg_w) // 2))
|
|
message_win.box()
|
|
message_win.addstr(1, 2, "No files available.")
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
return []
|
|
|
|
# Sort files for consistent display
|
|
files = sorted(files)
|
|
|
|
# Track selected files
|
|
selected_files = set()
|
|
|
|
# Calculate dialog dimensions with safety checks
|
|
min_dialog_h = 8 # Minimum height for usable dialog
|
|
min_dialog_w = 30 # Minimum width for usable dialog
|
|
|
|
# Ensure we have enough terminal space
|
|
if h < min_dialog_h + 2 or w < min_dialog_w + 4:
|
|
return [] # Terminal too small
|
|
|
|
dialog_h = min(max(min_dialog_h, min(20, len(files) + 5)), h - 2) # +5 for title, instructions, pagination, and border
|
|
dialog_w = min(max(min_dialog_w, min(w - 4, 80)), w - 2)
|
|
dialog_y = max(0, (h - dialog_h) // 2)
|
|
dialog_x = max(0, (w - dialog_w) // 2)
|
|
|
|
# Final safety check
|
|
if dialog_h <= 0 or dialog_w <= 0 or dialog_y < 0 or dialog_x < 0:
|
|
return []
|
|
|
|
# Draw dialog box
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.keypad(True) # Enable arrow keys
|
|
|
|
# Initialize selection variables
|
|
selected_idx = 0
|
|
current_page = 0
|
|
items_per_page = dialog_h - 5 # -5 for title, instructions, actions, and border
|
|
|
|
# Draw function
|
|
def draw_file_list():
|
|
dialog_win.clear()
|
|
dialog_win.box()
|
|
dialog_win.addstr(1, 2, title, curses.A_BOLD)
|
|
dialog_win.addstr(dialog_h - 2, 2, action_text)
|
|
|
|
# Calculate page items
|
|
start_idx = current_page * items_per_page
|
|
end_idx = min(start_idx + items_per_page, len(files))
|
|
|
|
# Display files with selection highlight and checkbox
|
|
for i, file in enumerate(files[start_idx:end_idx]):
|
|
y = i + 3 # +3 for title, blank line, and border
|
|
|
|
# Create checkbox display
|
|
checkbox = "[X]" if file in selected_files else "[ ]"
|
|
|
|
# Truncate filename if too long
|
|
max_width = dialog_w - 8 # Allow for padding and checkbox
|
|
if len(file) > max_width:
|
|
file_display = file[:max_width-3] + "..."
|
|
else:
|
|
file_display = file
|
|
|
|
item_text = f"{checkbox} {file_display}"
|
|
|
|
if i + start_idx == selected_idx:
|
|
dialog_win.addstr(y, 2, item_text, curses.A_REVERSE)
|
|
else:
|
|
dialog_win.addstr(y, 2, item_text)
|
|
|
|
# Show pagination info if needed
|
|
if len(files) > items_per_page:
|
|
page_info = f"Page {current_page + 1}/{(len(files) - 1) // items_per_page + 1}"
|
|
dialog_win.addstr(dialog_h - 3, dialog_w - len(page_info) - 2, page_info)
|
|
|
|
dialog_win.refresh()
|
|
|
|
# Main interaction loop
|
|
while True:
|
|
draw_file_list()
|
|
|
|
key = dialog_win.getch()
|
|
|
|
if key == 27: # ESC
|
|
return []
|
|
elif key == curses.KEY_UP and selected_idx > 0:
|
|
selected_idx -= 1
|
|
if selected_idx < current_page * items_per_page:
|
|
current_page -= 1
|
|
elif key == curses.KEY_DOWN and selected_idx < len(files) - 1:
|
|
selected_idx += 1
|
|
if selected_idx >= (current_page + 1) * items_per_page:
|
|
current_page += 1
|
|
elif key == ord(' '): # SPACE to toggle selection
|
|
current_file = files[selected_idx]
|
|
if current_file in selected_files:
|
|
selected_files.remove(current_file)
|
|
else:
|
|
selected_files.add(current_file)
|
|
elif key in (10, 13): # Enter to confirm
|
|
return list(selected_files)
|
|
elif key == curses.KEY_NPAGE and current_page < (len(files) - 1) // items_per_page:
|
|
# Page Down
|
|
current_page += 1
|
|
selected_idx = min(selected_idx + items_per_page, len(files) - 1)
|
|
elif key == curses.KEY_PPAGE and current_page > 0:
|
|
# Page Up
|
|
current_page -= 1
|
|
selected_idx = max(selected_idx - items_per_page, 0)
|
|
|
|
def has_child_segments(item, mode, selector):
|
|
"""
|
|
Check if the given item has child segments.
|
|
|
|
Args:
|
|
item: Current prefix path (e.g., "field.storage")
|
|
mode: View mode ("matched" or "unmatched")
|
|
selector: The ConfigSelector instance
|
|
|
|
Returns:
|
|
True if child segments exist, False otherwise
|
|
"""
|
|
if mode == "matched":
|
|
# For matched prefixes
|
|
for prefix in selector.prefixes:
|
|
if prefix.startswith(item + '.'):
|
|
return True
|
|
return False
|
|
else:
|
|
# For unmatched files, check if any filenames have one more segment
|
|
selected_segments = item.split('.')
|
|
segment_count = len(selected_segments)
|
|
|
|
# Look through all files in all unmatched prefixes
|
|
for prefix, file_list in selector.unmatched_files.items():
|
|
for filename in file_list:
|
|
# Remove .yml extension for better segment counting
|
|
if filename.endswith('.yml'):
|
|
filename = filename[:-4]
|
|
|
|
filename_parts = filename.split('.')
|
|
# Check if this file has this prefix plus at least one more segment
|
|
if (len(filename_parts) > segment_count and
|
|
'.'.join(filename_parts[:segment_count]) == item):
|
|
return True
|
|
return False
|
|
|
|
def preview_yaml_cleaning(stdscr, content):
|
|
"""Show a simplified schematic preview of the YAML cleaning process."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Safety checks for minimum terminal size
|
|
min_h, min_w = 10, 30
|
|
if h < min_h or w < min_w:
|
|
return # Terminal too small
|
|
|
|
# Calculate preview window size
|
|
preview_h = min(max(8, min(12, h - 8)), h - 2) # Smaller height
|
|
preview_w = min(max(30, min(70, w - 4)), w - 2)
|
|
preview_y = max(0, (h - preview_h) // 2)
|
|
preview_x = max(0, (w - preview_w) // 2)
|
|
|
|
# Final safety check
|
|
if preview_h <= 0 or preview_w <= 0 or preview_y < 0 or preview_x < 0:
|
|
return
|
|
|
|
# Create window
|
|
preview_win = curses.newwin(preview_h, preview_w, preview_y, preview_x)
|
|
preview_win.box()
|
|
preview_win.addstr(1, 2, "YAML Cleaning Preview:", curses.A_BOLD)
|
|
|
|
# Create simple schematic example
|
|
original = """langcode: en
|
|
status: true
|
|
uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae
|
|
dependencies:
|
|
module:
|
|
- node
|
|
_core:
|
|
default_config_hash: fUksROt4FfkAU9BV4hV2XvhTBSS2nTNrZS4U7S-tKrs
|
|
id: example
|
|
name: Example"""
|
|
|
|
cleaned = """langcode: en
|
|
status: true
|
|
dependencies:
|
|
module:
|
|
- node
|
|
id: example
|
|
name: Example"""
|
|
|
|
# Display side by side
|
|
# Show original
|
|
preview_win.addstr(3, 2, "Original:", curses.A_UNDERLINE)
|
|
preview_win.addstr(4, 2, "uuid: cc09dc7f-ec98-4e4e-ae38-4fe7e8676aae", curses.A_BOLD)
|
|
preview_win.addstr(5, 2, "_core:", curses.A_BOLD)
|
|
preview_win.addstr(6, 2, " default_config_hash: fUksROt4F...", curses.A_BOLD)
|
|
preview_win.addstr(7, 2, "other YAML content...")
|
|
|
|
# Show arrow
|
|
arrow_x = preview_w // 2 - 2
|
|
for i in range(3, preview_h - 3):
|
|
preview_win.addstr(i, arrow_x, "→")
|
|
|
|
# Show cleaned
|
|
preview_win.addstr(3, arrow_x + 4, "Cleaned:", curses.A_UNDERLINE)
|
|
preview_win.addstr(5, arrow_x + 4, "UUID and _core removed")
|
|
preview_win.addstr(7, arrow_x + 4, "other YAML content preserved")
|
|
|
|
# Instructions
|
|
preview_win.addstr(preview_h - 2, 2, "Press any key to continue...", curses.A_DIM)
|
|
preview_win.refresh()
|
|
|
|
# Wait for key
|
|
stdscr.getch()
|
|
|
|
def confirm_yaml_cleaning(stdscr, selector):
|
|
"""Show a confirmation dialog for YAML cleaning during copy."""
|
|
h, w = stdscr.getmaxyx()
|
|
|
|
# Get a sample file to preview
|
|
sample_file = None
|
|
for prefix in selector.prefixes:
|
|
if selector.matched_files[prefix]:
|
|
sample_file = os.path.join(selector.original_config_path, selector.matched_files[prefix][0])
|
|
break
|
|
|
|
if sample_file and os.path.exists(sample_file):
|
|
try:
|
|
with open(sample_file, 'r') as file:
|
|
content = file.read()
|
|
preview_yaml_cleaning(stdscr, content)
|
|
except Exception as e:
|
|
# In case of error, just show a simple message
|
|
error_msg = f"Error reading sample file: {str(e)}"
|
|
# Safety checks for window creation
|
|
if h >= 5 and w >= 20:
|
|
msg_h = min(3, h - 2)
|
|
msg_w = min(len(error_msg) + 4, w - 4)
|
|
if msg_w < 10:
|
|
msg_w = min(20, w - 4)
|
|
msg_y = max(0, (h - msg_h) // 2)
|
|
msg_x = max(0, (w - msg_w) // 2)
|
|
|
|
if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0:
|
|
message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x)
|
|
message_win.box()
|
|
display_msg = error_msg[:msg_w-4] if len(error_msg) > msg_w-4 else error_msg
|
|
message_win.addstr(1, 2, display_msg)
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
|
|
return confirm_dialog(stdscr, "Copy all matched files to config directory and remove UUID and _core?")
|
|
|
|
def _curses_addstr_clipped(win, y: int, x: int, text: str, max_width: int, attrs: int = 0) -> None:
|
|
"""Write text inside a bordered curses window, clipping to fit."""
|
|
win_h, _ = win.getmaxyx()
|
|
if y <= 0 or y >= win_h - 1:
|
|
return
|
|
clipped = text[: max(0, max_width - x - 1)]
|
|
if not clipped:
|
|
return
|
|
try:
|
|
if attrs:
|
|
win.addstr(y, x, clipped, attrs)
|
|
else:
|
|
win.addstr(y, x, clipped)
|
|
except curses.error:
|
|
pass
|
|
|
|
def _show_text_dialog(stdscr, title: str, content_lines: List[str]) -> None:
|
|
"""Show a bordered dialog; clears the screen first to avoid TUI bleed-through."""
|
|
stdscr.clear()
|
|
stdscr.refresh()
|
|
h, w = stdscr.getmaxyx()
|
|
if h < 8 or w < 24:
|
|
return
|
|
|
|
dialog_w = min(72, w - 4)
|
|
inner_h = h - 6
|
|
lines = list(content_lines)
|
|
if len(lines) > inner_h:
|
|
hidden = len(lines) - inner_h + 1
|
|
lines = lines[: inner_h - 1]
|
|
lines.append(f"... +{hidden} more (see CSV files)")
|
|
|
|
dialog_h = min(h - 2, max(8, len(lines) + 4))
|
|
dialog_y = max(0, (h - dialog_h) // 2)
|
|
dialog_x = max(0, (w - dialog_w) // 2)
|
|
if dialog_h <= 0 or dialog_w <= 0:
|
|
return
|
|
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
_curses_addstr_clipped(dialog_win, 1, 2, title, dialog_w, curses.A_BOLD)
|
|
for i, line in enumerate(lines):
|
|
row = 2 + i
|
|
if row >= dialog_h - 2:
|
|
break
|
|
_curses_addstr_clipped(dialog_win, row, 2, line, dialog_w)
|
|
_curses_addstr_clipped(
|
|
dialog_win, dialog_h - 2, 2, "Press any key to continue...", dialog_w
|
|
)
|
|
dialog_win.refresh()
|
|
stdscr.getch()
|
|
|
|
def show_info_dialog(stdscr, message: str) -> None:
|
|
"""Show a simple message dialog."""
|
|
h, w = stdscr.getmaxyx()
|
|
if h < 5 or w < 20:
|
|
return
|
|
dialog_h = min(5, h - 2)
|
|
dialog_w = min(max(40, len(message) + 4), w - 4)
|
|
dialog_y = max(0, (h - dialog_h) // 2)
|
|
dialog_x = max(0, (w - dialog_w) // 2)
|
|
if dialog_h <= 0 or dialog_w <= 0:
|
|
return
|
|
dialog_win = curses.newwin(dialog_h, dialog_w, dialog_y, dialog_x)
|
|
dialog_win.box()
|
|
_curses_addstr_clipped(dialog_win, 1, 2, message, dialog_w)
|
|
_curses_addstr_clipped(dialog_win, dialog_h - 2, 2, "Press any key to continue...", dialog_w)
|
|
dialog_win.refresh()
|
|
stdscr.getch()
|
|
|
|
def show_update_recipe_results(
|
|
stdscr,
|
|
files_deleted: int,
|
|
files_applied: int,
|
|
deletes_missing: int,
|
|
update_dir: str,
|
|
) -> None:
|
|
"""Show the results of building update/<recipe_name>/ from the committed recipe."""
|
|
content_lines = [
|
|
f"Recipe copy: {update_dir}/",
|
|
f"Deleted from config: {files_deleted}",
|
|
f"Applied from changed_files/: {files_applied}",
|
|
]
|
|
if deletes_missing:
|
|
content_lines.append(f"Delete CSV entries not in copy: {deletes_missing}")
|
|
_show_text_dialog(stdscr, "Recipe Update Results", content_lines)
|
|
|
|
def show_comparison_results(
|
|
stdscr,
|
|
added_count: int,
|
|
modified_count: int,
|
|
deleted_count: int,
|
|
unchanged_count: int,
|
|
language_deleted_count: int = 0,
|
|
not_in_original_count: int = 0,
|
|
language_inconsistency_count: int = 0,
|
|
need_translations_count: int = 0,
|
|
):
|
|
"""Show the results of file comparison."""
|
|
total_out = added_count + modified_count
|
|
deleted_line = f"Deleted (recipe, missing in new): {deleted_count}"
|
|
if language_deleted_count:
|
|
deleted_line += f" ({language_deleted_count} language/)"
|
|
|
|
content_lines = [
|
|
f"New (not in committed recipe): {added_count}",
|
|
f"Modified vs committed: {modified_count}",
|
|
f"Total -> changed_files/: {total_out}",
|
|
deleted_line,
|
|
f"Unchanged: {unchanged_count}",
|
|
]
|
|
if not_in_original_count:
|
|
content_lines.append(
|
|
f"Not in original export: {not_in_original_count} "
|
|
"(new_recipe_not_in_original.csv)"
|
|
)
|
|
if language_deleted_count:
|
|
content_lines.append("Language deletions: see deleted_files.csv")
|
|
if language_inconsistency_count:
|
|
content_lines.append(
|
|
f"Language inconsistency: {language_inconsistency_count} "
|
|
"(language_inconsistency.csv)"
|
|
)
|
|
if need_translations_count:
|
|
content_lines.append(
|
|
f"Missing de translation: {need_translations_count} "
|
|
"(need_translations.csv)"
|
|
)
|
|
content_lines.extend(
|
|
[
|
|
"",
|
|
"Manifest -> changed_files_manifest.csv",
|
|
"Deletions -> deleted_files.csv",
|
|
]
|
|
)
|
|
_show_text_dialog(stdscr, "File Comparison Results", content_lines)
|
|
|
|
def main(stdscr):
|
|
# Set paths relative to current directory.
|
|
jsonPath = os.path.join(BASE_DIR, "config_prefixes.json")
|
|
configPath = os.path.join(BASE_DIR, "new_recipe_config")
|
|
|
|
# Create directories if they don't exist.
|
|
os.makedirs(ORIGINAL_CONFIG_DIR, exist_ok=True)
|
|
os.makedirs(configPath, exist_ok=True)
|
|
|
|
# Check if we need to create default JSON file.
|
|
if not os.path.exists(jsonPath):
|
|
with open(jsonPath, "w") as f:
|
|
json.dump({"prefixes": ["put.your.prefixes.here"]}, f, indent=2)
|
|
print(f"Created default {jsonPath}")
|
|
|
|
translatablePrefixesPath = os.path.join(BASE_DIR, "translatable_config_prefixes.json")
|
|
if not os.path.exists(translatablePrefixesPath):
|
|
with open(translatablePrefixesPath, "w") as f:
|
|
json.dump(
|
|
{
|
|
"prefixes": [
|
|
"core.entity_form_display",
|
|
"core.entity_form_mode",
|
|
"field.field.wisski_individual",
|
|
"field.storage.wisski_individual",
|
|
"views.view",
|
|
"wisski_core.wisski_bundle",
|
|
]
|
|
},
|
|
f,
|
|
indent=2,
|
|
)
|
|
print(f"Created default {translatablePrefixesPath}")
|
|
|
|
# Initialize selector.
|
|
selector = ConfigSelector(
|
|
json_path=jsonPath,
|
|
original_config_path=ORIGINAL_CONFIG_DIR,
|
|
config_path=configPath,
|
|
old_recipe_path=RECIPE_CONFIG_DIR,
|
|
)
|
|
|
|
# Menu state
|
|
selected_idx = 0
|
|
view_mode = "matched" # "matched" or "unmatched"
|
|
current_prefix = "" # Current prefix path, e.g. "field.storage"
|
|
level = 0 # Current hierarchical level
|
|
|
|
# Navigation history for going back
|
|
path_history = []
|
|
|
|
# Main loop
|
|
while True:
|
|
# Get segments at the current level for the current view
|
|
all_prefixes = selector.prefixes if view_mode == "matched" else list(selector.unmatched_files.keys())
|
|
items = get_prefix_segments_at_level(all_prefixes, current_prefix, level, selector, view_mode)
|
|
|
|
# Draw the menu
|
|
item_count = draw_menu(stdscr, selector, selected_idx, view_mode, current_prefix, level)
|
|
|
|
if item_count == 0:
|
|
key = stdscr.getch()
|
|
if key == ord('q') or key == ord('Q'):
|
|
break
|
|
elif key == ord(' '):
|
|
view_mode = "unmatched" if view_mode == "matched" else "matched"
|
|
selected_idx = 0
|
|
# Reset navigation when toggling view
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
elif key == ord('a') or key == ord('A'):
|
|
new_prefix = add_prefix_dialog(stdscr)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == ord('o') or key == ord('O'):
|
|
new_prefix = choose_prefix_dialog(stdscr, selector)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == curses.KEY_LEFT and (current_prefix or level > 0):
|
|
# Go back one level
|
|
if level > 0:
|
|
level -= 1
|
|
if current_prefix:
|
|
# Remove last segment
|
|
parts = current_prefix.split('.')
|
|
current_prefix = '.'.join(parts[:-1])
|
|
selected_idx = 0
|
|
elif path_history:
|
|
# Pop from history
|
|
prev_prefix, prev_level, prev_idx = path_history.pop()
|
|
current_prefix = prev_prefix
|
|
level = prev_level
|
|
selected_idx = prev_idx
|
|
continue
|
|
|
|
key = stdscr.getch()
|
|
|
|
if key == ord('q') or key == ord('Q'):
|
|
break
|
|
elif key == curses.KEY_UP and selected_idx > 0:
|
|
selected_idx -= 1
|
|
elif key == curses.KEY_DOWN and selected_idx < item_count - 1:
|
|
selected_idx += 1
|
|
elif key == ord(' '):
|
|
view_mode = "unmatched" if view_mode == "matched" else "matched"
|
|
selected_idx = 0
|
|
# Reset navigation when toggling view
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
elif key == curses.KEY_RIGHT:
|
|
# Drill down into the selected item
|
|
if selected_idx < len(items):
|
|
selected_item = items[selected_idx]
|
|
|
|
# Check if there are child segments
|
|
if has_child_segments(selected_item, view_mode, selector):
|
|
# Save current state to history
|
|
path_history.append((current_prefix, level, selected_idx))
|
|
|
|
# Update current prefix and level
|
|
current_prefix = selected_item
|
|
level += 1
|
|
selected_idx = 0
|
|
elif key == curses.KEY_LEFT and (current_prefix or level > 0):
|
|
# Go back one level
|
|
if level > 0:
|
|
level -= 1
|
|
if current_prefix:
|
|
# Remove last segment
|
|
parts = current_prefix.split('.')
|
|
current_prefix = '.'.join(parts[:-1])
|
|
selected_idx = 0
|
|
elif path_history:
|
|
# Pop from history
|
|
prev_prefix, prev_level, prev_idx = path_history.pop()
|
|
current_prefix = prev_prefix
|
|
level = prev_level
|
|
selected_idx = prev_idx
|
|
elif key == ord('a') or key == ord('A'):
|
|
new_prefix = add_prefix_dialog(stdscr)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == ord('o') or key == ord('O'):
|
|
new_prefix = choose_prefix_dialog(stdscr, selector)
|
|
if new_prefix:
|
|
selector.add_prefix(new_prefix)
|
|
elif key == ord('d') or key == ord('D'):
|
|
if view_mode == "matched" and len(items) > 0 and selected_idx < len(items):
|
|
selected_item = items[selected_idx]
|
|
# Only allow deleting complete prefixes
|
|
if selected_item in selector.prefixes:
|
|
if confirm_dialog(stdscr, f"Remove prefix '{selected_item}'?"):
|
|
selector.remove_prefix(selected_item)
|
|
if selected_idx >= len(items) - 1:
|
|
selected_idx = max(0, len(items) - 1)
|
|
else:
|
|
# Allow adding the current selection (partial prefix) directly
|
|
if confirm_dialog(stdscr, f"Add current path '{selected_item}' as a matched prefix?"):
|
|
selector.add_prefix(selected_item)
|
|
view_mode = "matched"
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
selected_idx = 0
|
|
elif view_mode == "unmatched" and len(items) > 0 and selected_idx < len(items):
|
|
# Add the item to matched prefixes
|
|
selected_item = items[selected_idx]
|
|
if confirm_dialog(stdscr, f"Add '{selected_item}' to matched prefixes?"):
|
|
selector.add_prefix(selected_item)
|
|
view_mode = "matched"
|
|
current_prefix = ""
|
|
level = 0
|
|
path_history = []
|
|
selected_idx = 0
|
|
elif key == ord('c') or key == ord('C'):
|
|
if confirm_yaml_cleaning(stdscr, selector):
|
|
selector.copy_matched_files(quiet=True)
|
|
elif key == ord('m') or key == ord('M'):
|
|
if confirm_dialog(stdscr, "Compare old and new recipe configs and track changes?"):
|
|
(
|
|
added_count,
|
|
modified_count,
|
|
deleted_count,
|
|
unchanged_count,
|
|
language_deleted_count,
|
|
not_in_original_count,
|
|
language_inconsistency_count,
|
|
need_translations_count,
|
|
) = selector.compare_and_track_changes(quiet=True)
|
|
show_comparison_results(
|
|
stdscr,
|
|
added_count,
|
|
modified_count,
|
|
deleted_count,
|
|
unchanged_count,
|
|
language_deleted_count,
|
|
not_in_original_count,
|
|
language_inconsistency_count,
|
|
need_translations_count,
|
|
)
|
|
elif key == ord('i') or key == ord('I'):
|
|
inconsistent = selector._collect_language_inconsistencies()
|
|
if not inconsistent:
|
|
show_info_dialog(stdscr, "No language inconsistencies in new_recipe_config.")
|
|
elif confirm_dialog(
|
|
stdscr,
|
|
f"Delete {len(inconsistent)} language file(s) without base config pendant?",
|
|
):
|
|
removed = selector.delete_language_inconsistencies()
|
|
show_info_dialog(
|
|
stdscr,
|
|
f"Removed {removed} language inconsistency file(s) from new_recipe_config.",
|
|
)
|
|
elif key == ord('u') or key == ord('U'):
|
|
if confirm_dialog(
|
|
stdscr,
|
|
f"Copy recipe to {recipe_update_dir()}/, remove deleted_files.csv "
|
|
"entries, apply changed_files/ to config?",
|
|
):
|
|
try:
|
|
files_deleted, files_applied, deletes_missing, update_dir = (
|
|
selector.update_recipe(quiet=True)
|
|
)
|
|
show_update_recipe_results(
|
|
stdscr,
|
|
files_deleted,
|
|
files_applied,
|
|
deletes_missing,
|
|
update_dir,
|
|
)
|
|
except FileNotFoundError as exc:
|
|
show_info_dialog(stdscr, str(exc))
|
|
elif key == ord('s') or key == ord('S'):
|
|
selector.save_prefixes()
|
|
elif key in (10, 13): # ENTER key to select individual files
|
|
if len(items) > 0 and selected_idx < len(items):
|
|
selected_item = items[selected_idx]
|
|
files = get_files_for_segment(selector, selected_item, view_mode)
|
|
|
|
if files:
|
|
if view_mode == "matched":
|
|
# Check if this is a complete prefix in matched
|
|
if selected_item in selector.prefixes:
|
|
selected_files = select_individual_files_dialog(stdscr, selector, selected_item, view_mode)
|
|
if selected_files and confirm_dialog(stdscr, f"Remove {len(selected_files)} files from matched?"):
|
|
# Remove individual files from matched_files
|
|
for file in selected_files:
|
|
if file in selector.matched_files[selected_item]:
|
|
selector.matched_files[selected_item].remove(file)
|
|
else:
|
|
# Show message that you can only select files from complete prefixes
|
|
h, w = stdscr.getmaxyx()
|
|
# Safety checks for window creation
|
|
if h >= 5 and w >= 30:
|
|
msg_h = min(3, h - 2)
|
|
msg_w = min(60, w - 4)
|
|
if msg_w < 20:
|
|
msg_w = min(30, w - 4)
|
|
msg_y = max(0, (h - msg_h) // 2)
|
|
msg_x = max(0, (w - msg_w) // 2)
|
|
|
|
if msg_h > 0 and msg_w > 0 and msg_y >= 0 and msg_x >= 0:
|
|
message_win = curses.newwin(msg_h, msg_w, msg_y, msg_x)
|
|
message_win.box()
|
|
message_win.addstr(1, 2, "Can only select files from complete prefixes.")
|
|
message_win.refresh()
|
|
message_win.getch()
|
|
else: # Unmatched view
|
|
# Find a matching prefix to add files to
|
|
target_prefix = None
|
|
for prefix in selector.prefixes:
|
|
if selected_item.startswith(prefix) or prefix.startswith(selected_item):
|
|
target_prefix = prefix
|
|
break
|
|
|
|
if not target_prefix:
|
|
if confirm_dialog(stdscr, f"Add prefix '{selected_item}' to matched prefixes?"):
|
|
selector.add_prefix(selected_item)
|
|
target_prefix = selected_item
|
|
|
|
if target_prefix:
|
|
file_selection = select_individual_files_dialog(stdscr, selector, selected_item, view_mode)
|
|
if file_selection:
|
|
# Add individual files to matched_files
|
|
for file in file_selection:
|
|
if file not in selector.matched_files[target_prefix]:
|
|
selector.matched_files[target_prefix].append(file)
|
|
|
|
if __name__ == "__main__":
|
|
forceIngest = "--force" in sys.argv
|
|
ingest_config(force=forceIngest)
|
|
try:
|
|
curses.wrapper(main)
|
|
except KeyboardInterrupt:
|
|
print("Program terminated by user")
|