diff --git a/src/servala/api/authentication.py b/src/servala/api/authentication.py deleted file mode 100644 index f7d7479..0000000 --- a/src/servala/api/authentication.py +++ /dev/null @@ -1,56 +0,0 @@ -import base64 - -from django.conf import settings -from django.http import HttpResponse -from django.utils.translation import gettext as _ - - -class OSBBasicAuthentication: - """ - HTTP Basic Authentication for OSB API endpoints. - Uses environment variables for username/password configuration. - """ - - def __init__(self, get_response): - self.get_response = get_response - self.osb_username = getattr(settings, "OSB_USERNAME", None) - self.osb_password = getattr(settings, "OSB_PASSWORD", None) - - def __call__(self, request): - # Only apply authentication to OSB API endpoints - if request.path.startswith("/v2/"): - if not self._authenticate_request(request): - return self._authentication_required() - - return self.get_response(request) - - def _authenticate_request(self, request): - """ - Authenticate the request using HTTP Basic Authentication. - """ - if not self.osb_username or not self.osb_password: - return False - - auth_header = request.META.get("HTTP_AUTHORIZATION", "") - - if not auth_header.startswith("Basic "): - return False - - try: - encoded_credentials = auth_header[6:] # Remove 'Basic ' prefix - decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") - username, password = decoded_credentials.split(":", 1) - - return username == self.osb_username and password == self.osb_password - except ValueError: - return False - - def _authentication_required(self): - """ - Return an HTTP 401 Unauthorized response. - """ - response = HttpResponse( - _("Authentication required"), status=401, content_type="text/plain" - ) - response["WWW-Authenticate"] = 'Basic realm="OSB API"' - return response diff --git a/src/servala/api/permissions.py b/src/servala/api/permissions.py new file mode 100644 index 0000000..25a015d --- /dev/null +++ b/src/servala/api/permissions.py @@ -0,0 +1,50 @@ +import base64 +from contextlib import suppress + +from django.conf import settings +from django.http import HttpResponseForbidden + + +def get_username_and_password(request): # pragma: no cover + # This method is vendored from assorted DRF bits, so we + # skip it in our test coverage report + auth = request.META.get("HTTP_AUTHORIZATION", b"") + if isinstance(auth, str): + # Work around django test client oddness + auth = auth.encode("iso-8859-1") + auth = auth.split() + if not auth or auth[0].lower() != b"basic": + return False, False + if len(auth) != 2: + return False, False + + with suppress(TypeError, ValueError): + try: + auth_decoded = base64.b64decode(auth[1]).decode("utf-8") + except UnicodeDecodeError: + auth_decoded = base64.b64decode(auth[1]).decode("latin-1") + + return auth_decoded.split(":", 1) + + return False, False + + +class OSBBasicAuthPermission: + """ + Basic auth for OSB is implemented as a permission class rather than as + an authentication class, because authentication is expected to associate + the request with a Django user. However, the OSB/Exoscale requests do not + relate to a user account, so we treat the auth result as a permission instead. + """ + + def dispatch(self, request, *args, **kwargs): + osb_username = getattr(settings, "OSB_USERNAME", None) + osb_password = getattr(settings, "OSB_PASSWORD", None) + + if not osb_username or not osb_password: + return False # pragma: no cover + + username, password = get_username_and_password(request) + if username == osb_username and password == osb_password: + return super().dispatch(request, *args, **kwargs) + return HttpResponseForbidden()