diff --git a/.forgejo/workflows/renovate.yaml b/.forgejo/workflows/renovate.yaml index 1a0bbe9..19e5ce8 100644 --- a/.forgejo/workflows/renovate.yaml +++ b/.forgejo/workflows/renovate.yaml @@ -19,7 +19,7 @@ jobs: node-version: "24" - name: Renovate - uses: https://github.com/renovatebot/github-action@v44.0.1 + uses: https://github.com/renovatebot/github-action@v43.0.19 with: token: ${{ secrets.RENOVATE_TOKEN }} env: diff --git a/pyproject.toml b/pyproject.toml index b7ad2a7..d77e198 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,8 +7,8 @@ requires-python = ">=3.14.0" dependencies = [ "argon2-cffi>=25.1.0", "cryptography>=46.0.3", - "django==5.2.8", - "django-allauth>=65.13.0", + "django==5.2.7", + "django-allauth>=65.12.1", "django-auditlog>=3.3.0", "django-fernet-encrypted-fields>=0.3.0", "django-jsonform>=2.23.2", @@ -28,15 +28,15 @@ dependencies = [ [dependency-groups] dev = [ - "black>=25.11.0", + "black>=25.9.0", "bumpver>=2025.1131", - "coverage>=7.11.3", + "coverage>=7.11.0", "djlint>=1.36.4", "flake8>=7.3.0", "flake8-bugbear>=25.10.21", "flake8-pyproject>=1.2.3", "isort>=7.0.0", - "pytest>=9.0.0", + "pytest>=8.4.2", "pytest-cov>=7.0.0", "pytest-django>=4.11.1", "pytest-mock>=3.15.1", diff --git a/src/servala/core/admin.py b/src/servala/core/admin.py index fb64a2b..87da376 100644 --- a/src/servala/core/admin.py +++ b/src/servala/core/admin.py @@ -1,6 +1,3 @@ -import json -from pathlib import Path - from django.contrib import admin, messages from django.utils.translation import gettext_lazy as _ from django_jsonform.widgets import JSONFormWidget @@ -316,9 +313,9 @@ class ServiceDefinitionAdmin(admin.ModelAdmin): ( _("Form Configuration"), { - "fields": ("form_config",), + "fields": ("advanced_fields",), "description": _( - "Optional custom form configuration. When provided, this will be used instead of auto-generating the form from the OpenAPI spec." + "Configure which fields should be hidden behind an 'Advanced' toggle in the form" ), }, ), @@ -326,13 +323,19 @@ class ServiceDefinitionAdmin(admin.ModelAdmin): def get_form(self, request, obj=None, **kwargs): form = super().get_form(request, obj, **kwargs) - schema_path = Path(__file__).parent / "schemas" / "form_config_schema.json" - with open(schema_path) as f: - form_config_schema = json.load(f) - - if "form_config" in form.base_fields: - form.base_fields["form_config"].widget = JSONFormWidget( - schema=form_config_schema + # JSON schema for advanced_fields field + advanced_fields_schema = { + "type": "array", + "title": "Advanced Fields", + "items": { + "type": "string", + "title": "Field Name", + "description": "Field name in dot notation (e.g., spec.parameters.monitoring.enabled)", + }, + } + if "advanced_fields" in form.base_fields: + form.base_fields["advanced_fields"].widget = JSONFormWidget( + schema=advanced_fields_schema ) return form diff --git a/src/servala/core/crd.py b/src/servala/core/crd.py new file mode 100644 index 0000000..fe8edbb --- /dev/null +++ b/src/servala/core/crd.py @@ -0,0 +1,557 @@ +import re + +from django import forms +from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator +from django.db import models +from django.forms.models import ModelForm, ModelFormMetaclass +from django.utils.translation import gettext_lazy as _ + +from servala.core.models import ServiceInstance + + +class CRDModel(models.Model): + """Base class for all virtual CRD models""" + + def __init__(self, **kwargs): + if spec := kwargs.pop("spec", None): + kwargs.update(unnest_data({"spec": spec})) + super().__init__(**kwargs) + + class Meta: + abstract = True + + +def duplicate_field(field_name, model): + # Get the field from the model + field = model._meta.get_field(field_name) + + # Create a new field with the same attributes + new_field = type(field).__new__(type(field)) + new_field.__dict__.update(field.__dict__) + + # Ensure the field is not linked to the original model + new_field.model = None + new_field.auto_created = False + + return new_field + + +def generate_django_model(schema, group, version, kind): + """ + Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema. + """ + # We always need these three fields to know our own name and our full namespace + model_fields = {"__module__": "crd_models"} + for field_name in ("name", "organization", "context"): + model_fields[field_name] = duplicate_field(field_name, ServiceInstance) + + # All other fields are generated from the schema, except for the + # resourceRef object + spec = schema["properties"].get("spec") or {} + spec["properties"].pop("resourceRef", None) + model_fields.update(build_object_fields(spec, "spec", parent_required=False)) + + # Store the original schema on the model class + model_fields["SCHEMA"] = schema + + meta_class = type("Meta", (), {"app_label": "crd_models"}) + model_fields["Meta"] = meta_class + + # create the model class + model_name = kind + model_class = type(model_name, (CRDModel,), model_fields) + return model_class + + +def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False): + required_fields = schema.get("required") or [] + properties = schema.get("properties") or {} + fields = {} + + for field_name, field_schema in properties.items(): + is_required = field_name in required_fields or parent_required + full_name = f"{name}.{field_name}" + result = get_django_field( + field_schema, + is_required, + field_name, + full_name, + verbose_name_prefix=verbose_name_prefix, + ) + if isinstance(result, dict): + fields.update(result) + else: + fields[full_name] = result + return fields + + +def deslugify(title): + """ + Convert camelCase, PascalCase, or snake_case to human-readable title. + Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters). + """ + ACRONYMS = { + # Database systems + "SQL": "SQL", + "MYSQL": "MySQL", + "POSTGRESQL": "PostgreSQL", + "MARIADB": "MariaDB", + "MSSQL": "MSSQL", + "MONGODB": "MongoDB", + "REDIS": "Redis", + # Protocols + "HTTP": "HTTP", + "HTTPS": "HTTPS", + "FTP": "FTP", + "SFTP": "SFTP", + "SSH": "SSH", + "TLS": "TLS", + "SSL": "SSL", + # APIs + "API": "API", + "REST": "REST", + "GRPC": "gRPC", + "GRAPHQL": "GraphQL", + # Networking + "URL": "URL", + "URI": "URI", + "FQDN": "FQDN", + "DNS": "DNS", + "IP": "IP", + "TCP": "TCP", + "UDP": "UDP", + # Data formats + "JSON": "JSON", + "XML": "XML", + "YAML": "YAML", + "CSV": "CSV", + "HTML": "HTML", + "CSS": "CSS", + # Hardware + "CPU": "CPU", + "RAM": "RAM", + "GPU": "GPU", + "SSD": "SSD", + "HDD": "HDD", + # Identifiers + "ID": "ID", + "UUID": "UUID", + "GUID": "GUID", + "ARN": "ARN", + # Cloud providers + "AWS": "AWS", + "GCP": "GCP", + "AZURE": "Azure", + "IBM": "IBM", + # Kubernetes/Cloud + "DB": "DB", + "PVC": "PVC", + "PV": "PV", + "VPN": "VPN", + # Auth + "OS": "OS", + "LDAP": "LDAP", + "SAML": "SAML", + "OAUTH": "OAuth", + "JWT": "JWT", + # AWS Services + "S3": "S3", + "EC2": "EC2", + "RDS": "RDS", + "EBS": "EBS", + "IAM": "IAM", + } + + if "_" in title: + # Handle snake_case + title = title.replace("_", " ") + words = title.split() + else: + # Handle camelCase/PascalCase with smart splitting + # This regex splits on: + # - Transition from lowercase to uppercase (camelCase) + # - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters) + words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title) + + # Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL) + merged_words = [] + i = 0 + while i < len(words): + if i < len(words) - 1: + # Check if current word + next word form a known acronym + combined = (words[i] + words[i + 1]).upper() + if combined in ACRONYMS: + merged_words.append(combined) + i += 2 + continue + merged_words.append(words[i]) + i += 1 + + # Capitalize each word, using proper casing for known acronyms + result = [] + for word in merged_words: + word_upper = word.upper() + if word_upper in ACRONYMS: + result.append(ACRONYMS[word_upper]) + else: + result.append(word.capitalize()) + + return " ".join(result) + + +def get_django_field( + field_schema, is_required, field_name, full_name, verbose_name_prefix=None +): + field_type = field_schema.get("type") or "string" + format = field_schema.get("format") + verbose_name_prefix = verbose_name_prefix or "" + verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip() + + # Pass down the requirement status from parent to child fields + kwargs = { + "blank": not is_required, # All fields are optional by default + "null": not is_required, + "help_text": field_schema.get("description"), + "validators": [], + "verbose_name": verbose_name, + "default": field_schema.get("default"), + } + + if minimum := field_schema.get("minimum"): + kwargs["validators"].append(MinValueValidator(minimum)) + if maximum := field_schema.get("maximum"): + kwargs["validators"].append(MaxValueValidator(maximum)) + + if field_type == "string": + if format == "date-time": + return models.DateTimeField(**kwargs) + elif format == "date": + return models.DateField(**kwargs) + else: + max_length = field_schema.get("max_length") or 255 + if pattern := field_schema.get("pattern"): + kwargs["validators"].append(RegexValidator(regex=pattern)) + if choices := field_schema.get("enum"): + kwargs["choices"] = ((choice, choice) for choice in choices) + return models.CharField(max_length=max_length, **kwargs) + elif field_type == "integer": + return models.IntegerField(**kwargs) + elif field_type == "number": + return models.FloatField(**kwargs) + elif field_type == "boolean": + return models.BooleanField(**kwargs) + elif field_type == "object": + # Here we pass down the requirement status to nested objects + return build_object_fields( + field_schema, + full_name, + verbose_name_prefix=f"{verbose_name}:", + parent_required=is_required, + ) + elif field_type == "array": + kwargs["help_text"] = field_schema.get("description") or _("List of values") + from servala.frontend.forms.widgets import DynamicArrayField + + field = models.JSONField(**kwargs) + formfield_kwargs = { + "label": field.verbose_name, + "required": not field.blank, + } + + array_validation = {} + if min_items := field_schema.get("min_items"): + array_validation["min_items"] = min_items + if max_items := field_schema.get("max_items"): + array_validation["max_items"] = max_items + if unique_items := field_schema.get("unique_items"): + array_validation["unique_items"] = unique_items + if items_schema := field_schema.get("items"): + array_validation["items_schema"] = items_schema + if array_validation: + formfield_kwargs["array_validation"] = array_validation + + field.formfield = lambda: DynamicArrayField(**formfield_kwargs) + + return field + return models.CharField(max_length=255, **kwargs) + + +def unnest_data(data): + result = {} + + def _flatten_dict(d, parent_key=""): + for key, value in d.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + if isinstance(value, dict): + _flatten_dict(value, new_key) + else: + result[new_key] = value + + _flatten_dict(data) + return result + + +class CrdModelFormMixin: + HIDDEN_FIELDS = [ + "spec.compositeDeletePolicy", + "spec.compositionRef", + "spec.compositionRevisionRef", + "spec.compositionRevisionSelector", + "spec.compositionSelector", + "spec.compositionUpdatePolicy", + "spec.parameters.monitoring.alertmanagerConfigRef", + "spec.parameters.monitoring.alertmanagerConfigSecretRef", + "spec.parameters.network.serviceType", + "spec.parameters.scheduling", + "spec.parameters.security", + "spec.parameters.size.cpu", + "spec.parameters.size.memory", + "spec.parameters.size.requests.cpu", + "spec.parameters.size.requests.memory", + "spec.publishConnectionDetailsTo", + "spec.resourceRef", + "spec.writeConnectionSecretToRef", + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.schema = self._meta.model.SCHEMA + + for field in ("organization", "context"): + self.fields[field].widget = forms.HiddenInput() + + for name, field in self.fields.items(): + if name in self.HIDDEN_FIELDS or any( + name.startswith(f) for f in self.HIDDEN_FIELDS + ): + field.widget = forms.HiddenInput() + field.required = False + + # Mark advanced fields with a CSS class and data attribute + for name, field in self.fields.items(): + if self.is_field_advanced(name): + field.widget.attrs.update( + { + "class": ( + field.widget.attrs.get("class", "") + " advanced-field" + ).strip(), + "data-advanced": "true", + } + ) + + if self.instance and self.instance.pk: + self.fields["name"].disabled = True + self.fields["name"].help_text = _("Name cannot be changed after creation.") + self.fields["name"].widget = forms.HiddenInput() + + def strip_title(self, field_name, label): + field = self.fields[field_name] + if field and field.label and (position := field.label.find(label)) != -1: + field.label = field.label[position + len(label) :] + + def has_mandatory_fields(self, field_list): + for field_name in field_list: + if field_name in self.fields and self.fields[field_name].required: + return True + return False + + def is_field_advanced(self, field_name): + advanced_fields = getattr(self, "ADVANCED_FIELDS", []) + return field_name in advanced_fields or any( + field_name.startswith(f"{af}.") for af in advanced_fields + ) + + def are_all_fields_advanced(self, field_list): + if not field_list: + return False + return all(self.is_field_advanced(field_name) for field_name in field_list) + + def get_fieldsets(self): + fieldsets = [] + + # General fieldset for non-spec fields + general_fields = [ + field_name + for field_name in self.fields.keys() + if not field_name.startswith("spec.") + ] + if general_fields: + fieldset = { + "title": "General", + "fields": general_fields, + "fieldsets": [], + "has_mandatory": self.has_mandatory_fields(general_fields), + "is_advanced": self.are_all_fields_advanced(general_fields), + } + if all( + [ + isinstance(self.fields[field].widget, forms.HiddenInput) + for field in general_fields + ] + ): + fieldset["hidden"] = True + fieldsets.append(fieldset) + + # Process spec fields + others = [] + top_level_fieldsets = {} + hidden_spec_fields = [] + + for field_name in self.fields: + if field_name.startswith("spec."): + if isinstance(self.fields[field_name].widget, forms.HiddenInput): + hidden_spec_fields.append(field_name) + continue + + parts = field_name.split(".") + if len(parts) == 2: + # Top-level spec field + others.append(field_name) + elif len(parts) == 3: + # Second-level field - promote to top-level fieldset + fieldset_key = f"{parts[1]}.{parts[2]}" + if not top_level_fieldsets.get(fieldset_key): + top_level_fieldsets[fieldset_key] = { + "fields": [], + "fieldsets": {}, + "title": f"{deslugify(parts[2])}", + } + top_level_fieldsets[fieldset_key]["fields"].append(field_name) + else: + # Third-level and deeper - create nested fieldsets + fieldset_key = f"{parts[1]}.{parts[2]}" + if not top_level_fieldsets.get(fieldset_key): + top_level_fieldsets[fieldset_key] = { + "fields": [], + "fieldsets": {}, + "title": f"{deslugify(parts[2])}", + } + + sub_key = parts[3] + if not top_level_fieldsets[fieldset_key]["fieldsets"].get(sub_key): + top_level_fieldsets[fieldset_key]["fieldsets"][sub_key] = { + "title": deslugify(sub_key), + "fields": [], + } + top_level_fieldsets[fieldset_key]["fieldsets"][sub_key][ + "fields" + ].append(field_name) + + for fieldset in top_level_fieldsets.values(): + nested_fieldsets_list = [] + for sub_fieldset in fieldset["fieldsets"].values(): + if len(sub_fieldset["fields"]) == 1: + # If nested fieldset has only one field, move it to parent + fieldset["fields"].append(sub_fieldset["fields"][0]) + else: + # Keep as nested fieldset with proper title stripping + title = f"{fieldset['title']}: {sub_fieldset['title']}: " + for field in sub_fieldset["fields"]: + self.strip_title(field, title) + sub_fieldset["is_advanced"] = self.are_all_fields_advanced( + sub_fieldset["fields"] + ) + nested_fieldsets_list.append(sub_fieldset) + + fieldset["fieldsets"] = nested_fieldsets_list + total_fields = len(fieldset["fields"]) + len(nested_fieldsets_list) + if total_fields == 1 and len(fieldset["fields"]) == 1: + others.append(fieldset["fields"][0]) + else: + title = f"{fieldset['title']}: " + for field in fieldset["fields"]: + self.strip_title(field, title) + + all_fields = fieldset["fields"][:] + for sub_fieldset in nested_fieldsets_list: + all_fields.extend(sub_fieldset["fields"]) + fieldset["has_mandatory"] = self.has_mandatory_fields(all_fields) + + fieldset["is_advanced"] = self.are_all_fields_advanced(all_fields) + + fieldsets.append(fieldset) + + # Add 'others' tab if there are any fields + if others: + fieldsets.append( + { + "title": "Others", + "fields": others, + "fieldsets": [], + "has_mandatory": self.has_mandatory_fields(others), + "is_advanced": self.are_all_fields_advanced(others), + } + ) + + if hidden_spec_fields: + fieldsets.append( + { + "title": "Advanced", + "fields": hidden_spec_fields, + "fieldsets": [], + "hidden": True, + "has_mandatory": self.has_mandatory_fields(hidden_spec_fields), + } + ) + + fieldsets.sort(key=lambda f: f.get("hidden", False)) + + return fieldsets + + def get_nested_data(self): + """ + Builds the original nested JSON structure from flat form data. + Form fields are named with dot notation (e.g., 'spec.replicas') + """ + result = {} + + for field_name, value in self.cleaned_data.items(): + if value is None or value == "": + continue + + parts = field_name.split(".") + current = result + + # Navigate through the nested structure + for i, part in enumerate(parts): + if i == len(parts) - 1: + # Last part, set the value + current[part] = value + else: + # Create nested dict if it doesn't exist + if part not in current: + current[part] = {} + current = current[part] + return result + + def clean(self): + cleaned_data = super().clean() + self.validate_nested_data( + self.get_nested_data().get("spec", {}), self.schema["properties"]["spec"] + ) + return cleaned_data + + def validate_nested_data(self, data, schema): + """Validate data against the provided OpenAPI v3 schema""" + # TODO: actually validate the nested data. + # TODO: get jsonschema to give us a path to the failing field rather than just an error message, + # then add the validation error to that field (self.add_error()) + # try: + # validate(instance=data, schema=schema) + # except Exception as e: + # raise forms.ValidationError(f"Validation error: {e.message}") + pass + + +def generate_model_form_class(model, advanced_fields=None): + meta_attrs = { + "model": model, + "fields": "__all__", + } + fields = { + "Meta": type("Meta", (object,), meta_attrs), + "__module__": "crd_models", + "ADVANCED_FIELDS": advanced_fields or [], + } + class_name = f"{model.__name__}ModelForm" + return ModelFormMetaclass(class_name, (CrdModelFormMixin, ModelForm), fields) diff --git a/src/servala/core/crd/__init__.py b/src/servala/core/crd/__init__.py deleted file mode 100644 index 3f10abb..0000000 --- a/src/servala/core/crd/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -from servala.core.crd.forms import ( - CrdModelFormMixin, - CustomFormMixin, - FormGeneratorMixin, - generate_custom_form_class, - generate_model_form_class, -) -from servala.core.crd.models import ( - CRDModel, - build_object_fields, - duplicate_field, - generate_django_model, - get_django_field, - unnest_data, -) -from servala.core.crd.utils import deslugify - -__all__ = [ - "CrdModelFormMixin", - "CustomFormMixin", - "FormGeneratorMixin", - "generate_django_model", - "generate_model_form_class", - "generate_custom_form_class", - "CRDModel", - "build_object_fields", - "duplicate_field", - "get_django_field", - "unnest_data", - "deslugify", -] diff --git a/src/servala/core/crd/forms.py b/src/servala/core/crd/forms.py deleted file mode 100644 index 659684e..0000000 --- a/src/servala/core/crd/forms.py +++ /dev/null @@ -1,454 +0,0 @@ -from django import forms -from django.core.validators import MaxValueValidator, MinValueValidator -from django.forms.models import ModelForm, ModelFormMetaclass - -from servala.core.crd.utils import deslugify -from servala.core.models import ControlPlaneCRD -from servala.frontend.forms.widgets import DynamicArrayWidget - -# Fields that must be present in every form -MANDATORY_FIELDS = ["name"] - -# Default field configurations - fields that can be included with just a mapping -# to avoid administrators having to duplicate common information -DEFAULT_FIELD_CONFIGS = { - "name": { - "type": "text", - "label": "Instance Name", - "help_text": "Unique name for the new instance", - "required": True, - "max_length": 63, - }, - "spec.parameters.service.fqdn": { - "type": "array", - "label": "FQDNs", - "help_text": "Domain names for accessing this service", - "required": False, - }, -} - - -class FormGeneratorMixin: - """Shared base class for ModelForm classes based on our generated CRD models. - There are two relevant child classes: - - CrdModelFormMixin: For fully auto-generated forms from the spec - - CustomFormMixin: For forms built from form_config settings. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - if "context" in self.fields: - self.fields["context"].widget = forms.HiddenInput() - if crd := self.initial.get("context"): - crd = getattr(crd, "pk", crd) # can be int or object - self.fields["context"].queryset = ControlPlaneCRD.objects.filter(pk=crd) - - if self.instance and hasattr(self.instance, "name") and self.instance.name: - if "name" in self.fields: - self.fields["name"].disabled = True - self.fields["name"].widget = forms.HiddenInput() - - def has_mandatory_fields(self, field_list): - for field_name in field_list: - if field_name in self.fields and self.fields[field_name].required: - return True - return False - - -class CrdModelFormMixin(FormGeneratorMixin): - HIDDEN_FIELDS = [ - "spec.compositeDeletePolicy", - "spec.compositionRef", - "spec.compositionRevisionRef", - "spec.compositionRevisionSelector", - "spec.compositionSelector", - "spec.compositionUpdatePolicy", - "spec.parameters.monitoring.alertmanagerConfigRef", - "spec.parameters.monitoring.alertmanagerConfigSecretRef", - "spec.parameters.network.serviceType", - "spec.parameters.scheduling", - "spec.parameters.security", - "spec.parameters.size.cpu", - "spec.parameters.size.memory", - "spec.parameters.size.requests.cpu", - "spec.parameters.size.requests.memory", - "spec.publishConnectionDetailsTo", - "spec.resourceRef", - "spec.writeConnectionSecretToRef", - ] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.schema = self._meta.model.SCHEMA - - for name, field in self.fields.items(): - if name in self.HIDDEN_FIELDS or any( - name.startswith(f) for f in self.HIDDEN_FIELDS - ): - field.widget = forms.HiddenInput() - field.required = False - - def strip_title(self, field_name, label): - field = self.fields[field_name] - if field and field.label and (position := field.label.find(label)) != -1: - field.label = field.label[position + len(label) :] - - def get_fieldsets(self): - fieldsets = [] - - # General fieldset for non-spec fields - general_fields = [ - field_name - for field_name in self.fields.keys() - if not field_name.startswith("spec.") - ] - if general_fields: - fieldset = { - "title": "General", - "fields": general_fields, - "fieldsets": [], - "has_mandatory": self.has_mandatory_fields(general_fields), - } - if all( - [ - isinstance(self.fields[field].widget, forms.HiddenInput) - for field in general_fields - ] - ): - fieldset["hidden"] = True - fieldsets.append(fieldset) - - # Process spec fields - others = [] - top_level_fieldsets = {} - hidden_spec_fields = [] - - for field_name in self.fields: - if field_name.startswith("spec."): - if isinstance(self.fields[field_name].widget, forms.HiddenInput): - hidden_spec_fields.append(field_name) - continue - - parts = field_name.split(".") - if len(parts) == 2: - # Top-level spec field - others.append(field_name) - elif len(parts) == 3: - # Second-level field - promote to top-level fieldset - fieldset_key = f"{parts[1]}.{parts[2]}" - if not top_level_fieldsets.get(fieldset_key): - top_level_fieldsets[fieldset_key] = { - "fields": [], - "fieldsets": {}, - "title": f"{deslugify(parts[2])}", - } - top_level_fieldsets[fieldset_key]["fields"].append(field_name) - else: - # Third-level and deeper - create nested fieldsets - fieldset_key = f"{parts[1]}.{parts[2]}" - if not top_level_fieldsets.get(fieldset_key): - top_level_fieldsets[fieldset_key] = { - "fields": [], - "fieldsets": {}, - "title": f"{deslugify(parts[2])}", - } - - sub_key = parts[3] - if not top_level_fieldsets[fieldset_key]["fieldsets"].get(sub_key): - top_level_fieldsets[fieldset_key]["fieldsets"][sub_key] = { - "title": deslugify(sub_key), - "fields": [], - } - top_level_fieldsets[fieldset_key]["fieldsets"][sub_key][ - "fields" - ].append(field_name) - - for fieldset in top_level_fieldsets.values(): - nested_fieldsets_list = [] - for sub_fieldset in fieldset["fieldsets"].values(): - if len(sub_fieldset["fields"]) == 1: - # If nested fieldset has only one field, move it to parent - fieldset["fields"].append(sub_fieldset["fields"][0]) - else: - # Keep as nested fieldset with proper title stripping - title = f"{fieldset['title']}: {sub_fieldset['title']}: " - for field in sub_fieldset["fields"]: - self.strip_title(field, title) - nested_fieldsets_list.append(sub_fieldset) - - fieldset["fieldsets"] = nested_fieldsets_list - total_fields = len(fieldset["fields"]) + len(nested_fieldsets_list) - if total_fields == 1 and len(fieldset["fields"]) == 1: - others.append(fieldset["fields"][0]) - else: - title = f"{fieldset['title']}: " - for field in fieldset["fields"]: - self.strip_title(field, title) - - all_fields = fieldset["fields"][:] - for sub_fieldset in nested_fieldsets_list: - all_fields.extend(sub_fieldset["fields"]) - fieldset["has_mandatory"] = self.has_mandatory_fields(all_fields) - - fieldsets.append(fieldset) - - # Add 'others' tab if there are any fields - if others: - fieldsets.append( - { - "title": "Others", - "fields": others, - "fieldsets": [], - "has_mandatory": self.has_mandatory_fields(others), - } - ) - - if hidden_spec_fields: - fieldsets.append( - { - "title": "Advanced", - "fields": hidden_spec_fields, - "fieldsets": [], - "hidden": True, - "has_mandatory": self.has_mandatory_fields(hidden_spec_fields), - } - ) - - fieldsets.sort(key=lambda f: f.get("hidden", False)) - - return fieldsets - - def get_nested_data(self): - """ - Builds the original nested JSON structure from flat form data. - Form fields are named with dot notation (e.g., 'spec.replicas') - """ - result = {} - - for field_name, value in self.cleaned_data.items(): - if value is None or value == "": - continue - - parts = field_name.split(".") - current = result - - # Navigate through the nested structure - for i, part in enumerate(parts): - if i == len(parts) - 1: - # Last part, set the value - current[part] = value - else: - # Create nested dict if it doesn't exist - if part not in current: - current[part] = {} - current = current[part] - return result - - def clean(self): - cleaned_data = super().clean() - self.validate_nested_data( - self.get_nested_data().get("spec", {}), self.schema["properties"]["spec"] - ) - return cleaned_data - - def validate_nested_data(self, data, schema): - """Validate data against the provided OpenAPI v3 schema""" - # TODO: actually validate the nested data. - # TODO: get jsonschema to give us a path to the failing field rather than just an error message, - # then add the validation error to that field (self.add_error()) - # try: - # validate(instance=data, schema=schema) - # except Exception as e: - # raise forms.ValidationError(f"Validation error: {e.message}") - pass - - -def generate_model_form_class(model): - meta_attrs = { - "model": model, - "fields": "__all__", - } - fields = { - "Meta": type("Meta", (object,), meta_attrs), - "__module__": "crd_models", - } - class_name = f"{model.__name__}ModelForm" - return ModelFormMetaclass(class_name, (CrdModelFormMixin, ModelForm), fields) - - -class CustomFormMixin(FormGeneratorMixin): - """ - Base for custom (user-friendly) forms generated from ServiceDefinition.form_config. - """ - - IS_CUSTOM_FORM = True - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._apply_field_config() - if ( - self.instance - and hasattr(self.instance, "name") - and self.instance.name - and "name" in self.fields - ): - self.fields["name"].widget = forms.HiddenInput() - self.fields["name"].disabled = True - self.fields.pop("context", None) - - def _apply_field_config(self): - for fieldset in self.form_config.get("fieldsets", []): - for fc in fieldset.get("fields", []): - field_name = fc.get("controlplane_field_mapping") - - if field_name not in self.fields: - continue - - field_config = fc.copy() - # Merge with defaults if field has default config - if field_name in DEFAULT_FIELD_CONFIGS: - field_config = DEFAULT_FIELD_CONFIGS[field_name].copy() - for key, value in fc.items(): - if value or (value is False): - field_config[key] = value - - field = self.fields[field_name] - field_type = field_config.get("type") - - field.label = field_config.get("label", field_name) - field.help_text = field_config.get("help_text", "") - field.required = field_config.get("required", False) - - if field_type == "textarea": - field.widget = forms.Textarea( - attrs={"rows": field_config.get("rows", 4)} - ) - elif field_type == "array": - field.widget = DynamicArrayWidget() - elif field_type == "choice": - if hasattr(field, "choices") and field.choices: - field._controlplane_choices = list(field.choices) - if custom_choices := field_config.get("choices"): - field.choices = [tuple(choice) for choice in custom_choices] - - if field_type == "number": - min_val = field_config.get("min_value") - max_val = field_config.get("max_value") - - validators = [] - if min_val is not None: - validators.append(MinValueValidator(min_val)) - field.widget.attrs["min"] = min_val - if max_val is not None: - validators.append(MaxValueValidator(max_val)) - field.widget.attrs["max"] = max_val - - if validators: - field.validators.extend(validators) - - if "default_value" in field_config and field.initial is None: - field.initial = field_config["default_value"] - - if field_type in ("text", "textarea") and field_config.get( - "max_length" - ): - field.max_length = field_config.get("max_length") - if hasattr(field.widget, "attrs"): - field.widget.attrs["maxlength"] = field_config.get("max_length") - - field.controlplane_field_mapping = field_name - - def get_fieldsets(self): - fieldsets = [] - for fieldset_config in self.form_config.get("fieldsets", []): - field_names = [ - f["controlplane_field_mapping"] - for f in fieldset_config.get("fields", []) - ] - fieldset = { - "title": fieldset_config.get("title", "General"), - "fields": field_names, - "fieldsets": [], - "has_mandatory": self.has_mandatory_fields(field_names), - } - fieldsets.append(fieldset) - - return fieldsets - - def clean(self): - cleaned_data = super().clean() - - for field_name, field in self.fields.items(): - if hasattr(field, "_controlplane_choices"): - value = cleaned_data.get(field_name) - if value: - valid_values = [choice[0] for choice in field._controlplane_choices] - if value not in valid_values: - self.add_error( - field_name, - forms.ValidationError( - f"'{value}' is not a valid choice. " - f"Must be one of: {valid_values.join(', ')}" - ), - ) - - return cleaned_data - - def get_nested_data(self): - nested = {} - for field_name in self.fields.keys(): - if field_name == "context": - value = self.cleaned_data.get(field_name) - if value is not None: - nested[field_name] = value - continue - - mapping = field_name - value = self.cleaned_data.get(field_name) - parts = mapping.split(".") - current = nested - for part in parts[:-1]: - if part not in current: - current[part] = {} - current = current[part] - - current[parts[-1]] = value - - return nested - - -def generate_custom_form_class(form_config, model): - """ - Generate a custom (user-friendly) form class from form_config JSON. - """ - field_list = ["context", "name"] - - for fieldset in form_config.get("fieldsets", []): - for field_config in fieldset.get("fields", []): - field_name = field_config.get("controlplane_field_mapping") - if field_name: - field_list.append(field_name) - - fields = { - "context": forms.ModelChoiceField( - queryset=ControlPlaneCRD.objects.none(), - required=True, - widget=forms.HiddenInput(), - ), - } - - meta_attrs = { - "model": model, - "fields": field_list, - } - - form_fields = { - "Meta": type("Meta", (object,), meta_attrs), - "__module__": "crd_models", - "form_config": form_config, - **fields, - } - - class_name = f"{model.__name__}CustomForm" - return ModelFormMetaclass(class_name, (CustomFormMixin, ModelForm), form_fields) diff --git a/src/servala/core/crd/models.py b/src/servala/core/crd/models.py deleted file mode 100644 index 86df97f..0000000 --- a/src/servala/core/crd/models.py +++ /dev/null @@ -1,167 +0,0 @@ -from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator -from django.db import models -from django.utils.translation import gettext_lazy as _ - -from servala.core.crd.utils import deslugify -from servala.core.models import ServiceInstance -from servala.frontend.forms.widgets import DynamicArrayField - - -class CRDModel(models.Model): - """Base class for all virtual CRD models""" - - def __init__(self, **kwargs): - if spec := kwargs.pop("spec", None): - kwargs.update(unnest_data({"spec": spec})) - super().__init__(**kwargs) - - class Meta: - abstract = True - - -def generate_django_model(schema, group, version, kind): - """ - Generates a virtual Django model from a Kubernetes CRD's OpenAPI v3 schema. - """ - # We always need these three fields to know our own name and our full namespace - model_fields = {"__module__": "crd_models"} - for field_name in ("name", "context"): - model_fields[field_name] = duplicate_field(field_name, ServiceInstance) - - # All other fields are generated from the schema, except for the - # resourceRef object - spec = schema["properties"].get("spec") or {} - spec["properties"].pop("resourceRef", None) - model_fields.update(build_object_fields(spec, "spec", parent_required=False)) - - # Store the original schema on the model class - model_fields["SCHEMA"] = schema - - meta_class = type("Meta", (), {"app_label": "crd_models"}) - model_fields["Meta"] = meta_class - - # create the model class - model_name = kind - model_class = type(model_name, (CRDModel,), model_fields) - return model_class - - -def duplicate_field(field_name, model): - field = model._meta.get_field(field_name) - new_field = type(field).__new__(type(field)) - new_field.__dict__.update(field.__dict__) - new_field.model = None - new_field.auto_created = False - return new_field - - -def build_object_fields(schema, name, verbose_name_prefix=None, parent_required=False): - required_fields = schema.get("required") or [] - properties = schema.get("properties") or {} - fields = {} - - for field_name, field_schema in properties.items(): - is_required = field_name in required_fields or parent_required - full_name = f"{name}.{field_name}" - result = get_django_field( - field_schema, - is_required, - field_name, - full_name, - verbose_name_prefix=verbose_name_prefix, - ) - if isinstance(result, dict): - fields.update(result) - else: - fields[full_name] = result - return fields - - -def get_django_field( - field_schema, is_required, field_name, full_name, verbose_name_prefix=None -): - field_type = field_schema.get("type") or "string" - format = field_schema.get("format") - verbose_name_prefix = verbose_name_prefix or "" - verbose_name = f"{verbose_name_prefix} {deslugify(field_name)}".strip() - - # Pass down the requirement status from parent to child fields - kwargs = { - "blank": not is_required, # All fields are optional by default - "null": not is_required, - "help_text": field_schema.get("description"), - "validators": [], - "verbose_name": verbose_name, - "default": field_schema.get("default"), - } - - if minimum := field_schema.get("minimum"): - kwargs["validators"].append(MinValueValidator(minimum)) - if maximum := field_schema.get("maximum"): - kwargs["validators"].append(MaxValueValidator(maximum)) - - if field_type == "string": - if format == "date-time": - return models.DateTimeField(**kwargs) - elif format == "date": - return models.DateField(**kwargs) - else: - max_length = field_schema.get("max_length") or 255 - if pattern := field_schema.get("pattern"): - kwargs["validators"].append(RegexValidator(regex=pattern)) - if choices := field_schema.get("enum"): - kwargs["choices"] = ((choice, choice) for choice in choices) - return models.CharField(max_length=max_length, **kwargs) - elif field_type == "integer": - return models.IntegerField(**kwargs) - elif field_type == "number": - return models.FloatField(**kwargs) - elif field_type == "boolean": - return models.BooleanField(**kwargs) - elif field_type == "object": - # Here we pass down the requirement status to nested objects - return build_object_fields( - field_schema, - full_name, - verbose_name_prefix=f"{verbose_name}:", - parent_required=is_required, - ) - elif field_type == "array": - kwargs["help_text"] = field_schema.get("description") or _("List of values") - field = models.JSONField(**kwargs) - formfield_kwargs = { - "label": field.verbose_name, - "required": not field.blank, - } - - array_validation = {} - if min_items := field_schema.get("min_items"): - array_validation["min_items"] = min_items - if max_items := field_schema.get("max_items"): - array_validation["max_items"] = max_items - if unique_items := field_schema.get("unique_items"): - array_validation["unique_items"] = unique_items - if items_schema := field_schema.get("items"): - array_validation["items_schema"] = items_schema - if array_validation: - formfield_kwargs["array_validation"] = array_validation - - field.formfield = lambda: DynamicArrayField(**formfield_kwargs) - - return field - return models.CharField(max_length=255, **kwargs) - - -def unnest_data(data): - result = {} - - def _flatten_dict(d, parent_key=""): - for key, value in d.items(): - new_key = f"{parent_key}.{key}" if parent_key else key - if isinstance(value, dict): - _flatten_dict(value, new_key) - else: - result[new_key] = value - - _flatten_dict(data) - return result diff --git a/src/servala/core/crd/utils.py b/src/servala/core/crd/utils.py deleted file mode 100644 index a537fd9..0000000 --- a/src/servala/core/crd/utils.py +++ /dev/null @@ -1,115 +0,0 @@ -import re - - -def deslugify(title): - """ - Convert camelCase, PascalCase, or snake_case to human-readable title. - Handles known acronyms (e.g., postgreSQLParameters -> PostgreSQL Parameters). - """ - ACRONYMS = { - # Database systems - "SQL": "SQL", - "MYSQL": "MySQL", - "POSTGRESQL": "PostgreSQL", - "MARIADB": "MariaDB", - "MSSQL": "MSSQL", - "MONGODB": "MongoDB", - "REDIS": "Redis", - # Protocols - "HTTP": "HTTP", - "HTTPS": "HTTPS", - "FTP": "FTP", - "SFTP": "SFTP", - "SSH": "SSH", - "TLS": "TLS", - "SSL": "SSL", - # APIs - "API": "API", - "REST": "REST", - "GRPC": "gRPC", - "GRAPHQL": "GraphQL", - # Networking - "URL": "URL", - "URI": "URI", - "FQDN": "FQDN", - "DNS": "DNS", - "IP": "IP", - "TCP": "TCP", - "UDP": "UDP", - # Data formats - "JSON": "JSON", - "XML": "XML", - "YAML": "YAML", - "CSV": "CSV", - "HTML": "HTML", - "CSS": "CSS", - # Hardware - "CPU": "CPU", - "RAM": "RAM", - "GPU": "GPU", - "SSD": "SSD", - "HDD": "HDD", - # Identifiers - "ID": "ID", - "UUID": "UUID", - "GUID": "GUID", - "ARN": "ARN", - # Cloud providers - "AWS": "AWS", - "GCP": "GCP", - "AZURE": "Azure", - "IBM": "IBM", - # Kubernetes/Cloud - "DB": "DB", - "PVC": "PVC", - "PV": "PV", - "VPN": "VPN", - # Auth - "OS": "OS", - "LDAP": "LDAP", - "SAML": "SAML", - "OAUTH": "OAuth", - "JWT": "JWT", - # AWS Services - "S3": "S3", - "EC2": "EC2", - "RDS": "RDS", - "EBS": "EBS", - "IAM": "IAM", - } - - if "_" in title: - # Handle snake_case - title = title.replace("_", " ") - words = title.split() - else: - # Handle camelCase/PascalCase with smart splitting - # This regex splits on: - # - Transition from lowercase to uppercase (camelCase) - # - Transition from multiple uppercase to an uppercase followed by lowercase (SQLParameters -> SQL Parameters) - words = re.findall(r"[A-Z]+(?=[A-Z][a-z]|\b)|[A-Z][a-z]+|[a-z]+|[0-9]+", title) - - # Merge adjacent words if they form a known compound acronym (e.g., postgre + SQL = PostgreSQL) - merged_words = [] - i = 0 - while i < len(words): - if i < len(words) - 1: - # Check if current word + next word form a known acronym - combined = (words[i] + words[i + 1]).upper() - if combined in ACRONYMS: - merged_words.append(combined) - i += 2 - continue - merged_words.append(words[i]) - i += 1 - - # Capitalize each word, using proper casing for known acronyms - result = [] - for word in merged_words: - word_upper = word.upper() - if word_upper in ACRONYMS: - result.append(ACRONYMS[word_upper]) - else: - result.append(word.capitalize()) - - return " ".join(result) diff --git a/src/servala/core/forms.py b/src/servala/core/forms.py index 090abba..034d1c9 100644 --- a/src/servala/core/forms.py +++ b/src/servala/core/forms.py @@ -1,12 +1,7 @@ -import json -from pathlib import Path - -import jsonschema from django import forms from django.utils.translation import gettext_lazy as _ from django_jsonform.widgets import JSONFormWidget -from servala.core.crd.forms import DEFAULT_FIELD_CONFIGS, MANDATORY_FIELDS from servala.core.models import ControlPlane, ServiceDefinition CONTROL_PLANE_USER_INFO_SCHEMA = { @@ -101,12 +96,6 @@ class ControlPlaneAdminForm(forms.ModelForm): return super().save(*args, **kwargs) -def fields_empty(fields): - if not fields: - return True - return all(not field.get("controlplane_field_mapping") for field in fields) - - class ServiceDefinitionAdminForm(forms.ModelForm): api_group = forms.CharField( required=False, @@ -135,10 +124,6 @@ class ServiceDefinitionAdminForm(forms.ModelForm): self.fields["api_version"].initial = api_def.get("version", "") self.fields["api_kind"].initial = api_def.get("kind", "") - schema_path = Path(__file__).parent / "schemas" / "form_config_schema.json" - with open(schema_path) as f: - self.form_config_schema = json.load(f) - def clean(self): cleaned_data = super().clean() @@ -166,250 +151,8 @@ class ServiceDefinitionAdminForm(forms.ModelForm): api_def["kind"] = api_kind cleaned_data["api_definition"] = api_def - form_config = cleaned_data.get("form_config") - - # Convert empty form_config to None (no custom form) - if form_config: - if not form_config.get("fieldsets") or all( - fields_empty(fieldset.get("fields")) - for fieldset in form_config.get("fieldsets") - ): - form_config = None - cleaned_data["form_config"] = None - - if form_config: - form_config = self._normalize_form_config_types(form_config) - cleaned_data["form_config"] = form_config - - try: - jsonschema.validate( - instance=form_config, schema=self.form_config_schema - ) - except jsonschema.ValidationError as e: - raise forms.ValidationError( - { - "form_config": _("Invalid form configuration: {}").format( - e.message - ) - } - ) - except jsonschema.SchemaError as e: - raise forms.ValidationError( - {"form_config": _("Schema error: {}").format(e.message)} - ) - - self._validate_field_mappings(form_config, cleaned_data) - return cleaned_data - def _normalize_form_config_types(self, form_config): - """ - Normalize form_config by converting string representations of numbers - to actual integers/floats. The JSON form widget sends all values - as strings, but the schema expects proper types. - """ - if not isinstance(form_config, dict): - return form_config - - integer_fields = ["max_length", "rows", "min_values", "max_values"] - number_fields = ["min_value", "max_value"] - - for fieldset in form_config.get("fieldsets", []): - for field in fieldset.get("fields", []): - for field_name in integer_fields: - if field_name in field and field[field_name] is not None: - value = field[field_name] - if isinstance(value, str): - try: - field[field_name] = int(value) if value else None - except (ValueError, TypeError): - pass - - for field_name in number_fields: - if field_name in field and field[field_name] is not None: - value = field[field_name] - if isinstance(value, str): - try: - field[field_name] = ( - int(value) if "." not in value else float(value) - ) - except (ValueError, TypeError): - pass - - return form_config - - def _validate_field_mappings(self, form_config, cleaned_data): - if not self.instance.pk: - return - crd = self.instance.offering_control_planes.all().first() - if not crd: - return - - schema = None - try: - schema = crd.resource_schema - except Exception: - pass - - if not schema or not (spec_schema := schema.get("properties", {}).get("spec")): - return - - valid_paths = self._extract_field_paths(spec_schema, "spec") | {"name"} - included_mappings = set() - errors = [] - for fieldset in form_config.get("fieldsets", []): - for field in fieldset.get("fields", []): - mapping = field.get("controlplane_field_mapping") - included_mappings.add(mapping) - - # Validate that fields without defaults have required properties - if mapping not in DEFAULT_FIELD_CONFIGS: - if not field.get("label"): - errors.append( - _( - "Field with mapping '{}' must have a 'label' property " - "(or use a mapping with default config)" - ).format(mapping) - ) - if not field.get("type"): - errors.append( - _( - "Field with mapping '{}' must have a 'type' property " - "(or use a mapping with default config)" - ).format(mapping) - ) - - if mapping and mapping not in valid_paths: - field_name = field.get("label", field.get("name", mapping)) - errors.append( - _( - "Field '{}' has invalid mapping '{}'. Valid paths are: {}" - ).format( - field_name, - mapping, - ", ".join(sorted(valid_paths)[:10]) - + ("..." if len(valid_paths) > 10 else ""), - ) - ) - - if field.get("type") == "choice" and field.get("choices"): - self._validate_choice_field( - field, mapping, spec_schema, "spec", errors - ) - - for mandatory_field in MANDATORY_FIELDS: - if mandatory_field not in included_mappings: - errors.append( - _( - "Required field '{}' must be included in the form configuration" - ).format(mandatory_field) - ) - - if errors: - raise forms.ValidationError({"form_config": errors}) - - def _validate_choice_field(self, field, mapping, spec_schema, prefix, errors): - if not mapping: - return - - field_name = field.get("label", mapping) - custom_choices = field.get("choices", []) - - # Single-element choices [value] are transformed to [value, value] - for i, choice in enumerate(custom_choices): - if not isinstance(choice, (list, tuple)): - errors.append( - _( - "Field '{}': Choice at index {} must be a list or tuple, " - "but got: {}" - ).format(field_name, i, repr(choice)) - ) - return - - choice_len = len(choice) - if choice_len == 1: - custom_choices[i] = [choice[0], choice[0]] - elif choice_len == 0 or choice_len > 2: - errors.append( - _( - "Field '{}': Choice at index {} must have 1 or 2 elements " - "(got {}): {}" - ).format(field_name, i, choice_len, repr(choice)) - ) - return - - field_schema = self._get_field_schema(spec_schema, mapping, prefix) - if not field_schema: - return - - control_plane_choices = field_schema.get("enum", []) - if not control_plane_choices: - return - - custom_choice_values = [choice[0] for choice in custom_choices] - - invalid_choices = [ - value - for value in custom_choice_values - if value not in control_plane_choices - ] - - if invalid_choices: - errors.append( - _( - "Field '{}' has invalid choice values: {}. " - "Valid choices from control plane are: {}" - ).format( - field_name, - ", ".join(f"'{c}'" for c in invalid_choices), - ", ".join(f"'{c}'" for c in control_plane_choices), - ) - ) - - def _get_field_schema(self, schema, field_path, prefix): - if not field_path or not schema: - return None - - if field_path.startswith(prefix + "."): - field_path = field_path[len(prefix) + 1 :] - - parts = field_path.split(".") - current_schema = schema - - for part in parts: - if not isinstance(current_schema, dict): - return None - - properties = current_schema.get("properties", {}) - if part not in properties: - return None - - current_schema = properties[part] - - return current_schema - - def _extract_field_paths(self, schema, prefix=""): - paths = set() - - if not isinstance(schema, dict): - return paths - - if "type" in schema and schema["type"] != "object": - if prefix: - paths.add(prefix) - - if schema.get("properties"): - for prop_name, prop_schema in schema["properties"].items(): - new_prefix = f"{prefix}.{prop_name}" if prefix else prop_name - paths.add(new_prefix) - paths.update(self._extract_field_paths(prop_schema, new_prefix)) - - if schema.get("type") == "array" and "items" in schema: - if prefix: - paths.add(prefix) - - return paths - def save(self, *args, **kwargs): self.instance.api_definition = self.cleaned_data["api_definition"] return super().save(*args, **kwargs) diff --git a/src/servala/core/migrations/0012_remove_advanced_fields.py b/src/servala/core/migrations/0012_remove_advanced_fields.py deleted file mode 100644 index 7d0fecd..0000000 --- a/src/servala/core/migrations/0012_remove_advanced_fields.py +++ /dev/null @@ -1,32 +0,0 @@ -# Generated by Django 5.2.7 on 2025-10-31 10:40 - -import django.core.validators -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("core", "0012_convert_user_info_to_array"), - ] - - operations = [ - migrations.RemoveField( - model_name="servicedefinition", - name="advanced_fields", - ), - migrations.AlterField( - model_name="organization", - name="name", - field=models.CharField( - max_length=32, - validators=[ - django.core.validators.RegexValidator( - message="Organization name can only contain letters, numbers, and spaces.", - regex="^[A-Za-z0-9\\s]+$", - ) - ], - verbose_name="Name", - ), - ), - ] diff --git a/src/servala/core/migrations/0013_add_form_config.py b/src/servala/core/migrations/0013_add_form_config.py deleted file mode 100644 index 2819a6c..0000000 --- a/src/servala/core/migrations/0013_add_form_config.py +++ /dev/null @@ -1,27 +0,0 @@ -# Generated by Django 5.2.7 on 2025-10-31 10:47 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("core", "0012_remove_advanced_fields"), - ] - - operations = [ - migrations.AddField( - model_name="servicedefinition", - name="form_config", - field=models.JSONField( - blank=True, - help_text=( - "Optional custom form configuration. When provided, this configuration will " - "be used to render the service form instead of auto-generating it from the OpenAPI spec. " - 'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}' - ), - null=True, - verbose_name="Form Configuration", - ), - ), - ] diff --git a/src/servala/core/models/service.py b/src/servala/core/models/service.py index 1535703..3af8c89 100644 --- a/src/servala/core/models/service.py +++ b/src/servala/core/models/service.py @@ -360,15 +360,15 @@ class ServiceDefinition(ServalaModelMixin, models.Model): null=True, blank=True, ) - form_config = models.JSONField( - verbose_name=_("Form Configuration"), + advanced_fields = models.JSONField( + verbose_name=_("Advanced fields"), help_text=_( - "Optional custom form configuration. When provided, this configuration will be used " - "to render the service form instead of auto-generating it from the OpenAPI spec. " - 'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}' + "Array of field names that should be hidden behind an 'Advanced' toggle. " + "Use dot notation (e.g., ['spec.parameters.monitoring.enabled', 'spec.parameters.backup.schedule'])" ), null=True, blank=True, + default=list, ) service = models.ForeignKey( to="Service", @@ -510,22 +510,9 @@ class ControlPlaneCRD(ServalaModelMixin, models.Model): if not self.django_model: return - return generate_model_form_class(self.django_model) - - @cached_property - def custom_model_form_class(self): - from servala.core.crd import generate_custom_form_class - - if not self.django_model: - return - if not ( - self.service_definition - and self.service_definition.form_config - and self.service_definition.form_config.get("fieldsets") - ): - return - return generate_custom_form_class( - self.service_definition.form_config, self.django_model + advanced_fields = self.service_definition.advanced_fields or [] + return generate_model_form_class( + self.django_model, advanced_fields=advanced_fields ) @@ -891,6 +878,7 @@ class ServiceInstance(ServalaModelMixin, models.Model): return return self.context.django_model( name=self.name, + organization=self.organization, context=self.context, spec=self.spec, # We pass -1 as ID in order to make it clear that a) this object exists (remotely), diff --git a/src/servala/core/schemas/form_config_schema.json b/src/servala/core/schemas/form_config_schema.json deleted file mode 100644 index 3b01b3f..0000000 --- a/src/servala/core/schemas/form_config_schema.json +++ /dev/null @@ -1,107 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Service Definition Form Configuration Schema", - "description": "Schema for custom form configuration in ServiceDefinition", - "type": "object", - "required": ["fieldsets"], - "properties": { - "fieldsets": { - "type": "array", - "description": "Array of fieldset objects defining form sections", - "minItems": 1, - "items": { - "type": "object", - "required": ["fields"], - "properties": { - "title": { - "type": "string", - "description": "Optional title for the fieldset/tab" - }, - "fields": { - "type": "array", - "description": "Array of field definitions in this fieldset", - "minItems": 1, - "items": { - "type": "object", - "required": ["controlplane_field_mapping"], - "properties": { - "type": { - "type": "string", - "description": "Field type", - "enum": ["", "text", "email", "textarea", "number", "choice", "checkbox", "array"] - }, - "label": { - "type": "string", - "description": "Human-readable field label" - }, - "help_text": { - "type": "string", - "description": "Optional help text displayed below the field" - }, - "required": { - "type": "boolean", - "description": "Whether the field is required", - "default": false - }, - "controlplane_field_mapping": { - "type": "string", - "description": "Dot-notation path mapping to Kubernetes spec field (e.g., 'spec.parameters.service.fqdn')" - }, - "max_length": { - "type": ["integer", "null"], - "description": "Maximum length for text/textarea fields", - "minimum": 1 - }, - "rows": { - "type": ["integer", "null"], - "description": "Number of rows for textarea fields", - "minimum": 1 - }, - "min_value": { - "type": ["number", "null"], - "description": "Minimum value for number fields" - }, - "max_value": { - "type": ["number", "null"], - "description": "Maximum value for number fields" - }, - "choices": { - "type": "array", - "description": "Array of [value, label] pairs for choice fields", - "items": { - "type": "array", - "items": { - "type": "string" - } - } - }, - "min_values": { - "type": ["integer", "null"], - "description": "Minimum number of values for array fields", - "minimum": 0 - }, - "max_values": { - "type": ["integer", "null"], - "description": "Maximum number of values for array fields", - "minimum": 1 - }, - "validators": { - "type": "array", - "description": "Array of validator names (for future use)", - "items": { - "type": "string", - "enum": ["email", "fqdn", "url", "ipv4", "ipv6"] - } - }, - "default_value": { - "type": "string", - "description": "Default value for the field when creating new instances" - } - } - } - } - } - } - } - } -} diff --git a/src/servala/frontend/forms/organization.py b/src/servala/frontend/forms/organization.py index 45e7b11..86ba0ab 100644 --- a/src/servala/frontend/forms/organization.py +++ b/src/servala/frontend/forms/organization.py @@ -8,6 +8,7 @@ from servala.core.models import Organization, OrganizationInvitation, Organizati from servala.core.odoo import get_invoice_addresses, get_odoo_countries from servala.frontend.forms.mixins import HtmxMixin + ORG_NAME_PATTERN = r"[\w\s\-.,&'()+]+" diff --git a/src/servala/frontend/templates/account/login.html b/src/servala/frontend/templates/account/login.html index e7576b0..f4ca590 100644 --- a/src/servala/frontend/templates/account/login.html +++ b/src/servala/frontend/templates/account/login.html @@ -26,14 +26,12 @@
- {% translate "Sign in to your account or create a new one to access your managed service instances and the Servala service catalog" %} + {% translate "Sign in to access your managed service instances and the Servala service catalog" %}
- {% translate "We currently cannot offer this service. Please check back later or contact support for more information." %} -
+{% translate "We currently cannot offer this service. Please check back later or contact support for more information." %}