servala-portal/src/servala/core/models/service.py

1251 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import hashlib
import html
import json
import re
import kubernetes
import rules
import urlman
from auditlog.registry import auditlog
from django.conf import settings
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import IntegrityError, models, transaction
from django.utils.functional import cached_property
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from encrypted_fields.fields import EncryptedJSONField
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from servala.core import rules as perms
from servala.core.models.mixins import ServalaModelMixin
from servala.core.validators import kubernetes_name_validator
class ServiceCategory(ServalaModelMixin, models.Model):
"""
Categories for services, e.g. "Databases", "Storage", "Compute".
"""
name = models.CharField(max_length=100, verbose_name=_("Name"))
description = models.TextField(blank=True, verbose_name=_("Description"))
logo = models.ImageField(
upload_to="public/service_categories",
blank=True,
null=True,
verbose_name=_("Logo"),
)
parent = models.ForeignKey(
to="self",
on_delete=models.CASCADE,
related_name="children",
blank=True,
null=True,
verbose_name=_("Parent"),
)
class Meta:
verbose_name = _("Service category")
verbose_name_plural = _("Service categories")
def __str__(self):
return self.name
class Service(ServalaModelMixin, models.Model):
"""
A service that can be offered, e.g. "PostgreSQL", "MinIO", "Kubernetes".
"""
name = models.CharField(max_length=100, verbose_name=_("Name"))
slug = models.SlugField(max_length=100, verbose_name=_("URL slug"), unique=True)
category = models.ForeignKey(
to="ServiceCategory",
on_delete=models.PROTECT,
related_name="services",
verbose_name=_("Category"),
)
description = models.TextField(blank=True, verbose_name=_("Description"))
logo = models.ImageField(
upload_to="public/services", blank=True, null=True, verbose_name=_("Logo")
)
external_links = models.JSONField(
null=True,
blank=True,
verbose_name=_("External links"),
help_text=(
'JSON array of link objects: {"url": "", "title": "", "featured": false}. '
"Featured links will be shown on the service list page, all other links will only show on the service and offering detail pages."
),
)
osb_service_id = models.CharField(
max_length=100,
null=True,
blank=True,
verbose_name=_("OSB Service ID"),
help_text=_("Open Service Broker service ID for API matching"),
)
class Meta:
verbose_name = _("Service")
verbose_name_plural = _("Services")
def __str__(self):
return self.name
@property
def featured_links(self):
"""Return external links marked as featured."""
if not self.external_links:
return []
return [link for link in self.external_links if link.get("featured")]
def validate_dict(data, required_fields=None, allow_empty=True):
if not data:
if allow_empty:
return
raise ValidationError(_("Data may not be empty!"))
missing_fields = required_fields - set(data)
if missing_fields:
raise ValidationError(
_("Missing required fields in API credentials: %(fields)s"),
params={"fields": ", ".join(missing_fields)},
)
def validate_api_credentials(value):
required_fields = ("certificate-authority-data", "server", "token")
return validate_dict(value, required_fields)
class ControlPlane(ServalaModelMixin, models.Model):
"""
Note: ControlPlanes are called "Service Provider Zone" in the user-facing frontend.
"""
name = models.CharField(max_length=100, verbose_name=_("Name"))
description = models.TextField(blank=True, verbose_name=_("Description"))
# Either contains the fields "certificate_authority_data", "server" and "token", or is empty
api_credentials = EncryptedJSONField(
verbose_name=_("API credentials"),
help_text="Required fields: certificate-authority-data, server (URL), token",
validators=[validate_api_credentials],
)
required_label = models.CharField(
max_length=100,
blank=True,
null=True,
verbose_name=_("Required Label"),
help_text=_(
"Label value for the 'appcat.vshn.io/provider-config' added to every instance on this plane."
),
)
cloud_provider = models.ForeignKey(
to="CloudProvider",
on_delete=models.PROTECT,
related_name="control_planes",
verbose_name=_("Cloud provider"),
)
user_info = models.JSONField(
null=True,
blank=True,
verbose_name=_("User Information"),
help_text=_(
'Array of info objects: [{"title": "", "content": "", "help_text": ""}]. '
"The help_text field is optional and will be shown as a hover popover on an info icon."
),
)
wildcard_dns = models.CharField(
max_length=255,
blank=True,
null=True,
verbose_name=_("Wildcard DNS"),
help_text=_(
"Wildcard DNS domain for auto-generating FQDNs (e.g., apps.exoscale-ch-gva-2-prod2.services.servala.com)"
),
)
storage_plan_odoo_product_id = models.CharField(
max_length=255,
null=True,
blank=True,
verbose_name=_("Storage plan Odoo product ID"),
help_text=_("Storage product ID in Odoo"),
)
storage_plan_odoo_unit_id = models.CharField(
max_length=255,
null=True,
blank=True,
verbose_name=_("Storage plan Odoo unit ID"),
help_text=_("Unit of measure ID in Odoo"),
)
storage_plan_price_per_gib = models.DecimalField(
max_digits=10,
decimal_places=2,
null=True,
blank=True,
verbose_name=_("Storage plan price per GiB"),
help_text=_("Price per GiB of storage"),
)
class Meta:
verbose_name = _("Control plane")
verbose_name_plural = _("Control planes")
def __str__(self):
return self.name
@property
def service_definitions(self):
return ServiceDefinition.objects.filter(
offering_control_planes__control_plane=self
).distinct()
@property
def kubernetes_config(self):
conf = kubernetes.client.Configuration()
user_name = "servala-user"
config_dict = {
"apiVersion": "v1",
"clusters": [
{
"cluster": {
"certificate-authority-data": self.api_credentials[
"certificate-authority-data"
],
"server": self.api_credentials["server"],
},
"name": self.name,
},
],
"contexts": [
{
"context": {
"cluster": self.name,
"namespace": "default",
"user": user_name,
},
"name": self.name,
}
],
"current-context": self.name,
"kind": "Config",
"preferences": {},
"users": [
{
"name": user_name,
"user": {"token": self.api_credentials["token"]},
}
],
}
config.load_kube_config_from_dict(
config_dict=config_dict,
client_configuration=conf,
)
return conf
def get_kubernetes_client(self):
return kubernetes.client.ApiClient(self.kubernetes_config)
@cached_property
def custom_objects_api(self):
return client.CustomObjectsApi(self.get_kubernetes_client())
def test_connection(self):
if not self.api_credentials:
return False, _("No API credentials provided")
try:
v1 = kubernetes.client.CoreV1Api(self.get_kubernetes_client())
namespace_count = len(v1.list_namespace().items)
return True, _(
"Successfully connected to Kubernetes API. Found {} namespaces."
).format(namespace_count)
except ApiException as e:
return False, _("API error: {}").format(str(e))
except Exception as e:
return False, _("Connection error: {}").format(str(e))
def get_or_create_namespace(self, organization):
api_instance = kubernetes.client.CoreV1Api(self.get_kubernetes_client())
name = organization.namespace
try:
api_instance.read_namespace(name=name)
except kubernetes.client.ApiException as e:
if e.status == 404:
labels = {
"servala.com/organization_id": str(organization.id),
}
annotations = {
"servala.com/organization": organization.name,
"servala.com/origin": organization.origin.name,
"servala.com/billing": organization.billing_entity.name,
}
for field in ("company_id", "invoice_id"):
if value := getattr(organization.billing_entity, f"odoo_{field}"):
labels[f"servala.com/erp_{field}"] = str(value)
if organization.odoo_sale_order_id:
labels["servala.com/erp_sale_order_id"] = str(
organization.odoo_sale_order_id
)
body = kubernetes.client.V1Namespace(
metadata=kubernetes.client.V1ObjectMeta(
name=name, labels=labels, annotations=annotations
)
)
api_instance.create_namespace(body=body)
else:
# If there's another error, raise it
raise
class CloudProvider(ServalaModelMixin, models.Model):
"""
A cloud provider, e.g. "Exoscale".
"""
name = models.CharField(max_length=100, verbose_name=_("Name"))
description = models.TextField(blank=True, verbose_name=_("Description"))
logo = models.ImageField(
upload_to="public/service_providers",
blank=True,
null=True,
verbose_name=_("Logo"),
)
external_links = models.JSONField(
null=True,
blank=True,
verbose_name=_("External links"),
help_text=('JSON array of link objects: {"url": "", "title": ""}. '),
)
class Meta:
verbose_name = _("Cloud provider")
verbose_name_plural = _("Cloud providers")
def __str__(self):
return self.name
def validate_api_definition(value):
required_fields = ("group", "version", "kind")
return validate_dict(value, required_fields)
def prune_empty_data(data):
"""
Recursively remove empty values from dictionaries and lists.
Empty values include: None, empty strings, empty lists, empty dicts.
"""
def is_empty(value):
return value is None or value == "" or value == [] or value == {}
if isinstance(data, dict):
return {
key: pruned_value
for key, value in data.items()
if not is_empty(pruned_value := prune_empty_data(value))
}
elif isinstance(data, list):
return [
pruned_item
for item in data
if not is_empty(pruned_item := prune_empty_data(item))
]
else:
return data
class ServiceDefinition(ServalaModelMixin, models.Model):
"""
Configuration/service implementation: contains information on which
CompositeResourceDefinition (aka XRD) implements a service on a ControlPlane.
Is required in order to query the OpenAPI spec for dynamic form generation.
"""
name = models.CharField(max_length=100, verbose_name=_("Name"))
description = models.TextField(blank=True, verbose_name=_("Description"))
api_definition = models.JSONField(
verbose_name=_("API Definition"),
help_text=_("Contains group, version, and kind information"),
null=True,
blank=True,
)
form_config = models.JSONField(
verbose_name=_("Form Configuration"),
help_text=_(
"Optional custom form configuration. When provided, this configuration will be used "
"to render the service form instead of auto-generating it from the OpenAPI spec. "
'Format: {"fieldsets": [{"title": "Section", "fields": [{...}]}]}'
),
null=True,
blank=True,
)
hide_expert_mode = models.BooleanField(
default=False,
verbose_name=_("Disable Expert Mode"),
help_text=_(
"When enabled, the 'Show Expert Mode' toggle will be hidden and only the custom form "
"configuration will be available. Only applies when a custom form configuration is provided."
),
)
service = models.ForeignKey(
to="Service",
on_delete=models.CASCADE,
related_name="service_definitions",
verbose_name=_("Service"),
)
@property
def control_planes(self):
return ControlPlane.objects.filter(
offering_connections__service_definition=self
).distinct()
@property
def service_offerings(self):
return ServiceOffering.objects.filter(
control_plane_connections__service_definition=self
).distinct()
class Meta:
verbose_name = _("Service definition")
verbose_name_plural = _("Service definitions")
def __str__(self):
return self.name
class ControlPlaneCRD(ServalaModelMixin, models.Model):
"""
Each combination of ServiceOffering and ControlPlane can have a different
ServiceDefinition, which is here modeled as basically a "through" model.
"""
service_offering = models.ForeignKey(
to="ServiceOffering",
on_delete=models.CASCADE,
related_name="control_plane_connections",
verbose_name=_("Service offering"),
)
control_plane = models.ForeignKey(
to="ControlPlane",
on_delete=models.CASCADE,
related_name="offering_connections",
verbose_name=_("Control plane"),
)
service_definition = models.ForeignKey(
to="ServiceDefinition",
on_delete=models.PROTECT,
related_name="offering_control_planes",
verbose_name=_("Service definition"),
)
class Meta:
verbose_name = _("ControlPlane CRD")
verbose_name_plural = _("ControlPlane CRDs")
unique_together = [("service_offering", "control_plane")]
def __str__(self):
return f"{self.service_offering} on {self.control_plane} with {self.service_definition}"
@cached_property
def group(self):
return self.service_definition.api_definition["group"]
@cached_property
def version(self):
return self.service_definition.api_definition["version"]
@cached_property
def kind(self):
return self.service_definition.api_definition["kind"]
@cached_property
def kind_plural(self):
if (
hasattr(self.resource_definition, "status")
and hasattr(self.resource_definition.status, "accepted_names")
and self.resource_definition.status.accepted_names
):
return self.resource_definition.status.accepted_names.plural
if self.kind.endswith("s"):
return self.kind.lower()
return f"{self.kind.lower()}s"
@cached_property
def resource_definition(self):
client = self.control_plane.get_kubernetes_client()
extensions_api = kubernetes.client.ApiextensionsV1Api(client)
crds = extensions_api.list_custom_resource_definition()
matching_crd = None
for crd in crds.items:
if matching_crd:
break
if crd.spec.group == self.group:
for served_version in crd.spec.versions:
if served_version.name == self.version and served_version.served:
if crd.spec.names.kind == self.kind:
matching_crd = crd
break
return matching_crd
@cached_property
def resource_schema(self):
cache_key = f"servala:crd:schema:{self.pk}"
if result := cache.get(cache_key):
return result
if not self.resource_definition:
return
for v in self.resource_definition.spec.versions:
if v.name == self.version:
if not v.schema or not v.schema.open_apiv3_schema:
return
result = v.schema.open_apiv3_schema.to_dict()
timeout_seconds = 60 * 60 * 24
cache.set(cache_key, result, timeout=timeout_seconds)
return result
@cached_property
def django_model(self):
from servala.core.crd import generate_django_model
if not self.resource_schema:
return
kwargs = {
"group": self.group,
"version": self.version,
"kind": self.kind,
}
return generate_django_model(self.resource_schema, **kwargs)
@cached_property
def model_form_class(self):
from servala.core.crd import generate_model_form_class
if not self.django_model:
return
return generate_model_form_class(self.django_model)
@cached_property
def custom_model_form_class(self):
from servala.core.crd import generate_custom_form_class
if not self.django_model:
return
if not (
self.service_definition
and self.service_definition.form_config
and self.service_definition.form_config.get("fieldsets")
):
return
return generate_custom_form_class(
self.service_definition.form_config, self.django_model
)
class ServiceOffering(ServalaModelMixin, models.Model):
"""
A service offering, e.g. "PostgreSQL on AWS", "MinIO on GCP".
"""
service = models.ForeignKey(
to="Service",
on_delete=models.PROTECT,
related_name="offerings",
verbose_name=_("Service"),
)
provider = models.ForeignKey(
to="CloudProvider",
on_delete=models.PROTECT,
related_name="offerings",
verbose_name=_("Provider"),
)
description = models.TextField(blank=True, verbose_name=_("Description"))
external_links = models.JSONField(
null=True,
blank=True,
verbose_name=_("External links"),
help_text=('JSON array of link objects: {"url": "", "title": ""}. '),
)
osb_plan_id = models.CharField(
max_length=100,
null=True,
blank=True,
verbose_name=_("OSB Plan ID"),
help_text=_("Open Service Broker plan ID for API matching"),
)
class Meta:
verbose_name = _("Service offering")
verbose_name_plural = _("Service offerings")
def __str__(self):
return _("{service_name} at {provider_name}").format(
service_name=self.service.name, provider_name=self.provider.name
)
@property
def control_planes(self):
return ControlPlane.objects.filter(
offering_connections__service_offering=self
).distinct()
class ServiceInstance(ServalaModelMixin, models.Model):
"""
The source of truth for service instances is Kubernetes.
The Django model only contains metadata, all other information is queried
on the fly.
"""
# The Kubernetes resource name (metadata.name). This field is immutable after
# creation and is auto-generated for new instances. Do not modify directly!
name = models.CharField(
max_length=63,
verbose_name=_("Instance ID"),
validators=[kubernetes_name_validator],
)
display_name = models.CharField(
max_length=100,
verbose_name=_("Name"),
)
organization = models.ForeignKey(
to="core.Organization",
on_delete=models.PROTECT,
verbose_name=_("Organization"),
related_name="service_instances",
)
created_by = models.ForeignKey(
to="core.User",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="+",
)
context = models.ForeignKey(
to="core.ControlPlaneCRD",
related_name="service_instances",
on_delete=models.PROTECT,
)
compute_plan_assignment = models.ForeignKey(
to="core.ComputePlanAssignment",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="instances",
verbose_name=_("Compute plan assignment"),
help_text=_("Compute plan with SLA for this instance"),
)
class Meta:
verbose_name = _("Service instance")
verbose_name_plural = _("Service instances")
# Names are unique per de-facto namespace, which is defined by the
# Organization + ServiceDefinition (group, version) + the ControlPlane.
unique_together = [("name", "organization", "context")]
rules_permissions = {
"view": rules.is_staff | perms.is_organization_member,
"change": rules.is_staff | perms.is_organization_admin,
"delete": rules.is_staff | perms.is_organization_admin,
"add": rules.is_authenticated,
}
class urls(urlman.Urls):
base = "{self.organization.urls.instances}{self.name}-{self.pk}/"
update = "{base}update/"
delete = "{base}delete/"
def _clear_kubernetes_caches(self):
"""Clears cached properties that depend on Kubernetes state."""
attrs = self.__dict__.keys()
for key in (
"kubernetes_object",
"spec",
"status_conditions",
"connection_credentials",
):
if key in attrs:
delattr(self, key)
@classmethod
def _prepare_spec_data(cls, spec_data):
"""
Prepare spec data by pruning empty values and ensuring required fields.
This method centralizes spec data processing to ensure consistency.
"""
spec_data = spec_data or {}
spec_data = prune_empty_data(spec_data)
return spec_data
@staticmethod
def generate_resource_name(organization, display_name, service, attempt=0):
"""
Generate a unique Kubernetes-compatible resource name.
Format: {prefix}-{sha256[:8]}
The hash input is: org_slug:display_name:service_slug[:attempt if > 0]
On collision, we retry with an incremented attempt number included in hash.
"""
hash_input = (
f"{organization.slug}:{display_name.lower().strip()}:{service.slug}"
)
if attempt > 0:
hash_input += f":{attempt}"
hash_value = hashlib.sha256(hash_input.encode("utf-8")).hexdigest()[:8]
return f"{settings.SERVALA_INSTANCE_NAME_PREFIX}-{hash_value}"
@staticmethod
def _apply_compute_plan_to_spec(spec_data, compute_plan_assignment):
"""
Apply compute plan resource allocations and SLA to spec.
"""
if not compute_plan_assignment:
return spec_data
compute_plan = compute_plan_assignment.compute_plan
if "parameters" not in spec_data:
spec_data["parameters"] = {}
if "size" not in spec_data["parameters"]:
spec_data["parameters"]["size"] = {}
if "requests" not in spec_data["parameters"]["size"]:
spec_data["parameters"]["size"]["requests"] = {}
if "service" not in spec_data["parameters"]:
spec_data["parameters"]["service"] = {}
spec_data["parameters"]["size"]["memory"] = compute_plan.memory_limits
spec_data["parameters"]["size"]["cpu"] = compute_plan.cpu_limits
spec_data["parameters"]["size"]["requests"][
"memory"
] = compute_plan.memory_requests
spec_data["parameters"]["size"]["requests"]["cpu"] = compute_plan.cpu_requests
spec_data["parameters"]["service"]["serviceLevel"] = compute_plan_assignment.sla
return spec_data
@staticmethod
def _build_billing_annotations(
compute_plan_assignment,
control_plane,
instance_name=None,
display_name=None,
organization=None,
service=None,
):
"""
Build Kubernetes annotations for billing integration and display name.
"""
from servala.core.models.organization import InvoiceGroupingChoice
annotations = {}
if display_name:
annotations["servala.com/displayName"] = display_name
if compute_plan_assignment:
annotations["servala.com/erp_product_id_resource"] = str(
compute_plan_assignment.odoo_product_id
)
annotations["servala.com/erp_unit_id_resource"] = str(
compute_plan_assignment.odoo_unit_id
)
if control_plane.storage_plan_odoo_product_id:
annotations["servala.com/erp_product_id_storage"] = str(
control_plane.storage_plan_odoo_product_id
)
if control_plane.storage_plan_odoo_unit_id:
annotations["servala.com/erp_unit_id_storage"] = str(
control_plane.storage_plan_odoo_unit_id
)
if organization and instance_name:
invoice_grouping = organization.origin.invoice_grouping
cloud_provider_name = control_plane.cloud_provider.name
control_plane_name = control_plane.name
if invoice_grouping == InvoiceGroupingChoice.BY_SERVICE:
if service:
annotations["servala.com/erp_item_group_description"] = (
f"Servala Service: {service.name}"
)
annotations["servala.com/erp_item_description"] = (
f"{instance_name} on {cloud_provider_name} {control_plane_name}"
)
else:
group_description = f"Organization: {organization.name}"
item_description = f"{instance_name} on {control_plane_name}"
if organization.osb_guid:
group_description += f" ({organization.osb_guid})"
item_description += f" [Org: {organization.osb_guid}]"
annotations["servala.com/erp_item_group_description"] = (
group_description
)
annotations["servala.com/erp_item_description"] = item_description
return annotations
@classmethod
def _format_kubernetes_error(cls, error_message):
if not error_message:
return {"message": "", "errors": None, "has_list": False}
error_message = str(error_message).strip()
# Pattern to match validation errors in brackets like [error1, error2, error3]
pattern = r"\[([^\]]+)\]"
match = re.search(pattern, error_message)
if not match:
return {"message": error_message, "errors": None, "has_list": False}
errors_text = match.group(1).strip()
if "," not in errors_text:
return {"message": error_message, "errors": None, "has_list": False}
errors = [error.strip().strip("\"'") for error in errors_text.split(",")]
errors = [error for error in errors if error]
if len(errors) <= 1:
return {"message": error_message, "errors": None, "has_list": False}
base_message = re.sub(pattern, "", error_message).strip()
base_message = base_message.rstrip(":").strip()
return {"message": base_message, "errors": errors, "has_list": True}
@classmethod
def _safe_format_error(cls, error_data):
if not isinstance(error_data, dict):
return html.escape(str(error_data))
if not error_data.get("has_list", False):
return html.escape(error_data.get("message", ""))
message = html.escape(error_data.get("message", ""))
errors = error_data.get("errors", [])
if not errors:
return message
escaped_errors = [html.escape(str(error)) for error in errors]
error_items = "".join(f"<li>{error}</li>" for error in escaped_errors)
if message:
return mark_safe(f"{message}<ul>{error_items}</ul>")
else:
return mark_safe(f"<ul>{error_items}</ul>")
@classmethod
@transaction.atomic
def create_instance(
cls,
display_name,
organization,
context,
created_by,
spec_data,
compute_plan_assignment=None,
):
service = context.service_offering.service
name = None
for attempt in range(10):
name = cls.generate_resource_name(
organization, display_name, service, attempt
)
if not cls.objects.filter(
name=name, organization=organization, context=context
).exists():
break
else:
message = _(
"Could not generate a unique resource name. Please try a different display name."
)
raise ValidationError(organization.add_support_message(message))
# Ensure the namespace exists
context.control_plane.get_or_create_namespace(organization)
try:
instance = cls.objects.create(
name=name,
display_name=display_name,
organization=organization,
created_by=created_by,
context=context,
compute_plan_assignment=compute_plan_assignment,
)
except IntegrityError:
message = _(
"An instance with this name already exists in this organization. Please choose a different name."
)
raise ValidationError(organization.add_support_message(message))
try:
spec_data = cls._prepare_spec_data(spec_data)
if compute_plan_assignment:
spec_data = cls._apply_compute_plan_to_spec(
spec_data, compute_plan_assignment
)
if "writeConnectionSecretToRef" not in spec_data:
spec_data["writeConnectionSecretToRef"] = {}
if not spec_data["writeConnectionSecretToRef"].get("name"):
service_slug = context.service_offering.service.slug
secret_name = f"{organization.slug}-{instance.pk}-{service_slug}"
spec_data["writeConnectionSecretToRef"]["name"] = secret_name
create_data = {
"apiVersion": f"{context.group}/{context.version}",
"kind": context.kind,
"metadata": {
"name": name,
"namespace": organization.namespace,
},
"spec": spec_data,
}
annotations = cls._build_billing_annotations(
compute_plan_assignment=compute_plan_assignment,
control_plane=context.control_plane,
instance_name=name,
display_name=display_name,
organization=organization,
service=service,
)
if annotations:
create_data["metadata"]["annotations"] = annotations
if label := context.control_plane.required_label:
create_data["metadata"]["labels"] = {settings.DEFAULT_LABEL_KEY: label}
api_instance = context.control_plane.custom_objects_api
api_instance.create_namespaced_custom_object(
group=context.group,
version=context.version,
namespace=organization.namespace,
plural=context.kind_plural,
body=create_data,
)
except Exception as e:
# Transaction will automatically roll back the instance creation
if isinstance(e, ApiException):
try:
error_body = json.loads(e.body)
reason = error_body.get("message", str(e))
error_data = cls._format_kubernetes_error(reason)
formatted_reason = cls._safe_format_error(error_data)
message = _("Error reported by control plane: {reason}").format(
reason=formatted_reason
)
raise ValidationError(organization.add_support_message(message))
except (ValueError, TypeError):
error_data = cls._format_kubernetes_error(str(e))
formatted_error = cls._safe_format_error(error_data)
message = _("Error reported by control plane: {error}").format(
error=formatted_error
)
raise ValidationError(organization.add_support_message(message))
error_data = cls._format_kubernetes_error(str(e))
formatted_error = cls._safe_format_error(error_data)
message = _("Error creating instance: {error}").format(
error=formatted_error
)
raise ValidationError(organization.add_support_message(message))
return instance
def update_spec(self, spec_data, updated_by, compute_plan_assignment=None):
try:
spec_data = self._prepare_spec_data(spec_data)
plan_to_use = compute_plan_assignment or self.compute_plan_assignment
if plan_to_use:
spec_data = self._apply_compute_plan_to_spec(spec_data, plan_to_use)
api_instance = self.context.control_plane.custom_objects_api
patch_body = {"spec": spec_data}
annotations = self._build_billing_annotations(
compute_plan_assignment=plan_to_use,
control_plane=self.context.control_plane,
instance_name=self.name,
display_name=self.display_name,
organization=self.organization,
service=self.context.service_offering.service,
)
if annotations:
patch_body["metadata"] = {"annotations": annotations}
api_instance.patch_namespaced_custom_object(
group=self.context.group,
version=self.context.version,
namespace=self.organization.namespace,
plural=self.context.kind_plural,
name=self.name,
body=patch_body,
)
self._clear_kubernetes_caches()
if (
compute_plan_assignment
and compute_plan_assignment != self.compute_plan_assignment
):
self.compute_plan_assignment = compute_plan_assignment
# Saving to update updated_at timestamp even if nothing was visibly changed
self.save()
except ApiException as e:
if e.status == 404:
message = _(
"Service instance not found in control plane. It may have been deleted externally."
)
raise ValidationError(self.organization.add_support_message(message))
try:
error_body = json.loads(e.body)
reason = error_body.get("message", str(e))
error_data = self._format_kubernetes_error(reason)
formatted_reason = self._safe_format_error(error_data)
message = _(
"Error reported by control plane while updating instance: {reason}"
).format(reason=formatted_reason)
raise ValidationError(self.organization.add_support_message(message))
except (ValueError, TypeError):
error_data = self._format_kubernetes_error(str(e))
formatted_error = self._safe_format_error(error_data)
message = _(
"Error reported by control plane while updating instance: {error}"
).format(error=formatted_error)
raise ValidationError(self.organization.add_support_message(message))
except Exception as e:
error_data = self._format_kubernetes_error(str(e))
formatted_error = self._safe_format_error(error_data)
message = _("Error updating instance: {error}").format(
error=formatted_error
)
raise ValidationError(self.organization.add_support_message(message))
@transaction.atomic
def delete(self, using=None, keep_parents=False, user=None):
"""
Deletes the Django instance and the corresponding Kubernetes custom resource.
"""
if (
self.spec.get("parameters", {})
.get("security", {})
.get("deletionProtection")
):
spec = copy.copy(self.spec)
spec["parameters"]["security"]["deletionProtection"] = False
self.update_spec(spec, user)
try:
api_instance = self.context.control_plane.custom_objects_api
api_instance.delete_namespaced_custom_object(
group=self.context.group,
version=self.context.version,
namespace=self.organization.namespace,
plural=self.context.kind_plural,
name=self.name,
body=client.V1DeleteOptions(),
)
except ApiException as e:
if e.status != 404:
# 404 is fine, the object was deleted already.
raise
self._clear_kubernetes_caches()
return super().delete(using=using, keep_parents=keep_parents)
@cached_property
def kubernetes_object(self):
"""Fetch the Kubernetes custom resource object"""
try:
api_instance = client.CustomObjectsApi(
self.context.control_plane.get_kubernetes_client()
)
return api_instance.get_namespaced_custom_object(
group=self.context.group,
version=self.context.version,
namespace=self.organization.namespace,
plural=self.context.kind_plural,
name=self.name,
)
except ApiException as e:
if e.status == 404:
return None
raise
@cached_property
def spec(self):
if not self.kubernetes_object:
return {}
if not (spec := self.kubernetes_object.get("spec")):
return {}
# Remove fields that shouldn't be displayed
spec = spec.copy()
spec.pop("resourceRef", None)
spec.pop("writeConnectionSecretToRef", None)
return spec
@cached_property
def spec_object(self):
"""Dynamically generated CRD object."""
if not self.context.django_model:
return
return self.context.django_model(
display_name=self.display_name,
context=self.context,
spec=self.spec,
# We pass -1 as ID in order to make it clear that a) this object exists (remotely),
# and b) its not a normal database object. This allows us to treat e.g. update
# forms differently from create forms.
pk=-1,
)
@cached_property
def status_conditions(self):
if not self.kubernetes_object:
return []
if not (status := self.kubernetes_object.get("status")):
return []
return status.get("conditions") or []
@cached_property
def connection_credentials(self):
"""
Get connection credentials directly from the resource's writeConnectionSecretToRef
after checking that secret conditions are available.
"""
if not self.kubernetes_object:
return {}
# Check if secrets are available based on conditions
secrets_available = any(
[
condition.get("type") == "Ready" and condition.get("status") == "True"
for condition in self.status_conditions
]
)
if not secrets_available:
return {}
spec = self.kubernetes_object.get("spec")
if not (secret_ref := spec.get("writeConnectionSecretToRef")):
return {}
if not (secret_name := secret_ref.get("name")):
return {}
try:
# Get the secret data
v1 = kubernetes.client.CoreV1Api(
self.context.control_plane.get_kubernetes_client()
)
secret = v1.read_namespaced_secret(
name=secret_name, namespace=self.organization.namespace
)
# Secret data is base64 encoded
credentials = {}
if hasattr(secret, "data") and secret.data:
import base64
for key, value in secret.data.items():
# Skip keys ending with _HOST as they're only useful for dedicated OpenShift clusters
if key.endswith("_HOST"):
continue
try:
credentials[key] = base64.b64decode(value).decode("utf-8")
except Exception:
credentials[key] = f"<binary data: {len(value)} bytes>"
return credentials
except ApiException as e:
return {"error": str(e)}
except Exception as e:
return {"error": str(e)}
@property
def fqdn_url(self):
try:
fqdn = self.spec.get("parameters", {}).get("service", {}).get("fqdn")
if not fqdn:
return None
if isinstance(fqdn, list):
return fqdn[0]
elif isinstance(fqdn, str):
return fqdn
else:
return None
except (AttributeError, KeyError, IndexError):
return None
@cached_property
def kubernetes_events(self) -> dict:
"""
Returns a list of event dictionaries sorted by last timestamp (newest first).
"""
if not self.kubernetes_object:
return []
try:
v1 = kubernetes.client.CoreV1Api(
self.context.control_plane.get_kubernetes_client()
)
events = v1.list_namespaced_event(
namespace=self.organization.namespace,
field_selector=f"involvedObject.name={self.name},involvedObject.kind={self.context.kind}",
)
event_list = []
for event in events.items:
event_dict = {
"type": event.type, # Normal or Warning
"reason": event.reason,
"message": event.message,
"count": event.count or 1,
"first_timestamp": (
event.first_timestamp.isoformat()
if event.first_timestamp
else None
),
"last_timestamp": (
event.last_timestamp.isoformat()
if event.last_timestamp
else None
),
"source": event.source.component if event.source else None,
}
event_list.append(event_dict)
event_list.sort(key=lambda x: x.get("last_timestamp") or "", reverse=True)
return event_list
except ApiException:
return []
except Exception:
return []
auditlog.register(ServiceInstance, exclude_fields=["updated_at"], serialize_data=True)