Refactor ServiceInstanceDetailView

This commit is contained in:
Tobias Kunze 2025-12-17 16:38:35 +01:00
parent b010b6ac62
commit c83683aaf1
5 changed files with 257 additions and 264 deletions

View file

@ -0,0 +1,151 @@
"""
Data handling for CRD data.
Follows the custom/generated split from servala.core.crd.forms.
"""
from servala.core.crd.utils import deslugify
def get_value_from_path(data, path):
"""
Get a value from nested dict using dot notation path.
e.g., 'spec.parameters.size.disk' from {'spec': {'parameters': {'size': {'disk': '10Gi'}}}}
"""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
def get_form_config_fieldsets(spec, form_config, name):
full_data = {"spec": spec, "name": name}
fieldsets = []
for fieldset_config in form_config.get("fieldsets", []):
fields = []
for field_config in fieldset_config.get("fields", []):
mapping = field_config.get("controlplane_field_mapping", "")
# Skip the name field as it's shown in the header
if mapping == "name":
continue
value = get_value_from_path(full_data, mapping)
if value is None:
continue
label = field_config.get("label") or deslugify(mapping.split(".")[-1])
fields.append(
{
"key": mapping,
"label": label,
"value": value,
"help_text": field_config.get("help_text", ""),
}
)
if fields:
fieldsets.append(
{
"title": fieldset_config.get("title", "Configuration"),
"fields": fields,
"fieldsets": {},
}
)
return fieldsets
def get_auto_generated_fieldsets(spec):
"""
Auto-generate fieldsets from spec structure (fallback when no form_config).
Excludes "General" tab - only returns nested structures.
"""
nested_fieldsets = {}
# First pass: organize fields into nested structures
for key, value in spec.items():
if isinstance(value, dict):
# This is a nested structure
if key not in nested_fieldsets:
nested_fieldsets[key] = {
"title": deslugify(key),
"fields": [],
"fieldsets": {},
}
# Process fields in the nested structure
for sub_key, sub_value in value.items():
if isinstance(sub_value, dict):
# Even deeper nesting
if sub_key not in nested_fieldsets[key]["fieldsets"]:
nested_fieldsets[key]["fieldsets"][sub_key] = {
"title": deslugify(sub_key),
"fields": [],
}
# Add fields from the deeper level
for leaf_key, leaf_value in sub_value.items():
nested_fieldsets[key]["fieldsets"][sub_key]["fields"].append(
{
"key": leaf_key,
"label": deslugify(leaf_key),
"value": leaf_value,
}
)
else:
# Add field to parent level
nested_fieldsets[key]["fields"].append(
{
"key": sub_key,
"label": deslugify(sub_key),
"value": sub_value,
}
)
# Second pass: Promote fields based on count
for group_key, group in list(nested_fieldsets.items()):
# Promote single sub-fieldsets to parent
for sub_key, sub_fieldset in list(group["fieldsets"].items()):
if len(sub_fieldset["fields"]) == 1:
field = sub_fieldset["fields"][0]
field["label"] = f"{sub_fieldset['title']}: {field['label']}"
group["fields"].append(field)
del group["fieldsets"][sub_key]
# Remove empty groups
total_fields = len(group["fields"])
for sub_fieldset in group["fieldsets"].values():
total_fields += len(sub_fieldset["fields"])
if total_fields == 0:
del nested_fieldsets[group_key]
# Create fieldsets from the organized data (no "General" tab)
fieldsets = []
for group in nested_fieldsets.values():
fieldsets.append(group)
return fieldsets
def get_nested_data(instance):
"""
Organize spec data into fieldsets.
Uses form_config when available, otherwise auto-generates from spec structure.
"""
if not instance.spec:
return []
if (
instance.context
and instance.context.service_definition
and instance.context.service_definition.form_config
):
return get_form_config_fieldsets(
instance.spec, instance.context.service_defintion.form_config, instance.name
)
return get_auto_generated_fieldsets(instance.spec)

View file

@ -113,3 +113,26 @@ def deslugify(title):
result.append(word.capitalize())
return " ".join(result)
def parse_disk_size_gib(value):
"""Parse disk size string (e.g., '10Gi', '100G') to GiB as integer."""
if not value:
return None
value = str(value)
# Handle Gi suffix (GiB)
if value.endswith("Gi"):
try:
return int(value[:-2])
except ValueError:
return None
# Handle G suffix (assume GB, convert to GiB approximately)
if value.endswith("G"):
try:
return int(float(value[:-1]) * 0.931) # GB to GiB
except ValueError:
return None
try:
return int(value)
except ValueError:
return None

View file

@ -3,6 +3,7 @@ import hashlib
import html
import json
import re
from decimal import Decimal
import kubernetes
import rules
@ -21,6 +22,7 @@ from kubernetes.client.rest import ApiException
from servala.core import rules as perms
from servala.core.models.mixins import ServalaModelMixin
from servala.core.utils import to_money
from servala.core.validators import kubernetes_name_validator
@ -1247,5 +1249,72 @@ class ServiceInstance(ServalaModelMixin, models.Model):
except Exception:
return []
def calculate_pricing(self):
"""Calculate hourly and monthly pricing."""
from servala.core.crd.utils import get_value_from_path, parse_disk_size_gib
pricing = {
"compute_hourly": None,
"compute_monthly": None,
"storage_hourly": None,
"storage_monthly": None,
"total_hourly": None,
"total_monthly": None,
"disk_size_gib": None,
}
hours_per_month = 720
# Compute plan pricing
if self.compute_plan_assignment:
price = self.compute_plan_assignment.price
unit = self.compute_plan_assignment.unit
hourly = price
if unit == "day":
hourly = price / Decimal("24")
elif unit == "month":
hourly = price / Decimal(hours_per_month)
elif unit == "year":
hourly = price / Decimal(hours_per_month * 12)
pricing["compute_hourly"] = to_money(hourly)
pricing["compute_monthly"] = to_money(hourly * hours_per_month)
# Storage pricing
storage_price_per_gib = None
if self.context and self.context.control_plane.storage_plan_price_per_gib:
storage_price_per_gib = (
self.context.control_plane.storage_plan_price_per_gib
)
# Get disk size from spec
disk_size_gib = None
if self.spec:
disk_value = get_value_from_path(
{"spec": self.spec}, "spec.parameters.size.disk"
)
disk_size_gib = parse_disk_size_gib(disk_value)
pricing["disk_size_gib"] = disk_size_gib
if storage_price_per_gib and disk_size_gib:
storage_hourly = storage_price_per_gib * disk_size_gib
pricing["storage_hourly"] = to_money(storage_hourly)
pricing["storage_monthly"] = to_money(storage_hourly * hours_per_month)
# Total pricing
if (
pricing["compute_hourly"] is not None
or pricing["storage_hourly"] is not None
):
compute_h = pricing["compute_hourly"] or Decimal("0")
storage_h = pricing["storage_hourly"] or Decimal("0")
pricing["total_hourly"] = to_money(compute_h + storage_h)
pricing["total_monthly"] = to_money(
pricing["total_hourly"] * hours_per_month
)
return pricing
auditlog.register(ServiceInstance, exclude_fields=["updated_at"], serialize_data=True)

10
src/servala/core/utils.py Normal file
View file

@ -0,0 +1,10 @@
from decimal import ROUND_HALF_UP, Decimal
def to_money(value):
"""Consistent monetary values by handling quantizing and rounding"""
if not value:
value = Decimal("0")
if not isinstance(value, Decimal):
raise ValueError("Expected a Decimal")
return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)

View file

@ -6,7 +6,6 @@ from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView, ListView, UpdateView
from servala.core.crd import deslugify
from servala.core.models import (
ControlPlaneCRD,
Service,
@ -356,9 +355,11 @@ class ServiceInstanceDetailView(
permission_type = "view"
def get_context_data(self, **kwargs):
from servala.core.crd.data import get_nested_data
context = super().get_context_data(**kwargs)
if self.object.kubernetes_object and self.object.spec:
context["spec_fieldsets"] = self.get_nested_spec()
context["spec_fieldsets"] = get_nested_data(self.object.spec)
context["has_change_permission"] = self.request.user.has_perm(
ServiceInstance.get_perm("change"), self.object
)
@ -377,270 +378,9 @@ class ServiceInstanceDetailView(
"price_per_gib": self.object.context.control_plane.storage_plan_price_per_gib,
}
# Calculate pricing summary
context["pricing"] = self._calculate_pricing()
context["pricing"] = self.object.calculate_pricing()
return context
def _parse_disk_size_gib(self, disk_value):
"""Parse disk size string (e.g., '10Gi', '100G') to GiB as integer."""
if not disk_value:
return None
disk_str = str(disk_value)
# Handle Gi suffix (GiB)
if disk_str.endswith("Gi"):
try:
return int(disk_str[:-2])
except ValueError:
return None
# Handle G suffix (assume GB, convert to GiB approximately)
if disk_str.endswith("G"):
try:
return int(float(disk_str[:-1]) * 0.931) # GB to GiB
except ValueError:
return None
# Handle plain number (assume GiB)
try:
return int(disk_str)
except ValueError:
return None
def _calculate_pricing(self):
"""Calculate hourly and monthly pricing for the instance."""
from decimal import Decimal, ROUND_HALF_UP
pricing = {
"compute_hourly": None,
"compute_monthly": None,
"storage_hourly": None,
"storage_monthly": None,
"total_hourly": None,
"total_monthly": None,
"disk_size_gib": None,
}
hours_per_month = 720
# Compute plan pricing
compute_assignment = self.object.compute_plan_assignment
if compute_assignment:
price = compute_assignment.price
unit = compute_assignment.unit
# Convert to hourly
if unit == "hour":
hourly = price
elif unit == "day":
hourly = price / Decimal("24")
elif unit == "month":
hourly = price / Decimal(hours_per_month)
elif unit == "year":
hourly = price / Decimal(hours_per_month * 12)
else:
hourly = price # fallback
pricing["compute_hourly"] = hourly.quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
pricing["compute_monthly"] = (hourly * hours_per_month).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
# Storage pricing
storage_price_per_gib = None
if (
self.object.context
and self.object.context.control_plane.storage_plan_price_per_gib
):
storage_price_per_gib = (
self.object.context.control_plane.storage_plan_price_per_gib
)
# Get disk size from spec
disk_size_gib = None
if self.object.spec:
disk_value = self._get_value_from_path(
{"spec": self.object.spec}, "spec.parameters.size.disk"
)
disk_size_gib = self._parse_disk_size_gib(disk_value)
pricing["disk_size_gib"] = disk_size_gib
if storage_price_per_gib and disk_size_gib:
storage_hourly = storage_price_per_gib * disk_size_gib
pricing["storage_hourly"] = storage_hourly.quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
pricing["storage_monthly"] = (storage_hourly * hours_per_month).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
# Total pricing
if pricing["compute_hourly"] is not None or pricing["storage_hourly"] is not None:
compute_h = pricing["compute_hourly"] or Decimal("0")
storage_h = pricing["storage_hourly"] or Decimal("0")
pricing["total_hourly"] = (compute_h + storage_h).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
pricing["total_monthly"] = (pricing["total_hourly"] * hours_per_month).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
return pricing
def _get_value_from_path(self, data, path):
"""
Get a value from nested dict using dot notation path.
e.g., 'spec.parameters.size.disk' from {'spec': {'parameters': {'size': {'disk': '10Gi'}}}}
"""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
def _get_form_config_fieldsets(self):
"""
Generate fieldsets from form_config, extracting values from spec.
"""
form_config = self.object.context.service_definition.form_config
spec = self.object.spec or {}
full_data = {"spec": spec, "name": self.object.name}
fieldsets = []
for fieldset_config in form_config.get("fieldsets", []):
fields = []
for field_config in fieldset_config.get("fields", []):
mapping = field_config.get("controlplane_field_mapping", "")
# Skip the name field as it's shown in the header
if mapping == "name":
continue
value = self._get_value_from_path(full_data, mapping)
if value is None:
continue
label = field_config.get("label") or deslugify(mapping.split(".")[-1])
fields.append(
{
"key": mapping,
"label": label,
"value": value,
"help_text": field_config.get("help_text", ""),
}
)
if fields:
fieldsets.append(
{
"title": fieldset_config.get("title", "Configuration"),
"fields": fields,
"fieldsets": {},
}
)
return fieldsets
def _get_auto_generated_fieldsets(self):
"""
Auto-generate fieldsets from spec structure (fallback when no form_config).
Excludes "General" tab - only returns nested structures.
"""
spec = self.object.spec or {}
if not spec:
return []
nested_fieldsets = {}
# First pass: organize fields into nested structures
for key, value in spec.items():
if isinstance(value, dict):
# This is a nested structure
if key not in nested_fieldsets:
nested_fieldsets[key] = {
"title": deslugify(key),
"fields": [],
"fieldsets": {},
}
# Process fields in the nested structure
for sub_key, sub_value in value.items():
if isinstance(sub_value, dict):
# Even deeper nesting
if sub_key not in nested_fieldsets[key]["fieldsets"]:
nested_fieldsets[key]["fieldsets"][sub_key] = {
"title": deslugify(sub_key),
"fields": [],
}
# Add fields from the deeper level
for leaf_key, leaf_value in sub_value.items():
nested_fieldsets[key]["fieldsets"][sub_key][
"fields"
].append(
{
"key": leaf_key,
"label": deslugify(leaf_key),
"value": leaf_value,
}
)
else:
# Add field to parent level
nested_fieldsets[key]["fields"].append(
{
"key": sub_key,
"label": deslugify(sub_key),
"value": sub_value,
}
)
# Second pass: Promote fields based on count
for group_key, group in list(nested_fieldsets.items()):
# Promote single sub-fieldsets to parent
for sub_key, sub_fieldset in list(group["fieldsets"].items()):
if len(sub_fieldset["fields"]) == 1:
field = sub_fieldset["fields"][0]
field["label"] = f"{sub_fieldset['title']}: {field['label']}"
group["fields"].append(field)
del group["fieldsets"][sub_key]
# Remove empty groups
total_fields = len(group["fields"])
for sub_fieldset in group["fieldsets"].values():
total_fields += len(sub_fieldset["fields"])
if total_fields == 0:
del nested_fieldsets[group_key]
# Create fieldsets from the organized data (no "General" tab)
fieldsets = []
for group in nested_fieldsets.values():
fieldsets.append(group)
return fieldsets
def get_nested_spec(self):
"""
Organize spec data into fieldsets.
Uses form_config when available, otherwise auto-generates from spec structure.
"""
spec = self.object.spec or {}
if not spec:
return []
# Check if form_config exists
if (
self.object.context
and self.object.context.service_definition
and self.object.context.service_definition.form_config
):
return self._get_form_config_fieldsets()
return self._get_auto_generated_fieldsets()
class ServiceInstanceUpdateView(
ServiceInstanceMixin, OrganizationViewMixin, HtmxUpdateView