diff --git a/src/servala/core/admin.py b/src/servala/core/admin.py index 87da376..fb64a2b 100644 --- a/src/servala/core/admin.py +++ b/src/servala/core/admin.py @@ -1,3 +1,6 @@ +import json +from pathlib import Path + from django.contrib import admin, messages from django.utils.translation import gettext_lazy as _ from django_jsonform.widgets import JSONFormWidget @@ -313,9 +316,9 @@ class ServiceDefinitionAdmin(admin.ModelAdmin): ( _("Form Configuration"), { - "fields": ("advanced_fields",), + "fields": ("form_config",), "description": _( - "Configure which fields should be hidden behind an 'Advanced' toggle in the form" + "Optional custom form configuration. When provided, this will be used instead of auto-generating the form from the OpenAPI spec." ), }, ), @@ -323,19 +326,13 @@ class ServiceDefinitionAdmin(admin.ModelAdmin): def get_form(self, request, obj=None, **kwargs): form = super().get_form(request, obj, **kwargs) - # JSON schema for advanced_fields field - advanced_fields_schema = { - "type": "array", - "title": "Advanced Fields", - "items": { - "type": "string", - "title": "Field Name", - "description": "Field name in dot notation (e.g., spec.parameters.monitoring.enabled)", - }, - } - if "advanced_fields" in form.base_fields: - form.base_fields["advanced_fields"].widget = JSONFormWidget( - schema=advanced_fields_schema + schema_path = Path(__file__).parent / "schemas" / "form_config_schema.json" + with open(schema_path) as f: + form_config_schema = json.load(f) + + if "form_config" in form.base_fields: + form.base_fields["form_config"].widget = JSONFormWidget( + schema=form_config_schema ) return form diff --git a/src/servala/core/crd.py b/src/servala/core/crd.py deleted file mode 100644 index fe8edbb..0000000 --- a/src/servala/core/crd.py +++ /dev/null @@ -1,557 +0,0 @@ -import re - -from django import forms -from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator -from django.db import models -from django.forms.models import ModelForm, ModelFormMetaclass -from django.utils.translation import gettext_lazy as _ - -from servala.core.models import ServiceInstance - - -class CRDModel(models.Model): - """Base class for all virtual CRD models""" - - def __init__(self, **kwargs): - if spec := kwargs.pop("spec", None): - kwargs.update(unnest_data({"spec": spec})) - super().__init__(**kwargs) - - class Meta: - abstract = True - - -def duplicate_field(field_name, model): - # Get the field from the model - field = model._meta.get_field(field_name) - - # Create a new field with the same attributes - new_field = type(field).__new__(type(field)) - new_field.__dict__.update(field.__dict__) - - # Ensure the field is not linked to the original model - new_field.model = None - new_field.auto_created = False - - return new_field - - -def generate_django_model(schema, group, version, kind): - """ - Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema. - """ - # We always need these three fields to know our own name and our full namespace - model_fields = {"__module__": "crd_models"} - for field_name in ("name", "organization", "context"): - model_fields[field_name] = duplicate_field(field_name, ServiceInstance) - - # All other fields are generated from the schema, except for the - # resourceRef object - spec = schema["properties"].get("spec") or {} - spec["properties"].pop("resourceRef", None) - model_fields.update(build_object_fields(spec, "spec", parent_required=False)) - - # Store the original schema on the model class - model_fields["SCHEMA"] = schema - - meta_class = type("Meta", (), {"app_label": "crd_models"}) - model_fields["Meta"] = meta_class - - # create the model class - model_name = kind - model_class = type(model_name, (CRDModel,), model_fields) - return model_class - - -def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False): - required_fields = schema.get("required") or [] - properties = schema.get("properties") or {} - fields = {} - - for field_name, field_schema in properties.items(): - is_required = field_name in required_fields or parent_required - full_name = f"{name}.{field_name}" - result = get_django_field( - field_schema, - is_required, - field_name, - full_name, - verbose_name_prefix=verbose_name_prefix, - ) - if isinstance(result, dict): - fields.update(result) - else: - fields[full_name] = result - return fields - - -def deslugify(title): - """ - Convert camelCase, PascalCase, or snake_case to human-readable title. - Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters). - """ - ACRONYMS = { - # Database systems - "SQL": "SQL", - "MYSQL": "MySQL", - "POSTGRESQL": "PostgreSQL", - "MARIADB": "MariaDB", - "MSSQL": "MSSQL", - "MONGODB": "MongoDB", - "REDIS": "Redis", - # Protocols - "HTTP": "HTTP", - "HTTPS": "HTTPS", - "FTP": "FTP", - "SFTP": "SFTP", - "SSH": "SSH", - "TLS": "TLS", - "SSL": "SSL", - # APIs - "API": "API", - "REST": "REST", - "GRPC": "gRPC", - "GRAPHQL": "GraphQL", - # Networking - "URL": "URL", - "URI": "URI", - "FQDN": "FQDN", - "DNS": "DNS", - "IP": "IP", - "TCP": "TCP", - "UDP": "UDP", - # Data formats - "JSON": "JSON", - "XML": "XML", - "YAML": "YAML", - "CSV": "CSV", - "HTML": "HTML", - "CSS": "CSS", - # Hardware - "CPU": "CPU", - "RAM": "RAM", - "GPU": "GPU", - "SSD": "SSD", - "HDD": "HDD", - # Identifiers - "ID": "ID", - "UUID": "UUID", - "GUID": "GUID", - "ARN": "ARN", - # Cloud providers - "AWS": "AWS", - "GCP": "GCP", - "AZURE": "Azure", - "IBM": "IBM", - # Kubernetes/Cloud - "DB": "DB", - "PVC": "PVC", - "PV": "PV", - "VPN": "VPN", - # Auth - "OS": "OS", - "LDAP": "LDAP", - "SAML": "SAML", - "OAUTH": "OAuth", - "JWT": "JWT", - # AWS Services - "S3": "S3", - "EC2": "EC2", - "RDS": "RDS", - "EBS": "EBS", - "IAM": "IAM", - } - - if "_" in title: - # Handle snake_case - title = title.replace("_", " ") - words = title.split() - else: - # Handle camelCase/PascalCase with smart splitting - # This regex splits on: - # - Transition from lowercase to uppercase (camelCase) - # - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters) - words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title) - - # Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL) - merged_words = [] - i = 0 - while i < len(words): - if i < len(words) - 1: - # Check if current word + next word form a known acronym - combined = (words[i] + words[i + 1]).upper() - if combined in ACRONYMS: - merged_words.append(combined) - i += 2 - continue - merged_words.append(words[i]) - i += 1 - - # Capitalize each word, using proper casing for known acronyms - result = [] - for word in merged_words: - word_upper = word.upper() - if word_upper in ACRONYMS: - result.append(ACRONYMS[word_upper]) - else: - result.append(word.capitalize()) - - return " ".join(result) - - -def get_django_field( - field_schema, is_required, field_name, full_name, verbose_name_prefix=None -): - field_type = field_schema.get("type") or "string" - format = field_schema.get("format") - verbose_name_prefix = verbose_name_prefix or "" - verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip() - - # Pass down the requirement status from parent to child fields - kwargs = { - "blank": not is_required, # All fields are optional by default - "null": not is_required, - "help_text": field_schema.get("description"), - "validators": [], - "verbose_name": verbose_name, - "default": field_schema.get("default"), - } - - if minimum := field_schema.get("minimum"): - kwargs["validators"].append(MinValueValidator(minimum)) - if maximum := field_schema.get("maximum"): - kwargs["validators"].append(MaxValueValidator(maximum)) - - if field_type == "string": - if format == "date-time": - return models.DateTimeField(**kwargs) - elif format == "date": - return models.DateField(**kwargs) - else: - max_length = field_schema.get("max_length") or 255 - if pattern := field_schema.get("pattern"): - kwargs["validators"].append(RegexValidator(regex=pattern)) - if choices := field_schema.get("enum"): - kwargs["choices"] = ((choice, choice) for choice in choices) - return models.CharField(max_length=max_length, **kwargs) - elif field_type == "integer": - return models.IntegerField(**kwargs) - elif field_type == "number": - return models.FloatField(**kwargs) - elif field_type == "boolean": - return models.BooleanField(**kwargs) - elif field_type == "object": - # Here we pass down the requirement status to nested objects - return build_object_fields( - field_schema, - full_name, - verbose_name_prefix=f"{verbose_name}:", - parent_required=is_required, - ) - elif field_type == "array": - kwargs["help_text"] = field_schema.get("description") or _("List of values") - from servala.frontend.forms.widgets import DynamicArrayField - - field = models.JSONField(**kwargs) - formfield_kwargs = { - "label": field.verbose_name, - "required": not field.blank, - } - - array_validation = {} - if min_items := field_schema.get("min_items"): - array_validation["min_items"] = min_items - if max_items := field_schema.get("max_items"): - array_validation["max_items"] = max_items - if unique_items := field_schema.get("unique_items"): - array_validation["unique_items"] = unique_items - if items_schema := field_schema.get("items"): - array_validation["items_schema"] = items_schema - if array_validation: - formfield_kwargs["array_validation"] = array_validation - - field.formfield = lambda: DynamicArrayField(**formfield_kwargs) - - return field - return models.CharField(max_length=255, **kwargs) - - -def unnest_data(data): - result = {} - - def _flatten_dict(d, parent_key=""): - for key, value in d.items(): - new_key = f"{parent_key}.{key}" if parent_key else key - if isinstance(value, dict): - _flatten_dict(value, new_key) - else: - result[new_key] = value - - _flatten_dict(data) - return result - - -class CrdModelFormMixin: - HIDDEN_FIELDS = [ - "spec.compositeDeletePolicy", - "spec.compositionRef", - "spec.compositionRevisionRef", - "spec.compositionRevisionSelector", - "spec.compositionSelector", - "spec.compositionUpdatePolicy", - "spec.parameters.monitoring.alertmanagerConfigRef", - "spec.parameters.monitoring.alertmanagerConfigSecretRef", - "spec.parameters.network.serviceType", - "spec.parameters.scheduling", - "spec.parameters.security", - "spec.parameters.size.cpu", - "spec.parameters.size.memory", - "spec.parameters.size.requests.cpu", - "spec.parameters.size.requests.memory", - "spec.publishConnectionDetailsTo", - "spec.resourceRef", - "spec.writeConnectionSecretToRef", - ] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.schema = self._meta.model.SCHEMA - - for field in ("organization", "context"): - self.fields[field].widget = forms.HiddenInput() - - for name, field in self.fields.items(): - if name in self.HIDDEN_FIELDS or any( - name.startswith(f) for f in self.HIDDEN_FIELDS - ): - field.widget = forms.HiddenInput() - field.required = False - - # Mark advanced fields with a CSS class and data attribute - for name, field in self.fields.items(): - if self.is_field_advanced(name): - field.widget.attrs.update( - { - "class": ( - field.widget.attrs.get("class", "") + " advanced-field" - ).strip(), - "data-advanced": "true", - } - ) - - if self.instance and self.instance.pk: - self.fields["name"].disabled = True - self.fields["name"].help_text = _("Name cannot be changed after creation.") - self.fields["name"].widget = forms.HiddenInput() - - def strip_title(self, field_name, label): - field = self.fields[field_name] - if field and field.label and (position := field.label.find(label)) != -1: - field.label = field.label[position + len(label) :] - - def has_mandatory_fields(self, field_list): - for field_name in field_list: - if field_name in self.fields and self.fields[field_name].required: - return True - return False - - def is_field_advanced(self, field_name): - advanced_fields = getattr(self, "ADVANCED_FIELDS", []) - return field_name in advanced_fields or any( - field_name.startswith(f"{af}.") for af in advanced_fields - ) - - def are_all_fields_advanced(self, field_list): - if not field_list: - return False - return all(self.is_field_advanced(field_name) for field_name in field_list) - - def get_fieldsets(self): - fieldsets = [] - - # General fieldset for non-spec fields - general_fields = [ - field_name - for field_name in self.fields.keys() - if not field_name.startswith("spec.") - ] - if general_fields: - fieldset = { - "title": "General", - "fields": general_fields, - "fieldsets": [], - "has_mandatory": self.has_mandatory_fields(general_fields), - "is_advanced": self.are_all_fields_advanced(general_fields), - } - if all( - [ - isinstance(self.fields[field].widget, forms.HiddenInput) - for field in general_fields - ] - ): - fieldset["hidden"] = True - fieldsets.append(fieldset) - - # Process spec fields - others = [] - top_level_fieldsets = {} - hidden_spec_fields = [] - - for field_name in self.fields: - if field_name.startswith("spec."): - if isinstance(self.fields[field_name].widget, forms.HiddenInput): - hidden_spec_fields.append(field_name) - continue - - parts = field_name.split(".") - if len(parts) == 2: - # Top-level spec field - others.append(field_name) - elif len(parts) == 3: - # Second-level field - promote to top-level fieldset - fieldset_key = f"{parts[1]}.{parts[2]}" - if not top_level_fieldsets.get(fieldset_key): - top_level_fieldsets[fieldset_key] = { - "fields": [], - "fieldsets": {}, - "title": f"{deslugify(parts[2])}", - } - top_level_fieldsets[fieldset_key]["fields"].append(field_name) - else: - # Third-level and deeper - create nested fieldsets - fieldset_key = f"{parts[1]}.{parts[2]}" - if not top_level_fieldsets.get(fieldset_key): - top_level_fieldsets[fieldset_key] = { - "fields": [], - "fieldsets": {}, - "title": f"{deslugify(parts[2])}", - } - - sub_key = parts[3] - if not top_level_fieldsets[fieldset_key]["fieldsets"].get(sub_key): - top_level_fieldsets[fieldset_key]["fieldsets"][sub_key] = { - "title": deslugify(sub_key), - "fields": [], - } - top_level_fieldsets[fieldset_key]["fieldsets"][sub_key][ - "fields" - ].append(field_name) - - for fieldset in top_level_fieldsets.values(): - nested_fieldsets_list = [] - for sub_fieldset in fieldset["fieldsets"].values(): - if len(sub_fieldset["fields"]) == 1: - # If nested fieldset has only one field, move it to parent - fieldset["fields"].append(sub_fieldset["fields"][0]) - else: - # Keep as nested fieldset with proper title stripping - title = f"{fieldset['title']}: {sub_fieldset['title']}: " - for field in sub_fieldset["fields"]: - self.strip_title(field, title) - sub_fieldset["is_advanced"] = self.are_all_fields_advanced( - sub_fieldset["fields"] - ) - nested_fieldsets_list.append(sub_fieldset) - - fieldset["fieldsets"] = nested_fieldsets_list - total_fields = len(fieldset["fields"]) + len(nested_fieldsets_list) - if total_fields == 1 and len(fieldset["fields"]) == 1: - others.append(fieldset["fields"][0]) - else: - title = f"{fieldset['title']}: " - for field in fieldset["fields"]: - self.strip_title(field, title) - - all_fields = fieldset["fields"][:] - for sub_fieldset in nested_fieldsets_list: - all_fields.extend(sub_fieldset["fields"]) - fieldset["has_mandatory"] = self.has_mandatory_fields(all_fields) - - fieldset["is_advanced"] = self.are_all_fields_advanced(all_fields) - - fieldsets.append(fieldset) - - # Add 'others' tab if there are any fields - if others: - fieldsets.append( - { - "title": "Others", - "fields": others, - "fieldsets": [], - "has_mandatory": self.has_mandatory_fields(others), - "is_advanced": self.are_all_fields_advanced(others), - } - ) - - if hidden_spec_fields: - fieldsets.append( - { - "title": "Advanced", - "fields": hidden_spec_fields, - "fieldsets": [], - "hidden": True, - "has_mandatory": self.has_mandatory_fields(hidden_spec_fields), - } - ) - - fieldsets.sort(key=lambda f: f.get("hidden", False)) - - return fieldsets - - def get_nested_data(self): - """ - Builds the original nested JSON structure from flat form data. - Form fields are named with dot notation (e.g., 'spec.replicas') - """ - result = {} - - for field_name, value in self.cleaned_data.items(): - if value is None or value == "": - continue - - parts = field_name.split(".") - current = result - - # Navigate through the nested structure - for i, part in enumerate(parts): - if i == len(parts) - 1: - # Last part, set the value - current[part] = value - else: - # Create nested dict if it doesn't exist - if part not in current: - current[part] = {} - current = current[part] - return result - - def clean(self): - cleaned_data = super().clean() - self.validate_nested_data( - self.get_nested_data().get("spec", {}), self.schema["properties"]["spec"] - ) - return cleaned_data - - def validate_nested_data(self, data, schema): - """Validate data against the provided OpenAPI v3 schema""" - # TODO: actually validate the nested data. - # TODO: get jsonschema to give us a path to the failing field rather than just an error message, - # then add the validation error to that field (self.add_error()) - # try: - # validate(instance=data, schema=schema) - # except Exception as e: - # raise forms.ValidationError(f"Validation error: {e.message}") - pass - - -def generate_model_form_class(model, advanced_fields=None): - meta_attrs = { - "model": model, - "fields": "__all__", - } - fields = { - "Meta": type("Meta", (object,), meta_attrs), - "__module__": "crd_models", - "ADVANCED_FIELDS": advanced_fields or [], - } - class_name = f"{model.__name__}ModelForm" - return ModelFormMetaclass(class_name, (CrdModelFormMixin, ModelForm), fields) diff --git a/src/servala/core/crd/__init__.py b/src/servala/core/crd/__init__.py new file mode 100644 index 0000000..3f10abb --- /dev/null +++ b/src/servala/core/crd/__init__.py @@ -0,0 +1,31 @@ +from servala.core.crd.forms import ( + CrdModelFormMixin, + CustomFormMixin, + FormGeneratorMixin, + generate_custom_form_class, + generate_model_form_class, +) +from servala.core.crd.models import ( + CRDModel, + build_object_fields, + duplicate_field, + generate_django_model, + get_django_field, + unnest_data, +) +from servala.core.crd.utils import deslugify + +__all__ = [ + "CrdModelFormMixin", + "CustomFormMixin", + "FormGeneratorMixin", + "generate_django_model", + "generate_model_form_class", + "generate_custom_form_class", + "CRDModel", + "build_object_fields", + "duplicate_field", + "get_django_field", + "unnest_data", + "deslugify", +] diff --git a/src/servala/core/crd/forms.py b/src/servala/core/crd/forms.py new file mode 100644 index 0000000..659684e --- /dev/null +++ b/src/servala/core/crd/forms.py @@ -0,0 +1,454 @@ +from django import forms +from django.core.validators import MaxValueValidator, MinValueValidator +from django.forms.models import ModelForm, ModelFormMetaclass + +from servala.core.crd.utils import deslugify +from servala.core.models import ControlPlaneCRD +from servala.frontend.forms.widgets import DynamicArrayWidget + +# Fields that must be present in every form +MANDATORY_FIELDS = ["name"] + +# Default field configurations - fields that can be included with just a mapping +# to avoid administrators having to duplicate common information +DEFAULT_FIELD_CONFIGS = { + "name": { + "type": "text", + "label": "Instance Name", + "help_text": "Unique name for the new instance", + "required": True, + "max_length": 63, + }, + "spec.parameters.service.fqdn": { + "type": "array", + "label": "FQDNs", + "help_text": "Domain names for accessing this service", + "required": False, + }, +} + + +class FormGeneratorMixin: + """Shared base class for ModelForm classes based on our generated CRD models. + There are two relevant child classes: + - CrdModelFormMixin: For fully auto-generated forms from the spec + - CustomFormMixin: For forms built from form_config settings. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if "context" in self.fields: + self.fields["context"].widget = forms.HiddenInput() + if crd := self.initial.get("context"): + crd = getattr(crd, "pk", crd) # can be int or object + self.fields["context"].queryset = ControlPlaneCRD.objects.filter(pk=crd) + + if self.instance and hasattr(self.instance, "name") and self.instance.name: + if "name" in self.fields: + self.fields["name"].disabled = True + self.fields["name"].widget = forms.HiddenInput() + + def has_mandatory_fields(self, field_list): + for field_name in field_list: + if field_name in self.fields and self.fields[field_name].required: + return True + return False + + +class CrdModelFormMixin(FormGeneratorMixin): + HIDDEN_FIELDS = [ + "spec.compositeDeletePolicy", + "spec.compositionRef", + "spec.compositionRevisionRef", + "spec.compositionRevisionSelector", + "spec.compositionSelector", + "spec.compositionUpdatePolicy", + "spec.parameters.monitoring.alertmanagerConfigRef", + "spec.parameters.monitoring.alertmanagerConfigSecretRef", + "spec.parameters.network.serviceType", + "spec.parameters.scheduling", + "spec.parameters.security", + "spec.parameters.size.cpu", + "spec.parameters.size.memory", + "spec.parameters.size.requests.cpu", + "spec.parameters.size.requests.memory", + "spec.publishConnectionDetailsTo", + "spec.resourceRef", + "spec.writeConnectionSecretToRef", + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.schema = self._meta.model.SCHEMA + + for name, field in self.fields.items(): + if name in self.HIDDEN_FIELDS or any( + name.startswith(f) for f in self.HIDDEN_FIELDS + ): + field.widget = forms.HiddenInput() + field.required = False + + def strip_title(self, field_name, label): + field = self.fields[field_name] + if field and field.label and (position := field.label.find(label)) != -1: + field.label = field.label[position + len(label) :] + + def get_fieldsets(self): + fieldsets = [] + + # General fieldset for non-spec fields + general_fields = [ + field_name + for field_name in self.fields.keys() + if not field_name.startswith("spec.") + ] + if general_fields: + fieldset = { + "title": "General", + "fields": general_fields, + "fieldsets": [], + "has_mandatory": self.has_mandatory_fields(general_fields), + } + if all( + [ + isinstance(self.fields[field].widget, forms.HiddenInput) + for field in general_fields + ] + ): + fieldset["hidden"] = True + fieldsets.append(fieldset) + + # Process spec fields + others = [] + top_level_fieldsets = {} + hidden_spec_fields = [] + + for field_name in self.fields: + if field_name.startswith("spec."): + if isinstance(self.fields[field_name].widget, forms.HiddenInput): + hidden_spec_fields.append(field_name) + continue + + parts = field_name.split(".") + if len(parts) == 2: + # Top-level spec field + others.append(field_name) + elif len(parts) == 3: + # Second-level field - promote to top-level fieldset + fieldset_key = f"{parts[1]}.{parts[2]}" + if not top_level_fieldsets.get(fieldset_key): + top_level_fieldsets[fieldset_key] = { + "fields": [], + "fieldsets": {}, + "title": f"{deslugify(parts[2])}", + } + top_level_fieldsets[fieldset_key]["fields"].append(field_name) + else: + # Third-level and deeper - create nested fieldsets + fieldset_key = f"{parts[1]}.{parts[2]}" + if not top_level_fieldsets.get(fieldset_key): + top_level_fieldsets[fieldset_key] = { + "fields": [], + "fieldsets": {}, + "title": f"{deslugify(parts[2])}", + } + + sub_key = parts[3] + if not top_level_fieldsets[fieldset_key]["fieldsets"].get(sub_key): + top_level_fieldsets[fieldset_key]["fieldsets"][sub_key] = { + "title": deslugify(sub_key), + "fields": [], + } + top_level_fieldsets[fieldset_key]["fieldsets"][sub_key][ + "fields" + ].append(field_name) + + for fieldset in top_level_fieldsets.values(): + nested_fieldsets_list = [] + for sub_fieldset in fieldset["fieldsets"].values(): + if len(sub_fieldset["fields"]) == 1: + # If nested fieldset has only one field, move it to parent + fieldset["fields"].append(sub_fieldset["fields"][0]) + else: + # Keep as nested fieldset with proper title stripping + title = f"{fieldset['title']}: {sub_fieldset['title']}: " + for field in sub_fieldset["fields"]: + self.strip_title(field, title) + nested_fieldsets_list.append(sub_fieldset) + + fieldset["fieldsets"] = nested_fieldsets_list + total_fields = len(fieldset["fields"]) + len(nested_fieldsets_list) + if total_fields == 1 and len(fieldset["fields"]) == 1: + others.append(fieldset["fields"][0]) + else: + title = f"{fieldset['title']}: " + for field in fieldset["fields"]: + self.strip_title(field, title) + + all_fields = fieldset["fields"][:] + for sub_fieldset in nested_fieldsets_list: + all_fields.extend(sub_fieldset["fields"]) + fieldset["has_mandatory"] = self.has_mandatory_fields(all_fields) + + fieldsets.append(fieldset) + + # Add 'others' tab if there are any fields + if others: + fieldsets.append( + { + "title": "Others", + "fields": others, + "fieldsets": [], + "has_mandatory": self.has_mandatory_fields(others), + } + ) + + if hidden_spec_fields: + fieldsets.append( + { + "title": "Advanced", + "fields": hidden_spec_fields, + "fieldsets": [], + "hidden": True, + "has_mandatory": self.has_mandatory_fields(hidden_spec_fields), + } + ) + + fieldsets.sort(key=lambda f: f.get("hidden", False)) + + return fieldsets + + def get_nested_data(self): + """ + Builds the original nested JSON structure from flat form data. + Form fields are named with dot notation (e.g., 'spec.replicas') + """ + result = {} + + for field_name, value in self.cleaned_data.items(): + if value is None or value == "": + continue + + parts = field_name.split(".") + current = result + + # Navigate through the nested structure + for i, part in enumerate(parts): + if i == len(parts) - 1: + # Last part, set the value + current[part] = value + else: + # Create nested dict if it doesn't exist + if part not in current: + current[part] = {} + current = current[part] + return result + + def clean(self): + cleaned_data = super().clean() + self.validate_nested_data( + self.get_nested_data().get("spec", {}), self.schema["properties"]["spec"] + ) + return cleaned_data + + def validate_nested_data(self, data, schema): + """Validate data against the provided OpenAPI v3 schema""" + # TODO: actually validate the nested data. + # TODO: get jsonschema to give us a path to the failing field rather than just an error message, + # then add the validation error to that field (self.add_error()) + # try: + # validate(instance=data, schema=schema) + # except Exception as e: + # raise forms.ValidationError(f"Validation error: {e.message}") + pass + + +def generate_model_form_class(model): + meta_attrs = { + "model": model, + "fields": "__all__", + } + fields = { + "Meta": type("Meta", (object,), meta_attrs), + "__module__": "crd_models", + } + class_name = f"{model.__name__}ModelForm" + return ModelFormMetaclass(class_name, (CrdModelFormMixin, ModelForm), fields) + + +class CustomFormMixin(FormGeneratorMixin): + """ + Base for custom (user-friendly) forms generated from ServiceDefinition.form_config. + """ + + IS_CUSTOM_FORM = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._apply_field_config() + if ( + self.instance + and hasattr(self.instance, "name") + and self.instance.name + and "name" in self.fields + ): + self.fields["name"].widget = forms.HiddenInput() + self.fields["name"].disabled = True + self.fields.pop("context", None) + + def _apply_field_config(self): + for fieldset in self.form_config.get("fieldsets", []): + for fc in fieldset.get("fields", []): + field_name = fc.get("controlplane_field_mapping") + + if field_name not in self.fields: + continue + + field_config = fc.copy() + # Merge with defaults if field has default config + if field_name in DEFAULT_FIELD_CONFIGS: + field_config = DEFAULT_FIELD_CONFIGS[field_name].copy() + for key, value in fc.items(): + if value or (value is False): + field_config[key] = value + + field = self.fields[field_name] + field_type = field_config.get("type") + + field.label = field_config.get("label", field_name) + field.help_text = field_config.get("help_text", "") + field.required = field_config.get("required", False) + + if field_type == "textarea": + field.widget = forms.Textarea( + attrs={"rows": field_config.get("rows", 4)} + ) + elif field_type == "array": + field.widget = DynamicArrayWidget() + elif field_type == "choice": + if hasattr(field, "choices") and field.choices: + field._controlplane_choices = list(field.choices) + if custom_choices := field_config.get("choices"): + field.choices = [tuple(choice) for choice in custom_choices] + + if field_type == "number": + min_val = field_config.get("min_value") + max_val = field_config.get("max_value") + + validators = [] + if min_val is not None: + validators.append(MinValueValidator(min_val)) + field.widget.attrs["min"] = min_val + if max_val is not None: + validators.append(MaxValueValidator(max_val)) + field.widget.attrs["max"] = max_val + + if validators: + field.validators.extend(validators) + + if "default_value" in field_config and field.initial is None: + field.initial = field_config["default_value"] + + if field_type in ("text", "textarea") and field_config.get( + "max_length" + ): + field.max_length = field_config.get("max_length") + if hasattr(field.widget, "attrs"): + field.widget.attrs["maxlength"] = field_config.get("max_length") + + field.controlplane_field_mapping = field_name + + def get_fieldsets(self): + fieldsets = [] + for fieldset_config in self.form_config.get("fieldsets", []): + field_names = [ + f["controlplane_field_mapping"] + for f in fieldset_config.get("fields", []) + ] + fieldset = { + "title": fieldset_config.get("title", "General"), + "fields": field_names, + "fieldsets": [], + "has_mandatory": self.has_mandatory_fields(field_names), + } + fieldsets.append(fieldset) + + return fieldsets + + def clean(self): + cleaned_data = super().clean() + + for field_name, field in self.fields.items(): + if hasattr(field, "_controlplane_choices"): + value = cleaned_data.get(field_name) + if value: + valid_values = [choice[0] for choice in field._controlplane_choices] + if value not in valid_values: + self.add_error( + field_name, + forms.ValidationError( + f"'{value}' is not a valid choice. " + f"Must be one of: {valid_values.join(', ')}" + ), + ) + + return cleaned_data + + def get_nested_data(self): + nested = {} + for field_name in self.fields.keys(): + if field_name == "context": + value = self.cleaned_data.get(field_name) + if value is not None: + nested[field_name] = value + continue + + mapping = field_name + value = self.cleaned_data.get(field_name) + parts = mapping.split(".") + current = nested + for part in parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + + current[parts[-1]] = value + + return nested + + +def generate_custom_form_class(form_config, model): + """ + Generate a custom (user-friendly) form class from form_config JSON. + """ + field_list = ["context", "name"] + + for fieldset in form_config.get("fieldsets", []): + for field_config in fieldset.get("fields", []): + field_name = field_config.get("controlplane_field_mapping") + if field_name: + field_list.append(field_name) + + fields = { + "context": forms.ModelChoiceField( + queryset=ControlPlaneCRD.objects.none(), + required=True, + widget=forms.HiddenInput(), + ), + } + + meta_attrs = { + "model": model, + "fields": field_list, + } + + form_fields = { + "Meta": type("Meta", (object,), meta_attrs), + "__module__": "crd_models", + "form_config": form_config, + **fields, + } + + class_name = f"{model.__name__}CustomForm" + return ModelFormMetaclass(class_name, (CustomFormMixin, ModelForm), form_fields) diff --git a/src/servala/core/crd/models.py b/src/servala/core/crd/models.py new file mode 100644 index 0000000..86df97f --- /dev/null +++ b/src/servala/core/crd/models.py @@ -0,0 +1,167 @@ +from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from servala.core.crd.utils import deslugify +from servala.core.models import ServiceInstance +from servala.frontend.forms.widgets import DynamicArrayField + + +class CRDModel(models.Model): + """Base class for all virtual CRD models""" + + def __init__(self, **kwargs): + if spec := kwargs.pop("spec", None): + kwargs.update(unnest_data({"spec": spec})) + super().__init__(**kwargs) + + class Meta: + abstract = True + + +def generate_django_model(schema, group, version, kind): + """ + Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema. + """ + # We always need these three fields to know our own name and our full namespace + model_fields = {"__module__": "crd_models"} + for field_name in ("name", "context"): + model_fields[field_name] = duplicate_field(field_name, ServiceInstance) + + # All other fields are generated from the schema, except for the + # resourceRef object + spec = schema["properties"].get("spec") or {} + spec["properties"].pop("resourceRef", None) + model_fields.update(build_object_fields(spec, "spec", parent_required=False)) + + # Store the original schema on the model class + model_fields["SCHEMA"] = schema + + meta_class = type("Meta", (), {"app_label": "crd_models"}) + model_fields["Meta"] = meta_class + + # create the model class + model_name = kind + model_class = type(model_name, (CRDModel,), model_fields) + return model_class + + +def duplicate_field(field_name, model): + field = model._meta.get_field(field_name) + new_field = type(field).__new__(type(field)) + new_field.__dict__.update(field.__dict__) + new_field.model = None + new_field.auto_created = False + return new_field + + +def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False): + required_fields = schema.get("required") or [] + properties = schema.get("properties") or {} + fields = {} + + for field_name, field_schema in properties.items(): + is_required = field_name in required_fields or parent_required + full_name = f"{name}.{field_name}" + result = get_django_field( + field_schema, + is_required, + field_name, + full_name, + verbose_name_prefix=verbose_name_prefix, + ) + if isinstance(result, dict): + fields.update(result) + else: + fields[full_name] = result + return fields + + +def get_django_field( + field_schema, is_required, field_name, full_name, verbose_name_prefix=None +): + field_type = field_schema.get("type") or "string" + format = field_schema.get("format") + verbose_name_prefix = verbose_name_prefix or "" + verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip() + + # Pass down the requirement status from parent to child fields + kwargs = { + "blank": not is_required, # All fields are optional by default + "null": not is_required, + "help_text": field_schema.get("description"), + "validators": [], + "verbose_name": verbose_name, + "default": field_schema.get("default"), + } + + if minimum := field_schema.get("minimum"): + kwargs["validators"].append(MinValueValidator(minimum)) + if maximum := field_schema.get("maximum"): + kwargs["validators"].append(MaxValueValidator(maximum)) + + if field_type == "string": + if format == "date-time": + return models.DateTimeField(**kwargs) + elif format == "date": + return models.DateField(**kwargs) + else: + max_length = field_schema.get("max_length") or 255 + if pattern := field_schema.get("pattern"): + kwargs["validators"].append(RegexValidator(regex=pattern)) + if choices := field_schema.get("enum"): + kwargs["choices"] = ((choice, choice) for choice in choices) + return models.CharField(max_length=max_length, **kwargs) + elif field_type == "integer": + return models.IntegerField(**kwargs) + elif field_type == "number": + return models.FloatField(**kwargs) + elif field_type == "boolean": + return models.BooleanField(**kwargs) + elif field_type == "object": + # Here we pass down the requirement status to nested objects + return build_object_fields( + field_schema, + full_name, + verbose_name_prefix=f"{verbose_name}:", + parent_required=is_required, + ) + elif field_type == "array": + kwargs["help_text"] = field_schema.get("description") or _("List of values") + field = models.JSONField(**kwargs) + formfield_kwargs = { + "label": field.verbose_name, + "required": not field.blank, + } + + array_validation = {} + if min_items := field_schema.get("min_items"): + array_validation["min_items"] = min_items + if max_items := field_schema.get("max_items"): + array_validation["max_items"] = max_items + if unique_items := field_schema.get("unique_items"): + array_validation["unique_items"] = unique_items + if items_schema := field_schema.get("items"): + array_validation["items_schema"] = items_schema + if array_validation: + formfield_kwargs["array_validation"] = array_validation + + field.formfield = lambda: DynamicArrayField(**formfield_kwargs) + + return field + return models.CharField(max_length=255, **kwargs) + + +def unnest_data(data): + result = {} + + def _flatten_dict(d, parent_key=""): + for key, value in d.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + if isinstance(value, dict): + _flatten_dict(value, new_key) + else: + result[new_key] = value + + _flatten_dict(data) + return result diff --git a/src/servala/core/crd/utils.py b/src/servala/core/crd/utils.py new file mode 100644 index 0000000..a537fd9 --- /dev/null +++ b/src/servala/core/crd/utils.py @@ -0,0 +1,115 @@ +import re + + +def deslugify(title): + """ + Convert camelCase, PascalCase, or snake_case to human-readable title. + Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters). + """ + ACRONYMS = { + # Database systems + "SQL": "SQL", + "MYSQL": "MySQL", + "POSTGRESQL": "PostgreSQL", + "MARIADB": "MariaDB", + "MSSQL": "MSSQL", + "MONGODB": "MongoDB", + "REDIS": "Redis", + # Protocols + "HTTP": "HTTP", + "HTTPS": "HTTPS", + "FTP": "FTP", + "SFTP": "SFTP", + "SSH": "SSH", + "TLS": "TLS", + "SSL": "SSL", + # APIs + "API": "API", + "REST": "REST", + "GRPC": "gRPC", + "GRAPHQL": "GraphQL", + # Networking + "URL": "URL", + "URI": "URI", + "FQDN": "FQDN", + "DNS": "DNS", + "IP": "IP", + "TCP": "TCP", + "UDP": "UDP", + # Data formats + "JSON": "JSON", + "XML": "XML", + "YAML": "YAML", + "CSV": "CSV", + "HTML": "HTML", + "CSS": "CSS", + # Hardware + "CPU": "CPU", + "RAM": "RAM", + "GPU": "GPU", + "SSD": "SSD", + "HDD": "HDD", + # Identifiers + "ID": "ID", + "UUID": "UUID", + "GUID": "GUID", + "ARN": "ARN", + # Cloud providers + "AWS": "AWS", + "GCP": "GCP", + "AZURE": "Azure", + "IBM": "IBM", + # Kubernetes/Cloud + "DB": "DB", + "PVC": "PVC", + "PV": "PV", + "VPN": "VPN", + # Auth + "OS": "OS", + "LDAP": "LDAP", + "SAML": "SAML", + "OAUTH": "OAuth", + "JWT": "JWT", + # AWS Services + "S3": "S3", + "EC2": "EC2", + "RDS": "RDS", + "EBS": "EBS", + "IAM": "IAM", + } + + if "_" in title: + # Handle snake_case + title = title.replace("_", " ") + words = title.split() + else: + # Handle camelCase/PascalCase with smart splitting + # This regex splits on: + # - Transition from lowercase to uppercase (camelCase) + # - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters) + words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title) + + # Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL) + merged_words = [] + i = 0 + while i < len(words): + if i < len(words) - 1: + # Check if current word + next word form a known acronym + combined = (words[i] + words[i + 1]).upper() + if combined in ACRONYMS: + merged_words.append(combined) + i += 2 + continue + merged_words.append(words[i]) + i += 1 + + # Capitalize each word, using proper casing for known acronyms + result = [] + for word in merged_words: + word_upper = word.upper() + if word_upper in ACRONYMS: + result.append(ACRONYMS[word_upper]) + else: + result.append(word.capitalize()) + + return " ".join(result) diff --git a/src/servala/core/forms.py b/src/servala/core/forms.py index 034d1c9..090abba 100644 --- a/src/servala/core/forms.py +++ b/src/servala/core/forms.py @@ -1,7 +1,12 @@ +import json +from pathlib import Path + +import jsonschema from django import forms from django.utils.translation import gettext_lazy as _ from django_jsonform.widgets import JSONFormWidget +from servala.core.crd.forms import DEFAULT_FIELD_CONFIGS, MANDATORY_FIELDS from servala.core.models import ControlPlane, ServiceDefinition CONTROL_PLANE_USER_INFO_SCHEMA = { @@ -96,6 +101,12 @@ class ControlPlaneAdminForm(forms.ModelForm): return super().save(*args, **kwargs) +def fields_empty(fields): + if not fields: + return True + return all(not field.get("controlplane_field_mapping") for field in fields) + + class ServiceDefinitionAdminForm(forms.ModelForm): api_group = forms.CharField( required=False, @@ -124,6 +135,10 @@ class ServiceDefinitionAdminForm(forms.ModelForm): self.fields["api_version"].initial = api_def.get("version", "") self.fields["api_kind"].initial = api_def.get("kind", "") + schema_path = Path(__file__).parent / "schemas" / "form_config_schema.json" + with open(schema_path) as f: + self.form_config_schema = json.load(f) + def clean(self): cleaned_data = super().clean() @@ -151,8 +166,250 @@ class ServiceDefinitionAdminForm(forms.ModelForm): api_def["kind"] = api_kind cleaned_data["api_definition"] = api_def + form_config = cleaned_data.get("form_config") + + # Convert empty form_config to None (no custom form) + if form_config: + if not form_config.get("fieldsets") or all( + fields_empty(fieldset.get("fields")) + for fieldset in form_config.get("fieldsets") + ): + form_config = None + cleaned_data["form_config"] = None + + if form_config: + form_config = self._normalize_form_config_types(form_config) + cleaned_data["form_config"] = form_config + + try: + jsonschema.validate( + instance=form_config, schema=self.form_config_schema + ) + except jsonschema.ValidationError as e: + raise forms.ValidationError( + { + "form_config": _("Invalid form configuration: {}").format( + e.message + ) + } + ) + except jsonschema.SchemaError as e: + raise forms.ValidationError( + {"form_config": _("Schema error: {}").format(e.message)} + ) + + self._validate_field_mappings(form_config, cleaned_data) + return cleaned_data + def _normalize_form_config_types(self, form_config): + """ + Normalize form_config by converting string representations of numbers + to actual integers/floats. The JSON form widget sends all values + as strings, but the schema expects proper types. + """ + if not isinstance(form_config, dict): + return form_config + + integer_fields = ["max_length", "rows", "min_values", "max_values"] + number_fields = ["min_value", "max_value"] + + for fieldset in form_config.get("fieldsets", []): + for field in fieldset.get("fields", []): + for field_name in integer_fields: + if field_name in field and field[field_name] is not None: + value = field[field_name] + if isinstance(value, str): + try: + field[field_name] = int(value) if value else None + except (ValueError, TypeError): + pass + + for field_name in number_fields: + if field_name in field and field[field_name] is not None: + value = field[field_name] + if isinstance(value, str): + try: + field[field_name] = ( + int(value) if "." not in value else float(value) + ) + except (ValueError, TypeError): + pass + + return form_config + + def _validate_field_mappings(self, form_config, cleaned_data): + if not self.instance.pk: + return + crd = self.instance.offering_control_planes.all().first() + if not crd: + return + + schema = None + try: + schema = crd.resource_schema + except Exception: + pass + + if not schema or not (spec_schema := schema.get("properties", {}).get("spec")): + return + + valid_paths = self._extract_field_paths(spec_schema, "spec") | {"name"} + included_mappings = set() + errors = [] + for fieldset in form_config.get("fieldsets", []): + for field in fieldset.get("fields", []): + mapping = field.get("controlplane_field_mapping") + included_mappings.add(mapping) + + # Validate that fields without defaults have required properties + if mapping not in DEFAULT_FIELD_CONFIGS: + if not field.get("label"): + errors.append( + _( + "Field with mapping '{}' must have a 'label' property " + "(or use a mapping with default config)" + ).format(mapping) + ) + if not field.get("type"): + errors.append( + _( + "Field with mapping '{}' must have a 'type' property " + "(or use a mapping with default config)" + ).format(mapping) + ) + + if mapping and mapping not in valid_paths: + field_name = field.get("label", field.get("name", mapping)) + errors.append( + _( + "Field '{}' has invalid mapping '{}'. Valid paths are: {}" + ).format( + field_name, + mapping, + ", ".join(sorted(valid_paths)[:10]) + + ("..." if len(valid_paths) > 10 else ""), + ) + ) + + if field.get("type") == "choice" and field.get("choices"): + self._validate_choice_field( + field, mapping, spec_schema, "spec", errors + ) + + for mandatory_field in MANDATORY_FIELDS: + if mandatory_field not in included_mappings: + errors.append( + _( + "Required field '{}' must be included in the form configuration" + ).format(mandatory_field) + ) + + if errors: + raise forms.ValidationError({"form_config": errors}) + + def _validate_choice_field(self, field, mapping, spec_schema, prefix, errors): + if not mapping: + return + + field_name = field.get("label", mapping) + custom_choices = field.get("choices", []) + + # Single-element choices [value] are transformed to [value, value] + for i, choice in enumerate(custom_choices): + if not isinstance(choice, (list, tuple)): + errors.append( + _( + "Field '{}': Choice at index {} must be a list or tuple, " + "but got: {}" + ).format(field_name, i, repr(choice)) + ) + return + + choice_len = len(choice) + if choice_len == 1: + custom_choices[i] = [choice[0], choice[0]] + elif choice_len == 0 or choice_len > 2: + errors.append( + _( + "Field '{}': Choice at index {} must have 1 or 2 elements " + "(got {}): {}" + ).format(field_name, i, choice_len, repr(choice)) + ) + return + + field_schema = self._get_field_schema(spec_schema, mapping, prefix) + if not field_schema: + return + + control_plane_choices = field_schema.get("enum", []) + if not control_plane_choices: + return + + custom_choice_values = [choice[0] for choice in custom_choices] + + invalid_choices = [ + value + for value in custom_choice_values + if value not in control_plane_choices + ] + + if invalid_choices: + errors.append( + _( + "Field '{}' has invalid choice values: {}. " + "Valid choices from control plane are: {}" + ).format( + field_name, + ", ".join(f"'{c}'" for c in invalid_choices), + ", ".join(f"'{c}'" for c in control_plane_choices), + ) + ) + + def _get_field_schema(self, schema, field_path, prefix): + if not field_path or not schema: + return None + + if field_path.startswith(prefix + "."): + field_path = field_path[len(prefix) + 1 :] + + parts = field_path.split(".") + current_schema = schema + + for part in parts: + if not isinstance(current_schema, dict): + return None + + properties = current_schema.get("properties", {}) + if part not in properties: + return None + + current_schema = properties[part] + + return current_schema + + def _extract_field_paths(self, schema, prefix=""): + paths = set() + + if not isinstance(schema, dict): + return paths + + if "type" in schema and schema["type"] != "object": + if prefix: + paths.add(prefix) + + if schema.get("properties"): + for prop_name, prop_schema in schema["properties"].items(): + new_prefix = f"{prefix}.{prop_name}" if prefix else prop_name + paths.add(new_prefix) + paths.update(self._extract_field_paths(prop_schema, new_prefix)) + + if schema.get("type") == "array" and "items" in schema: + if prefix: + paths.add(prefix) + + return paths + def save(self, *args, **kwargs): self.instance.api_definition = self.cleaned_data["api_definition"] return super().save(*args, **kwargs) diff --git a/src/servala/core/migrations/0012_remove_advanced_fields.py b/src/servala/core/migrations/0012_remove_advanced_fields.py new file mode 100644 index 0000000..7d0fecd --- /dev/null +++ b/src/servala/core/migrations/0012_remove_advanced_fields.py @@ -0,0 +1,32 @@ +# Generated by Django 5.2.7 on 2025-10-31 10:40 + +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0012_convert_user_info_to_array"), + ] + + operations = [ + migrations.RemoveField( + model_name="servicedefinition", + name="advanced_fields", + ), + migrations.AlterField( + model_name="organization", + name="name", + field=models.CharField( + max_length=32, + validators=[ + django.core.validators.RegexValidator( + message="Organization name can only contain letters, numbers, and spaces.", + regex="^[A-Za-z0-9\\s]+$", + ) + ], + verbose_name="Name", + ), + ), + ] diff --git a/src/servala/core/migrations/0013_add_form_config.py b/src/servala/core/migrations/0013_add_form_config.py new file mode 100644 index 0000000..2819a6c --- /dev/null +++ b/src/servala/core/migrations/0013_add_form_config.py @@ -0,0 +1,27 @@ +# Generated by Django 5.2.7 on 2025-10-31 10:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0012_remove_advanced_fields"), + ] + + operations = [ + migrations.AddField( + model_name="servicedefinition", + name="form_config", + field=models.JSONField( + blank=True, + help_text=( + "Optional custom form configuration. When provided, this configuration will " + "be used to render the service form instead of auto-generating it from the OpenAPI spec. " + 'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}' + ), + null=True, + verbose_name="Form Configuration", + ), + ), + ] diff --git a/src/servala/core/models/service.py b/src/servala/core/models/service.py index 3af8c89..1535703 100644 --- a/src/servala/core/models/service.py +++ b/src/servala/core/models/service.py @@ -360,15 +360,15 @@ class ServiceDefinition(ServalaModelMixin, models.Model): null=True, blank=True, ) - advanced_fields = models.JSONField( - verbose_name=_("Advanced fields"), + form_config = models.JSONField( + verbose_name=_("Form Configuration"), help_text=_( - "Array of field names that should be hidden behind an 'Advanced' toggle. " - "Use dot notation (e.g., ['spec.parameters.monitoring.enabled', 'spec.parameters.backup.schedule'])" + "Optional custom form configuration. When provided, this configuration will be used " + "to render the service form instead of auto-generating it from the OpenAPI spec. " + 'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}' ), null=True, blank=True, - default=list, ) service = models.ForeignKey( to="Service", @@ -510,9 +510,22 @@ class ControlPlaneCRD(ServalaModelMixin, models.Model): if not self.django_model: return - advanced_fields = self.service_definition.advanced_fields or [] - return generate_model_form_class( - self.django_model, advanced_fields=advanced_fields + return generate_model_form_class(self.django_model) + + @cached_property + def custom_model_form_class(self): + from servala.core.crd import generate_custom_form_class + + if not self.django_model: + return + if not ( + self.service_definition + and self.service_definition.form_config + and self.service_definition.form_config.get("fieldsets") + ): + return + return generate_custom_form_class( + self.service_definition.form_config, self.django_model ) @@ -878,7 +891,6 @@ class ServiceInstance(ServalaModelMixin, models.Model): return return self.context.django_model( name=self.name, - organization=self.organization, context=self.context, spec=self.spec, # We pass -1 as ID in order to make it clear that a) this object exists (remotely), diff --git a/src/servala/core/schemas/form_config_schema.json b/src/servala/core/schemas/form_config_schema.json new file mode 100644 index 0000000..3b01b3f --- /dev/null +++ b/src/servala/core/schemas/form_config_schema.json @@ -0,0 +1,107 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Service Definition Form Configuration Schema", + "description": "Schema for custom form configuration in ServiceDefinition", + "type": "object", + "required": ["fieldsets"], + "properties": { + "fieldsets": { + "type": "array", + "description": "Array of fieldset objects defining form sections", + "minItems": 1, + "items": { + "type": "object", + "required": ["fields"], + "properties": { + "title": { + "type": "string", + "description": "Optional title for the fieldset/tab" + }, + "fields": { + "type": "array", + "description": "Array of field definitions in this fieldset", + "minItems": 1, + "items": { + "type": "object", + "required": ["controlplane_field_mapping"], + "properties": { + "type": { + "type": "string", + "description": "Field type", + "enum": ["", "text", "email", "textarea", "number", "choice", "checkbox", "array"] + }, + "label": { + "type": "string", + "description": "Human-readable field label" + }, + "help_text": { + "type": "string", + "description": "Optional help text displayed below the field" + }, + "required": { + "type": "boolean", + "description": "Whether the field is required", + "default": false + }, + "controlplane_field_mapping": { + "type": "string", + "description": "Dot-notation path mapping to Kubernetes spec field (e.g., 'spec.parameters.service.fqdn')" + }, + "max_length": { + "type": ["integer", "null"], + "description": "Maximum length for text/textarea fields", + "minimum": 1 + }, + "rows": { + "type": ["integer", "null"], + "description": "Number of rows for textarea fields", + "minimum": 1 + }, + "min_value": { + "type": ["number", "null"], + "description": "Minimum value for number fields" + }, + "max_value": { + "type": ["number", "null"], + "description": "Maximum value for number fields" + }, + "choices": { + "type": "array", + "description": "Array of [value, label] pairs for choice fields", + "items": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "min_values": { + "type": ["integer", "null"], + "description": "Minimum number of values for array fields", + "minimum": 0 + }, + "max_values": { + "type": ["integer", "null"], + "description": "Maximum number of values for array fields", + "minimum": 1 + }, + "validators": { + "type": "array", + "description": "Array of validator names (for future use)", + "items": { + "type": "string", + "enum": ["email", "fqdn", "url", "ipv4", "ipv6"] + } + }, + "default_value": { + "type": "string", + "description": "Default value for the field when creating new instances" + } + } + } + } + } + } + } + } +} diff --git a/src/servala/frontend/forms/organization.py b/src/servala/frontend/forms/organization.py index 86ba0ab..45e7b11 100644 --- a/src/servala/frontend/forms/organization.py +++ b/src/servala/frontend/forms/organization.py @@ -8,7 +8,6 @@ from servala.core.models import Organization, OrganizationInvitation, Organizati from servala.core.odoo import get_invoice_addresses, get_odoo_countries from servala.frontend.forms.mixins import HtmxMixin - ORG_NAME_PATTERN = r"[\w\s\-.,&'()+]+" diff --git a/src/servala/frontend/templates/account/login.html b/src/servala/frontend/templates/account/login.html index f4ca590..906a9fa 100644 --- a/src/servala/frontend/templates/account/login.html +++ b/src/servala/frontend/templates/account/login.html @@ -31,7 +31,9 @@ {% for provider in socialaccount_providers %} {% provider_login_url provider process=process scope=scope auth_params=auth_params as href %} -