Compare commits

..

No commits in common. "ece60ad3b198f31e43437507aae4298b8937b9b8" and "ca485978b93b8997bdbe774127503dcdc1765e9f" have entirely different histories.

10 changed files with 293 additions and 1094 deletions

View file

@ -1,18 +1,291 @@
from django import forms
from django.core.validators import MaxValueValidator, MinValueValidator
from django.forms.models import ModelForm, ModelFormMetaclass
import re
from servala.core.crd.utils import deslugify
from servala.core.models import ControlPlaneCRD
from servala.frontend.forms.widgets import DynamicArrayWidget
from django import forms
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.db import models
from django.forms.models import ModelForm, ModelFormMetaclass
from django.utils.translation import gettext_lazy as _
from servala.core.models import ControlPlaneCRD, ServiceInstance
from servala.frontend.forms.widgets import DynamicArrayField, DynamicArrayWidget
class CRDModel(models.Model):
"""Base class for all virtual CRD models"""
def __init__(self, **kwargs):
if spec := kwargs.pop("spec", None):
kwargs.update(unnest_data({"spec": spec}))
super().__init__(**kwargs)
class Meta:
abstract = True
def duplicate_field(field_name, model):
field = model._meta.get_field(field_name)
new_field = type(field).__new__(type(field))
new_field.__dict__.update(field.__dict__)
new_field.model = None
new_field.auto_created = False
return new_field
def generate_django_model(schema, group, version, kind):
"""
Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
"""
# We always need these three fields to know our own name and our full namespace
model_fields = {"__module__": "crd_models"}
for field_name in ("name", "context"):
model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
# All other fields are generated from the schema, except for the
# resourceRef object
spec = schema["properties"].get("spec") or {}
spec["properties"].pop("resourceRef", None)
model_fields.update(build_object_fields(spec, "spec", parent_required=False))
# Store the original schema on the model class
model_fields["SCHEMA"] = schema
meta_class = type("Meta", (), {"app_label": "crd_models"})
model_fields["Meta"] = meta_class
# create the model class
model_name = kind
model_class = type(model_name, (CRDModel,), model_fields)
return model_class
def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
required_fields = schema.get("required") or []
properties = schema.get("properties") or {}
fields = {}
for field_name, field_schema in properties.items():
is_required = field_name in required_fields or parent_required
full_name = f"{name}.{field_name}"
result = get_django_field(
field_schema,
is_required,
field_name,
full_name,
verbose_name_prefix=verbose_name_prefix,
)
if isinstance(result, dict):
fields.update(result)
else:
fields[full_name] = result
return fields
def deslugify(title):
"""
Convert camelCase, PascalCase, or snake_case to human-readable title.
Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
"""
ACRONYMS = {
# Database systems
"SQL": "SQL",
"MYSQL": "MySQL",
"POSTGRESQL": "PostgreSQL",
"MARIADB": "MariaDB",
"MSSQL": "MSSQL",
"MONGODB": "MongoDB",
"REDIS": "Redis",
# Protocols
"HTTP": "HTTP",
"HTTPS": "HTTPS",
"FTP": "FTP",
"SFTP": "SFTP",
"SSH": "SSH",
"TLS": "TLS",
"SSL": "SSL",
# APIs
"API": "API",
"REST": "REST",
"GRPC": "gRPC",
"GRAPHQL": "GraphQL",
# Networking
"URL": "URL",
"URI": "URI",
"FQDN": "FQDN",
"DNS": "DNS",
"IP": "IP",
"TCP": "TCP",
"UDP": "UDP",
# Data formats
"JSON": "JSON",
"XML": "XML",
"YAML": "YAML",
"CSV": "CSV",
"HTML": "HTML",
"CSS": "CSS",
# Hardware
"CPU": "CPU",
"RAM": "RAM",
"GPU": "GPU",
"SSD": "SSD",
"HDD": "HDD",
# Identifiers
"ID": "ID",
"UUID": "UUID",
"GUID": "GUID",
"ARN": "ARN",
# Cloud providers
"AWS": "AWS",
"GCP": "GCP",
"AZURE": "Azure",
"IBM": "IBM",
# Kubernetes/Cloud
"DB": "DB",
"PVC": "PVC",
"PV": "PV",
"VPN": "VPN",
# Auth
"OS": "OS",
"LDAP": "LDAP",
"SAML": "SAML",
"OAUTH": "OAuth",
"JWT": "JWT",
# AWS Services
"S3": "S3",
"EC2": "EC2",
"RDS": "RDS",
"EBS": "EBS",
"IAM": "IAM",
}
if "_" in title:
# Handle snake_case
title = title.replace("_", " ")
words = title.split()
else:
# Handle camelCase/PascalCase with smart splitting
# This regex splits on:
# - Transition from lowercase to uppercase (camelCase)
# - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
# Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
merged_words = []
i = 0
while i < len(words):
if i < len(words) - 1:
# Check if current word + next word form a known acronym
combined = (words[i] + words[i + 1]).upper()
if combined in ACRONYMS:
merged_words.append(combined)
i += 2
continue
merged_words.append(words[i])
i += 1
# Capitalize each word, using proper casing for known acronyms
result = []
for word in merged_words:
word_upper = word.upper()
if word_upper in ACRONYMS:
result.append(ACRONYMS[word_upper])
else:
result.append(word.capitalize())
return " ".join(result)
def get_django_field(
field_schema, is_required, field_name, full_name, verbose_name_prefix=None
):
field_type = field_schema.get("type") or "string"
format = field_schema.get("format")
verbose_name_prefix = verbose_name_prefix or ""
verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
# Pass down the requirement status from parent to child fields
kwargs = {
"blank": not is_required, # All fields are optional by default
"null": not is_required,
"help_text": field_schema.get("description"),
"validators": [],
"verbose_name": verbose_name,
"default": field_schema.get("default"),
}
if minimum := field_schema.get("minimum"):
kwargs["validators"].append(MinValueValidator(minimum))
if maximum := field_schema.get("maximum"):
kwargs["validators"].append(MaxValueValidator(maximum))
if field_type == "string":
if format == "date-time":
return models.DateTimeField(**kwargs)
elif format == "date":
return models.DateField(**kwargs)
else:
max_length = field_schema.get("max_length") or 255
if pattern := field_schema.get("pattern"):
kwargs["validators"].append(RegexValidator(regex=pattern))
if choices := field_schema.get("enum"):
kwargs["choices"] = ((choice, choice) for choice in choices)
return models.CharField(max_length=max_length, **kwargs)
elif field_type == "integer":
return models.IntegerField(**kwargs)
elif field_type == "number":
return models.FloatField(**kwargs)
elif field_type == "boolean":
return models.BooleanField(**kwargs)
elif field_type == "object":
# Here we pass down the requirement status to nested objects
return build_object_fields(
field_schema,
full_name,
verbose_name_prefix=f"{verbose_name}:",
parent_required=is_required,
)
elif field_type == "array":
kwargs["help_text"] = field_schema.get("description") or _("List of values")
field = models.JSONField(**kwargs)
formfield_kwargs = {
"label": field.verbose_name,
"required": not field.blank,
}
array_validation = {}
if min_items := field_schema.get("min_items"):
array_validation["min_items"] = min_items
if max_items := field_schema.get("max_items"):
array_validation["max_items"] = max_items
if unique_items := field_schema.get("unique_items"):
array_validation["unique_items"] = unique_items
if items_schema := field_schema.get("items"):
array_validation["items_schema"] = items_schema
if array_validation:
formfield_kwargs["array_validation"] = array_validation
field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
return field
return models.CharField(max_length=255, **kwargs)
def unnest_data(data):
result = {}
def _flatten_dict(d, parent_key=""):
for key, value in d.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
_flatten_dict(value, new_key)
else:
result[new_key] = value
_flatten_dict(data)
return result
class FormGeneratorMixin:
"""Shared base class for ModelForm classes based on our generated CRD models.
There are two relevant child classes:
- CrdModelFormMixin: For fully auto-generated forms from the spec
- CustomFormMixin: For forms built from form_config settings.
"""
IS_CUSTOM_FORM = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -297,11 +570,6 @@ class CustomFormMixin(FormGeneratorMixin):
)
elif field_type == "array":
field.widget = DynamicArrayWidget()
elif field_type == "choice":
if hasattr(field, "choices") and field.choices:
field._controlplane_choices = list(field.choices)
if custom_choices := field_config.get("choices"):
field.choices = [tuple(choice) for choice in custom_choices]
if field_type == "number":
min_val = field_config.get("min_value")
@ -310,17 +578,12 @@ class CustomFormMixin(FormGeneratorMixin):
validators = []
if min_val is not None:
validators.append(MinValueValidator(min_val))
field.widget.attrs["min"] = min_val
if max_val is not None:
validators.append(MaxValueValidator(max_val))
field.widget.attrs["max"] = max_val
if validators:
field.validators.extend(validators)
if "default_value" in field_config and field.initial is None:
field.initial = field_config["default_value"]
field.controlplane_field_mapping = field_name
def get_fieldsets(self):
@ -340,25 +603,6 @@ class CustomFormMixin(FormGeneratorMixin):
return fieldsets
def clean(self):
cleaned_data = super().clean()
for field_name, field in self.fields.items():
if hasattr(field, "_controlplane_choices"):
value = cleaned_data.get(field_name)
if value:
valid_values = [choice[0] for choice in field._controlplane_choices]
if value not in valid_values:
self.add_error(
field_name,
forms.ValidationError(
f"'{value}' is not a valid choice. "
f"Must be one of: {valid_values.join(', ')}"
),
)
return cleaned_data
def get_nested_data(self):
nested = {}
for field_name in self.fields.keys():

View file

@ -1,31 +0,0 @@
from servala.core.crd.forms import (
CrdModelFormMixin,
CustomFormMixin,
FormGeneratorMixin,
generate_custom_form_class,
generate_model_form_class,
)
from servala.core.crd.models import (
CRDModel,
build_object_fields,
duplicate_field,
generate_django_model,
get_django_field,
unnest_data,
)
from servala.core.crd.utils import deslugify
__all__ = [
"CrdModelFormMixin",
"CustomFormMixin",
"FormGeneratorMixin",
"generate_django_model",
"generate_model_form_class",
"generate_custom_form_class",
"CRDModel",
"build_object_fields",
"duplicate_field",
"get_django_field",
"unnest_data",
"deslugify",
]

View file

@ -1,167 +0,0 @@
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.db import models
from django.utils.translation import gettext_lazy as _
from servala.core.crd.utils import deslugify
from servala.core.models import ServiceInstance
from servala.frontend.forms.widgets import DynamicArrayField
class CRDModel(models.Model):
"""Base class for all virtual CRD models"""
def __init__(self, **kwargs):
if spec := kwargs.pop("spec", None):
kwargs.update(unnest_data({"spec": spec}))
super().__init__(**kwargs)
class Meta:
abstract = True
def generate_django_model(schema, group, version, kind):
"""
Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
"""
# We always need these three fields to know our own name and our full namespace
model_fields = {"__module__": "crd_models"}
for field_name in ("name", "context"):
model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
# All other fields are generated from the schema, except for the
# resourceRef object
spec = schema["properties"].get("spec") or {}
spec["properties"].pop("resourceRef", None)
model_fields.update(build_object_fields(spec, "spec", parent_required=False))
# Store the original schema on the model class
model_fields["SCHEMA"] = schema
meta_class = type("Meta", (), {"app_label": "crd_models"})
model_fields["Meta"] = meta_class
# create the model class
model_name = kind
model_class = type(model_name, (CRDModel,), model_fields)
return model_class
def duplicate_field(field_name, model):
field = model._meta.get_field(field_name)
new_field = type(field).__new__(type(field))
new_field.__dict__.update(field.__dict__)
new_field.model = None
new_field.auto_created = False
return new_field
def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
required_fields = schema.get("required") or []
properties = schema.get("properties") or {}
fields = {}
for field_name, field_schema in properties.items():
is_required = field_name in required_fields or parent_required
full_name = f"{name}.{field_name}"
result = get_django_field(
field_schema,
is_required,
field_name,
full_name,
verbose_name_prefix=verbose_name_prefix,
)
if isinstance(result, dict):
fields.update(result)
else:
fields[full_name] = result
return fields
def get_django_field(
field_schema, is_required, field_name, full_name, verbose_name_prefix=None
):
field_type = field_schema.get("type") or "string"
format = field_schema.get("format")
verbose_name_prefix = verbose_name_prefix or ""
verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
# Pass down the requirement status from parent to child fields
kwargs = {
"blank": not is_required, # All fields are optional by default
"null": not is_required,
"help_text": field_schema.get("description"),
"validators": [],
"verbose_name": verbose_name,
"default": field_schema.get("default"),
}
if minimum := field_schema.get("minimum"):
kwargs["validators"].append(MinValueValidator(minimum))
if maximum := field_schema.get("maximum"):
kwargs["validators"].append(MaxValueValidator(maximum))
if field_type == "string":
if format == "date-time":
return models.DateTimeField(**kwargs)
elif format == "date":
return models.DateField(**kwargs)
else:
max_length = field_schema.get("max_length") or 255
if pattern := field_schema.get("pattern"):
kwargs["validators"].append(RegexValidator(regex=pattern))
if choices := field_schema.get("enum"):
kwargs["choices"] = ((choice, choice) for choice in choices)
return models.CharField(max_length=max_length, **kwargs)
elif field_type == "integer":
return models.IntegerField(**kwargs)
elif field_type == "number":
return models.FloatField(**kwargs)
elif field_type == "boolean":
return models.BooleanField(**kwargs)
elif field_type == "object":
# Here we pass down the requirement status to nested objects
return build_object_fields(
field_schema,
full_name,
verbose_name_prefix=f"{verbose_name}:",
parent_required=is_required,
)
elif field_type == "array":
kwargs["help_text"] = field_schema.get("description") or _("List of values")
field = models.JSONField(**kwargs)
formfield_kwargs = {
"label": field.verbose_name,
"required": not field.blank,
}
array_validation = {}
if min_items := field_schema.get("min_items"):
array_validation["min_items"] = min_items
if max_items := field_schema.get("max_items"):
array_validation["max_items"] = max_items
if unique_items := field_schema.get("unique_items"):
array_validation["unique_items"] = unique_items
if items_schema := field_schema.get("items"):
array_validation["items_schema"] = items_schema
if array_validation:
formfield_kwargs["array_validation"] = array_validation
field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
return field
return models.CharField(max_length=255, **kwargs)
def unnest_data(data):
result = {}
def _flatten_dict(d, parent_key=""):
for key, value in d.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
_flatten_dict(value, new_key)
else:
result[new_key] = value
_flatten_dict(data)
return result

View file

@ -1,115 +0,0 @@
import re
def deslugify(title):
"""
Convert camelCase, PascalCase, or snake_case to human-readable title.
Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
"""
ACRONYMS = {
# Database systems
"SQL": "SQL",
"MYSQL": "MySQL",
"POSTGRESQL": "PostgreSQL",
"MARIADB": "MariaDB",
"MSSQL": "MSSQL",
"MONGODB": "MongoDB",
"REDIS": "Redis",
# Protocols
"HTTP": "HTTP",
"HTTPS": "HTTPS",
"FTP": "FTP",
"SFTP": "SFTP",
"SSH": "SSH",
"TLS": "TLS",
"SSL": "SSL",
# APIs
"API": "API",
"REST": "REST",
"GRPC": "gRPC",
"GRAPHQL": "GraphQL",
# Networking
"URL": "URL",
"URI": "URI",
"FQDN": "FQDN",
"DNS": "DNS",
"IP": "IP",
"TCP": "TCP",
"UDP": "UDP",
# Data formats
"JSON": "JSON",
"XML": "XML",
"YAML": "YAML",
"CSV": "CSV",
"HTML": "HTML",
"CSS": "CSS",
# Hardware
"CPU": "CPU",
"RAM": "RAM",
"GPU": "GPU",
"SSD": "SSD",
"HDD": "HDD",
# Identifiers
"ID": "ID",
"UUID": "UUID",
"GUID": "GUID",
"ARN": "ARN",
# Cloud providers
"AWS": "AWS",
"GCP": "GCP",
"AZURE": "Azure",
"IBM": "IBM",
# Kubernetes/Cloud
"DB": "DB",
"PVC": "PVC",
"PV": "PV",
"VPN": "VPN",
# Auth
"OS": "OS",
"LDAP": "LDAP",
"SAML": "SAML",
"OAUTH": "OAuth",
"JWT": "JWT",
# AWS Services
"S3": "S3",
"EC2": "EC2",
"RDS": "RDS",
"EBS": "EBS",
"IAM": "IAM",
}
if "_" in title:
# Handle snake_case
title = title.replace("_", " ")
words = title.split()
else:
# Handle camelCase/PascalCase with smart splitting
# This regex splits on:
# - Transition from lowercase to uppercase (camelCase)
# - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
# Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
merged_words = []
i = 0
while i < len(words):
if i < len(words) - 1:
# Check if current word + next word form a known acronym
combined = (words[i] + words[i + 1]).upper()
if combined in ACRONYMS:
merged_words.append(combined)
i += 2
continue
merged_words.append(words[i])
i += 1
# Capitalize each word, using proper casing for known acronyms
result = []
for word in merged_words:
word_upper = word.upper()
if word_upper in ACRONYMS:
result.append(ACRONYMS[word_upper])
else:
result.append(word.capitalize())
return " ".join(result)

View file

@ -218,11 +218,6 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
)
)
if field.get("type") == "choice" and field.get("choices"):
self._validate_choice_field(
field, mapping, spec_schema, "spec", errors
)
if "name" not in included_mappings:
raise forms.ValidationError(
{
@ -235,62 +230,6 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
if errors:
raise forms.ValidationError({"form_config": errors})
def _validate_choice_field(self, field, mapping, spec_schema, prefix, errors):
if not mapping:
return
field_schema = self._get_field_schema(spec_schema, mapping, prefix)
if not field_schema:
return
control_plane_choices = field_schema.get("enum", [])
if not control_plane_choices:
return
custom_choices = field.get("choices", [])
custom_choice_values = [choice[0] for choice in custom_choices]
invalid_choices = [
value
for value in custom_choice_values
if value not in control_plane_choices
]
if invalid_choices:
field_name = field.get("label", mapping)
errors.append(
_(
"Field '{}' has invalid choice values: {}. "
"Valid choices from control plane are: {}"
).format(
field_name,
", ".join(f"'{c}'" for c in invalid_choices),
", ".join(f"'{c}'" for c in control_plane_choices),
)
)
def _get_field_schema(self, schema, field_path, prefix):
if not field_path or not schema:
return None
if field_path.startswith(prefix + "."):
field_path = field_path[len(prefix) + 1 :]
parts = field_path.split(".")
current_schema = schema
for part in parts:
if not isinstance(current_schema, dict):
return None
properties = current_schema.get("properties", {})
if part not in properties:
return None
current_schema = properties[part]
return current_schema
def _extract_field_paths(self, schema, prefix=""):
paths = set()

View file

@ -7,7 +7,7 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("core", "0012_convert_user_info_to_array"),
("core", "0011_alter_organizationorigin_billing_entity"),
]
operations = [

View file

@ -48,21 +48,21 @@
"description": "Dot-notation path mapping to Kubernetes spec field (e.g., 'spec.parameters.service.fqdn')"
},
"max_length": {
"type": ["integer", "null"],
"type": "integer",
"description": "Maximum length for text/textarea fields",
"minimum": 1
},
"rows": {
"type": ["integer", "null"],
"type": "integer",
"description": "Number of rows for textarea fields",
"minimum": 1
},
"min_value": {
"type": ["number", "null"],
"type": "number",
"description": "Minimum value for number fields"
},
"max_value": {
"type": ["number", "null"],
"type": "number",
"description": "Maximum value for number fields"
},
"choices": {
@ -76,12 +76,12 @@
}
},
"min_values": {
"type": ["integer", "null"],
"type": "integer",
"description": "Minimum number of values for array fields",
"minimum": 0
},
"max_values": {
"type": ["integer", "null"],
"type": "integer",
"description": "Maximum number of values for array fields",
"minimum": 1
},
@ -92,10 +92,6 @@
"type": "string",
"enum": ["email", "fqdn", "url", "ipv4", "ipv6"]
}
},
"default_value": {
"type": "string",
"description": "Default value for the field when creating new instances"
}
}
}

View file

@ -82,11 +82,10 @@
</div>
{% endif %}
</div>
{% if expert_form %}
{% if form and expert_form %}
<div id="expert-form-container"
class="expert-crd-form"
style="{% if form %}display:none{% endif %}">
{% if expert_form and expert_form.context %}{{ expert_form.context }}{% endif %}
style="display:none">
<ul class="nav nav-tabs" id="expertTab" role="tablist">
{% for fieldset in expert_form.get_fieldsets %}
{% if not fieldset.hidden %}
@ -144,7 +143,6 @@
value="{% if form_submit_label %}{{ form_submit_label }}{% else %}{% translate "Save" %}{% endif %}" />
</div>
</form>
<script defer src="{% static 'js/bootstrap-tabs.js' %}"></script>
{% if form %}
<script defer src="{% static 'js/expert-mode.js' %}"></script>
{% endif %}

View file

@ -1,30 +0,0 @@
// Bootstrap 5 automatically initializes tabs with data-bs-toggle="tab"
// but we need to ensure they work after HTMX swaps
(function() {
'use strict';
const initBootstrapTabs = () => {
const customTabList = document.querySelectorAll('#myTab button[data-bs-toggle="tab"]');
customTabList.forEach(function(tabButton) {
new bootstrap.Tab(tabButton);
});
const expertTabList = document.querySelectorAll('#expertTab button[data-bs-toggle="tab"]');
expertTabList.forEach(function(tabButton) {
new bootstrap.Tab(tabButton);
});
}
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', initBootstrapTabs);
} else {
initBootstrapTabs();
}
document.addEventListener('htmx:afterSwap', function(event) {
if (event.detail.target.id === 'service-form' ||
event.detail.target.classList.contains('crd-form')) {
initBootstrapTabs();
}
});
})();

View file

@ -1,635 +0,0 @@
from unittest.mock import Mock
import jsonschema
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from servala.core.crd import generate_custom_form_class
from servala.core.forms import ServiceDefinitionAdminForm
from servala.core.models import ControlPlaneCRD
def test_custom_model_form_class_is_none_when_no_form_config():
crd = Mock(spec=ControlPlaneCRD)
service_def = Mock()
service_def.form_config = None
crd.service_definition = service_def
crd.django_model = Mock()
if not (
crd.django_model
and crd.service_definition
and crd.service_definition.form_config
and crd.service_definition.form_config.get("fieldsets")
):
result = None
else:
result = generate_custom_form_class(
crd.service_definition.form_config, crd.django_model
)
assert result is None
def test_custom_model_form_class_returns_class_when_form_config_exists():
crd = Mock(spec=ControlPlaneCRD)
service_def = Mock()
service_def.form_config = {
"fieldsets": [
{
"title": "General",
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"required": True,
}
],
}
]
}
crd.service_definition = service_def
class TestModel(models.Model):
name = models.CharField(max_length=100)
class Meta:
app_label = "test"
crd.django_model = TestModel
if not (
crd.django_model
and crd.service_definition
and crd.service_definition.form_config
and crd.service_definition.form_config.get("fieldsets")
):
result = None
else:
result = generate_custom_form_class(
crd.service_definition.form_config, crd.django_model
)
assert result is not None
assert hasattr(result, "form_config")
def test_form_config_schema_validates_minimal_config():
form = ServiceDefinitionAdminForm()
schema = form.form_config_schema
minimal_config = {
"fieldsets": [
{
"fields": [
{
"type": "text",
"label": "Service Name",
"controlplane_field_mapping": "spec.serviceName",
}
]
}
]
}
jsonschema.validate(instance=minimal_config, schema=schema)
def test_form_config_schema_validates_config_with_null_integers():
form = ServiceDefinitionAdminForm()
schema = form.form_config_schema
config_with_nulls = {
"fieldsets": [
{
"fields": [
{
"type": "text",
"label": "Service Name",
"controlplane_field_mapping": "spec.serviceName",
"max_length": None,
"required": False,
},
{
"type": "textarea",
"label": "Description",
"controlplane_field_mapping": "spec.description",
"rows": None,
"max_length": None,
},
{
"type": "number",
"label": "Port",
"controlplane_field_mapping": "spec.port",
"min_value": None,
"max_value": None,
},
{
"type": "array",
"label": "Tags",
"controlplane_field_mapping": "spec.tags",
"min_values": None,
"max_values": None,
},
]
}
]
}
jsonschema.validate(instance=config_with_nulls, schema=schema)
def test_form_config_schema_validates_full_config():
form = ServiceDefinitionAdminForm()
schema = form.form_config_schema
full_config = {
"fieldsets": [
{
"title": "Service Configuration",
"fields": [
{
"type": "text",
"label": "Service Name",
"controlplane_field_mapping": "spec.serviceName",
"help_text": "Enter a unique service name",
"required": True,
"max_length": 100,
},
{
"type": "email",
"label": "Admin Email",
"controlplane_field_mapping": "spec.adminEmail",
"help_text": "Contact email for service administrator",
"required": True,
"max_length": 255,
},
{
"type": "textarea",
"label": "Description",
"controlplane_field_mapping": "spec.description",
"help_text": "Describe the service purpose",
"required": False,
"rows": 5,
"max_length": 500,
},
{
"type": "number",
"label": "Port",
"controlplane_field_mapping": "spec.port",
"help_text": "Service port number",
"required": True,
"min_value": 1,
"max_value": 65535,
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "spec.environment",
"help_text": "Deployment environment",
"required": True,
"choices": [
["dev", "Development"],
["staging", "Staging"],
["prod", "Production"],
],
},
{
"type": "checkbox",
"label": "Enable Monitoring",
"controlplane_field_mapping": "spec.monitoring.enabled",
"help_text": "Enable service monitoring",
"required": False,
},
{
"type": "array",
"label": "Tags",
"controlplane_field_mapping": "spec.tags",
"help_text": "Service tags for organization",
"required": False,
"min_values": 0,
"max_values": 10,
},
],
}
]
}
jsonschema.validate(instance=full_config, schema=schema)
def test_choice_field_uses_custom_choices_from_form_config():
"""Test that choice fields use custom choices when provided in form_config"""
class TestModel(models.Model):
name = models.CharField(max_length=100)
environment = models.CharField(
max_length=20,
choices=[
("dev", "Development"),
("staging", "Staging"),
("prod", "Production"),
("test", "Testing"),
],
)
class Meta:
app_label = "test"
form_config = {
"fieldsets": [
{
"title": "General",
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"required": True,
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "environment",
"required": True,
"choices": [["dev", "Development"], ["prod", "Production"]],
},
],
}
]
}
form_class = generate_custom_form_class(form_config, TestModel)
form = form_class()
environment_field = form.fields["environment"]
assert list(environment_field.choices) == [
("dev", "Development"),
("prod", "Production"),
]
assert hasattr(environment_field, "_controlplane_choices")
assert len(environment_field._controlplane_choices) == 5 # 4 choices + empty choice
def test_choice_field_uses_control_plane_choices_when_no_custom_choices():
class TestModel(models.Model):
name = models.CharField(max_length=100)
environment = models.CharField(
max_length=20,
choices=[
("dev", "Development"),
("staging", "Staging"),
("prod", "Production"),
],
)
class Meta:
app_label = "test"
form_config = {
"fieldsets": [
{
"title": "General",
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"required": True,
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "environment",
"required": True,
},
],
}
]
}
form_class = generate_custom_form_class(form_config, TestModel)
form = form_class()
environment_field = form.fields["environment"]
choices_list = list(environment_field.choices)
assert len(choices_list) == 4 # 3 choices + empty choice
assert ("dev", "Development") in choices_list
def test_choice_field_validates_against_control_plane_choices():
class TestModel(models.Model):
name = models.CharField(max_length=100)
environment = models.CharField(
max_length=20,
choices=[
("dev", "Development"),
("staging", "Staging"),
("prod", "Production"),
],
)
class Meta:
app_label = "test"
form_config = {
"fieldsets": [
{
"title": "General",
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"required": True,
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "environment",
"required": True,
"choices": [["dev", "Development"], ["prod", "Production"]],
},
],
}
]
}
form_class = generate_custom_form_class(form_config, TestModel)
form = form_class(data={"name": "test-service", "environment": "dev"})
form.fields["context"].required = False # Skip context validation
assert form.is_valid(), f"Form should be valid but has errors: {form.errors}"
form = form_class(data={"name": "test-service", "environment": "prod"})
form.fields["context"].required = False # Skip context validation
assert form.is_valid(), f"Form should be valid but has errors: {form.errors}"
form = form_class(data={"name": "test-service", "environment": "invalid"})
form.fields["context"].required = False # Skip context validation
assert not form.is_valid()
assert "environment" in form.errors
def test_admin_form_validates_choice_values_against_schema():
form = ServiceDefinitionAdminForm()
mock_crd = Mock()
mock_crd.resource_schema = {
"properties": {
"spec": {
"properties": {
"environment": {
"type": "string",
"enum": ["dev", "staging", "prod"],
}
}
}
}
}
valid_form_config = {
"fieldsets": [
{
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "spec.environment",
"choices": [["dev", "Development"], ["prod", "Production"]],
},
]
}
]
}
spec_schema = mock_crd.resource_schema["properties"]["spec"]
errors = []
for field in valid_form_config["fieldsets"][0]["fields"]:
if field.get("type") == "choice":
form._validate_choice_field(
field, field["controlplane_field_mapping"], spec_schema, "spec", errors
)
assert len(errors) == 0, f"Expected no errors but got: {errors}"
invalid_form_config = {
"fieldsets": [
{
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "spec.environment",
"choices": [
["dev", "Development"],
["invalid", "Invalid Environment"],
],
},
]
}
]
}
errors = []
for field in invalid_form_config["fieldsets"][0]["fields"]:
if field.get("type") == "choice":
form._validate_choice_field(
field, field["controlplane_field_mapping"], spec_schema, "spec", errors
)
assert len(errors) > 0, "Expected validation errors but got none"
error_message = str(errors[0])
assert "invalid" in error_message.lower()
assert "Environment" in error_message
def test_number_field_min_max_sets_widget_attributes():
class TestModel(models.Model):
name = models.CharField(max_length=100)
port = models.IntegerField()
replica_count = models.IntegerField()
class Meta:
app_label = "test"
form_config = {
"fieldsets": [
{
"title": "General",
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"required": True,
},
{
"type": "number",
"label": "Port",
"controlplane_field_mapping": "port",
"required": True,
"min_value": 1,
"max_value": 65535,
},
{
"type": "number",
"label": "Replicas",
"controlplane_field_mapping": "replica_count",
"required": True,
"min_value": 1,
"max_value": 10,
},
],
}
]
}
form_class = generate_custom_form_class(form_config, TestModel)
form = form_class()
port_field = form.fields["port"]
assert port_field.widget.attrs.get("min") == 1
assert port_field.widget.attrs.get("max") == 65535
replica_field = form.fields["replica_count"]
assert replica_field.widget.attrs.get("min") == 1
assert replica_field.widget.attrs.get("max") == 10
port_validators = port_field.validators
assert any(
isinstance(v, MinValueValidator) and v.limit_value == 1 for v in port_validators
)
assert any(
isinstance(v, MaxValueValidator) and v.limit_value == 65535
for v in port_validators
)
def test_default_value_for_all_field_types():
class TestModel(models.Model):
name = models.CharField(max_length=100)
description = models.TextField()
port = models.IntegerField()
environment = models.CharField(
max_length=20,
choices=[
("dev", "Development"),
("staging", "Staging"),
("prod", "Production"),
],
)
monitoring_enabled = models.BooleanField()
tags = models.JSONField()
class Meta:
app_label = "test"
form_config = {
"fieldsets": [
{
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"default_value": "default-name",
},
{
"type": "textarea",
"label": "Description",
"controlplane_field_mapping": "description",
"default_value": "Default description text",
},
{
"type": "number",
"label": "Port",
"controlplane_field_mapping": "port",
"default_value": "8080",
},
{
"type": "choice",
"label": "Environment",
"controlplane_field_mapping": "environment",
"default_value": "dev",
},
{
"type": "checkbox",
"label": "Enable Monitoring",
"controlplane_field_mapping": "monitoring_enabled",
"default_value": "true",
},
{
"type": "array",
"label": "Tags",
"controlplane_field_mapping": "tags",
"default_value": "tag1,tag2,tag3",
},
],
}
]
}
form_class = generate_custom_form_class(form_config, TestModel)
form = form_class()
assert form.fields["name"].initial == "default-name"
assert form.fields["description"].initial == "Default description text"
assert form.fields["port"].initial == "8080"
assert form.fields["environment"].initial == "dev"
assert form.fields["monitoring_enabled"].initial == "true"
assert form.fields["tags"].initial == "tag1,tag2,tag3"
def test_default_value_not_override_existing_instance():
class TestModel(models.Model):
name = models.CharField(max_length=100)
port = models.IntegerField()
class Meta:
app_label = "test"
form_config = {
"fieldsets": [
{
"fields": [
{
"type": "text",
"label": "Name",
"controlplane_field_mapping": "name",
"default_value": "default-name",
},
{
"type": "number",
"label": "Port",
"controlplane_field_mapping": "port",
"default_value": "8080",
},
],
}
]
}
instance = TestModel(name="existing-name", port=3000)
form_class = generate_custom_form_class(form_config, TestModel)
form = form_class(instance=instance)
assert form.initial["name"] == "existing-name"
assert form.initial["port"] == 3000