Compare commits
9 commits
ca485978b9
...
ece60ad3b1
| Author | SHA1 | Date | |
|---|---|---|---|
| ece60ad3b1 | |||
| bab9d636ee | |||
| 089dbb663a | |||
| b3bb41b322 | |||
| 68dce4c5fb | |||
| 7fbd57d1b1 | |||
| 16d8ac0c6d | |||
| 9e1804a141 | |||
| 5b496ec5b2 |
10 changed files with 1092 additions and 291 deletions
31
src/servala/core/crd/__init__.py
Normal file
31
src/servala/core/crd/__init__.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
from servala.core.crd.forms import (
|
||||
CrdModelFormMixin,
|
||||
CustomFormMixin,
|
||||
FormGeneratorMixin,
|
||||
generate_custom_form_class,
|
||||
generate_model_form_class,
|
||||
)
|
||||
from servala.core.crd.models import (
|
||||
CRDModel,
|
||||
build_object_fields,
|
||||
duplicate_field,
|
||||
generate_django_model,
|
||||
get_django_field,
|
||||
unnest_data,
|
||||
)
|
||||
from servala.core.crd.utils import deslugify
|
||||
|
||||
__all__ = [
|
||||
"CrdModelFormMixin",
|
||||
"CustomFormMixin",
|
||||
"FormGeneratorMixin",
|
||||
"generate_django_model",
|
||||
"generate_model_form_class",
|
||||
"generate_custom_form_class",
|
||||
"CRDModel",
|
||||
"build_object_fields",
|
||||
"duplicate_field",
|
||||
"get_django_field",
|
||||
"unnest_data",
|
||||
"deslugify",
|
||||
]
|
||||
|
|
@ -1,291 +1,18 @@
|
|||
import re
|
||||
|
||||
from django import forms
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
|
||||
from django.db import models
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||
from django.forms.models import ModelForm, ModelFormMetaclass
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from servala.core.models import ControlPlaneCRD, ServiceInstance
|
||||
from servala.frontend.forms.widgets import DynamicArrayField, DynamicArrayWidget
|
||||
|
||||
|
||||
class CRDModel(models.Model):
|
||||
"""Base class for all virtual CRD models"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if spec := kwargs.pop("spec", None):
|
||||
kwargs.update(unnest_data({"spec": spec}))
|
||||
super().__init__(**kwargs)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
|
||||
def duplicate_field(field_name, model):
|
||||
field = model._meta.get_field(field_name)
|
||||
new_field = type(field).__new__(type(field))
|
||||
new_field.__dict__.update(field.__dict__)
|
||||
new_field.model = None
|
||||
new_field.auto_created = False
|
||||
return new_field
|
||||
|
||||
|
||||
def generate_django_model(schema, group, version, kind):
|
||||
"""
|
||||
Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
|
||||
"""
|
||||
# We always need these three fields to know our own name and our full namespace
|
||||
model_fields = {"__module__": "crd_models"}
|
||||
for field_name in ("name", "context"):
|
||||
model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
|
||||
|
||||
# All other fields are generated from the schema, except for the
|
||||
# resourceRef object
|
||||
spec = schema["properties"].get("spec") or {}
|
||||
spec["properties"].pop("resourceRef", None)
|
||||
model_fields.update(build_object_fields(spec, "spec", parent_required=False))
|
||||
|
||||
# Store the original schema on the model class
|
||||
model_fields["SCHEMA"] = schema
|
||||
|
||||
meta_class = type("Meta", (), {"app_label": "crd_models"})
|
||||
model_fields["Meta"] = meta_class
|
||||
|
||||
# create the model class
|
||||
model_name = kind
|
||||
model_class = type(model_name, (CRDModel,), model_fields)
|
||||
return model_class
|
||||
|
||||
|
||||
def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
|
||||
required_fields = schema.get("required") or []
|
||||
properties = schema.get("properties") or {}
|
||||
fields = {}
|
||||
|
||||
for field_name, field_schema in properties.items():
|
||||
is_required = field_name in required_fields or parent_required
|
||||
full_name = f"{name}.{field_name}"
|
||||
result = get_django_field(
|
||||
field_schema,
|
||||
is_required,
|
||||
field_name,
|
||||
full_name,
|
||||
verbose_name_prefix=verbose_name_prefix,
|
||||
)
|
||||
if isinstance(result, dict):
|
||||
fields.update(result)
|
||||
else:
|
||||
fields[full_name] = result
|
||||
return fields
|
||||
|
||||
|
||||
def deslugify(title):
|
||||
"""
|
||||
Convert camelCase, PascalCase, or snake_case to human-readable title.
|
||||
Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
|
||||
"""
|
||||
ACRONYMS = {
|
||||
# Database systems
|
||||
"SQL": "SQL",
|
||||
"MYSQL": "MySQL",
|
||||
"POSTGRESQL": "PostgreSQL",
|
||||
"MARIADB": "MariaDB",
|
||||
"MSSQL": "MSSQL",
|
||||
"MONGODB": "MongoDB",
|
||||
"REDIS": "Redis",
|
||||
# Protocols
|
||||
"HTTP": "HTTP",
|
||||
"HTTPS": "HTTPS",
|
||||
"FTP": "FTP",
|
||||
"SFTP": "SFTP",
|
||||
"SSH": "SSH",
|
||||
"TLS": "TLS",
|
||||
"SSL": "SSL",
|
||||
# APIs
|
||||
"API": "API",
|
||||
"REST": "REST",
|
||||
"GRPC": "gRPC",
|
||||
"GRAPHQL": "GraphQL",
|
||||
# Networking
|
||||
"URL": "URL",
|
||||
"URI": "URI",
|
||||
"FQDN": "FQDN",
|
||||
"DNS": "DNS",
|
||||
"IP": "IP",
|
||||
"TCP": "TCP",
|
||||
"UDP": "UDP",
|
||||
# Data formats
|
||||
"JSON": "JSON",
|
||||
"XML": "XML",
|
||||
"YAML": "YAML",
|
||||
"CSV": "CSV",
|
||||
"HTML": "HTML",
|
||||
"CSS": "CSS",
|
||||
# Hardware
|
||||
"CPU": "CPU",
|
||||
"RAM": "RAM",
|
||||
"GPU": "GPU",
|
||||
"SSD": "SSD",
|
||||
"HDD": "HDD",
|
||||
# Identifiers
|
||||
"ID": "ID",
|
||||
"UUID": "UUID",
|
||||
"GUID": "GUID",
|
||||
"ARN": "ARN",
|
||||
# Cloud providers
|
||||
"AWS": "AWS",
|
||||
"GCP": "GCP",
|
||||
"AZURE": "Azure",
|
||||
"IBM": "IBM",
|
||||
# Kubernetes/Cloud
|
||||
"DB": "DB",
|
||||
"PVC": "PVC",
|
||||
"PV": "PV",
|
||||
"VPN": "VPN",
|
||||
# Auth
|
||||
"OS": "OS",
|
||||
"LDAP": "LDAP",
|
||||
"SAML": "SAML",
|
||||
"OAUTH": "OAuth",
|
||||
"JWT": "JWT",
|
||||
# AWS Services
|
||||
"S3": "S3",
|
||||
"EC2": "EC2",
|
||||
"RDS": "RDS",
|
||||
"EBS": "EBS",
|
||||
"IAM": "IAM",
|
||||
}
|
||||
|
||||
if "_" in title:
|
||||
# Handle snake_case
|
||||
title = title.replace("_", " ")
|
||||
words = title.split()
|
||||
else:
|
||||
# Handle camelCase/PascalCase with smart splitting
|
||||
# This regex splits on:
|
||||
# - Transition from lowercase to uppercase (camelCase)
|
||||
# - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
|
||||
words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
|
||||
|
||||
# Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
|
||||
merged_words = []
|
||||
i = 0
|
||||
while i < len(words):
|
||||
if i < len(words) - 1:
|
||||
# Check if current word + next word form a known acronym
|
||||
combined = (words[i] + words[i + 1]).upper()
|
||||
if combined in ACRONYMS:
|
||||
merged_words.append(combined)
|
||||
i += 2
|
||||
continue
|
||||
merged_words.append(words[i])
|
||||
i += 1
|
||||
|
||||
# Capitalize each word, using proper casing for known acronyms
|
||||
result = []
|
||||
for word in merged_words:
|
||||
word_upper = word.upper()
|
||||
if word_upper in ACRONYMS:
|
||||
result.append(ACRONYMS[word_upper])
|
||||
else:
|
||||
result.append(word.capitalize())
|
||||
|
||||
return " ".join(result)
|
||||
|
||||
|
||||
def get_django_field(
|
||||
field_schema, is_required, field_name, full_name, verbose_name_prefix=None
|
||||
):
|
||||
field_type = field_schema.get("type") or "string"
|
||||
format = field_schema.get("format")
|
||||
verbose_name_prefix = verbose_name_prefix or ""
|
||||
verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
|
||||
|
||||
# Pass down the requirement status from parent to child fields
|
||||
kwargs = {
|
||||
"blank": not is_required, # All fields are optional by default
|
||||
"null": not is_required,
|
||||
"help_text": field_schema.get("description"),
|
||||
"validators": [],
|
||||
"verbose_name": verbose_name,
|
||||
"default": field_schema.get("default"),
|
||||
}
|
||||
|
||||
if minimum := field_schema.get("minimum"):
|
||||
kwargs["validators"].append(MinValueValidator(minimum))
|
||||
if maximum := field_schema.get("maximum"):
|
||||
kwargs["validators"].append(MaxValueValidator(maximum))
|
||||
|
||||
if field_type == "string":
|
||||
if format == "date-time":
|
||||
return models.DateTimeField(**kwargs)
|
||||
elif format == "date":
|
||||
return models.DateField(**kwargs)
|
||||
else:
|
||||
max_length = field_schema.get("max_length") or 255
|
||||
if pattern := field_schema.get("pattern"):
|
||||
kwargs["validators"].append(RegexValidator(regex=pattern))
|
||||
if choices := field_schema.get("enum"):
|
||||
kwargs["choices"] = ((choice, choice) for choice in choices)
|
||||
return models.CharField(max_length=max_length, **kwargs)
|
||||
elif field_type == "integer":
|
||||
return models.IntegerField(**kwargs)
|
||||
elif field_type == "number":
|
||||
return models.FloatField(**kwargs)
|
||||
elif field_type == "boolean":
|
||||
return models.BooleanField(**kwargs)
|
||||
elif field_type == "object":
|
||||
# Here we pass down the requirement status to nested objects
|
||||
return build_object_fields(
|
||||
field_schema,
|
||||
full_name,
|
||||
verbose_name_prefix=f"{verbose_name}:",
|
||||
parent_required=is_required,
|
||||
)
|
||||
elif field_type == "array":
|
||||
kwargs["help_text"] = field_schema.get("description") or _("List of values")
|
||||
field = models.JSONField(**kwargs)
|
||||
formfield_kwargs = {
|
||||
"label": field.verbose_name,
|
||||
"required": not field.blank,
|
||||
}
|
||||
|
||||
array_validation = {}
|
||||
if min_items := field_schema.get("min_items"):
|
||||
array_validation["min_items"] = min_items
|
||||
if max_items := field_schema.get("max_items"):
|
||||
array_validation["max_items"] = max_items
|
||||
if unique_items := field_schema.get("unique_items"):
|
||||
array_validation["unique_items"] = unique_items
|
||||
if items_schema := field_schema.get("items"):
|
||||
array_validation["items_schema"] = items_schema
|
||||
if array_validation:
|
||||
formfield_kwargs["array_validation"] = array_validation
|
||||
|
||||
field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
|
||||
|
||||
return field
|
||||
return models.CharField(max_length=255, **kwargs)
|
||||
|
||||
|
||||
def unnest_data(data):
|
||||
result = {}
|
||||
|
||||
def _flatten_dict(d, parent_key=""):
|
||||
for key, value in d.items():
|
||||
new_key = f"{parent_key}.{key}" if parent_key else key
|
||||
if isinstance(value, dict):
|
||||
_flatten_dict(value, new_key)
|
||||
else:
|
||||
result[new_key] = value
|
||||
|
||||
_flatten_dict(data)
|
||||
return result
|
||||
from servala.core.crd.utils import deslugify
|
||||
from servala.core.models import ControlPlaneCRD
|
||||
from servala.frontend.forms.widgets import DynamicArrayWidget
|
||||
|
||||
|
||||
class FormGeneratorMixin:
|
||||
IS_CUSTOM_FORM = False
|
||||
"""Shared base class for ModelForm classes based on our generated CRD models.
|
||||
There are two relevant child classes:
|
||||
- CrdModelFormMixin: For fully auto-generated forms from the spec
|
||||
- CustomFormMixin: For forms built from form_config settings.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
|
@ -570,6 +297,11 @@ class CustomFormMixin(FormGeneratorMixin):
|
|||
)
|
||||
elif field_type == "array":
|
||||
field.widget = DynamicArrayWidget()
|
||||
elif field_type == "choice":
|
||||
if hasattr(field, "choices") and field.choices:
|
||||
field._controlplane_choices = list(field.choices)
|
||||
if custom_choices := field_config.get("choices"):
|
||||
field.choices = [tuple(choice) for choice in custom_choices]
|
||||
|
||||
if field_type == "number":
|
||||
min_val = field_config.get("min_value")
|
||||
|
|
@ -578,12 +310,17 @@ class CustomFormMixin(FormGeneratorMixin):
|
|||
validators = []
|
||||
if min_val is not None:
|
||||
validators.append(MinValueValidator(min_val))
|
||||
field.widget.attrs["min"] = min_val
|
||||
if max_val is not None:
|
||||
validators.append(MaxValueValidator(max_val))
|
||||
field.widget.attrs["max"] = max_val
|
||||
|
||||
if validators:
|
||||
field.validators.extend(validators)
|
||||
|
||||
if "default_value" in field_config and field.initial is None:
|
||||
field.initial = field_config["default_value"]
|
||||
|
||||
field.controlplane_field_mapping = field_name
|
||||
|
||||
def get_fieldsets(self):
|
||||
|
|
@ -603,6 +340,25 @@ class CustomFormMixin(FormGeneratorMixin):
|
|||
|
||||
return fieldsets
|
||||
|
||||
def clean(self):
|
||||
cleaned_data = super().clean()
|
||||
|
||||
for field_name, field in self.fields.items():
|
||||
if hasattr(field, "_controlplane_choices"):
|
||||
value = cleaned_data.get(field_name)
|
||||
if value:
|
||||
valid_values = [choice[0] for choice in field._controlplane_choices]
|
||||
if value not in valid_values:
|
||||
self.add_error(
|
||||
field_name,
|
||||
forms.ValidationError(
|
||||
f"'{value}' is not a valid choice. "
|
||||
f"Must be one of: {valid_values.join(', ')}"
|
||||
),
|
||||
)
|
||||
|
||||
return cleaned_data
|
||||
|
||||
def get_nested_data(self):
|
||||
nested = {}
|
||||
for field_name in self.fields.keys():
|
||||
167
src/servala/core/crd/models.py
Normal file
167
src/servala/core/crd/models.py
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
|
||||
from django.db import models
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from servala.core.crd.utils import deslugify
|
||||
from servala.core.models import ServiceInstance
|
||||
from servala.frontend.forms.widgets import DynamicArrayField
|
||||
|
||||
|
||||
class CRDModel(models.Model):
|
||||
"""Base class for all virtual CRD models"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if spec := kwargs.pop("spec", None):
|
||||
kwargs.update(unnest_data({"spec": spec}))
|
||||
super().__init__(**kwargs)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
|
||||
def generate_django_model(schema, group, version, kind):
|
||||
"""
|
||||
Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
|
||||
"""
|
||||
# We always need these three fields to know our own name and our full namespace
|
||||
model_fields = {"__module__": "crd_models"}
|
||||
for field_name in ("name", "context"):
|
||||
model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
|
||||
|
||||
# All other fields are generated from the schema, except for the
|
||||
# resourceRef object
|
||||
spec = schema["properties"].get("spec") or {}
|
||||
spec["properties"].pop("resourceRef", None)
|
||||
model_fields.update(build_object_fields(spec, "spec", parent_required=False))
|
||||
|
||||
# Store the original schema on the model class
|
||||
model_fields["SCHEMA"] = schema
|
||||
|
||||
meta_class = type("Meta", (), {"app_label": "crd_models"})
|
||||
model_fields["Meta"] = meta_class
|
||||
|
||||
# create the model class
|
||||
model_name = kind
|
||||
model_class = type(model_name, (CRDModel,), model_fields)
|
||||
return model_class
|
||||
|
||||
|
||||
def duplicate_field(field_name, model):
|
||||
field = model._meta.get_field(field_name)
|
||||
new_field = type(field).__new__(type(field))
|
||||
new_field.__dict__.update(field.__dict__)
|
||||
new_field.model = None
|
||||
new_field.auto_created = False
|
||||
return new_field
|
||||
|
||||
|
||||
def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
|
||||
required_fields = schema.get("required") or []
|
||||
properties = schema.get("properties") or {}
|
||||
fields = {}
|
||||
|
||||
for field_name, field_schema in properties.items():
|
||||
is_required = field_name in required_fields or parent_required
|
||||
full_name = f"{name}.{field_name}"
|
||||
result = get_django_field(
|
||||
field_schema,
|
||||
is_required,
|
||||
field_name,
|
||||
full_name,
|
||||
verbose_name_prefix=verbose_name_prefix,
|
||||
)
|
||||
if isinstance(result, dict):
|
||||
fields.update(result)
|
||||
else:
|
||||
fields[full_name] = result
|
||||
return fields
|
||||
|
||||
|
||||
def get_django_field(
|
||||
field_schema, is_required, field_name, full_name, verbose_name_prefix=None
|
||||
):
|
||||
field_type = field_schema.get("type") or "string"
|
||||
format = field_schema.get("format")
|
||||
verbose_name_prefix = verbose_name_prefix or ""
|
||||
verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
|
||||
|
||||
# Pass down the requirement status from parent to child fields
|
||||
kwargs = {
|
||||
"blank": not is_required, # All fields are optional by default
|
||||
"null": not is_required,
|
||||
"help_text": field_schema.get("description"),
|
||||
"validators": [],
|
||||
"verbose_name": verbose_name,
|
||||
"default": field_schema.get("default"),
|
||||
}
|
||||
|
||||
if minimum := field_schema.get("minimum"):
|
||||
kwargs["validators"].append(MinValueValidator(minimum))
|
||||
if maximum := field_schema.get("maximum"):
|
||||
kwargs["validators"].append(MaxValueValidator(maximum))
|
||||
|
||||
if field_type == "string":
|
||||
if format == "date-time":
|
||||
return models.DateTimeField(**kwargs)
|
||||
elif format == "date":
|
||||
return models.DateField(**kwargs)
|
||||
else:
|
||||
max_length = field_schema.get("max_length") or 255
|
||||
if pattern := field_schema.get("pattern"):
|
||||
kwargs["validators"].append(RegexValidator(regex=pattern))
|
||||
if choices := field_schema.get("enum"):
|
||||
kwargs["choices"] = ((choice, choice) for choice in choices)
|
||||
return models.CharField(max_length=max_length, **kwargs)
|
||||
elif field_type == "integer":
|
||||
return models.IntegerField(**kwargs)
|
||||
elif field_type == "number":
|
||||
return models.FloatField(**kwargs)
|
||||
elif field_type == "boolean":
|
||||
return models.BooleanField(**kwargs)
|
||||
elif field_type == "object":
|
||||
# Here we pass down the requirement status to nested objects
|
||||
return build_object_fields(
|
||||
field_schema,
|
||||
full_name,
|
||||
verbose_name_prefix=f"{verbose_name}:",
|
||||
parent_required=is_required,
|
||||
)
|
||||
elif field_type == "array":
|
||||
kwargs["help_text"] = field_schema.get("description") or _("List of values")
|
||||
field = models.JSONField(**kwargs)
|
||||
formfield_kwargs = {
|
||||
"label": field.verbose_name,
|
||||
"required": not field.blank,
|
||||
}
|
||||
|
||||
array_validation = {}
|
||||
if min_items := field_schema.get("min_items"):
|
||||
array_validation["min_items"] = min_items
|
||||
if max_items := field_schema.get("max_items"):
|
||||
array_validation["max_items"] = max_items
|
||||
if unique_items := field_schema.get("unique_items"):
|
||||
array_validation["unique_items"] = unique_items
|
||||
if items_schema := field_schema.get("items"):
|
||||
array_validation["items_schema"] = items_schema
|
||||
if array_validation:
|
||||
formfield_kwargs["array_validation"] = array_validation
|
||||
|
||||
field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
|
||||
|
||||
return field
|
||||
return models.CharField(max_length=255, **kwargs)
|
||||
|
||||
|
||||
def unnest_data(data):
|
||||
result = {}
|
||||
|
||||
def _flatten_dict(d, parent_key=""):
|
||||
for key, value in d.items():
|
||||
new_key = f"{parent_key}.{key}" if parent_key else key
|
||||
if isinstance(value, dict):
|
||||
_flatten_dict(value, new_key)
|
||||
else:
|
||||
result[new_key] = value
|
||||
|
||||
_flatten_dict(data)
|
||||
return result
|
||||
115
src/servala/core/crd/utils.py
Normal file
115
src/servala/core/crd/utils.py
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
import re
|
||||
|
||||
|
||||
def deslugify(title):
|
||||
"""
|
||||
Convert camelCase, PascalCase, or snake_case to human-readable title.
|
||||
Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
|
||||
"""
|
||||
ACRONYMS = {
|
||||
# Database systems
|
||||
"SQL": "SQL",
|
||||
"MYSQL": "MySQL",
|
||||
"POSTGRESQL": "PostgreSQL",
|
||||
"MARIADB": "MariaDB",
|
||||
"MSSQL": "MSSQL",
|
||||
"MONGODB": "MongoDB",
|
||||
"REDIS": "Redis",
|
||||
# Protocols
|
||||
"HTTP": "HTTP",
|
||||
"HTTPS": "HTTPS",
|
||||
"FTP": "FTP",
|
||||
"SFTP": "SFTP",
|
||||
"SSH": "SSH",
|
||||
"TLS": "TLS",
|
||||
"SSL": "SSL",
|
||||
# APIs
|
||||
"API": "API",
|
||||
"REST": "REST",
|
||||
"GRPC": "gRPC",
|
||||
"GRAPHQL": "GraphQL",
|
||||
# Networking
|
||||
"URL": "URL",
|
||||
"URI": "URI",
|
||||
"FQDN": "FQDN",
|
||||
"DNS": "DNS",
|
||||
"IP": "IP",
|
||||
"TCP": "TCP",
|
||||
"UDP": "UDP",
|
||||
# Data formats
|
||||
"JSON": "JSON",
|
||||
"XML": "XML",
|
||||
"YAML": "YAML",
|
||||
"CSV": "CSV",
|
||||
"HTML": "HTML",
|
||||
"CSS": "CSS",
|
||||
# Hardware
|
||||
"CPU": "CPU",
|
||||
"RAM": "RAM",
|
||||
"GPU": "GPU",
|
||||
"SSD": "SSD",
|
||||
"HDD": "HDD",
|
||||
# Identifiers
|
||||
"ID": "ID",
|
||||
"UUID": "UUID",
|
||||
"GUID": "GUID",
|
||||
"ARN": "ARN",
|
||||
# Cloud providers
|
||||
"AWS": "AWS",
|
||||
"GCP": "GCP",
|
||||
"AZURE": "Azure",
|
||||
"IBM": "IBM",
|
||||
# Kubernetes/Cloud
|
||||
"DB": "DB",
|
||||
"PVC": "PVC",
|
||||
"PV": "PV",
|
||||
"VPN": "VPN",
|
||||
# Auth
|
||||
"OS": "OS",
|
||||
"LDAP": "LDAP",
|
||||
"SAML": "SAML",
|
||||
"OAUTH": "OAuth",
|
||||
"JWT": "JWT",
|
||||
# AWS Services
|
||||
"S3": "S3",
|
||||
"EC2": "EC2",
|
||||
"RDS": "RDS",
|
||||
"EBS": "EBS",
|
||||
"IAM": "IAM",
|
||||
}
|
||||
|
||||
if "_" in title:
|
||||
# Handle snake_case
|
||||
title = title.replace("_", " ")
|
||||
words = title.split()
|
||||
else:
|
||||
# Handle camelCase/PascalCase with smart splitting
|
||||
# This regex splits on:
|
||||
# - Transition from lowercase to uppercase (camelCase)
|
||||
# - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
|
||||
words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
|
||||
|
||||
# Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
|
||||
merged_words = []
|
||||
i = 0
|
||||
while i < len(words):
|
||||
if i < len(words) - 1:
|
||||
# Check if current word + next word form a known acronym
|
||||
combined = (words[i] + words[i + 1]).upper()
|
||||
if combined in ACRONYMS:
|
||||
merged_words.append(combined)
|
||||
i += 2
|
||||
continue
|
||||
merged_words.append(words[i])
|
||||
i += 1
|
||||
|
||||
# Capitalize each word, using proper casing for known acronyms
|
||||
result = []
|
||||
for word in merged_words:
|
||||
word_upper = word.upper()
|
||||
if word_upper in ACRONYMS:
|
||||
result.append(ACRONYMS[word_upper])
|
||||
else:
|
||||
result.append(word.capitalize())
|
||||
|
||||
return " ".join(result)
|
||||
|
|
@ -218,6 +218,11 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
|
|||
)
|
||||
)
|
||||
|
||||
if field.get("type") == "choice" and field.get("choices"):
|
||||
self._validate_choice_field(
|
||||
field, mapping, spec_schema, "spec", errors
|
||||
)
|
||||
|
||||
if "name" not in included_mappings:
|
||||
raise forms.ValidationError(
|
||||
{
|
||||
|
|
@ -230,6 +235,62 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
|
|||
if errors:
|
||||
raise forms.ValidationError({"form_config": errors})
|
||||
|
||||
def _validate_choice_field(self, field, mapping, spec_schema, prefix, errors):
|
||||
if not mapping:
|
||||
return
|
||||
|
||||
field_schema = self._get_field_schema(spec_schema, mapping, prefix)
|
||||
if not field_schema:
|
||||
return
|
||||
|
||||
control_plane_choices = field_schema.get("enum", [])
|
||||
if not control_plane_choices:
|
||||
return
|
||||
|
||||
custom_choices = field.get("choices", [])
|
||||
custom_choice_values = [choice[0] for choice in custom_choices]
|
||||
|
||||
invalid_choices = [
|
||||
value
|
||||
for value in custom_choice_values
|
||||
if value not in control_plane_choices
|
||||
]
|
||||
|
||||
if invalid_choices:
|
||||
field_name = field.get("label", mapping)
|
||||
errors.append(
|
||||
_(
|
||||
"Field '{}' has invalid choice values: {}. "
|
||||
"Valid choices from control plane are: {}"
|
||||
).format(
|
||||
field_name,
|
||||
", ".join(f"'{c}'" for c in invalid_choices),
|
||||
", ".join(f"'{c}'" for c in control_plane_choices),
|
||||
)
|
||||
)
|
||||
|
||||
def _get_field_schema(self, schema, field_path, prefix):
|
||||
if not field_path or not schema:
|
||||
return None
|
||||
|
||||
if field_path.startswith(prefix + "."):
|
||||
field_path = field_path[len(prefix) + 1 :]
|
||||
|
||||
parts = field_path.split(".")
|
||||
current_schema = schema
|
||||
|
||||
for part in parts:
|
||||
if not isinstance(current_schema, dict):
|
||||
return None
|
||||
|
||||
properties = current_schema.get("properties", {})
|
||||
if part not in properties:
|
||||
return None
|
||||
|
||||
current_schema = properties[part]
|
||||
|
||||
return current_schema
|
||||
|
||||
def _extract_field_paths(self, schema, prefix=""):
|
||||
paths = set()
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ from django.db import migrations, models
|
|||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("core", "0011_alter_organizationorigin_billing_entity"),
|
||||
("core", "0012_convert_user_info_to_array"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
|
|
|
|||
|
|
@ -48,21 +48,21 @@
|
|||
"description": "Dot-notation path mapping to Kubernetes spec field (e.g., 'spec.parameters.service.fqdn')"
|
||||
},
|
||||
"max_length": {
|
||||
"type": "integer",
|
||||
"type": ["integer", "null"],
|
||||
"description": "Maximum length for text/textarea fields",
|
||||
"minimum": 1
|
||||
},
|
||||
"rows": {
|
||||
"type": "integer",
|
||||
"type": ["integer", "null"],
|
||||
"description": "Number of rows for textarea fields",
|
||||
"minimum": 1
|
||||
},
|
||||
"min_value": {
|
||||
"type": "number",
|
||||
"type": ["number", "null"],
|
||||
"description": "Minimum value for number fields"
|
||||
},
|
||||
"max_value": {
|
||||
"type": "number",
|
||||
"type": ["number", "null"],
|
||||
"description": "Maximum value for number fields"
|
||||
},
|
||||
"choices": {
|
||||
|
|
@ -76,12 +76,12 @@
|
|||
}
|
||||
},
|
||||
"min_values": {
|
||||
"type": "integer",
|
||||
"type": ["integer", "null"],
|
||||
"description": "Minimum number of values for array fields",
|
||||
"minimum": 0
|
||||
},
|
||||
"max_values": {
|
||||
"type": "integer",
|
||||
"type": ["integer", "null"],
|
||||
"description": "Maximum number of values for array fields",
|
||||
"minimum": 1
|
||||
},
|
||||
|
|
@ -92,6 +92,10 @@
|
|||
"type": "string",
|
||||
"enum": ["email", "fqdn", "url", "ipv4", "ipv6"]
|
||||
}
|
||||
},
|
||||
"default_value": {
|
||||
"type": "string",
|
||||
"description": "Default value for the field when creating new instances"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,10 +82,11 @@
|
|||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% if form and expert_form %}
|
||||
{% if expert_form %}
|
||||
<div id="expert-form-container"
|
||||
class="expert-crd-form"
|
||||
style="display:none">
|
||||
style="{% if form %}display:none{% endif %}">
|
||||
{% if expert_form and expert_form.context %}{{ expert_form.context }}{% endif %}
|
||||
<ul class="nav nav-tabs" id="expertTab" role="tablist">
|
||||
{% for fieldset in expert_form.get_fieldsets %}
|
||||
{% if not fieldset.hidden %}
|
||||
|
|
@ -143,6 +144,7 @@
|
|||
value="{% if form_submit_label %}{{ form_submit_label }}{% else %}{% translate "Save" %}{% endif %}" />
|
||||
</div>
|
||||
</form>
|
||||
<script defer src="{% static 'js/bootstrap-tabs.js' %}"></script>
|
||||
{% if form %}
|
||||
<script defer src="{% static 'js/expert-mode.js' %}"></script>
|
||||
{% endif %}
|
||||
|
|
|
|||
30
src/servala/static/js/bootstrap-tabs.js
vendored
Normal file
30
src/servala/static/js/bootstrap-tabs.js
vendored
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
// Bootstrap 5 automatically initializes tabs with data-bs-toggle="tab"
|
||||
// but we need to ensure they work after HTMX swaps
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
const initBootstrapTabs = () => {
|
||||
const customTabList = document.querySelectorAll('#myTab button[data-bs-toggle="tab"]');
|
||||
customTabList.forEach(function(tabButton) {
|
||||
new bootstrap.Tab(tabButton);
|
||||
});
|
||||
|
||||
const expertTabList = document.querySelectorAll('#expertTab button[data-bs-toggle="tab"]');
|
||||
expertTabList.forEach(function(tabButton) {
|
||||
new bootstrap.Tab(tabButton);
|
||||
});
|
||||
}
|
||||
|
||||
if (document.readyState === 'loading') {
|
||||
document.addEventListener('DOMContentLoaded', initBootstrapTabs);
|
||||
} else {
|
||||
initBootstrapTabs();
|
||||
}
|
||||
|
||||
document.addEventListener('htmx:afterSwap', function(event) {
|
||||
if (event.detail.target.id === 'service-form' ||
|
||||
event.detail.target.classList.contains('crd-form')) {
|
||||
initBootstrapTabs();
|
||||
}
|
||||
});
|
||||
})();
|
||||
635
src/tests/test_form_config.py
Normal file
635
src/tests/test_form_config.py
Normal file
|
|
@ -0,0 +1,635 @@
|
|||
from unittest.mock import Mock
|
||||
|
||||
import jsonschema
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||
from django.db import models
|
||||
|
||||
from servala.core.crd import generate_custom_form_class
|
||||
from servala.core.forms import ServiceDefinitionAdminForm
|
||||
from servala.core.models import ControlPlaneCRD
|
||||
|
||||
|
||||
def test_custom_model_form_class_is_none_when_no_form_config():
|
||||
crd = Mock(spec=ControlPlaneCRD)
|
||||
service_def = Mock()
|
||||
service_def.form_config = None
|
||||
crd.service_definition = service_def
|
||||
crd.django_model = Mock()
|
||||
|
||||
if not (
|
||||
crd.django_model
|
||||
and crd.service_definition
|
||||
and crd.service_definition.form_config
|
||||
and crd.service_definition.form_config.get("fieldsets")
|
||||
):
|
||||
result = None
|
||||
else:
|
||||
result = generate_custom_form_class(
|
||||
crd.service_definition.form_config, crd.django_model
|
||||
)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_custom_model_form_class_returns_class_when_form_config_exists():
|
||||
|
||||
crd = Mock(spec=ControlPlaneCRD)
|
||||
service_def = Mock()
|
||||
service_def.form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"title": "General",
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"required": True,
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
crd.service_definition = service_def
|
||||
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
crd.django_model = TestModel
|
||||
|
||||
if not (
|
||||
crd.django_model
|
||||
and crd.service_definition
|
||||
and crd.service_definition.form_config
|
||||
and crd.service_definition.form_config.get("fieldsets")
|
||||
):
|
||||
result = None
|
||||
else:
|
||||
result = generate_custom_form_class(
|
||||
crd.service_definition.form_config, crd.django_model
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
assert hasattr(result, "form_config")
|
||||
|
||||
|
||||
def test_form_config_schema_validates_minimal_config():
|
||||
form = ServiceDefinitionAdminForm()
|
||||
schema = form.form_config_schema
|
||||
|
||||
minimal_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Service Name",
|
||||
"controlplane_field_mapping": "spec.serviceName",
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
jsonschema.validate(instance=minimal_config, schema=schema)
|
||||
|
||||
|
||||
def test_form_config_schema_validates_config_with_null_integers():
|
||||
form = ServiceDefinitionAdminForm()
|
||||
schema = form.form_config_schema
|
||||
|
||||
config_with_nulls = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Service Name",
|
||||
"controlplane_field_mapping": "spec.serviceName",
|
||||
"max_length": None,
|
||||
"required": False,
|
||||
},
|
||||
{
|
||||
"type": "textarea",
|
||||
"label": "Description",
|
||||
"controlplane_field_mapping": "spec.description",
|
||||
"rows": None,
|
||||
"max_length": None,
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"label": "Port",
|
||||
"controlplane_field_mapping": "spec.port",
|
||||
"min_value": None,
|
||||
"max_value": None,
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
"label": "Tags",
|
||||
"controlplane_field_mapping": "spec.tags",
|
||||
"min_values": None,
|
||||
"max_values": None,
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
jsonschema.validate(instance=config_with_nulls, schema=schema)
|
||||
|
||||
|
||||
def test_form_config_schema_validates_full_config():
|
||||
form = ServiceDefinitionAdminForm()
|
||||
schema = form.form_config_schema
|
||||
|
||||
full_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"title": "Service Configuration",
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Service Name",
|
||||
"controlplane_field_mapping": "spec.serviceName",
|
||||
"help_text": "Enter a unique service name",
|
||||
"required": True,
|
||||
"max_length": 100,
|
||||
},
|
||||
{
|
||||
"type": "email",
|
||||
"label": "Admin Email",
|
||||
"controlplane_field_mapping": "spec.adminEmail",
|
||||
"help_text": "Contact email for service administrator",
|
||||
"required": True,
|
||||
"max_length": 255,
|
||||
},
|
||||
{
|
||||
"type": "textarea",
|
||||
"label": "Description",
|
||||
"controlplane_field_mapping": "spec.description",
|
||||
"help_text": "Describe the service purpose",
|
||||
"required": False,
|
||||
"rows": 5,
|
||||
"max_length": 500,
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"label": "Port",
|
||||
"controlplane_field_mapping": "spec.port",
|
||||
"help_text": "Service port number",
|
||||
"required": True,
|
||||
"min_value": 1,
|
||||
"max_value": 65535,
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "spec.environment",
|
||||
"help_text": "Deployment environment",
|
||||
"required": True,
|
||||
"choices": [
|
||||
["dev", "Development"],
|
||||
["staging", "Staging"],
|
||||
["prod", "Production"],
|
||||
],
|
||||
},
|
||||
{
|
||||
"type": "checkbox",
|
||||
"label": "Enable Monitoring",
|
||||
"controlplane_field_mapping": "spec.monitoring.enabled",
|
||||
"help_text": "Enable service monitoring",
|
||||
"required": False,
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
"label": "Tags",
|
||||
"controlplane_field_mapping": "spec.tags",
|
||||
"help_text": "Service tags for organization",
|
||||
"required": False,
|
||||
"min_values": 0,
|
||||
"max_values": 10,
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
jsonschema.validate(instance=full_config, schema=schema)
|
||||
|
||||
|
||||
def test_choice_field_uses_custom_choices_from_form_config():
|
||||
"""Test that choice fields use custom choices when provided in form_config"""
|
||||
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
environment = models.CharField(
|
||||
max_length=20,
|
||||
choices=[
|
||||
("dev", "Development"),
|
||||
("staging", "Staging"),
|
||||
("prod", "Production"),
|
||||
("test", "Testing"),
|
||||
],
|
||||
)
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"title": "General",
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "environment",
|
||||
"required": True,
|
||||
"choices": [["dev", "Development"], ["prod", "Production"]],
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
form_class = generate_custom_form_class(form_config, TestModel)
|
||||
form = form_class()
|
||||
|
||||
environment_field = form.fields["environment"]
|
||||
assert list(environment_field.choices) == [
|
||||
("dev", "Development"),
|
||||
("prod", "Production"),
|
||||
]
|
||||
|
||||
assert hasattr(environment_field, "_controlplane_choices")
|
||||
assert len(environment_field._controlplane_choices) == 5 # 4 choices + empty choice
|
||||
|
||||
|
||||
def test_choice_field_uses_control_plane_choices_when_no_custom_choices():
|
||||
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
environment = models.CharField(
|
||||
max_length=20,
|
||||
choices=[
|
||||
("dev", "Development"),
|
||||
("staging", "Staging"),
|
||||
("prod", "Production"),
|
||||
],
|
||||
)
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"title": "General",
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "environment",
|
||||
"required": True,
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
form_class = generate_custom_form_class(form_config, TestModel)
|
||||
form = form_class()
|
||||
|
||||
environment_field = form.fields["environment"]
|
||||
choices_list = list(environment_field.choices)
|
||||
assert len(choices_list) == 4 # 3 choices + empty choice
|
||||
assert ("dev", "Development") in choices_list
|
||||
|
||||
|
||||
def test_choice_field_validates_against_control_plane_choices():
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
environment = models.CharField(
|
||||
max_length=20,
|
||||
choices=[
|
||||
("dev", "Development"),
|
||||
("staging", "Staging"),
|
||||
("prod", "Production"),
|
||||
],
|
||||
)
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"title": "General",
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "environment",
|
||||
"required": True,
|
||||
"choices": [["dev", "Development"], ["prod", "Production"]],
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
form_class = generate_custom_form_class(form_config, TestModel)
|
||||
|
||||
form = form_class(data={"name": "test-service", "environment": "dev"})
|
||||
form.fields["context"].required = False # Skip context validation
|
||||
assert form.is_valid(), f"Form should be valid but has errors: {form.errors}"
|
||||
|
||||
form = form_class(data={"name": "test-service", "environment": "prod"})
|
||||
form.fields["context"].required = False # Skip context validation
|
||||
assert form.is_valid(), f"Form should be valid but has errors: {form.errors}"
|
||||
|
||||
form = form_class(data={"name": "test-service", "environment": "invalid"})
|
||||
form.fields["context"].required = False # Skip context validation
|
||||
assert not form.is_valid()
|
||||
assert "environment" in form.errors
|
||||
|
||||
|
||||
def test_admin_form_validates_choice_values_against_schema():
|
||||
|
||||
form = ServiceDefinitionAdminForm()
|
||||
mock_crd = Mock()
|
||||
mock_crd.resource_schema = {
|
||||
"properties": {
|
||||
"spec": {
|
||||
"properties": {
|
||||
"environment": {
|
||||
"type": "string",
|
||||
"enum": ["dev", "staging", "prod"],
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
valid_form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "spec.environment",
|
||||
"choices": [["dev", "Development"], ["prod", "Production"]],
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
spec_schema = mock_crd.resource_schema["properties"]["spec"]
|
||||
errors = []
|
||||
|
||||
for field in valid_form_config["fieldsets"][0]["fields"]:
|
||||
if field.get("type") == "choice":
|
||||
form._validate_choice_field(
|
||||
field, field["controlplane_field_mapping"], spec_schema, "spec", errors
|
||||
)
|
||||
|
||||
assert len(errors) == 0, f"Expected no errors but got: {errors}"
|
||||
|
||||
invalid_form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "spec.environment",
|
||||
"choices": [
|
||||
["dev", "Development"],
|
||||
["invalid", "Invalid Environment"],
|
||||
],
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
errors = []
|
||||
|
||||
for field in invalid_form_config["fieldsets"][0]["fields"]:
|
||||
if field.get("type") == "choice":
|
||||
form._validate_choice_field(
|
||||
field, field["controlplane_field_mapping"], spec_schema, "spec", errors
|
||||
)
|
||||
|
||||
assert len(errors) > 0, "Expected validation errors but got none"
|
||||
error_message = str(errors[0])
|
||||
assert "invalid" in error_message.lower()
|
||||
assert "Environment" in error_message
|
||||
|
||||
|
||||
def test_number_field_min_max_sets_widget_attributes():
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
port = models.IntegerField()
|
||||
replica_count = models.IntegerField()
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"title": "General",
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"required": True,
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"label": "Port",
|
||||
"controlplane_field_mapping": "port",
|
||||
"required": True,
|
||||
"min_value": 1,
|
||||
"max_value": 65535,
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"label": "Replicas",
|
||||
"controlplane_field_mapping": "replica_count",
|
||||
"required": True,
|
||||
"min_value": 1,
|
||||
"max_value": 10,
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
form_class = generate_custom_form_class(form_config, TestModel)
|
||||
form = form_class()
|
||||
|
||||
port_field = form.fields["port"]
|
||||
assert port_field.widget.attrs.get("min") == 1
|
||||
assert port_field.widget.attrs.get("max") == 65535
|
||||
|
||||
replica_field = form.fields["replica_count"]
|
||||
assert replica_field.widget.attrs.get("min") == 1
|
||||
assert replica_field.widget.attrs.get("max") == 10
|
||||
|
||||
port_validators = port_field.validators
|
||||
assert any(
|
||||
isinstance(v, MinValueValidator) and v.limit_value == 1 for v in port_validators
|
||||
)
|
||||
assert any(
|
||||
isinstance(v, MaxValueValidator) and v.limit_value == 65535
|
||||
for v in port_validators
|
||||
)
|
||||
|
||||
|
||||
def test_default_value_for_all_field_types():
|
||||
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
description = models.TextField()
|
||||
port = models.IntegerField()
|
||||
environment = models.CharField(
|
||||
max_length=20,
|
||||
choices=[
|
||||
("dev", "Development"),
|
||||
("staging", "Staging"),
|
||||
("prod", "Production"),
|
||||
],
|
||||
)
|
||||
monitoring_enabled = models.BooleanField()
|
||||
tags = models.JSONField()
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"default_value": "default-name",
|
||||
},
|
||||
{
|
||||
"type": "textarea",
|
||||
"label": "Description",
|
||||
"controlplane_field_mapping": "description",
|
||||
"default_value": "Default description text",
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"label": "Port",
|
||||
"controlplane_field_mapping": "port",
|
||||
"default_value": "8080",
|
||||
},
|
||||
{
|
||||
"type": "choice",
|
||||
"label": "Environment",
|
||||
"controlplane_field_mapping": "environment",
|
||||
"default_value": "dev",
|
||||
},
|
||||
{
|
||||
"type": "checkbox",
|
||||
"label": "Enable Monitoring",
|
||||
"controlplane_field_mapping": "monitoring_enabled",
|
||||
"default_value": "true",
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
"label": "Tags",
|
||||
"controlplane_field_mapping": "tags",
|
||||
"default_value": "tag1,tag2,tag3",
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
form_class = generate_custom_form_class(form_config, TestModel)
|
||||
form = form_class()
|
||||
|
||||
assert form.fields["name"].initial == "default-name"
|
||||
assert form.fields["description"].initial == "Default description text"
|
||||
assert form.fields["port"].initial == "8080"
|
||||
assert form.fields["environment"].initial == "dev"
|
||||
assert form.fields["monitoring_enabled"].initial == "true"
|
||||
assert form.fields["tags"].initial == "tag1,tag2,tag3"
|
||||
|
||||
|
||||
def test_default_value_not_override_existing_instance():
|
||||
|
||||
class TestModel(models.Model):
|
||||
name = models.CharField(max_length=100)
|
||||
port = models.IntegerField()
|
||||
|
||||
class Meta:
|
||||
app_label = "test"
|
||||
|
||||
form_config = {
|
||||
"fieldsets": [
|
||||
{
|
||||
"fields": [
|
||||
{
|
||||
"type": "text",
|
||||
"label": "Name",
|
||||
"controlplane_field_mapping": "name",
|
||||
"default_value": "default-name",
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"label": "Port",
|
||||
"controlplane_field_mapping": "port",
|
||||
"default_value": "8080",
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
instance = TestModel(name="existing-name", port=3000)
|
||||
form_class = generate_custom_form_class(form_config, TestModel)
|
||||
form = form_class(instance=instance)
|
||||
|
||||
assert form.initial["name"] == "existing-name"
|
||||
assert form.initial["port"] == 3000
|
||||
Loading…
Add table
Add a link
Reference in a new issue