Compare commits

...

2 commits

Author SHA1 Message Date
a5d46b696f Code style
All checks were successful
Tests / test (push) Successful in 26s
2025-11-05 10:37:11 +01:00
5cc582b638 Validate fields used in custom form config 2025-11-05 10:37:01 +01:00
4 changed files with 94 additions and 11 deletions

View file

@ -6,7 +6,7 @@ from django.db import models
from django.forms.models import ModelForm, ModelFormMetaclass
from django.utils.translation import gettext_lazy as _
from servala.core.models import ServiceInstance, ControlPlaneCRD
from servala.core.models import ControlPlaneCRD, ServiceInstance
from servala.frontend.forms.widgets import DynamicArrayField, DynamicArrayWidget

View file

@ -178,8 +178,80 @@ class ServiceDefinitionAdminForm(forms.ModelForm):
{"form_config": _("Schema error: {}").format(e.message)}
)
self._validate_field_mappings(form_config, cleaned_data)
return cleaned_data
def _validate_field_mappings(self, form_config, cleaned_data):
if not self.instance.pk:
return
crd = self.instance.offering_control_planes.all().first()
if not crd:
return
schema = None
try:
schema = crd.resource_schema
except Exception as e:
pass
if not schema or not (spec_schema := schema.get("properties", {}).get("spec")):
return
valid_paths = self._extract_field_paths(spec_schema, "spec") | {"name"}
included_mappings = set()
errors = []
for fieldset in form_config.get("fieldsets", []):
for field in fieldset.get("fields", []):
mapping = field.get("controlplane_field_mapping")
included_mappings.add(mapping)
if mapping and mapping not in valid_paths:
field_name = field.get("name", mapping)
errors.append(
_(
"Field '{}' has invalid mapping '{}'. Valid paths are: {}"
).format(
field_name,
mapping,
", ".join(sorted(valid_paths)[:10])
+ ("..." if len(valid_paths) > 10 else ""),
)
)
if "name" not in included_mappings:
raise forms.ValidationError(
{
"form_config": _(
"You must include a `name` field in the custom form config."
)
}
)
if errors:
raise forms.ValidationError({"form_config": errors})
def _extract_field_paths(self, schema, prefix=""):
paths = set()
if not isinstance(schema, dict):
return paths
if "type" in schema and schema["type"] != "object":
if prefix:
paths.add(prefix)
if schema.get("properties"):
for prop_name, prop_schema in schema["properties"].items():
new_prefix = f"{prefix}.{prop_name}" if prefix else prop_name
paths.add(new_prefix)
paths.update(self._extract_field_paths(prop_schema, new_prefix))
if schema.get("type") == "array" and "items" in schema:
if prefix:
paths.add(prefix)
return paths
def save(self, *args, **kwargs):
self.instance.api_definition = self.cleaned_data["api_definition"]
return super().save(*args, **kwargs)

View file

@ -8,7 +8,6 @@ from servala.core.models import Organization, OrganizationInvitation, Organizati
from servala.core.odoo import get_invoice_addresses, get_odoo_countries
from servala.frontend.forms.mixins import HtmxMixin
ORG_NAME_PATTERN = r"[\w\s\-.,&'()+]+"

View file

@ -123,7 +123,9 @@ class ServiceOfferingDetailView(OrganizationViewMixin, HtmxViewMixin, DetailView
def context_object(self):
if self.request.method == "POST":
return ControlPlaneCRD.objects.filter(
pk=self.request.POST.get("expert-context", self.request.POST.get("custom-context")),
pk=self.request.POST.get(
"expert-context", self.request.POST.get("custom-context")
),
# Make sure we dont use a malicious ID
control_plane__in=self.planes,
).first()
@ -131,19 +133,27 @@ class ServiceOfferingDetailView(OrganizationViewMixin, HtmxViewMixin, DetailView
control_plane=self.selected_plane, service_offering=self.object
).first()
def get_instance_form_kwargs(self, ignore_data=False):
return {"initial": {
"organization": self.request.organization,
"context": self.context_object,
}, "prefix": "expert", "data": self.request.POST if (self.request.method == "POST" and not ignore_data) else None
}
return {
"initial": {
"organization": self.request.organization,
"context": self.context_object,
},
"prefix": "expert",
"data": (
self.request.POST
if (self.request.method == "POST" and not ignore_data)
else None
),
}
def get_instance_form(self, ignore_data=False):
if not self.context_object or not self.context_object.model_form_class:
return
return self.context_object.model_form_class(**self.get_instance_form_kwargs(ignore_data=ignore_data))
return self.context_object.model_form_class(
**self.get_instance_form_kwargs(ignore_data=ignore_data)
)
def get_custom_instance_form(self, ignore_data=False):
if not self.context_object or not self.context_object.custom_model_form_class:
@ -169,7 +179,9 @@ class ServiceOfferingDetailView(OrganizationViewMixin, HtmxViewMixin, DetailView
context["custom_service_form"] = self.get_custom_instance_form()
else:
context["service_form"] = self.get_instance_form()
context["custom_service_form"] = self.get_custom_instance_form(ignore_data=True)
context["custom_service_form"] = self.get_custom_instance_form(
ignore_data=True
)
else:
context["service_form"] = self.get_instance_form()
context["custom_service_form"] = self.get_custom_instance_form()