import xmlrpc.client from django.conf import settings from django_scopes import scopes_disabled ADDRESS_FIELDS = [ "id", "name", "street", "street2", "city", "zip", "state_id", "country_id", "email", "phone", "vat", "is_company", "type", "parent_id", ] class OdooClient: def __init__(self): self.url = settings.ODOO["URL"] self.db = settings.ODOO["DB"] self.username = settings.ODOO["USERNAME"] self.password = settings.ODOO["PASSWORD"] self.common_proxy = None self.models_proxy = None self.uid = None def _connect(self): """This method is called on the first client request, not on instantiation, so that we can instantiate the client on startup and reuse it across the entire application.""" try: self.common_proxy = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/common") self.uid = self.common_proxy.authenticate( self.db, self.username, self.password, {} ) if not self.uid: raise Exception("Authentication failed with Odoo: No UID returned.") self.models_proxy = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/object") except xmlrpc.client.Fault as e: raise Exception( f"Odoo XML-RPC Fault during connection: {e.faultString}" ) from e except ConnectionRefusedError as e: raise Exception( f"Could not connect to Odoo at {self.url}. Connection refused." ) from e except Exception as e: raise Exception( f"An error occurred while connecting to Odoo: {str(e)}" ) from e def execute(self, model, method, args_list, **kwargs): if not self.uid or not self.models_proxy: self._connect() try: result = self.models_proxy.execute_kw( self.db, self.uid, self.password, model, method, args_list, kwargs ) return result except xmlrpc.client.Fault as e: print(f"Fault! {e}") raise Exception(f"Odoo XML-RPC Fault: {e.faultString}") from e except ConnectionRefusedError as e: raise Exception( f"Connection to Odoo at {self.url} lost or refused during operation." ) from e except Exception as e: print(e) raise Exception( f"An error occurred while communicating with Odoo: {str(e)}" ) from e def search_read(self, model, domain, fields, **kwargs): return self.execute(model, "search_read", args_list=[domain, fields], **kwargs) CLIENT = OdooClient() # Odoo countries do not change, so they are fetched once per process COUNTRIES = [] def get_odoo_countries(): global COUNTRIES if COUNTRIES: return COUNTRIES try: odoo_countries_data = CLIENT.search_read( model="res.country", domain=[], fields=["id", "name"] ) # Format as Django choices: [(value, label), ...] COUNTRIES = [ (country["id"], country["name"]) for country in odoo_countries_data ] # Sort by country name for better UX in dropdowns COUNTRIES.sort(key=lambda x: x[1]) except Exception as e: # Log the error or handle it as appropriate for your application # For now, return an empty list or a default if Odoo is unavailable print(f"Error fetching Odoo countries: {e}") return [("", "Error fetching countries")] # Or just [] return COUNTRIES def get_odoo_access_conditions(user): # We're building our conditions in order: # - in exceptions, users may be using a billing account's email # - if the user is an admin or owner of a Servala organization # - if the user is associated with an odoo user, return all billing # addresses / organizations created by the user # - if the user is associated with an odoo contact, return all billing # addresses with the same parent_id from servala.core.models.organization import ( OrganizationMembership, OrganizationRole, ) email = user.email or_conditions = [("email", "ilike", email)] odoo_users = CLIENT.search_read( model="res.users", domain=[("login", "=", email), ("active", "=", True)], fields=["id", "share"], limit=1, ) if odoo_users: odoo_user = odoo_users[0] if odoo_user.get("share") is False: # An Odoo internal user (share=False) should see all invoice addresses, # so we short-circuit the entire search logic here return [] elif uid := odoo_user.get("id"): # For portal users or users not in Odoo, apply standard filters. or_conditions.append(("create_uid", "=", uid)) odoo_contacts = CLIENT.search_read( model="res.partner", domain=[ ("is_company", "=", False), ("type", "=", "contact"), ("email", "ilike", email), ], fields=["id", "parent_id"], ) if odoo_contacts: for contact in odoo_contacts: or_conditions.append(("parent_id", "=", contact["parent_id"][0])) with scopes_disabled(): servala_invoice_ids = list( OrganizationMembership.objects.filter( user=user, role__in=[OrganizationRole.ADMIN, OrganizationRole.OWNER] ) .values_list("organization__billing_entity__odoo_invoice_id", flat=True) .distinct() ) servala_invoice_ids = [pk for pk in servala_invoice_ids if pk] if servala_invoice_ids: or_conditions.append(("id", "in", servala_invoice_ids)) if len(or_conditions) > 1: or_conditions = ["|"] * (len(or_conditions) - 1) + or_conditions return or_conditions def get_invoice_addresses(user): """Used during organization creation: retrieves all invoice addresses the user owns or is connected to from the Odoo API.""" or_conditions = get_odoo_access_conditions(user) domain = [ ("is_company", "=", False), ("type", "=", "invoice"), ] + or_conditions try: invoice_addresses = CLIENT.search_read( model="res.partner", domain=domain, fields=ADDRESS_FIELDS, ) if invoice_addresses: invoice_addresses.sort( key=lambda addr: ( addr["parent_id"][1] if addr.get("parent_id") else "", addr["name"], addr["id"], ) ) return invoice_addresses or [] except Exception: return [] def create_helpdesk_ticket(title, description, partner_id=None, sale_order_id=None): ticket_data = { "name": title, "team_id": settings.ODOO["HELPDESK_TEAM_ID"], "description": description, } if partner_id: ticket_data["partner_id"] = partner_id if sale_order_id: ticket_data["sale_order_id"] = sale_order_id return CLIENT.execute("helpdesk.ticket", "create", [ticket_data])