""" Admin classes for pricing models including compute plans, storage plans, and VSHN AppCat pricing """ import re from django.contrib import admin, messages from django.contrib.admin import helpers from django.utils.html import format_html from django import forms from django.shortcuts import render from django.http import HttpResponseRedirect from adminsortable2.admin import SortableAdminMixin from import_export.admin import ImportExportModelAdmin from import_export import resources from import_export.fields import Field from import_export.widgets import ForeignKeyWidget from ..models import ( ComputePlan, ComputePlanGroup, ComputePlanPrice, CloudProvider, StoragePlan, StoragePlanPrice, VSHNAppCatBaseFee, VSHNAppCatPrice, VSHNAppCatUnitRate, ProgressiveDiscountModel, DiscountTier, ExternalPricePlans, Service, ) from ..models.base import Term def natural_sort_key(obj): """Extract numeric parts for natural sorting""" parts = re.split(r"(\d+)", obj.name) return [int(part) if part.isdigit() else part for part in parts] class ComputePlanPriceInline(admin.TabularInline): """Inline admin for ComputePlanPrice model""" model = ComputePlanPrice extra = 1 fields = ("currency", "amount") class ComputePlanItemInline(admin.TabularInline): """Inline admin for ComputePlan model""" model = ComputePlan extra = 1 fields = ("name", "vcpus", "ram", "active", "valid_from", "valid_to") @admin.register(ComputePlanGroup) class ComputePlanGroupAdmin(SortableAdminMixin, admin.ModelAdmin): """Admin configuration for ComputePlanGroup model""" list_display = ("name", "node_label", "compute_plans_count") search_fields = ("name", "description", "node_label") ordering = ("order",) def compute_plans_count(self, obj): """Display count of compute plans in this group""" return obj.compute_plans.count() compute_plans_count.short_description = "Compute Plans" class ComputePlanResource(resources.ModelResource): """Import/Export resource for ComputePlan model""" cloud_provider = Field( column_name="cloud_provider", attribute="cloud_provider", widget=ForeignKeyWidget(CloudProvider, "name"), ) group = Field( column_name="group", attribute="group", widget=ForeignKeyWidget(ComputePlanGroup, "name"), ) prices = Field(column_name="prices", attribute=None) class Meta: model = ComputePlan skip_unchanged = True report_skipped = False import_id_fields = ["name"] fields = ( "name", "vcpus", "ram", "cpu_mem_ratio", "cloud_provider", "group", "active", "term", "valid_from", "valid_to", "prices", ) def dehydrate_prices(self, compute_plan): """Export prices in a custom format""" prices = compute_plan.prices.all() if not prices: return "" return "|".join([f"{p.currency} {p.amount}" for p in prices]) def save_m2m(self, instance, row, *args, **kwargs): """Handle many-to-many relationships during import""" super().save_m2m(instance, row, *args, **kwargs) if "prices" in row and row["prices"]: # Clear existing prices first instance.prices.all().delete() price_entries = row["prices"].split("|") for entry in price_entries: if " " in entry: currency, amount = entry.split(" ") ComputePlanPrice.objects.create( compute_plan=instance, currency=currency, amount=amount ) class MassUpdateComputePlanForm(forms.Form): """Form for mass updating ComputePlan fields""" active = forms.ChoiceField( choices=[("", "-- No change --"), ("True", "Active"), ("False", "Inactive")], required=False, help_text="Set active status for selected compute plans", ) term = forms.ChoiceField( choices=[("", "-- No change --")] + Term.choices, required=False, help_text="Set billing term for selected compute plans", ) group = forms.ModelChoiceField( queryset=ComputePlanGroup.objects.all(), required=False, empty_label="-- No change --", help_text="Set group for selected compute plans", ) valid_from = forms.DateField( widget=forms.DateInput(attrs={"type": "date"}), required=False, help_text="Set valid from date for selected compute plans", ) valid_to = forms.DateField( widget=forms.DateInput(attrs={"type": "date"}), required=False, help_text="Set valid to date for selected compute plans", ) @admin.register(ComputePlan) class ComputePlanAdmin(ImportExportModelAdmin): """Admin configuration for ComputePlan model with import/export functionality""" resource_class = ComputePlanResource list_display = ( "name", "cloud_provider", "group", "vcpus", "ram", "term", "display_prices", "active", ) search_fields = ("name", "cloud_provider__name", "group__name") list_filter = ("active", "cloud_provider", "group") inlines = [ComputePlanPriceInline] actions = ["mass_update_compute_plans"] def changelist_view(self, request, extra_context=None): """Override changelist view to apply natural sorting""" # Get the response from parent response = super().changelist_view(request, extra_context) # If it's a TemplateResponse, we can modify the context if hasattr(response, "context_data") and "cl" in response.context_data: cl = response.context_data["cl"] if hasattr(cl, "result_list"): cl.result_list = sorted(cl.result_list, key=natural_sort_key) return response def display_prices(self, obj): """Display formatted prices for the list view""" prices = obj.prices.all() if not prices: return "No prices set" return format_html("
".join([f"{p.amount} {p.currency}" for p in prices])) display_prices.short_description = "Prices (Amount Currency)" def mass_update_compute_plans(self, request, queryset): """Admin action to mass update compute plan fields""" if request.POST.get("post"): # Process the form submission form = MassUpdateComputePlanForm(request.POST) if form.is_valid(): updated_count = 0 updated_fields = [] # Prepare update data update_data = {} # Handle active field if form.cleaned_data["active"]: update_data["active"] = form.cleaned_data["active"] == "True" updated_fields.append("active") # Handle term field if form.cleaned_data["term"]: update_data["term"] = form.cleaned_data["term"] updated_fields.append("term") # Handle group field if form.cleaned_data["group"]: update_data["group"] = form.cleaned_data["group"] updated_fields.append("group") # Handle valid_from field if form.cleaned_data["valid_from"]: update_data["valid_from"] = form.cleaned_data["valid_from"] updated_fields.append("valid_from") # Handle valid_to field if form.cleaned_data["valid_to"]: update_data["valid_to"] = form.cleaned_data["valid_to"] updated_fields.append("valid_to") # Perform the bulk update if update_data: updated_count = queryset.update(**update_data) # Create success message field_list = ", ".join(updated_fields) self.message_user( request, f"Successfully updated {updated_count} compute plan(s). " f"Updated fields: {field_list}", messages.SUCCESS, ) else: self.message_user( request, "No fields were selected for update.", messages.WARNING ) return HttpResponseRedirect(request.get_full_path()) else: # Show the form form = MassUpdateComputePlanForm() # Render the mass update template return render( request, "admin/mass_update_compute_plans.html", { "form": form, "queryset": queryset, "action_checkbox_name": helpers.ACTION_CHECKBOX_NAME, "opts": self.model._meta, "title": f"Mass Update {queryset.count()} Compute Plans", }, ) mass_update_compute_plans.short_description = "Mass update selected compute plans" class VSHNAppCatBaseFeeInline(admin.TabularInline): """Inline admin for VSHNAppCatBaseFee model""" model = VSHNAppCatBaseFee extra = 1 fields = ("currency", "amount") class VSHNAppCatUnitRateInline(admin.TabularInline): """Inline admin for VSHNAppCatUnitRate model""" model = VSHNAppCatUnitRate extra = 1 fields = ("currency", "service_level", "amount") class DiscountTierInline(admin.TabularInline): """Inline admin for DiscountTier model""" model = DiscountTier extra = 1 fields = ("min_units", "max_units", "discount_percent") ordering = ("min_units",) @admin.register(ProgressiveDiscountModel) class ProgressiveDiscountModelAdmin(admin.ModelAdmin): """Admin configuration for ProgressiveDiscountModel""" list_display = ("name", "description", "active") search_fields = ("name", "description") inlines = [DiscountTierInline] @admin.register(VSHNAppCatPrice) class VSHNAppCatPriceAdmin(admin.ModelAdmin): """Admin configuration for VSHNAppCatPrice model""" list_display = ( "service", "variable_unit", "term", "discount_model", "admin_display_base_fees", "admin_display_unit_rates", "public_display_enabled", ) list_filter = ("variable_unit", "service", "discount_model") search_fields = ("service__name",) inlines = [VSHNAppCatBaseFeeInline, VSHNAppCatUnitRateInline] def admin_display_base_fees(self, obj): """Display base fees in admin list view""" fees = obj.base_fees.all() if not fees: return "No base fees" return format_html( "
".join([f"{fee.amount} {fee.currency}" for fee in fees]) ) admin_display_base_fees.short_description = "Base Fees" def admin_display_unit_rates(self, obj): """Display unit rates in admin list view""" rates = obj.unit_rates.all() if not rates: return "No unit rates" return format_html( "
".join( [ f"{rate.amount} {rate.currency} ({rate.get_service_level_display()})" for rate in rates ] ) ) admin_display_unit_rates.short_description = "Unit Rates" class StoragePlanPriceInline(admin.TabularInline): """Inline admin for StoragePlanPrice model""" model = StoragePlanPrice extra = 1 fields = ("currency", "amount") class StoragePlanResource(resources.ModelResource): """Import/Export resource for StoragePlan model""" cloud_provider = Field( column_name="cloud_provider", attribute="cloud_provider", widget=ForeignKeyWidget(CloudProvider, "name"), ) prices = Field(column_name="prices", attribute=None) class Meta: model = StoragePlan skip_unchanged = True report_skipped = False import_id_fields = ["name"] fields = ( "name", "cloud_provider", "term", "unit", "valid_from", "valid_to", "prices", ) def dehydrate_prices(self, storage_plan): """Export prices in a custom format""" prices = storage_plan.prices.all() if not prices: return "" return "|".join([f"{p.currency} {p.amount}" for p in prices]) def save_m2m(self, instance, row, *args, **kwargs): """Handle many-to-many relationships during import""" super().save_m2m(instance, row, *args, **kwargs) # Handle prices if "prices" in row and row["prices"]: # Clear existing prices first instance.prices.all().delete() # Create new prices price_entries = row["prices"].split("|") for entry in price_entries: if " " in entry: currency, amount = entry.split(" ") StoragePlanPrice.objects.create( storage_plan=instance, currency=currency, amount=amount ) @admin.register(StoragePlan) class StoragePlanAdmin(ImportExportModelAdmin): """Admin configuration for StoragePlan model with import/export functionality""" resource_class = StoragePlanResource list_display = ( "name", "cloud_provider", "term", "unit", "display_prices", ) search_fields = ("name", "cloud_provider__name") list_filter = ("cloud_provider",) ordering = ("name",) inlines = [StoragePlanPriceInline] def display_prices(self, obj): """Display formatted prices for the list view""" prices = obj.prices.all() if not prices: return "No prices set" return format_html("
".join([f"{p.amount} {p.currency}" for p in prices])) display_prices.short_description = "Prices (Amount Currency)" class ExternalPricePlansResource(resources.ModelResource): """Import/Export resource for ExternalPricePlans model""" cloud_provider = Field( column_name="cloud_provider", attribute="cloud_provider", widget=ForeignKeyWidget(CloudProvider, "name"), ) service = Field( column_name="service", attribute="service", widget=ForeignKeyWidget(Service, "name"), ) compare_to = Field(column_name="compare_to", attribute=None) class Meta: model = ExternalPricePlans skip_unchanged = True report_skipped = False import_id_fields = ["plan_name", "cloud_provider", "service"] fields = ( "plan_name", "description", "source", "date_retrieved", "cloud_provider", "service", "currency", "term", "amount", "vcpus", "ram", "storage", "competitor_sla", "replicas", "service_level", "compare_to", ) def dehydrate_compare_to(self, external_price): """Export compute plans this external price compares to""" compute_plans = external_price.compare_to.all() if not compute_plans: return "" return "|".join([plan.name for plan in compute_plans]) def save_m2m(self, instance, row, *args, **kwargs): """Handle many-to-many relationships during import""" super().save_m2m(instance, row, *args, **kwargs) # Handle compare_to relationships if "compare_to" in row and row["compare_to"]: # Clear existing relationships first instance.compare_to.clear() # Create new relationships plan_names = row["compare_to"].split("|") for plan_name in plan_names: plan_name = plan_name.strip() if plan_name: try: compute_plan = ComputePlan.objects.get(name=plan_name) instance.compare_to.add(compute_plan) except ComputePlan.DoesNotExist: # Log or handle missing compute plan pass @admin.register(ExternalPricePlans) class ExternalPricePlansAdmin(ImportExportModelAdmin): """Admin configuration for ExternalPricePlans model with import/export functionality""" resource_class = ExternalPricePlansResource list_display = ( "plan_name", "cloud_provider", "service", "amount", "display_compare_to_count", "replicas", ) list_filter = ("cloud_provider", "service", "currency", "term") search_fields = ("plan_name", "cloud_provider__name", "service__name") ordering = ("cloud_provider", "service", "plan_name") # Configure many-to-many field display filter_horizontal = ("compare_to",) def display_compare_to_count(self, obj): """Display count of compute plans this external price compares to""" count = obj.compare_to.count() if count == 0: return "No comparisons" return f"{count} plan{'s' if count != 1 else ''}" display_compare_to_count.short_description = "Compare To"