Custom form configuration #268

Merged
tobru merged 34 commits from 165-form-configuration into main 2025-11-10 14:49:33 +00:00
4 changed files with 322 additions and 282 deletions
Showing only changes of commit 9e1804a141 - Show all commits

View file

@ -0,0 +1,31 @@
from servala.core.crd.forms import (
CrdModelFormMixin,
CustomFormMixin,
FormGeneratorMixin,
generate_custom_form_class,
generate_model_form_class,
)
from servala.core.crd.models import (
CRDModel,
build_object_fields,
duplicate_field,
generate_django_model,
get_django_field,
unnest_data,
)
from servala.core.crd.utils import deslugify
__all__ = [
"CrdModelFormMixin",
"CustomFormMixin",
"FormGeneratorMixin",
"generate_django_model",
"generate_model_form_class",
"generate_custom_form_class",
"CRDModel",
"build_object_fields",
"duplicate_field",
"get_django_field",
"unnest_data",
"deslugify",
]

View file

@ -1,291 +1,18 @@
import re
from django import forms
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.db import models
from django.core.validators import MaxValueValidator, MinValueValidator
from django.forms.models import ModelForm, ModelFormMetaclass
from django.utils.translation import gettext_lazy as _
from servala.core.models import ControlPlaneCRD, ServiceInstance
from servala.frontend.forms.widgets import DynamicArrayField, DynamicArrayWidget
class CRDModel(models.Model):
"""Base class for all virtual CRD models"""
def __init__(self, **kwargs):
if spec := kwargs.pop("spec", None):
kwargs.update(unnest_data({"spec": spec}))
super().__init__(**kwargs)
class Meta:
abstract = True
def duplicate_field(field_name, model):
field = model._meta.get_field(field_name)
new_field = type(field).__new__(type(field))
new_field.__dict__.update(field.__dict__)
new_field.model = None
new_field.auto_created = False
return new_field
def generate_django_model(schema, group, version, kind):
"""
Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
"""
# We always need these three fields to know our own name and our full namespace
model_fields = {"__module__": "crd_models"}
for field_name in ("name", "context"):
model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
# All other fields are generated from the schema, except for the
# resourceRef object
spec = schema["properties"].get("spec") or {}
spec["properties"].pop("resourceRef", None)
model_fields.update(build_object_fields(spec, "spec", parent_required=False))
# Store the original schema on the model class
model_fields["SCHEMA"] = schema
meta_class = type("Meta", (), {"app_label": "crd_models"})
model_fields["Meta"] = meta_class
# create the model class
model_name = kind
model_class = type(model_name, (CRDModel,), model_fields)
return model_class
def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
required_fields = schema.get("required") or []
properties = schema.get("properties") or {}
fields = {}
for field_name, field_schema in properties.items():
is_required = field_name in required_fields or parent_required
full_name = f"{name}.{field_name}"
result = get_django_field(
field_schema,
is_required,
field_name,
full_name,
verbose_name_prefix=verbose_name_prefix,
)
if isinstance(result, dict):
fields.update(result)
else:
fields[full_name] = result
return fields
def deslugify(title):
"""
Convert camelCase, PascalCase, or snake_case to human-readable title.
Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
"""
ACRONYMS = {
# Database systems
"SQL": "SQL",
"MYSQL": "MySQL",
"POSTGRESQL": "PostgreSQL",
"MARIADB": "MariaDB",
"MSSQL": "MSSQL",
"MONGODB": "MongoDB",
"REDIS": "Redis",
# Protocols
"HTTP": "HTTP",
"HTTPS": "HTTPS",
"FTP": "FTP",
"SFTP": "SFTP",
"SSH": "SSH",
"TLS": "TLS",
"SSL": "SSL",
# APIs
"API": "API",
"REST": "REST",
"GRPC": "gRPC",
"GRAPHQL": "GraphQL",
# Networking
"URL": "URL",
"URI": "URI",
"FQDN": "FQDN",
"DNS": "DNS",
"IP": "IP",
"TCP": "TCP",
"UDP": "UDP",
# Data formats
"JSON": "JSON",
"XML": "XML",
"YAML": "YAML",
"CSV": "CSV",
"HTML": "HTML",
"CSS": "CSS",
# Hardware
"CPU": "CPU",
"RAM": "RAM",
"GPU": "GPU",
"SSD": "SSD",
"HDD": "HDD",
# Identifiers
"ID": "ID",
"UUID": "UUID",
"GUID": "GUID",
"ARN": "ARN",
# Cloud providers
"AWS": "AWS",
"GCP": "GCP",
"AZURE": "Azure",
"IBM": "IBM",
# Kubernetes/Cloud
"DB": "DB",
"PVC": "PVC",
"PV": "PV",
"VPN": "VPN",
# Auth
"OS": "OS",
"LDAP": "LDAP",
"SAML": "SAML",
"OAUTH": "OAuth",
"JWT": "JWT",
# AWS Services
"S3": "S3",
"EC2": "EC2",
"RDS": "RDS",
"EBS": "EBS",
"IAM": "IAM",
}
if "_" in title:
# Handle snake_case
title = title.replace("_", " ")
words = title.split()
else:
# Handle camelCase/PascalCase with smart splitting
# This regex splits on:
# - Transition from lowercase to uppercase (camelCase)
# - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
# Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
merged_words = []
i = 0
while i < len(words):
if i < len(words) - 1:
# Check if current word + next word form a known acronym
combined = (words[i] + words[i + 1]).upper()
if combined in ACRONYMS:
merged_words.append(combined)
i += 2
continue
merged_words.append(words[i])
i += 1
# Capitalize each word, using proper casing for known acronyms
result = []
for word in merged_words:
word_upper = word.upper()
if word_upper in ACRONYMS:
result.append(ACRONYMS[word_upper])
else:
result.append(word.capitalize())
return " ".join(result)
def get_django_field(
field_schema, is_required, field_name, full_name, verbose_name_prefix=None
):
field_type = field_schema.get("type") or "string"
format = field_schema.get("format")
verbose_name_prefix = verbose_name_prefix or ""
verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
# Pass down the requirement status from parent to child fields
kwargs = {
"blank": not is_required, # All fields are optional by default
"null": not is_required,
"help_text": field_schema.get("description"),
"validators": [],
"verbose_name": verbose_name,
"default": field_schema.get("default"),
}
if minimum := field_schema.get("minimum"):
kwargs["validators"].append(MinValueValidator(minimum))
if maximum := field_schema.get("maximum"):
kwargs["validators"].append(MaxValueValidator(maximum))
if field_type == "string":
if format == "date-time":
return models.DateTimeField(**kwargs)
elif format == "date":
return models.DateField(**kwargs)
else:
max_length = field_schema.get("max_length") or 255
if pattern := field_schema.get("pattern"):
kwargs["validators"].append(RegexValidator(regex=pattern))
if choices := field_schema.get("enum"):
kwargs["choices"] = ((choice, choice) for choice in choices)
return models.CharField(max_length=max_length, **kwargs)
elif field_type == "integer":
return models.IntegerField(**kwargs)
elif field_type == "number":
return models.FloatField(**kwargs)
elif field_type == "boolean":
return models.BooleanField(**kwargs)
elif field_type == "object":
# Here we pass down the requirement status to nested objects
return build_object_fields(
field_schema,
full_name,
verbose_name_prefix=f"{verbose_name}:",
parent_required=is_required,
)
elif field_type == "array":
kwargs["help_text"] = field_schema.get("description") or _("List of values")
field = models.JSONField(**kwargs)
formfield_kwargs = {
"label": field.verbose_name,
"required": not field.blank,
}
array_validation = {}
if min_items := field_schema.get("min_items"):
array_validation["min_items"] = min_items
if max_items := field_schema.get("max_items"):
array_validation["max_items"] = max_items
if unique_items := field_schema.get("unique_items"):
array_validation["unique_items"] = unique_items
if items_schema := field_schema.get("items"):
array_validation["items_schema"] = items_schema
if array_validation:
formfield_kwargs["array_validation"] = array_validation
field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
return field
return models.CharField(max_length=255, **kwargs)
def unnest_data(data):
result = {}
def _flatten_dict(d, parent_key=""):
for key, value in d.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
_flatten_dict(value, new_key)
else:
result[new_key] = value
_flatten_dict(data)
return result
from servala.core.crd.utils import deslugify
from servala.core.models import ControlPlaneCRD
from servala.frontend.forms.widgets import DynamicArrayWidget
class FormGeneratorMixin:
IS_CUSTOM_FORM = False
"""Shared base class for ModelForm classes based on our generated CRD models.
There are two relevant child classes:
- CrdModelFormMixin: For fully auto-generated forms from the spec
- CustomFormMixin: For forms built from form_config settings.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View file

@ -0,0 +1,167 @@
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.db import models
from django.utils.translation import gettext_lazy as _
from servala.core.crd.utils import deslugify
from servala.core.models import ServiceInstance
from servala.frontend.forms.widgets import DynamicArrayField
class CRDModel(models.Model):
"""Base class for all virtual CRD models"""
def __init__(self, **kwargs):
if spec := kwargs.pop("spec", None):
kwargs.update(unnest_data({"spec": spec}))
super().__init__(**kwargs)
class Meta:
abstract = True
def generate_django_model(schema, group, version, kind):
"""
Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
"""
# We always need these three fields to know our own name and our full namespace
model_fields = {"__module__": "crd_models"}
for field_name in ("name", "context"):
model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
# All other fields are generated from the schema, except for the
# resourceRef object
spec = schema["properties"].get("spec") or {}
spec["properties"].pop("resourceRef", None)
model_fields.update(build_object_fields(spec, "spec", parent_required=False))
# Store the original schema on the model class
model_fields["SCHEMA"] = schema
meta_class = type("Meta", (), {"app_label": "crd_models"})
model_fields["Meta"] = meta_class
# create the model class
model_name = kind
model_class = type(model_name, (CRDModel,), model_fields)
return model_class
def duplicate_field(field_name, model):
field = model._meta.get_field(field_name)
new_field = type(field).__new__(type(field))
new_field.__dict__.update(field.__dict__)
new_field.model = None
new_field.auto_created = False
return new_field
def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
required_fields = schema.get("required") or []
properties = schema.get("properties") or {}
fields = {}
for field_name, field_schema in properties.items():
is_required = field_name in required_fields or parent_required
full_name = f"{name}.{field_name}"
result = get_django_field(
field_schema,
is_required,
field_name,
full_name,
verbose_name_prefix=verbose_name_prefix,
)
if isinstance(result, dict):
fields.update(result)
else:
fields[full_name] = result
return fields
def get_django_field(
field_schema, is_required, field_name, full_name, verbose_name_prefix=None
):
field_type = field_schema.get("type") or "string"
format = field_schema.get("format")
verbose_name_prefix = verbose_name_prefix or ""
verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
# Pass down the requirement status from parent to child fields
kwargs = {
"blank": not is_required, # All fields are optional by default
"null": not is_required,
"help_text": field_schema.get("description"),
"validators": [],
"verbose_name": verbose_name,
"default": field_schema.get("default"),
}
if minimum := field_schema.get("minimum"):
kwargs["validators"].append(MinValueValidator(minimum))
if maximum := field_schema.get("maximum"):
kwargs["validators"].append(MaxValueValidator(maximum))
if field_type == "string":
if format == "date-time":
return models.DateTimeField(**kwargs)
elif format == "date":
return models.DateField(**kwargs)
else:
max_length = field_schema.get("max_length") or 255
if pattern := field_schema.get("pattern"):
kwargs["validators"].append(RegexValidator(regex=pattern))
if choices := field_schema.get("enum"):
kwargs["choices"] = ((choice, choice) for choice in choices)
return models.CharField(max_length=max_length, **kwargs)
elif field_type == "integer":
return models.IntegerField(**kwargs)
elif field_type == "number":
return models.FloatField(**kwargs)
elif field_type == "boolean":
return models.BooleanField(**kwargs)
elif field_type == "object":
# Here we pass down the requirement status to nested objects
return build_object_fields(
field_schema,
full_name,
verbose_name_prefix=f"{verbose_name}:",
parent_required=is_required,
)
elif field_type == "array":
kwargs["help_text"] = field_schema.get("description") or _("List of values")
field = models.JSONField(**kwargs)
formfield_kwargs = {
"label": field.verbose_name,
"required": not field.blank,
}
array_validation = {}
if min_items := field_schema.get("min_items"):
array_validation["min_items"] = min_items
if max_items := field_schema.get("max_items"):
array_validation["max_items"] = max_items
if unique_items := field_schema.get("unique_items"):
array_validation["unique_items"] = unique_items
if items_schema := field_schema.get("items"):
array_validation["items_schema"] = items_schema
if array_validation:
formfield_kwargs["array_validation"] = array_validation
field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
return field
return models.CharField(max_length=255, **kwargs)
def unnest_data(data):
result = {}
def _flatten_dict(d, parent_key=""):
for key, value in d.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
_flatten_dict(value, new_key)
else:
result[new_key] = value
_flatten_dict(data)
return result

View file

@ -0,0 +1,115 @@
import re
def deslugify(title):
"""
Convert camelCase, PascalCase, or snake_case to human-readable title.
Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
"""
ACRONYMS = {
# Database systems
"SQL": "SQL",
"MYSQL": "MySQL",
"POSTGRESQL": "PostgreSQL",
"MARIADB": "MariaDB",
"MSSQL": "MSSQL",
"MONGODB": "MongoDB",
"REDIS": "Redis",
# Protocols
"HTTP": "HTTP",
"HTTPS": "HTTPS",
"FTP": "FTP",
"SFTP": "SFTP",
"SSH": "SSH",
"TLS": "TLS",
"SSL": "SSL",
# APIs
"API": "API",
"REST": "REST",
"GRPC": "gRPC",
"GRAPHQL": "GraphQL",
# Networking
"URL": "URL",
"URI": "URI",
"FQDN": "FQDN",
"DNS": "DNS",
"IP": "IP",
"TCP": "TCP",
"UDP": "UDP",
# Data formats
"JSON": "JSON",
"XML": "XML",
"YAML": "YAML",
"CSV": "CSV",
"HTML": "HTML",
"CSS": "CSS",
# Hardware
"CPU": "CPU",
"RAM": "RAM",
"GPU": "GPU",
"SSD": "SSD",
"HDD": "HDD",
# Identifiers
"ID": "ID",
"UUID": "UUID",
"GUID": "GUID",
"ARN": "ARN",
# Cloud providers
"AWS": "AWS",
"GCP": "GCP",
"AZURE": "Azure",
"IBM": "IBM",
# Kubernetes/Cloud
"DB": "DB",
"PVC": "PVC",
"PV": "PV",
"VPN": "VPN",
# Auth
"OS": "OS",
"LDAP": "LDAP",
"SAML": "SAML",
"OAUTH": "OAuth",
"JWT": "JWT",
# AWS Services
"S3": "S3",
"EC2": "EC2",
"RDS": "RDS",
"EBS": "EBS",
"IAM": "IAM",
}
if "_" in title:
# Handle snake_case
title = title.replace("_", " ")
words = title.split()
else:
# Handle camelCase/PascalCase with smart splitting
# This regex splits on:
# - Transition from lowercase to uppercase (camelCase)
# - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
# Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
merged_words = []
i = 0
while i < len(words):
if i < len(words) - 1:
# Check if current word + next word form a known acronym
combined = (words[i] + words[i + 1]).upper()
if combined in ACRONYMS:
merged_words.append(combined)
i += 2
continue
merged_words.append(words[i])
i += 1
# Capitalize each word, using proper casing for known acronyms
result = []
for word in merged_words:
word_upper = word.upper()
if word_upper in ACRONYMS:
result.append(ACRONYMS[word_upper])
else:
result.append(word.capitalize())
return " ".join(result)