diff --git a/.forgejo/workflows/renovate.yaml b/.forgejo/workflows/renovate.yaml
index 19e5ce8..1a0bbe9 100644
--- a/.forgejo/workflows/renovate.yaml
+++ b/.forgejo/workflows/renovate.yaml
@@ -19,7 +19,7 @@ jobs:
node-version: "24"
- name: Renovate
- uses: https://github.com/renovatebot/github-action@v43.0.19
+ uses: https://github.com/renovatebot/github-action@v44.0.1
with:
token: ${{ secrets.RENOVATE_TOKEN }}
env:
diff --git a/.python-version b/.python-version
index 24ee5b1..6324d40 100644
--- a/.python-version
+++ b/.python-version
@@ -1 +1 @@
-3.13
+3.14
diff --git a/Dockerfile b/Dockerfile
index 5727f03..b1af0a7 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,4 @@
-FROM python:3.13-slim
+FROM python:3.14-slim
EXPOSE 8000
WORKDIR /app
diff --git a/pyproject.toml b/pyproject.toml
index dc94dd4..b7ad2a7 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -3,12 +3,12 @@ name = "servala"
version = "0.0.0"
description = "Servala portal server and frontend"
readme = "README.md"
-requires-python = ">=3.13"
+requires-python = ">=3.14.0"
dependencies = [
"argon2-cffi>=25.1.0",
"cryptography>=46.0.3",
- "django==5.2.7",
- "django-allauth>=65.12.1",
+ "django==5.2.8",
+ "django-allauth>=65.13.0",
"django-auditlog>=3.3.0",
"django-fernet-encrypted-fields>=0.3.0",
"django-jsonform>=2.23.2",
@@ -28,15 +28,15 @@ dependencies = [
[dependency-groups]
dev = [
- "black>=25.9.0",
+ "black>=25.11.0",
"bumpver>=2025.1131",
- "coverage>=7.11.0",
+ "coverage>=7.11.3",
"djlint>=1.36.4",
"flake8>=7.3.0",
"flake8-bugbear>=25.10.21",
"flake8-pyproject>=1.2.3",
"isort>=7.0.0",
- "pytest>=8.4.2",
+ "pytest>=9.0.0",
"pytest-cov>=7.0.0",
"pytest-django>=4.11.1",
"pytest-mock>=3.15.1",
diff --git a/src/servala/core/admin.py b/src/servala/core/admin.py
index 87da376..fb64a2b 100644
--- a/src/servala/core/admin.py
+++ b/src/servala/core/admin.py
@@ -1,3 +1,6 @@
+import json
+from pathlib import Path
+
from django.contrib import admin, messages
from django.utils.translation import gettext_lazy as _
from django_jsonform.widgets import JSONFormWidget
@@ -313,9 +316,9 @@ class ServiceDefinitionAdmin(admin.ModelAdmin):
(
_("Form Configuration"),
{
- "fields": ("advanced_fields",),
+ "fields": ("form_config",),
"description": _(
- "Configure which fields should be hidden behind an 'Advanced' toggle in the form"
+ "Optional custom form configuration. When provided, this will be used instead of auto-generating the form from the OpenAPI spec."
),
},
),
@@ -323,19 +326,13 @@ class ServiceDefinitionAdmin(admin.ModelAdmin):
def get_form(self, request, obj=None, **kwargs):
form = super().get_form(request, obj, **kwargs)
- # JSON schema for advanced_fields field
- advanced_fields_schema = {
- "type": "array",
- "title": "Advanced Fields",
- "items": {
- "type": "string",
- "title": "Field Name",
- "description": "Field name in dot notation (e.g., spec.parameters.monitoring.enabled)",
- },
- }
- if "advanced_fields" in form.base_fields:
- form.base_fields["advanced_fields"].widget = JSONFormWidget(
- schema=advanced_fields_schema
+ schema_path = Path(__file__).parent / "schemas" / "form_config_schema.json"
+ with open(schema_path) as f:
+ form_config_schema = json.load(f)
+
+ if "form_config" in form.base_fields:
+ form.base_fields["form_config"].widget = JSONFormWidget(
+ schema=form_config_schema
)
return form
diff --git a/src/servala/core/crd.py b/src/servala/core/crd.py
deleted file mode 100644
index fe8edbb..0000000
--- a/src/servala/core/crd.py
+++ /dev/null
@@ -1,557 +0,0 @@
-import re
-
-from django import forms
-from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
-from django.db import models
-from django.forms.models import ModelForm, ModelFormMetaclass
-from django.utils.translation import gettext_lazy as _
-
-from servala.core.models import ServiceInstance
-
-
-class CRDModel(models.Model):
- """Base class for all virtual CRD models"""
-
- def __init__(self, **kwargs):
- if spec := kwargs.pop("spec", None):
- kwargs.update(unnest_data({"spec": spec}))
- super().__init__(**kwargs)
-
- class Meta:
- abstract = True
-
-
-def duplicate_field(field_name, model):
- # Get the field from the model
- field = model._meta.get_field(field_name)
-
- # Create a new field with the same attributes
- new_field = type(field).__new__(type(field))
- new_field.__dict__.update(field.__dict__)
-
- # Ensure the field is not linked to the original model
- new_field.model = None
- new_field.auto_created = False
-
- return new_field
-
-
-def generate_django_model(schema, group, version, kind):
- """
- Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
- """
- # We always need these three fields to know our own name and our full namespace
- model_fields = {"__module__": "crd_models"}
- for field_name in ("name", "organization", "context"):
- model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
-
- # All other fields are generated from the schema, except for the
- # resourceRef object
- spec = schema["properties"].get("spec") or {}
- spec["properties"].pop("resourceRef", None)
- model_fields.update(build_object_fields(spec, "spec", parent_required=False))
-
- # Store the original schema on the model class
- model_fields["SCHEMA"] = schema
-
- meta_class = type("Meta", (), {"app_label": "crd_models"})
- model_fields["Meta"] = meta_class
-
- # create the model class
- model_name = kind
- model_class = type(model_name, (CRDModel,), model_fields)
- return model_class
-
-
-def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
- required_fields = schema.get("required") or []
- properties = schema.get("properties") or {}
- fields = {}
-
- for field_name, field_schema in properties.items():
- is_required = field_name in required_fields or parent_required
- full_name = f"{name}.{field_name}"
- result = get_django_field(
- field_schema,
- is_required,
- field_name,
- full_name,
- verbose_name_prefix=verbose_name_prefix,
- )
- if isinstance(result, dict):
- fields.update(result)
- else:
- fields[full_name] = result
- return fields
-
-
-def deslugify(title):
- """
- Convert camelCase, PascalCase, or snake_case to human-readable title.
- Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
- """
- ACRONYMS = {
- # Database systems
- "SQL": "SQL",
- "MYSQL": "MySQL",
- "POSTGRESQL": "PostgreSQL",
- "MARIADB": "MariaDB",
- "MSSQL": "MSSQL",
- "MONGODB": "MongoDB",
- "REDIS": "Redis",
- # Protocols
- "HTTP": "HTTP",
- "HTTPS": "HTTPS",
- "FTP": "FTP",
- "SFTP": "SFTP",
- "SSH": "SSH",
- "TLS": "TLS",
- "SSL": "SSL",
- # APIs
- "API": "API",
- "REST": "REST",
- "GRPC": "gRPC",
- "GRAPHQL": "GraphQL",
- # Networking
- "URL": "URL",
- "URI": "URI",
- "FQDN": "FQDN",
- "DNS": "DNS",
- "IP": "IP",
- "TCP": "TCP",
- "UDP": "UDP",
- # Data formats
- "JSON": "JSON",
- "XML": "XML",
- "YAML": "YAML",
- "CSV": "CSV",
- "HTML": "HTML",
- "CSS": "CSS",
- # Hardware
- "CPU": "CPU",
- "RAM": "RAM",
- "GPU": "GPU",
- "SSD": "SSD",
- "HDD": "HDD",
- # Identifiers
- "ID": "ID",
- "UUID": "UUID",
- "GUID": "GUID",
- "ARN": "ARN",
- # Cloud providers
- "AWS": "AWS",
- "GCP": "GCP",
- "AZURE": "Azure",
- "IBM": "IBM",
- # Kubernetes/Cloud
- "DB": "DB",
- "PVC": "PVC",
- "PV": "PV",
- "VPN": "VPN",
- # Auth
- "OS": "OS",
- "LDAP": "LDAP",
- "SAML": "SAML",
- "OAUTH": "OAuth",
- "JWT": "JWT",
- # AWS Services
- "S3": "S3",
- "EC2": "EC2",
- "RDS": "RDS",
- "EBS": "EBS",
- "IAM": "IAM",
- }
-
- if "_" in title:
- # Handle snake_case
- title = title.replace("_", " ")
- words = title.split()
- else:
- # Handle camelCase/PascalCase with smart splitting
- # This regex splits on:
- # - Transition from lowercase to uppercase (camelCase)
- # - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
- words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
-
- # Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
- merged_words = []
- i = 0
- while i < len(words):
- if i < len(words) - 1:
- # Check if current word + next word form a known acronym
- combined = (words[i] + words[i + 1]).upper()
- if combined in ACRONYMS:
- merged_words.append(combined)
- i += 2
- continue
- merged_words.append(words[i])
- i += 1
-
- # Capitalize each word, using proper casing for known acronyms
- result = []
- for word in merged_words:
- word_upper = word.upper()
- if word_upper in ACRONYMS:
- result.append(ACRONYMS[word_upper])
- else:
- result.append(word.capitalize())
-
- return " ".join(result)
-
-
-def get_django_field(
- field_schema, is_required, field_name, full_name, verbose_name_prefix=None
-):
- field_type = field_schema.get("type") or "string"
- format = field_schema.get("format")
- verbose_name_prefix = verbose_name_prefix or ""
- verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
-
- # Pass down the requirement status from parent to child fields
- kwargs = {
- "blank": not is_required, # All fields are optional by default
- "null": not is_required,
- "help_text": field_schema.get("description"),
- "validators": [],
- "verbose_name": verbose_name,
- "default": field_schema.get("default"),
- }
-
- if minimum := field_schema.get("minimum"):
- kwargs["validators"].append(MinValueValidator(minimum))
- if maximum := field_schema.get("maximum"):
- kwargs["validators"].append(MaxValueValidator(maximum))
-
- if field_type == "string":
- if format == "date-time":
- return models.DateTimeField(**kwargs)
- elif format == "date":
- return models.DateField(**kwargs)
- else:
- max_length = field_schema.get("max_length") or 255
- if pattern := field_schema.get("pattern"):
- kwargs["validators"].append(RegexValidator(regex=pattern))
- if choices := field_schema.get("enum"):
- kwargs["choices"] = ((choice, choice) for choice in choices)
- return models.CharField(max_length=max_length, **kwargs)
- elif field_type == "integer":
- return models.IntegerField(**kwargs)
- elif field_type == "number":
- return models.FloatField(**kwargs)
- elif field_type == "boolean":
- return models.BooleanField(**kwargs)
- elif field_type == "object":
- # Here we pass down the requirement status to nested objects
- return build_object_fields(
- field_schema,
- full_name,
- verbose_name_prefix=f"{verbose_name}:",
- parent_required=is_required,
- )
- elif field_type == "array":
- kwargs["help_text"] = field_schema.get("description") or _("List of values")
- from servala.frontend.forms.widgets import DynamicArrayField
-
- field = models.JSONField(**kwargs)
- formfield_kwargs = {
- "label": field.verbose_name,
- "required": not field.blank,
- }
-
- array_validation = {}
- if min_items := field_schema.get("min_items"):
- array_validation["min_items"] = min_items
- if max_items := field_schema.get("max_items"):
- array_validation["max_items"] = max_items
- if unique_items := field_schema.get("unique_items"):
- array_validation["unique_items"] = unique_items
- if items_schema := field_schema.get("items"):
- array_validation["items_schema"] = items_schema
- if array_validation:
- formfield_kwargs["array_validation"] = array_validation
-
- field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
-
- return field
- return models.CharField(max_length=255, **kwargs)
-
-
-def unnest_data(data):
- result = {}
-
- def _flatten_dict(d, parent_key=""):
- for key, value in d.items():
- new_key = f"{parent_key}.{key}" if parent_key else key
- if isinstance(value, dict):
- _flatten_dict(value, new_key)
- else:
- result[new_key] = value
-
- _flatten_dict(data)
- return result
-
-
-class CrdModelFormMixin:
- HIDDEN_FIELDS = [
- "spec.compositeDeletePolicy",
- "spec.compositionRef",
- "spec.compositionRevisionRef",
- "spec.compositionRevisionSelector",
- "spec.compositionSelector",
- "spec.compositionUpdatePolicy",
- "spec.parameters.monitoring.alertmanagerConfigRef",
- "spec.parameters.monitoring.alertmanagerConfigSecretRef",
- "spec.parameters.network.serviceType",
- "spec.parameters.scheduling",
- "spec.parameters.security",
- "spec.parameters.size.cpu",
- "spec.parameters.size.memory",
- "spec.parameters.size.requests.cpu",
- "spec.parameters.size.requests.memory",
- "spec.publishConnectionDetailsTo",
- "spec.resourceRef",
- "spec.writeConnectionSecretToRef",
- ]
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.schema = self._meta.model.SCHEMA
-
- for field in ("organization", "context"):
- self.fields[field].widget = forms.HiddenInput()
-
- for name, field in self.fields.items():
- if name in self.HIDDEN_FIELDS or any(
- name.startswith(f) for f in self.HIDDEN_FIELDS
- ):
- field.widget = forms.HiddenInput()
- field.required = False
-
- # Mark advanced fields with a CSS class and data attribute
- for name, field in self.fields.items():
- if self.is_field_advanced(name):
- field.widget.attrs.update(
- {
- "class": (
- field.widget.attrs.get("class", "") + " advanced-field"
- ).strip(),
- "data-advanced": "true",
- }
- )
-
- if self.instance and self.instance.pk:
- self.fields["name"].disabled = True
- self.fields["name"].help_text = _("Name cannot be changed after creation.")
- self.fields["name"].widget = forms.HiddenInput()
-
- def strip_title(self, field_name, label):
- field = self.fields[field_name]
- if field and field.label and (position := field.label.find(label)) != -1:
- field.label = field.label[position + len(label) :]
-
- def has_mandatory_fields(self, field_list):
- for field_name in field_list:
- if field_name in self.fields and self.fields[field_name].required:
- return True
- return False
-
- def is_field_advanced(self, field_name):
- advanced_fields = getattr(self, "ADVANCED_FIELDS", [])
- return field_name in advanced_fields or any(
- field_name.startswith(f"{af}.") for af in advanced_fields
- )
-
- def are_all_fields_advanced(self, field_list):
- if not field_list:
- return False
- return all(self.is_field_advanced(field_name) for field_name in field_list)
-
- def get_fieldsets(self):
- fieldsets = []
-
- # General fieldset for non-spec fields
- general_fields = [
- field_name
- for field_name in self.fields.keys()
- if not field_name.startswith("spec.")
- ]
- if general_fields:
- fieldset = {
- "title": "General",
- "fields": general_fields,
- "fieldsets": [],
- "has_mandatory": self.has_mandatory_fields(general_fields),
- "is_advanced": self.are_all_fields_advanced(general_fields),
- }
- if all(
- [
- isinstance(self.fields[field].widget, forms.HiddenInput)
- for field in general_fields
- ]
- ):
- fieldset["hidden"] = True
- fieldsets.append(fieldset)
-
- # Process spec fields
- others = []
- top_level_fieldsets = {}
- hidden_spec_fields = []
-
- for field_name in self.fields:
- if field_name.startswith("spec."):
- if isinstance(self.fields[field_name].widget, forms.HiddenInput):
- hidden_spec_fields.append(field_name)
- continue
-
- parts = field_name.split(".")
- if len(parts) == 2:
- # Top-level spec field
- others.append(field_name)
- elif len(parts) == 3:
- # Second-level field - promote to top-level fieldset
- fieldset_key = f"{parts[1]}.{parts[2]}"
- if not top_level_fieldsets.get(fieldset_key):
- top_level_fieldsets[fieldset_key] = {
- "fields": [],
- "fieldsets": {},
- "title": f"{deslugify(parts[2])}",
- }
- top_level_fieldsets[fieldset_key]["fields"].append(field_name)
- else:
- # Third-level and deeper - create nested fieldsets
- fieldset_key = f"{parts[1]}.{parts[2]}"
- if not top_level_fieldsets.get(fieldset_key):
- top_level_fieldsets[fieldset_key] = {
- "fields": [],
- "fieldsets": {},
- "title": f"{deslugify(parts[2])}",
- }
-
- sub_key = parts[3]
- if not top_level_fieldsets[fieldset_key]["fieldsets"].get(sub_key):
- top_level_fieldsets[fieldset_key]["fieldsets"][sub_key] = {
- "title": deslugify(sub_key),
- "fields": [],
- }
- top_level_fieldsets[fieldset_key]["fieldsets"][sub_key][
- "fields"
- ].append(field_name)
-
- for fieldset in top_level_fieldsets.values():
- nested_fieldsets_list = []
- for sub_fieldset in fieldset["fieldsets"].values():
- if len(sub_fieldset["fields"]) == 1:
- # If nested fieldset has only one field, move it to parent
- fieldset["fields"].append(sub_fieldset["fields"][0])
- else:
- # Keep as nested fieldset with proper title stripping
- title = f"{fieldset['title']}: {sub_fieldset['title']}: "
- for field in sub_fieldset["fields"]:
- self.strip_title(field, title)
- sub_fieldset["is_advanced"] = self.are_all_fields_advanced(
- sub_fieldset["fields"]
- )
- nested_fieldsets_list.append(sub_fieldset)
-
- fieldset["fieldsets"] = nested_fieldsets_list
- total_fields = len(fieldset["fields"]) + len(nested_fieldsets_list)
- if total_fields == 1 and len(fieldset["fields"]) == 1:
- others.append(fieldset["fields"][0])
- else:
- title = f"{fieldset['title']}: "
- for field in fieldset["fields"]:
- self.strip_title(field, title)
-
- all_fields = fieldset["fields"][:]
- for sub_fieldset in nested_fieldsets_list:
- all_fields.extend(sub_fieldset["fields"])
- fieldset["has_mandatory"] = self.has_mandatory_fields(all_fields)
-
- fieldset["is_advanced"] = self.are_all_fields_advanced(all_fields)
-
- fieldsets.append(fieldset)
-
- # Add 'others' tab if there are any fields
- if others:
- fieldsets.append(
- {
- "title": "Others",
- "fields": others,
- "fieldsets": [],
- "has_mandatory": self.has_mandatory_fields(others),
- "is_advanced": self.are_all_fields_advanced(others),
- }
- )
-
- if hidden_spec_fields:
- fieldsets.append(
- {
- "title": "Advanced",
- "fields": hidden_spec_fields,
- "fieldsets": [],
- "hidden": True,
- "has_mandatory": self.has_mandatory_fields(hidden_spec_fields),
- }
- )
-
- fieldsets.sort(key=lambda f: f.get("hidden", False))
-
- return fieldsets
-
- def get_nested_data(self):
- """
- Builds the original nested JSON structure from flat form data.
- Form fields are named with dot notation (e.g., 'spec.replicas')
- """
- result = {}
-
- for field_name, value in self.cleaned_data.items():
- if value is None or value == "":
- continue
-
- parts = field_name.split(".")
- current = result
-
- # Navigate through the nested structure
- for i, part in enumerate(parts):
- if i == len(parts) - 1:
- # Last part, set the value
- current[part] = value
- else:
- # Create nested dict if it doesn't exist
- if part not in current:
- current[part] = {}
- current = current[part]
- return result
-
- def clean(self):
- cleaned_data = super().clean()
- self.validate_nested_data(
- self.get_nested_data().get("spec", {}), self.schema["properties"]["spec"]
- )
- return cleaned_data
-
- def validate_nested_data(self, data, schema):
- """Validate data against the provided OpenAPI v3 schema"""
- # TODO: actually validate the nested data.
- # TODO: get jsonschema to give us a path to the failing field rather than just an error message,
- # then add the validation error to that field (self.add_error())
- # try:
- # validate(instance=data, schema=schema)
- # except Exception as e:
- # raise forms.ValidationError(f"Validation error: {e.message}")
- pass
-
-
-def generate_model_form_class(model, advanced_fields=None):
- meta_attrs = {
- "model": model,
- "fields": "__all__",
- }
- fields = {
- "Meta": type("Meta", (object,), meta_attrs),
- "__module__": "crd_models",
- "ADVANCED_FIELDS": advanced_fields or [],
- }
- class_name = f"{model.__name__}ModelForm"
- return ModelFormMetaclass(class_name, (CrdModelFormMixin, ModelForm), fields)
diff --git a/src/servala/core/crd/__init__.py b/src/servala/core/crd/__init__.py
new file mode 100644
index 0000000..3f10abb
--- /dev/null
+++ b/src/servala/core/crd/__init__.py
@@ -0,0 +1,31 @@
+from servala.core.crd.forms import (
+ CrdModelFormMixin,
+ CustomFormMixin,
+ FormGeneratorMixin,
+ generate_custom_form_class,
+ generate_model_form_class,
+)
+from servala.core.crd.models import (
+ CRDModel,
+ build_object_fields,
+ duplicate_field,
+ generate_django_model,
+ get_django_field,
+ unnest_data,
+)
+from servala.core.crd.utils import deslugify
+
+__all__ = [
+ "CrdModelFormMixin",
+ "CustomFormMixin",
+ "FormGeneratorMixin",
+ "generate_django_model",
+ "generate_model_form_class",
+ "generate_custom_form_class",
+ "CRDModel",
+ "build_object_fields",
+ "duplicate_field",
+ "get_django_field",
+ "unnest_data",
+ "deslugify",
+]
diff --git a/src/servala/core/crd/forms.py b/src/servala/core/crd/forms.py
new file mode 100644
index 0000000..659684e
--- /dev/null
+++ b/src/servala/core/crd/forms.py
@@ -0,0 +1,454 @@
+from django import forms
+from django.core.validators import MaxValueValidator, MinValueValidator
+from django.forms.models import ModelForm, ModelFormMetaclass
+
+from servala.core.crd.utils import deslugify
+from servala.core.models import ControlPlaneCRD
+from servala.frontend.forms.widgets import DynamicArrayWidget
+
+# Fields that must be present in every form
+MANDATORY_FIELDS = ["name"]
+
+# Default field configurations - fields that can be included with just a mapping
+# to avoid administrators having to duplicate common information
+DEFAULT_FIELD_CONFIGS = {
+ "name": {
+ "type": "text",
+ "label": "Instance Name",
+ "help_text": "Unique name for the new instance",
+ "required": True,
+ "max_length": 63,
+ },
+ "spec.parameters.service.fqdn": {
+ "type": "array",
+ "label": "FQDNs",
+ "help_text": "Domain names for accessing this service",
+ "required": False,
+ },
+}
+
+
+class FormGeneratorMixin:
+ """Shared base class for ModelForm classes based on our generated CRD models.
+ There are two relevant child classes:
+ - CrdModelFormMixin: For fully auto-generated forms from the spec
+ - CustomFormMixin: For forms built from form_config settings.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ if "context" in self.fields:
+ self.fields["context"].widget = forms.HiddenInput()
+ if crd := self.initial.get("context"):
+ crd = getattr(crd, "pk", crd) # can be int or object
+ self.fields["context"].queryset = ControlPlaneCRD.objects.filter(pk=crd)
+
+ if self.instance and hasattr(self.instance, "name") and self.instance.name:
+ if "name" in self.fields:
+ self.fields["name"].disabled = True
+ self.fields["name"].widget = forms.HiddenInput()
+
+ def has_mandatory_fields(self, field_list):
+ for field_name in field_list:
+ if field_name in self.fields and self.fields[field_name].required:
+ return True
+ return False
+
+
+class CrdModelFormMixin(FormGeneratorMixin):
+ HIDDEN_FIELDS = [
+ "spec.compositeDeletePolicy",
+ "spec.compositionRef",
+ "spec.compositionRevisionRef",
+ "spec.compositionRevisionSelector",
+ "spec.compositionSelector",
+ "spec.compositionUpdatePolicy",
+ "spec.parameters.monitoring.alertmanagerConfigRef",
+ "spec.parameters.monitoring.alertmanagerConfigSecretRef",
+ "spec.parameters.network.serviceType",
+ "spec.parameters.scheduling",
+ "spec.parameters.security",
+ "spec.parameters.size.cpu",
+ "spec.parameters.size.memory",
+ "spec.parameters.size.requests.cpu",
+ "spec.parameters.size.requests.memory",
+ "spec.publishConnectionDetailsTo",
+ "spec.resourceRef",
+ "spec.writeConnectionSecretToRef",
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.schema = self._meta.model.SCHEMA
+
+ for name, field in self.fields.items():
+ if name in self.HIDDEN_FIELDS or any(
+ name.startswith(f) for f in self.HIDDEN_FIELDS
+ ):
+ field.widget = forms.HiddenInput()
+ field.required = False
+
+ def strip_title(self, field_name, label):
+ field = self.fields[field_name]
+ if field and field.label and (position := field.label.find(label)) != -1:
+ field.label = field.label[position + len(label) :]
+
+ def get_fieldsets(self):
+ fieldsets = []
+
+ # General fieldset for non-spec fields
+ general_fields = [
+ field_name
+ for field_name in self.fields.keys()
+ if not field_name.startswith("spec.")
+ ]
+ if general_fields:
+ fieldset = {
+ "title": "General",
+ "fields": general_fields,
+ "fieldsets": [],
+ "has_mandatory": self.has_mandatory_fields(general_fields),
+ }
+ if all(
+ [
+ isinstance(self.fields[field].widget, forms.HiddenInput)
+ for field in general_fields
+ ]
+ ):
+ fieldset["hidden"] = True
+ fieldsets.append(fieldset)
+
+ # Process spec fields
+ others = []
+ top_level_fieldsets = {}
+ hidden_spec_fields = []
+
+ for field_name in self.fields:
+ if field_name.startswith("spec."):
+ if isinstance(self.fields[field_name].widget, forms.HiddenInput):
+ hidden_spec_fields.append(field_name)
+ continue
+
+ parts = field_name.split(".")
+ if len(parts) == 2:
+ # Top-level spec field
+ others.append(field_name)
+ elif len(parts) == 3:
+ # Second-level field - promote to top-level fieldset
+ fieldset_key = f"{parts[1]}.{parts[2]}"
+ if not top_level_fieldsets.get(fieldset_key):
+ top_level_fieldsets[fieldset_key] = {
+ "fields": [],
+ "fieldsets": {},
+ "title": f"{deslugify(parts[2])}",
+ }
+ top_level_fieldsets[fieldset_key]["fields"].append(field_name)
+ else:
+ # Third-level and deeper - create nested fieldsets
+ fieldset_key = f"{parts[1]}.{parts[2]}"
+ if not top_level_fieldsets.get(fieldset_key):
+ top_level_fieldsets[fieldset_key] = {
+ "fields": [],
+ "fieldsets": {},
+ "title": f"{deslugify(parts[2])}",
+ }
+
+ sub_key = parts[3]
+ if not top_level_fieldsets[fieldset_key]["fieldsets"].get(sub_key):
+ top_level_fieldsets[fieldset_key]["fieldsets"][sub_key] = {
+ "title": deslugify(sub_key),
+ "fields": [],
+ }
+ top_level_fieldsets[fieldset_key]["fieldsets"][sub_key][
+ "fields"
+ ].append(field_name)
+
+ for fieldset in top_level_fieldsets.values():
+ nested_fieldsets_list = []
+ for sub_fieldset in fieldset["fieldsets"].values():
+ if len(sub_fieldset["fields"]) == 1:
+ # If nested fieldset has only one field, move it to parent
+ fieldset["fields"].append(sub_fieldset["fields"][0])
+ else:
+ # Keep as nested fieldset with proper title stripping
+ title = f"{fieldset['title']}: {sub_fieldset['title']}: "
+ for field in sub_fieldset["fields"]:
+ self.strip_title(field, title)
+ nested_fieldsets_list.append(sub_fieldset)
+
+ fieldset["fieldsets"] = nested_fieldsets_list
+ total_fields = len(fieldset["fields"]) + len(nested_fieldsets_list)
+ if total_fields == 1 and len(fieldset["fields"]) == 1:
+ others.append(fieldset["fields"][0])
+ else:
+ title = f"{fieldset['title']}: "
+ for field in fieldset["fields"]:
+ self.strip_title(field, title)
+
+ all_fields = fieldset["fields"][:]
+ for sub_fieldset in nested_fieldsets_list:
+ all_fields.extend(sub_fieldset["fields"])
+ fieldset["has_mandatory"] = self.has_mandatory_fields(all_fields)
+
+ fieldsets.append(fieldset)
+
+ # Add 'others' tab if there are any fields
+ if others:
+ fieldsets.append(
+ {
+ "title": "Others",
+ "fields": others,
+ "fieldsets": [],
+ "has_mandatory": self.has_mandatory_fields(others),
+ }
+ )
+
+ if hidden_spec_fields:
+ fieldsets.append(
+ {
+ "title": "Advanced",
+ "fields": hidden_spec_fields,
+ "fieldsets": [],
+ "hidden": True,
+ "has_mandatory": self.has_mandatory_fields(hidden_spec_fields),
+ }
+ )
+
+ fieldsets.sort(key=lambda f: f.get("hidden", False))
+
+ return fieldsets
+
+ def get_nested_data(self):
+ """
+ Builds the original nested JSON structure from flat form data.
+ Form fields are named with dot notation (e.g., 'spec.replicas')
+ """
+ result = {}
+
+ for field_name, value in self.cleaned_data.items():
+ if value is None or value == "":
+ continue
+
+ parts = field_name.split(".")
+ current = result
+
+ # Navigate through the nested structure
+ for i, part in enumerate(parts):
+ if i == len(parts) - 1:
+ # Last part, set the value
+ current[part] = value
+ else:
+ # Create nested dict if it doesn't exist
+ if part not in current:
+ current[part] = {}
+ current = current[part]
+ return result
+
+ def clean(self):
+ cleaned_data = super().clean()
+ self.validate_nested_data(
+ self.get_nested_data().get("spec", {}), self.schema["properties"]["spec"]
+ )
+ return cleaned_data
+
+ def validate_nested_data(self, data, schema):
+ """Validate data against the provided OpenAPI v3 schema"""
+ # TODO: actually validate the nested data.
+ # TODO: get jsonschema to give us a path to the failing field rather than just an error message,
+ # then add the validation error to that field (self.add_error())
+ # try:
+ # validate(instance=data, schema=schema)
+ # except Exception as e:
+ # raise forms.ValidationError(f"Validation error: {e.message}")
+ pass
+
+
+def generate_model_form_class(model):
+ meta_attrs = {
+ "model": model,
+ "fields": "__all__",
+ }
+ fields = {
+ "Meta": type("Meta", (object,), meta_attrs),
+ "__module__": "crd_models",
+ }
+ class_name = f"{model.__name__}ModelForm"
+ return ModelFormMetaclass(class_name, (CrdModelFormMixin, ModelForm), fields)
+
+
+class CustomFormMixin(FormGeneratorMixin):
+ """
+ Base for custom (user-friendly) forms generated from ServiceDefinition.form_config.
+ """
+
+ IS_CUSTOM_FORM = True
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._apply_field_config()
+ if (
+ self.instance
+ and hasattr(self.instance, "name")
+ and self.instance.name
+ and "name" in self.fields
+ ):
+ self.fields["name"].widget = forms.HiddenInput()
+ self.fields["name"].disabled = True
+ self.fields.pop("context", None)
+
+ def _apply_field_config(self):
+ for fieldset in self.form_config.get("fieldsets", []):
+ for fc in fieldset.get("fields", []):
+ field_name = fc.get("controlplane_field_mapping")
+
+ if field_name not in self.fields:
+ continue
+
+ field_config = fc.copy()
+ # Merge with defaults if field has default config
+ if field_name in DEFAULT_FIELD_CONFIGS:
+ field_config = DEFAULT_FIELD_CONFIGS[field_name].copy()
+ for key, value in fc.items():
+ if value or (value is False):
+ field_config[key] = value
+
+ field = self.fields[field_name]
+ field_type = field_config.get("type")
+
+ field.label = field_config.get("label", field_name)
+ field.help_text = field_config.get("help_text", "")
+ field.required = field_config.get("required", False)
+
+ if field_type == "textarea":
+ field.widget = forms.Textarea(
+ attrs={"rows": field_config.get("rows", 4)}
+ )
+ elif field_type == "array":
+ field.widget = DynamicArrayWidget()
+ elif field_type == "choice":
+ if hasattr(field, "choices") and field.choices:
+ field._controlplane_choices = list(field.choices)
+ if custom_choices := field_config.get("choices"):
+ field.choices = [tuple(choice) for choice in custom_choices]
+
+ if field_type == "number":
+ min_val = field_config.get("min_value")
+ max_val = field_config.get("max_value")
+
+ validators = []
+ if min_val is not None:
+ validators.append(MinValueValidator(min_val))
+ field.widget.attrs["min"] = min_val
+ if max_val is not None:
+ validators.append(MaxValueValidator(max_val))
+ field.widget.attrs["max"] = max_val
+
+ if validators:
+ field.validators.extend(validators)
+
+ if "default_value" in field_config and field.initial is None:
+ field.initial = field_config["default_value"]
+
+ if field_type in ("text", "textarea") and field_config.get(
+ "max_length"
+ ):
+ field.max_length = field_config.get("max_length")
+ if hasattr(field.widget, "attrs"):
+ field.widget.attrs["maxlength"] = field_config.get("max_length")
+
+ field.controlplane_field_mapping = field_name
+
+ def get_fieldsets(self):
+ fieldsets = []
+ for fieldset_config in self.form_config.get("fieldsets", []):
+ field_names = [
+ f["controlplane_field_mapping"]
+ for f in fieldset_config.get("fields", [])
+ ]
+ fieldset = {
+ "title": fieldset_config.get("title", "General"),
+ "fields": field_names,
+ "fieldsets": [],
+ "has_mandatory": self.has_mandatory_fields(field_names),
+ }
+ fieldsets.append(fieldset)
+
+ return fieldsets
+
+ def clean(self):
+ cleaned_data = super().clean()
+
+ for field_name, field in self.fields.items():
+ if hasattr(field, "_controlplane_choices"):
+ value = cleaned_data.get(field_name)
+ if value:
+ valid_values = [choice[0] for choice in field._controlplane_choices]
+ if value not in valid_values:
+ self.add_error(
+ field_name,
+ forms.ValidationError(
+ f"'{value}' is not a valid choice. "
+ f"Must be one of: {valid_values.join(', ')}"
+ ),
+ )
+
+ return cleaned_data
+
+ def get_nested_data(self):
+ nested = {}
+ for field_name in self.fields.keys():
+ if field_name == "context":
+ value = self.cleaned_data.get(field_name)
+ if value is not None:
+ nested[field_name] = value
+ continue
+
+ mapping = field_name
+ value = self.cleaned_data.get(field_name)
+ parts = mapping.split(".")
+ current = nested
+ for part in parts[:-1]:
+ if part not in current:
+ current[part] = {}
+ current = current[part]
+
+ current[parts[-1]] = value
+
+ return nested
+
+
+def generate_custom_form_class(form_config, model):
+ """
+ Generate a custom (user-friendly) form class from form_config JSON.
+ """
+ field_list = ["context", "name"]
+
+ for fieldset in form_config.get("fieldsets", []):
+ for field_config in fieldset.get("fields", []):
+ field_name = field_config.get("controlplane_field_mapping")
+ if field_name:
+ field_list.append(field_name)
+
+ fields = {
+ "context": forms.ModelChoiceField(
+ queryset=ControlPlaneCRD.objects.none(),
+ required=True,
+ widget=forms.HiddenInput(),
+ ),
+ }
+
+ meta_attrs = {
+ "model": model,
+ "fields": field_list,
+ }
+
+ form_fields = {
+ "Meta": type("Meta", (object,), meta_attrs),
+ "__module__": "crd_models",
+ "form_config": form_config,
+ **fields,
+ }
+
+ class_name = f"{model.__name__}CustomForm"
+ return ModelFormMetaclass(class_name, (CustomFormMixin, ModelForm), form_fields)
diff --git a/src/servala/core/crd/models.py b/src/servala/core/crd/models.py
new file mode 100644
index 0000000..86df97f
--- /dev/null
+++ b/src/servala/core/crd/models.py
@@ -0,0 +1,167 @@
+from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
+from django.db import models
+from django.utils.translation import gettext_lazy as _
+
+from servala.core.crd.utils import deslugify
+from servala.core.models import ServiceInstance
+from servala.frontend.forms.widgets import DynamicArrayField
+
+
+class CRDModel(models.Model):
+ """Base class for all virtual CRD models"""
+
+ def __init__(self, **kwargs):
+ if spec := kwargs.pop("spec", None):
+ kwargs.update(unnest_data({"spec": spec}))
+ super().__init__(**kwargs)
+
+ class Meta:
+ abstract = True
+
+
+def generate_django_model(schema, group, version, kind):
+ """
+ Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema.
+ """
+ # We always need these three fields to know our own name and our full namespace
+ model_fields = {"__module__": "crd_models"}
+ for field_name in ("name", "context"):
+ model_fields[field_name] = duplicate_field(field_name, ServiceInstance)
+
+ # All other fields are generated from the schema, except for the
+ # resourceRef object
+ spec = schema["properties"].get("spec") or {}
+ spec["properties"].pop("resourceRef", None)
+ model_fields.update(build_object_fields(spec, "spec", parent_required=False))
+
+ # Store the original schema on the model class
+ model_fields["SCHEMA"] = schema
+
+ meta_class = type("Meta", (), {"app_label": "crd_models"})
+ model_fields["Meta"] = meta_class
+
+ # create the model class
+ model_name = kind
+ model_class = type(model_name, (CRDModel,), model_fields)
+ return model_class
+
+
+def duplicate_field(field_name, model):
+ field = model._meta.get_field(field_name)
+ new_field = type(field).__new__(type(field))
+ new_field.__dict__.update(field.__dict__)
+ new_field.model = None
+ new_field.auto_created = False
+ return new_field
+
+
+def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False):
+ required_fields = schema.get("required") or []
+ properties = schema.get("properties") or {}
+ fields = {}
+
+ for field_name, field_schema in properties.items():
+ is_required = field_name in required_fields or parent_required
+ full_name = f"{name}.{field_name}"
+ result = get_django_field(
+ field_schema,
+ is_required,
+ field_name,
+ full_name,
+ verbose_name_prefix=verbose_name_prefix,
+ )
+ if isinstance(result, dict):
+ fields.update(result)
+ else:
+ fields[full_name] = result
+ return fields
+
+
+def get_django_field(
+ field_schema, is_required, field_name, full_name, verbose_name_prefix=None
+):
+ field_type = field_schema.get("type") or "string"
+ format = field_schema.get("format")
+ verbose_name_prefix = verbose_name_prefix or ""
+ verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip()
+
+ # Pass down the requirement status from parent to child fields
+ kwargs = {
+ "blank": not is_required, # All fields are optional by default
+ "null": not is_required,
+ "help_text": field_schema.get("description"),
+ "validators": [],
+ "verbose_name": verbose_name,
+ "default": field_schema.get("default"),
+ }
+
+ if minimum := field_schema.get("minimum"):
+ kwargs["validators"].append(MinValueValidator(minimum))
+ if maximum := field_schema.get("maximum"):
+ kwargs["validators"].append(MaxValueValidator(maximum))
+
+ if field_type == "string":
+ if format == "date-time":
+ return models.DateTimeField(**kwargs)
+ elif format == "date":
+ return models.DateField(**kwargs)
+ else:
+ max_length = field_schema.get("max_length") or 255
+ if pattern := field_schema.get("pattern"):
+ kwargs["validators"].append(RegexValidator(regex=pattern))
+ if choices := field_schema.get("enum"):
+ kwargs["choices"] = ((choice, choice) for choice in choices)
+ return models.CharField(max_length=max_length, **kwargs)
+ elif field_type == "integer":
+ return models.IntegerField(**kwargs)
+ elif field_type == "number":
+ return models.FloatField(**kwargs)
+ elif field_type == "boolean":
+ return models.BooleanField(**kwargs)
+ elif field_type == "object":
+ # Here we pass down the requirement status to nested objects
+ return build_object_fields(
+ field_schema,
+ full_name,
+ verbose_name_prefix=f"{verbose_name}:",
+ parent_required=is_required,
+ )
+ elif field_type == "array":
+ kwargs["help_text"] = field_schema.get("description") or _("List of values")
+ field = models.JSONField(**kwargs)
+ formfield_kwargs = {
+ "label": field.verbose_name,
+ "required": not field.blank,
+ }
+
+ array_validation = {}
+ if min_items := field_schema.get("min_items"):
+ array_validation["min_items"] = min_items
+ if max_items := field_schema.get("max_items"):
+ array_validation["max_items"] = max_items
+ if unique_items := field_schema.get("unique_items"):
+ array_validation["unique_items"] = unique_items
+ if items_schema := field_schema.get("items"):
+ array_validation["items_schema"] = items_schema
+ if array_validation:
+ formfield_kwargs["array_validation"] = array_validation
+
+ field.formfield = lambda: DynamicArrayField(**formfield_kwargs)
+
+ return field
+ return models.CharField(max_length=255, **kwargs)
+
+
+def unnest_data(data):
+ result = {}
+
+ def _flatten_dict(d, parent_key=""):
+ for key, value in d.items():
+ new_key = f"{parent_key}.{key}" if parent_key else key
+ if isinstance(value, dict):
+ _flatten_dict(value, new_key)
+ else:
+ result[new_key] = value
+
+ _flatten_dict(data)
+ return result
diff --git a/src/servala/core/crd/utils.py b/src/servala/core/crd/utils.py
new file mode 100644
index 0000000..a537fd9
--- /dev/null
+++ b/src/servala/core/crd/utils.py
@@ -0,0 +1,115 @@
+import re
+
+
+def deslugify(title):
+ """
+ Convert camelCase, PascalCase, or snake_case to human-readable title.
+ Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters).
+ """
+ ACRONYMS = {
+ # Database systems
+ "SQL": "SQL",
+ "MYSQL": "MySQL",
+ "POSTGRESQL": "PostgreSQL",
+ "MARIADB": "MariaDB",
+ "MSSQL": "MSSQL",
+ "MONGODB": "MongoDB",
+ "REDIS": "Redis",
+ # Protocols
+ "HTTP": "HTTP",
+ "HTTPS": "HTTPS",
+ "FTP": "FTP",
+ "SFTP": "SFTP",
+ "SSH": "SSH",
+ "TLS": "TLS",
+ "SSL": "SSL",
+ # APIs
+ "API": "API",
+ "REST": "REST",
+ "GRPC": "gRPC",
+ "GRAPHQL": "GraphQL",
+ # Networking
+ "URL": "URL",
+ "URI": "URI",
+ "FQDN": "FQDN",
+ "DNS": "DNS",
+ "IP": "IP",
+ "TCP": "TCP",
+ "UDP": "UDP",
+ # Data formats
+ "JSON": "JSON",
+ "XML": "XML",
+ "YAML": "YAML",
+ "CSV": "CSV",
+ "HTML": "HTML",
+ "CSS": "CSS",
+ # Hardware
+ "CPU": "CPU",
+ "RAM": "RAM",
+ "GPU": "GPU",
+ "SSD": "SSD",
+ "HDD": "HDD",
+ # Identifiers
+ "ID": "ID",
+ "UUID": "UUID",
+ "GUID": "GUID",
+ "ARN": "ARN",
+ # Cloud providers
+ "AWS": "AWS",
+ "GCP": "GCP",
+ "AZURE": "Azure",
+ "IBM": "IBM",
+ # Kubernetes/Cloud
+ "DB": "DB",
+ "PVC": "PVC",
+ "PV": "PV",
+ "VPN": "VPN",
+ # Auth
+ "OS": "OS",
+ "LDAP": "LDAP",
+ "SAML": "SAML",
+ "OAUTH": "OAuth",
+ "JWT": "JWT",
+ # AWS Services
+ "S3": "S3",
+ "EC2": "EC2",
+ "RDS": "RDS",
+ "EBS": "EBS",
+ "IAM": "IAM",
+ }
+
+ if "_" in title:
+ # Handle snake_case
+ title = title.replace("_", " ")
+ words = title.split()
+ else:
+ # Handle camelCase/PascalCase with smart splitting
+ # This regex splits on:
+ # - Transition from lowercase to uppercase (camelCase)
+ # - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters)
+ words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title)
+
+ # Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL)
+ merged_words = []
+ i = 0
+ while i < len(words):
+ if i < len(words) - 1:
+ # Check if current word + next word form a known acronym
+ combined = (words[i] + words[i + 1]).upper()
+ if combined in ACRONYMS:
+ merged_words.append(combined)
+ i += 2
+ continue
+ merged_words.append(words[i])
+ i += 1
+
+ # Capitalize each word, using proper casing for known acronyms
+ result = []
+ for word in merged_words:
+ word_upper = word.upper()
+ if word_upper in ACRONYMS:
+ result.append(ACRONYMS[word_upper])
+ else:
+ result.append(word.capitalize())
+
+ return " ".join(result)
diff --git a/src/servala/core/forms.py b/src/servala/core/forms.py
index 034d1c9..090abba 100644
--- a/src/servala/core/forms.py
+++ b/src/servala/core/forms.py
@@ -1,7 +1,12 @@
+import json
+from pathlib import Path
+
+import jsonschema
from django import forms
from django.utils.translation import gettext_lazy as _
from django_jsonform.widgets import JSONFormWidget
+from servala.core.crd.forms import DEFAULT_FIELD_CONFIGS, MANDATORY_FIELDS
from servala.core.models import ControlPlane, ServiceDefinition
CONTROL_PLANE_USER_INFO_SCHEMA = {
@@ -96,6 +101,12 @@ class ControlPlaneAdminForm(forms.ModelForm):
return super().save(*args, **kwargs)
+def fields_empty(fields):
+ if not fields:
+ return True
+ return all(not field.get("controlplane_field_mapping") for field in fields)
+
+
class ServiceDefinitionAdminForm(forms.ModelForm):
api_group = forms.CharField(
required=False,
@@ -124,6 +135,10 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
self.fields["api_version"].initial = api_def.get("version", "")
self.fields["api_kind"].initial = api_def.get("kind", "")
+ schema_path = Path(__file__).parent / "schemas" / "form_config_schema.json"
+ with open(schema_path) as f:
+ self.form_config_schema = json.load(f)
+
def clean(self):
cleaned_data = super().clean()
@@ -151,8 +166,250 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
api_def["kind"] = api_kind
cleaned_data["api_definition"] = api_def
+ form_config = cleaned_data.get("form_config")
+
+ # Convert empty form_config to None (no custom form)
+ if form_config:
+ if not form_config.get("fieldsets") or all(
+ fields_empty(fieldset.get("fields"))
+ for fieldset in form_config.get("fieldsets")
+ ):
+ form_config = None
+ cleaned_data["form_config"] = None
+
+ if form_config:
+ form_config = self._normalize_form_config_types(form_config)
+ cleaned_data["form_config"] = form_config
+
+ try:
+ jsonschema.validate(
+ instance=form_config, schema=self.form_config_schema
+ )
+ except jsonschema.ValidationError as e:
+ raise forms.ValidationError(
+ {
+ "form_config": _("Invalid form configuration: {}").format(
+ e.message
+ )
+ }
+ )
+ except jsonschema.SchemaError as e:
+ raise forms.ValidationError(
+ {"form_config": _("Schema error: {}").format(e.message)}
+ )
+
+ self._validate_field_mappings(form_config, cleaned_data)
+
return cleaned_data
+ def _normalize_form_config_types(self, form_config):
+ """
+ Normalize form_config by converting string representations of numbers
+ to actual integers/floats. The JSON form widget sends all values
+ as strings, but the schema expects proper types.
+ """
+ if not isinstance(form_config, dict):
+ return form_config
+
+ integer_fields = ["max_length", "rows", "min_values", "max_values"]
+ number_fields = ["min_value", "max_value"]
+
+ for fieldset in form_config.get("fieldsets", []):
+ for field in fieldset.get("fields", []):
+ for field_name in integer_fields:
+ if field_name in field and field[field_name] is not None:
+ value = field[field_name]
+ if isinstance(value, str):
+ try:
+ field[field_name] = int(value) if value else None
+ except (ValueError, TypeError):
+ pass
+
+ for field_name in number_fields:
+ if field_name in field and field[field_name] is not None:
+ value = field[field_name]
+ if isinstance(value, str):
+ try:
+ field[field_name] = (
+ int(value) if "." not in value else float(value)
+ )
+ except (ValueError, TypeError):
+ pass
+
+ return form_config
+
+ def _validate_field_mappings(self, form_config, cleaned_data):
+ if not self.instance.pk:
+ return
+ crd = self.instance.offering_control_planes.all().first()
+ if not crd:
+ return
+
+ schema = None
+ try:
+ schema = crd.resource_schema
+ except Exception:
+ pass
+
+ if not schema or not (spec_schema := schema.get("properties", {}).get("spec")):
+ return
+
+ valid_paths = self._extract_field_paths(spec_schema, "spec") | {"name"}
+ included_mappings = set()
+ errors = []
+ for fieldset in form_config.get("fieldsets", []):
+ for field in fieldset.get("fields", []):
+ mapping = field.get("controlplane_field_mapping")
+ included_mappings.add(mapping)
+
+ # Validate that fields without defaults have required properties
+ if mapping not in DEFAULT_FIELD_CONFIGS:
+ if not field.get("label"):
+ errors.append(
+ _(
+ "Field with mapping '{}' must have a 'label' property "
+ "(or use a mapping with default config)"
+ ).format(mapping)
+ )
+ if not field.get("type"):
+ errors.append(
+ _(
+ "Field with mapping '{}' must have a 'type' property "
+ "(or use a mapping with default config)"
+ ).format(mapping)
+ )
+
+ if mapping and mapping not in valid_paths:
+ field_name = field.get("label", field.get("name", mapping))
+ errors.append(
+ _(
+ "Field '{}' has invalid mapping '{}'. Valid paths are: {}"
+ ).format(
+ field_name,
+ mapping,
+ ", ".join(sorted(valid_paths)[:10])
+ + ("..." if len(valid_paths) > 10 else ""),
+ )
+ )
+
+ if field.get("type") == "choice" and field.get("choices"):
+ self._validate_choice_field(
+ field, mapping, spec_schema, "spec", errors
+ )
+
+ for mandatory_field in MANDATORY_FIELDS:
+ if mandatory_field not in included_mappings:
+ errors.append(
+ _(
+ "Required field '{}' must be included in the form configuration"
+ ).format(mandatory_field)
+ )
+
+ if errors:
+ raise forms.ValidationError({"form_config": errors})
+
+ def _validate_choice_field(self, field, mapping, spec_schema, prefix, errors):
+ if not mapping:
+ return
+
+ field_name = field.get("label", mapping)
+ custom_choices = field.get("choices", [])
+
+ # Single-element choices [value] are transformed to [value, value]
+ for i, choice in enumerate(custom_choices):
+ if not isinstance(choice, (list, tuple)):
+ errors.append(
+ _(
+ "Field '{}': Choice at index {} must be a list or tuple, "
+ "but got: {}"
+ ).format(field_name, i, repr(choice))
+ )
+ return
+
+ choice_len = len(choice)
+ if choice_len == 1:
+ custom_choices[i] = [choice[0], choice[0]]
+ elif choice_len == 0 or choice_len > 2:
+ errors.append(
+ _(
+ "Field '{}': Choice at index {} must have 1 or 2 elements "
+ "(got {}): {}"
+ ).format(field_name, i, choice_len, repr(choice))
+ )
+ return
+
+ field_schema = self._get_field_schema(spec_schema, mapping, prefix)
+ if not field_schema:
+ return
+
+ control_plane_choices = field_schema.get("enum", [])
+ if not control_plane_choices:
+ return
+
+ custom_choice_values = [choice[0] for choice in custom_choices]
+
+ invalid_choices = [
+ value
+ for value in custom_choice_values
+ if value not in control_plane_choices
+ ]
+
+ if invalid_choices:
+ errors.append(
+ _(
+ "Field '{}' has invalid choice values: {}. "
+ "Valid choices from control plane are: {}"
+ ).format(
+ field_name,
+ ", ".join(f"'{c}'" for c in invalid_choices),
+ ", ".join(f"'{c}'" for c in control_plane_choices),
+ )
+ )
+
+ def _get_field_schema(self, schema, field_path, prefix):
+ if not field_path or not schema:
+ return None
+
+ if field_path.startswith(prefix + "."):
+ field_path = field_path[len(prefix) + 1 :]
+
+ parts = field_path.split(".")
+ current_schema = schema
+
+ for part in parts:
+ if not isinstance(current_schema, dict):
+ return None
+
+ properties = current_schema.get("properties", {})
+ if part not in properties:
+ return None
+
+ current_schema = properties[part]
+
+ return current_schema
+
+ def _extract_field_paths(self, schema, prefix=""):
+ paths = set()
+
+ if not isinstance(schema, dict):
+ return paths
+
+ if "type" in schema and schema["type"] != "object":
+ if prefix:
+ paths.add(prefix)
+
+ if schema.get("properties"):
+ for prop_name, prop_schema in schema["properties"].items():
+ new_prefix = f"{prefix}.{prop_name}" if prefix else prop_name
+ paths.add(new_prefix)
+ paths.update(self._extract_field_paths(prop_schema, new_prefix))
+
+ if schema.get("type") == "array" and "items" in schema:
+ if prefix:
+ paths.add(prefix)
+
+ return paths
+
def save(self, *args, **kwargs):
self.instance.api_definition = self.cleaned_data["api_definition"]
return super().save(*args, **kwargs)
diff --git a/src/servala/core/migrations/0012_remove_advanced_fields.py b/src/servala/core/migrations/0012_remove_advanced_fields.py
new file mode 100644
index 0000000..7d0fecd
--- /dev/null
+++ b/src/servala/core/migrations/0012_remove_advanced_fields.py
@@ -0,0 +1,32 @@
+# Generated by Django 5.2.7 on 2025-10-31 10:40
+
+import django.core.validators
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("core", "0012_convert_user_info_to_array"),
+ ]
+
+ operations = [
+ migrations.RemoveField(
+ model_name="servicedefinition",
+ name="advanced_fields",
+ ),
+ migrations.AlterField(
+ model_name="organization",
+ name="name",
+ field=models.CharField(
+ max_length=32,
+ validators=[
+ django.core.validators.RegexValidator(
+ message="Organization name can only contain letters, numbers, and spaces.",
+ regex="^[A-Za-z0-9\\s]+$",
+ )
+ ],
+ verbose_name="Name",
+ ),
+ ),
+ ]
diff --git a/src/servala/core/migrations/0013_add_form_config.py b/src/servala/core/migrations/0013_add_form_config.py
new file mode 100644
index 0000000..2819a6c
--- /dev/null
+++ b/src/servala/core/migrations/0013_add_form_config.py
@@ -0,0 +1,27 @@
+# Generated by Django 5.2.7 on 2025-10-31 10:47
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("core", "0012_remove_advanced_fields"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="servicedefinition",
+ name="form_config",
+ field=models.JSONField(
+ blank=True,
+ help_text=(
+ "Optional custom form configuration. When provided, this configuration will "
+ "be used to render the service form instead of auto-generating it from the OpenAPI spec. "
+ 'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}'
+ ),
+ null=True,
+ verbose_name="Form Configuration",
+ ),
+ ),
+ ]
diff --git a/src/servala/core/models/service.py b/src/servala/core/models/service.py
index 3af8c89..1535703 100644
--- a/src/servala/core/models/service.py
+++ b/src/servala/core/models/service.py
@@ -360,15 +360,15 @@ class ServiceDefinition(ServalaModelMixin, models.Model):
null=True,
blank=True,
)
- advanced_fields = models.JSONField(
- verbose_name=_("Advanced fields"),
+ form_config = models.JSONField(
+ verbose_name=_("Form Configuration"),
help_text=_(
- "Array of field names that should be hidden behind an 'Advanced' toggle. "
- "Use dot notation (e.g., ['spec.parameters.monitoring.enabled', 'spec.parameters.backup.schedule'])"
+ "Optional custom form configuration. When provided, this configuration will be used "
+ "to render the service form instead of auto-generating it from the OpenAPI spec. "
+ 'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}'
),
null=True,
blank=True,
- default=list,
)
service = models.ForeignKey(
to="Service",
@@ -510,9 +510,22 @@ class ControlPlaneCRD(ServalaModelMixin, models.Model):
if not self.django_model:
return
- advanced_fields = self.service_definition.advanced_fields or []
- return generate_model_form_class(
- self.django_model, advanced_fields=advanced_fields
+ return generate_model_form_class(self.django_model)
+
+ @cached_property
+ def custom_model_form_class(self):
+ from servala.core.crd import generate_custom_form_class
+
+ if not self.django_model:
+ return
+ if not (
+ self.service_definition
+ and self.service_definition.form_config
+ and self.service_definition.form_config.get("fieldsets")
+ ):
+ return
+ return generate_custom_form_class(
+ self.service_definition.form_config, self.django_model
)
@@ -878,7 +891,6 @@ class ServiceInstance(ServalaModelMixin, models.Model):
return
return self.context.django_model(
name=self.name,
- organization=self.organization,
context=self.context,
spec=self.spec,
# We pass -1 as ID in order to make it clear that a) this object exists (remotely),
diff --git a/src/servala/core/schemas/form_config_schema.json b/src/servala/core/schemas/form_config_schema.json
new file mode 100644
index 0000000..3b01b3f
--- /dev/null
+++ b/src/servala/core/schemas/form_config_schema.json
@@ -0,0 +1,107 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Service Definition Form Configuration Schema",
+ "description": "Schema for custom form configuration in ServiceDefinition",
+ "type": "object",
+ "required": ["fieldsets"],
+ "properties": {
+ "fieldsets": {
+ "type": "array",
+ "description": "Array of fieldset objects defining form sections",
+ "minItems": 1,
+ "items": {
+ "type": "object",
+ "required": ["fields"],
+ "properties": {
+ "title": {
+ "type": "string",
+ "description": "Optional title for the fieldset/tab"
+ },
+ "fields": {
+ "type": "array",
+ "description": "Array of field definitions in this fieldset",
+ "minItems": 1,
+ "items": {
+ "type": "object",
+ "required": ["controlplane_field_mapping"],
+ "properties": {
+ "type": {
+ "type": "string",
+ "description": "Field type",
+ "enum": ["", "text", "email", "textarea", "number", "choice", "checkbox", "array"]
+ },
+ "label": {
+ "type": "string",
+ "description": "Human-readable field label"
+ },
+ "help_text": {
+ "type": "string",
+ "description": "Optional help text displayed below the field"
+ },
+ "required": {
+ "type": "boolean",
+ "description": "Whether the field is required",
+ "default": false
+ },
+ "controlplane_field_mapping": {
+ "type": "string",
+ "description": "Dot-notation path mapping to Kubernetes spec field (e.g., 'spec.parameters.service.fqdn')"
+ },
+ "max_length": {
+ "type": ["integer", "null"],
+ "description": "Maximum length for text/textarea fields",
+ "minimum": 1
+ },
+ "rows": {
+ "type": ["integer", "null"],
+ "description": "Number of rows for textarea fields",
+ "minimum": 1
+ },
+ "min_value": {
+ "type": ["number", "null"],
+ "description": "Minimum value for number fields"
+ },
+ "max_value": {
+ "type": ["number", "null"],
+ "description": "Maximum value for number fields"
+ },
+ "choices": {
+ "type": "array",
+ "description": "Array of [value, label] pairs for choice fields",
+ "items": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ }
+ },
+ "min_values": {
+ "type": ["integer", "null"],
+ "description": "Minimum number of values for array fields",
+ "minimum": 0
+ },
+ "max_values": {
+ "type": ["integer", "null"],
+ "description": "Maximum number of values for array fields",
+ "minimum": 1
+ },
+ "validators": {
+ "type": "array",
+ "description": "Array of validator names (for future use)",
+ "items": {
+ "type": "string",
+ "enum": ["email", "fqdn", "url", "ipv4", "ipv6"]
+ }
+ },
+ "default_value": {
+ "type": "string",
+ "description": "Default value for the field when creating new instances"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/servala/frontend/forms/organization.py b/src/servala/frontend/forms/organization.py
index 86ba0ab..45e7b11 100644
--- a/src/servala/frontend/forms/organization.py
+++ b/src/servala/frontend/forms/organization.py
@@ -8,7 +8,6 @@ from servala.core.models import Organization, OrganizationInvitation, Organizati
from servala.core.odoo import get_invoice_addresses, get_odoo_countries
from servala.frontend.forms.mixins import HtmxMixin
-
ORG_NAME_PATTERN = r"[\w\s\-.,&'()+]+"
diff --git a/src/servala/frontend/templates/account/login.html b/src/servala/frontend/templates/account/login.html
index f4ca590..e7576b0 100644
--- a/src/servala/frontend/templates/account/login.html
+++ b/src/servala/frontend/templates/account/login.html
@@ -26,12 +26,14 @@
{% translate "Ready to get started?" %}
- {% translate "Sign in to access your managed service instances and the Servala service catalog" %}
+ {% translate "Sign in to your account or create a new one to access your managed service instances and the Servala service catalog" %}
{% for provider in socialaccount_providers %}
{% provider_login_url provider process=process scope=scope auth_params=auth_params as href %}
-
{% endfor %}
diff --git a/src/servala/frontend/templates/frontend/forms/dynamic_array.html b/src/servala/frontend/templates/frontend/forms/dynamic_array.html
index 9d61825..00c23b0 100644
--- a/src/servala/frontend/templates/frontend/forms/dynamic_array.html
+++ b/src/servala/frontend/templates/frontend/forms/dynamic_array.html
@@ -1,5 +1,5 @@
{% endif %}
{% for text in field.errors %}
{{ text }}
{% endfor %}
- {% if field.help_text %}
+ {% if field.help_text and not field.is_hidden and not field.field.widget.input_type == "hidden" %}
{{ field.help_text|safe }}
{% endif %}
diff --git a/src/servala/frontend/templates/frontend/organizations/service_instance_update.html b/src/servala/frontend/templates/frontend/organizations/service_instance_update.html
index 51a9213..74259e6 100644
--- a/src/servala/frontend/templates/frontend/organizations/service_instance_update.html
+++ b/src/servala/frontend/templates/frontend/organizations/service_instance_update.html
@@ -22,7 +22,7 @@
{% translate "Oops! Something went wrong with the service form generation. Please try again later." %}
{% else %}
- {% include "includes/tabbed_fieldset_form.html" with form=form %}
+ {% include "includes/tabbed_fieldset_form.html" with form=custom_form expert_form=form %}
{% endif %}
diff --git a/src/servala/frontend/templates/frontend/organizations/service_offering_detail.html b/src/servala/frontend/templates/frontend/organizations/service_offering_detail.html
index 3305328..927c6e3 100644
--- a/src/servala/frontend/templates/frontend/organizations/service_offering_detail.html
+++ b/src/servala/frontend/templates/frontend/organizations/service_offering_detail.html
@@ -17,7 +17,7 @@
{% endif %}
{% endpartialdef %}
{% partialdef service-form %}
-{% if service_form %}
+{% if service_form or custom_service_form %}
@@ -26,7 +26,7 @@
{% translate "Oops! Something went wrong with the service form generation. Please try again later." %}
{% else %}
- {% include "includes/tabbed_fieldset_form.html" with form=service_form %}
+ {% include "includes/tabbed_fieldset_form.html" with form=custom_service_form expert_form=service_form %}
{% endif %}
@@ -42,7 +42,9 @@
{% translate "Service Unavailable" %}
-
{% translate "We currently cannot offer this service. Please check back later or contact support for more information." %}
+
+ {% translate "We currently cannot offer this service. Please check back later or contact support for more information." %}
+
diff --git a/src/servala/frontend/templates/includes/control_plane_user_info.html b/src/servala/frontend/templates/includes/control_plane_user_info.html
index a3a27f5..fdcc995 100644
--- a/src/servala/frontend/templates/includes/control_plane_user_info.html
+++ b/src/servala/frontend/templates/includes/control_plane_user_info.html
@@ -4,9 +4,7 @@
{% for info in control_plane.user_info %}
- {% for fieldset in form.get_fieldsets %}
- {% if not fieldset.hidden %}
-
-
-
- {% endif %}
- {% endfor %}
-
-
- {% for fieldset in form.get_fieldsets %}
-
- {% for field in fieldset.fields %}
- {% with field=form|get_field:field %}{{ field.as_field_group }}{% endwith %}
- {% endfor %}
- {% for subfieldset in fieldset.fieldsets %}
- {% if subfieldset.fields %}
-
-
{{ subfieldset.title }}
- {% for field in subfieldset.fields %}
- {% with field=form|get_field:field %}{{ field.as_field_group }}{% endwith %}
- {% endfor %}
-
+
+ {% if form and form.context %}{{ form.context }}{% endif %}
+ {% if form and form.get_fieldsets|length == 1 %}
+ {# Single fieldset - render without tabs #}
+ {% for fieldset in form.get_fieldsets %}
+
+ {% for field in fieldset.fields %}
+ {% with field=form|get_field:field %}{{ field.as_field_group }}{% endwith %}
+ {% endfor %}
+ {% for subfieldset in fieldset.fieldsets %}
+ {% if subfieldset.fields %}
+
+
{{ subfieldset.title }}
+ {% for field in subfieldset.fields %}
+ {% with field=form|get_field:field %}{{ field.as_field_group }}{% endwith %}
+ {% endfor %}
+
+ {% endif %}
+ {% endfor %}
+
+ {% endfor %}
+ {% elif form %}
+ {# Multiple fieldsets or auto-generated form - render with tabs #}
+
+ {% for fieldset in form.get_fieldsets %}
+ {% if not fieldset.hidden %}
+
+
+
{% endif %}
{% endfor %}
+
+
+ {% for fieldset in form.get_fieldsets %}
+
+ {% for field in fieldset.fields %}
+ {% with field=form|get_field:field %}{{ field.as_field_group }}{% endwith %}
+ {% endfor %}
+ {% for subfieldset in fieldset.fieldsets %}
+ {% if subfieldset.fields %}
+
+
{{ subfieldset.title }}
+ {% for field in subfieldset.fields %}
+ {% with field=form|get_field:field %}{{ field.as_field_group }}{% endwith %}
+ {% endfor %}
+
+ {% endif %}
+ {% endfor %}
+
+ {% endfor %}
- {% endfor %}
+ {% endif %}
+ {% if expert_form %}
+
+ {% if expert_form and expert_form.context %}{{ expert_form.context }}{% endif %}
+
+ {% for fieldset in expert_form.get_fieldsets %}
+ {% if not fieldset.hidden %}
+
+
+
+ {% endif %}
+ {% endfor %}
+
+
+ {% for fieldset in expert_form.get_fieldsets %}
+
+ {% for field in fieldset.fields %}
+ {% with field=expert_form|get_field:field %}{{ field.as_field_group }}{% endwith %}
+ {% endfor %}
+ {% for subfieldset in fieldset.fieldsets %}
+ {% if subfieldset.fields %}
+
+
{{ subfieldset.title }}
+ {% for field in subfieldset.fields %}
+ {% with field=expert_form|get_field:field %}{{ field.as_field_group }}{% endwith %}
+ {% endfor %}
+
+ {% endif %}
+ {% endfor %}
+
+ {% endfor %}
+
+
+ {% endif %}
+ {% if form %}
+
+ {% endif %}
-
+ {# browser form validation fails when there are fields missing/invalid that are hidden #}
+