goodByeHida/buildSchemas.py
2024-02-07 15:33:41 +01:00

170 lines
7.4 KiB
Python

import os
import json
from utils import cleanEntityName
import xml.etree.ElementTree as ET
import shutil
from tqdm import tqdm
def processNode(node, schemaDir: str, parentName: str = None) -> set:
""" Process a node.
Args:
node (Element): The node to process.
schemaDir (str): The path to the directory to store the schemas.
parentName (str, optional): The name of the parent node. Defaults to None.
Returns:
set: The set of columns.
"""
if node.tag == "block":
# If the node is a block, it is the root node.
key_lbl: str = cleanEntityName(f"{node.get('txt')}") # The name of the column.
columns: set = set([]) # The set of columns.
else:
# If the node is not a block, it is a child node.
key_lbl: str = cleanEntityName(f"{node.get('key')}_{node.get('lbl')}") # The name of the column.
columns: set = {f"f__{key_lbl}"} # The set of columns with its own name, cause it has children.
for child in node:
if len(child) > 0:
# If the child node has children, process the child node.
processNode(child, schemaDir, key_lbl) # The columns of the child node.
elif 'txt' in child.attrib:
# If the child node has a text attribute, we need no column.
createRelTable(schemaDir, key_lbl, cleanEntityName(f"{child.get('key')}_{child.get('lbl')}"))
childName: str = cleanEntityName(f"{child.get('key')}_{child.get('lbl')}") # The name of the child column.
childColumns = set([f"f__uuid", f"f__{childName}"])
filePathEntity: str = os.path.join(schemaDir, f"c__{childName}.json")
if os.path.exists(filePathEntity):
# If the entity file exists, load the existing columns.
with open(filePathEntity, 'r', encoding='utf-8') as f:
# Load the existing columns from the entity file.
existingChildColumns: list = json.load(f).get("columns", [])
childColumns.update(existingChildColumns)
with open(filePathEntity, 'w', encoding='utf-8') as f:
# Open the entity file to write.
# Write the entity file with the columns.
json.dump({"name": f"c__{childName}", "columns": list(childColumns)}, f, ensure_ascii=False)
else:
# Iterate through the children of the node.
childName: str = cleanEntityName(f"{child.get('key')}_{child.get('lbl')}") # The name of the child column.
# Add the child column to the set of columns.
columns.add(f"f__{childName}")
if columns and len(node) > 0:
# Check if the node has children
columnsList: list = sorted(list(columns)) # Sorted list of the columns.
# Add the uuid column to the list of columns
columnsList.append("f__uuid")
filePathEntity: str = os.path.join(schemaDir, f"c__{key_lbl}.json") # The path to the entity file.
if os.path.exists(filePathEntity):
# If the entity file exists, load the existing columns.
with open(filePathEntity, 'r', encoding='utf-8') as f:
# Load the existing columns from the entity file.
existingColumns: list = json.load(f).get("columns", []) # The existing columns.
# Add the existing columns to the list of columns.
columnsList.extend(existingColumns)
# Remove duplicates
columnsList = sorted(list(set(columnsList)))
with open(filePathEntity, 'w', encoding='utf-8') as f:
# Open the entity file to write.
# Write the entity file with the columns.
json.dump({"name": f"c__{key_lbl}", "columns": columnsList}, f, ensure_ascii=False)
if parentName:
# If the node has a parent, create a relationship table.
createRelTable(schemaDir, parentName, key_lbl)
return columns
def processXmlFile(filePath, schemaDir):
"""Process an XML file.
Args:
filePath (str): The path to the XML file.
schemaDir (str): The path to the directory to store the schemas.
"""
tree = ET.parse(filePath) # The XML tree.
root = tree.getroot() # The root of the XML tree.
os.makedirs(schemaDir, exist_ok=True)
for block in root.iter('block'):
# Iterate through the blocks in the XML file and process each block.
if 'txt' in block.attrib:
# If the block has a text attribute, process the block.
columns: set = processNode(block, schemaDir) # The columns of the block.
columnsList: list = sorted(list(columns)) # Sorted list of the columns.
filePath: str = os.path.join(schemaDir, f"c__{block.get('txt')}.json") # The path to the file.
if os.path.exists(filePath):
# If the file exists, load the existing columns.
with open(filePath, 'r', encoding='utf-8') as f:
# Load the existing columns from the file.
existingColumns: list = json.load(f).get("columns", []) # The existing columns.
# Add the existing columns to the list of columns.
columnsList.extend(existingColumns)
# Remove duplicates from the list of columns.
columnsList = sorted(list(set(columnsList)))
with open(filePath, 'w', encoding='utf-8') as f:
# Open the file to write.
# Write the file with the columns.
json.dump({"name": f"c__{block.get('txt')}", "columns": columnsList}, f, ensure_ascii=False)
def buildSchemas(dirPath, schemaDir):
"""Parse schemas from XML files and saves them as json.
Args:
dirPath (str): The path to the directory containing the XML files.
schemaDir (str): The path to the directory to store the schemas.
"""
if os.path.exists(schemaDir):
# Remove the existing schema directory
shutil.rmtree(schemaDir)
# Get the total number of XML files
totalFiles = sum([len([f for f in files if f.endswith('.xml')]) for r, d, files in os.walk(dirPath)])
with tqdm(total=totalFiles, desc="Processing XML files", ncols=75) as pbar:
for dirpath, dirnames, filenames in os.walk(dirPath):
# Walk through the directory and process each XML file
for fileName in filenames:
if fileName.endswith('.xml'):
processXmlFile(os.path.join(dirpath, fileName), schemaDir)
# Update the progress bar
pbar.update(1)
print('Schemas built.')
def createRelTable(schemaDir: str, parentName: str, key_lbl: str):
"""Create a relationship table.
Args:
schemaDir (str): The path to the directory to store the schemas.
parentName (str): The name of the parent node.
key_lbl (str): The name of the column.
"""
tableName = f"r__{parentName}__{key_lbl}"
filePathRelTable: str = os.path.join(schemaDir,
f"{tableName}.json"
) # The path to the relationship table file.
with open(filePathRelTable, 'w', encoding='utf-8') as f:
# Open the relationship table file to write.
# Write the relationship table file with the columns.
json.dump(
{"name": tableName, "columns": [f"f__{parentName}__uuid", f"f__{key_lbl}__uuid"]},
f, ensure_ascii=False)