parent
3d62595663
commit
61f1065bc6
2 changed files with 445 additions and 4 deletions
|
|
@ -5,6 +5,12 @@ from django.core import mail
|
|||
from django_scopes import scopes_disabled
|
||||
|
||||
from servala.core.models import Organization, OrganizationOrigin, User
|
||||
from servala.core.models.service import (
|
||||
ControlPlane,
|
||||
ControlPlaneCRD,
|
||||
ServiceDefinition,
|
||||
ServiceInstance,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
@ -451,3 +457,273 @@ def test_organization_creation_with_context_only(
|
|||
assert response.status_code == 201
|
||||
org = Organization.objects.get(osb_guid="fallback-org-guid")
|
||||
assert org is not None
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_delete_offboarding_success(
|
||||
mock_odoo_success,
|
||||
osb_client,
|
||||
test_service,
|
||||
test_service_offering,
|
||||
instance_id,
|
||||
):
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_id}"
|
||||
f"?service_id={test_service.osb_service_id}&plan_id={test_service_offering.osb_plan_id}"
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.content == b"{}"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_delete_missing_service_id(osb_client, test_service_offering, instance_id):
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_id}?plan_id={test_service_offering.osb_plan_id}"
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert "service_id is required but missing" in response_data["error"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_delete_missing_plan_id(osb_client, test_service, instance_id):
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_id}?service_id={test_service.osb_service_id}"
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert "plan_id is required but missing" in response_data["error"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_delete_invalid_service_id(osb_client, instance_id):
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_id}?service_id=invalid&plan_id=invalid"
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert "Unknown service_id: invalid" in response_data["error"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_delete_invalid_plan_id(osb_client, test_service, instance_id):
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_id}"
|
||||
f"?service_id={test_service.osb_service_id}&plan_id=invalid"
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert (
|
||||
f"Unknown plan_id: invalid for service_id: {test_service.osb_service_id}"
|
||||
in response_data["error"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_patch_suspension_success(
|
||||
mock_odoo_success,
|
||||
osb_client,
|
||||
test_service,
|
||||
test_service_offering,
|
||||
instance_id,
|
||||
):
|
||||
payload = {
|
||||
"service_id": test_service.osb_service_id,
|
||||
"plan_id": test_service_offering.osb_plan_id,
|
||||
"parameters": {
|
||||
"users": [
|
||||
{
|
||||
"email": "user@example.com",
|
||||
"full_name": "Test User",
|
||||
"role": "owner",
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
response = osb_client.patch(
|
||||
f"/api/osb/v2/service_instances/{instance_id}",
|
||||
data=json.dumps(payload),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.content == b"{}"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_patch_missing_service_id(osb_client, test_service_offering, instance_id):
|
||||
payload = {
|
||||
"plan_id": test_service_offering.osb_plan_id,
|
||||
"parameters": {"users": []},
|
||||
}
|
||||
|
||||
response = osb_client.patch(
|
||||
f"/api/osb/v2/service_instances/{instance_id}",
|
||||
data=json.dumps(payload),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert "service_id is required but missing" in response_data["error"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_patch_missing_plan_id(osb_client, test_service, instance_id):
|
||||
payload = {
|
||||
"service_id": test_service.osb_service_id,
|
||||
"parameters": {"users": []},
|
||||
}
|
||||
|
||||
response = osb_client.patch(
|
||||
f"/api/osb/v2/service_instances/{instance_id}",
|
||||
data=json.dumps(payload),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert "plan_id is required but missing" in response_data["error"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_patch_invalid_json(osb_client, instance_id):
|
||||
response = osb_client.patch(
|
||||
f"/api/osb/v2/service_instances/{instance_id}",
|
||||
data="invalid json{",
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
response_data = json.loads(response.content)
|
||||
assert "Invalid JSON in request body" in response_data["error"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_delete_creates_ticket_with_admin_links(
|
||||
mocker,
|
||||
mock_odoo_success,
|
||||
osb_client,
|
||||
test_service,
|
||||
test_service_offering,
|
||||
instance_id,
|
||||
):
|
||||
mock_api_client = mocker.patch("servala.api.views.CLIENT")
|
||||
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_id}"
|
||||
f"?service_id={test_service.osb_service_id}&plan_id={test_service_offering.osb_plan_id}"
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
ticket_call = mock_api_client.execute.call_args_list[-1]
|
||||
ticket_data = ticket_call[0][2][0]
|
||||
|
||||
assert "admin/core/serviceoffering" in ticket_data["description"]
|
||||
assert f"/{test_service_offering.pk}/" in ticket_data["description"]
|
||||
assert (
|
||||
ticket_data["name"]
|
||||
== f"Exoscale OSB Offboard - {test_service.name} - {instance_id}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_patch_creates_ticket_with_user_admin_links(
|
||||
mocker,
|
||||
mock_odoo_success,
|
||||
osb_client,
|
||||
test_service,
|
||||
test_service_offering,
|
||||
instance_id,
|
||||
org_owner,
|
||||
):
|
||||
mock_api_client = mocker.patch("servala.api.views.CLIENT")
|
||||
payload = {
|
||||
"service_id": test_service.osb_service_id,
|
||||
"plan_id": test_service_offering.osb_plan_id,
|
||||
"parameters": {
|
||||
"users": [
|
||||
{
|
||||
"email": org_owner.email,
|
||||
"full_name": "Test User",
|
||||
"role": "owner",
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
response = osb_client.patch(
|
||||
f"/api/osb/v2/service_instances/{instance_id}",
|
||||
data=json.dumps(payload),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
ticket_call = mock_api_client.execute.call_args_list[-1]
|
||||
ticket_data = ticket_call[0][2][0]
|
||||
assert "admin/core/serviceoffering" in ticket_data["description"]
|
||||
assert "admin/core/user" in ticket_data["description"]
|
||||
assert f"/{org_owner.pk}/" in ticket_data["description"]
|
||||
assert (
|
||||
ticket_data["name"]
|
||||
== f"Exoscale OSB Suspend - {test_service.name} - {instance_id}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ticket_includes_organization_and_instance_when_found(
|
||||
mocker,
|
||||
mock_odoo_success,
|
||||
osb_client,
|
||||
test_service,
|
||||
test_service_offering,
|
||||
organization,
|
||||
):
|
||||
mock_api_client = mocker.patch("servala.api.views.CLIENT")
|
||||
service_definition = ServiceDefinition.objects.create(
|
||||
name="Test Definition",
|
||||
service=test_service,
|
||||
api_definition={"group": "test.example.com", "version": "v1", "kind": "Test"},
|
||||
)
|
||||
control_plane = ControlPlane.objects.create(
|
||||
name="Test Control Plane",
|
||||
cloud_provider=test_service_offering.provider,
|
||||
api_credentials={
|
||||
"certificate-authority-data": "test",
|
||||
"server": "https://test",
|
||||
"token": "test",
|
||||
},
|
||||
)
|
||||
crd = ControlPlaneCRD.objects.create(
|
||||
service_offering=test_service_offering,
|
||||
control_plane=control_plane,
|
||||
service_definition=service_definition,
|
||||
)
|
||||
instance_name = "test-instance-123"
|
||||
service_instance = ServiceInstance.objects.create(
|
||||
name=instance_name,
|
||||
organization=organization,
|
||||
context=crd,
|
||||
)
|
||||
|
||||
response = osb_client.delete(
|
||||
f"/api/osb/v2/service_instances/{instance_name}"
|
||||
f"?service_id={test_service.osb_service_id}&plan_id={test_service_offering.osb_plan_id}"
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
ticket_call = mock_api_client.execute.call_args_list[-1]
|
||||
ticket_data = ticket_call[0][2][0]
|
||||
assert f"Organization: {organization.name}" in ticket_data["description"]
|
||||
assert "admin/core/organization" in ticket_data["description"]
|
||||
assert f"/{organization.pk}/" in ticket_data["description"]
|
||||
assert f"Instance: {service_instance.name}" in ticket_data["description"]
|
||||
assert "admin/core/serviceinstance" in ticket_data["description"]
|
||||
assert f"/{service_instance.pk}/" in ticket_data["description"]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue