First SDK version

This commit is contained in:
nemunaire 2023-04-13 03:10:33 +02:00
parent 952e9d06b0
commit 7aaf0e2a96
9 changed files with 532 additions and 2 deletions

View File

@ -30,5 +30,3 @@
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
version = "0.1"

100
happydomain/admin.py Normal file
View File

@ -0,0 +1,100 @@
"""Handle administration tasks through happyDomain's admin API"""
from datetime import datetime, timezone
import json
import os
from urllib.parse import quote_plus
import requests
import requests_unixsocket
from .api import HappyError
class Admin:
def __init__(self, socket="./happydomain.sock"):
self.session = requests_unixsocket.Session()
self.socket_path = quote_plus(os.path.abspath(socket))
def authuser_create(self, email, password, allowcommercials=False, email_verified=False):
email_verification = None
if email_verified:
now = datetime.now()
now = now.replace(microsecond=0, tzinfo=timezone.utc)
email_verification = now.isoformat()
r = self.session.post(
"http+unix://" + self.socket_path + "/api/auth",
data=json.dumps({
"email": email,
"EmailVerification": email_verification,
"AllowCommercials": allowcommercials,
}),
)
if r.status_code != 200:
raise HappyError(r.status_code, **r.json())
from .authuser import AuthUser
u = AuthUser(self, **r.json())
u.Password = u.ResetPassword(password)
return
def authuser_delete(self, Id):
r = self.session.delete("http+unix://" + self.socket_path + "/api/auth/" + quote_plus(Id))
if r.status_code != 200:
raise HappyError(r.status_code, **r.json())
return r.json()
def authuser_list(self):
r = self.session.get("http+unix://" + self.socket_path + "/api/auth")
if r.status_code != 200:
raise HappyError(r.status_code, **r.json())
ret = []
val = r.json()
if val is not None:
from .authuser import AuthUser
for au in val:
ret.append(AuthUser(self, **au))
return ret
def authuser_reset_password(self, Id, NewPassword):
r = self.session.post(
"http+unix://" + self.socket_path + "/api/auth/" + quote_plus(Id) + "/reset_password",
data=json.dumps({
"password": NewPassword,
})
)
if r.status_code != 200:
raise HappyError(r.status_code, **r.json())
from .authuser import AuthUser
return r.json()
def authuser_udpate(self, Id, au):
r = self.session.put(
"http+unix://" + self.socket_path + "/api/auth/" + quote_plus(Id),
data=json.dumps({
"Id": au.Id,
"Email": au.Email,
"EmailVerification": au.EmailVerification,
"Password": au.Password,
"CreatedAt": au.CreatedAt,
"LastLoggedIn": au.LastLoggedIn,
"AllowCommercials": au.AllowCommercials,
})
)
if r.status_code != 200:
raise HappyError(r.status_code, **r.json())
from .authuser import AuthUser
return AuthUser(self, **r.json())

90
happydomain/api.py Normal file
View File

@ -0,0 +1,90 @@
"""Handle administration tasks through happyDomain's admin API"""
from datetime import datetime
import json
import os
from urllib.parse import quote_plus
import requests
from .error import HappyError
from .domain import Domain
from .provider import Provider
COOKIE_NAME = "happydomain_session"
class HappyDomain:
def __init__(self, scheme="http", host="127.0.0.1", port=8081, baseurl="", token=None):
self.session = requests.Session()
self.baseurl = scheme + "://" + host + ":" + str(port) + baseurl
self.token = token
def login(self, username, password):
r = self.session.post(
self.baseurl + "/api/auth",
data=json.dumps({
"email": username,
"password": password,
})
)
if r.status_code != 200:
raise HappyError(r.status_code, **json.loads(r.text))
self.token = r.cookies[COOKIE_NAME]
return json.loads(r.text)
# Domains
def domain_list(self):
r = self.session.get(
self.baseurl + "/api/domains",
)
if r.status_code != 200:
raise HappyError(r.status_code, **json.loads(r.text))
ret = []
val = json.loads(r.text)
if val is not None:
for au in val:
ret.append(Domain(self, **au))
return ret
# Providers
def provider_list(self):
r = self.session.get(
self.baseurl + "/api/providers",
)
if r.status_code != 200:
raise HappyError(r.status_code, **json.loads(r.text))
ret = []
val = json.loads(r.text)
if val is not None:
for au in val:
ret.append(Provider(self, **au))
return ret
def provider_add(self, type, name, data):
r = self.session.post(
self.baseurl + "/api/providers",
data=json.dumps({
"Provider": data,
"_comment": name,
"_srctype": type,
})
)
if r.status_code != 200:
raise HappyError(r.status_code, **json.loads(r.text))
return Provider(self, **json.loads(r.text))

21
happydomain/authuser.py Normal file
View File

@ -0,0 +1,21 @@
class AuthUser:
def __init__(self, _session, Id, Email, EmailVerification, Password, CreatedAt, LastLoggedIn, AllowCommercials):
self._session = _session
self.Id = Id
self.Email = Email
self.EmailVerification = EmailVerification
self.Password = Password
self.CreatedAt = CreatedAt
self.LastLoggedIn = LastLoggedIn
self.AllowCommercials = AllowCommercials
def Delete(self):
self._session.authuser_delete(self.Id)
def ResetPassword(self, *args, **kwargs):
self._session.authuser_reset_password(self.Id, *args, **kwargs)
def Update(self):
self._session.authuser_update(self.Id, self)

80
happydomain/domain.py Normal file
View File

@ -0,0 +1,80 @@
import json
from urllib.parse import quote
from .error import HappyError
from .zone import ZoneMeta, Zone
class Domain:
def __init__(self, _session, id, id_owner, id_provider, domain, zone_history, group=""):
self._session = _session
self.id = id
self.id_owner = id_owner
self.id_provider = id_provider
self.domain = domain
self.group = group
self.zone_history = zone_history if zone_history is not None else []
def _dumps(self):
return json.dumps({
"id": self.id,
"id_owner": self.id_owner,
"id_provider": self.id_provider,
"domain": self.domain,
"group": self.group,
"zone_history": self.zone_history,
})
@property
def current_zone(self):
if len(self.zone_history) == 0:
self.import_zone()
return self.zone_history[0]
def delete(self):
r = self._session.session.delete(
self.baseurl + "/api/domains/" + quote(self.id),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()
def update(self):
r = self._session.session.put(
self._session.baseurl + "/api/domains/" + quote(self.id),
date=self._dumps(),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()
def import_zone(self):
r = self._session.session.post(
self._session.baseurl + "/api/domains/" + quote(self.id) + "/import_zone",
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
zm = ZoneMeta(self._session, **r.json())
self.zone_history.append(zm)
return zm
def get_zone(self, zid):
r = self._session.session.get(
self._session.baseurl + "/api/domains/" + quote(self.id) + "/zone/" + quote(zid),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return Zone(self._session, self.id, **r.json())
def get_current_zone(self):
return self.get_zone(self.current_zone)

8
happydomain/error.py Normal file
View File

@ -0,0 +1,8 @@
class HappyError(BaseException):
def __init__(self, status_code, errmsg, href=""):
self.errmsg = errmsg
self.status_code = status_code
def __str__(self):
return str(self.status_code) + ": " + self.errmsg

60
happydomain/provider.py Normal file
View File

@ -0,0 +1,60 @@
import json
from .error import HappyError
from .domain import Domain
class Provider:
def __init__(self, _session, _srctype, _id, _ownerid, _comment, **kwargs):
self._session = _session
self._srctype = _srctype
self._id = _id
self._ownerid = _ownerid
self._comment = _comment
self.args = kwargs
def _dumps(self):
d = {
"_srctype": self._srctype,
"_id": self._id,
"_ownerid": self._ownerid,
"_comment": self._comment,
}
d.update(self.kwargs)
return json.dumps(d)
def domain_add(self, dn):
r = self._session.session.post(
self._session.baseurl + "/api/domains",
data=json.dumps({
"domain": dn,
"id_provider": self._id,
})
)
if r.status_code != 200:
raise HappyError(r.status_code, **r.json())
return Domain(self, **r.json())
def delete(self):
r = self._session.session.delete(
self.baseurl + "/api/providers/" + quote(self._id),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()
def update(self):
r = self._session.session.put(
self.baseurl + "/api/providers/" + quote(self._id),
date=self._dumps(),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()

66
happydomain/service.py Normal file
View File

@ -0,0 +1,66 @@
import json
from urllib.parse import quote
from .error import HappyError
class ServiceMeta:
def __init__(self, _session, _svctype, _domain, _ttl, _id=None, _ownerid=None, _comment="", _mycomment="", _aliases=[], _tmp_hint_nb=0):
self._svctype = _svctype
self._domain = _domain
self._ttl = _ttl
self._id = _id
self._ownerid = _ownerid
self._comment = _comment
self._mycomment = _mycomment
self._aliases = _aliases
self._tmp_hint_nb = _tmp_hint_nb
def _dumps(self):
return json.dumps({
"_svctype": self._svctype,
"_domain": self._domain,
"_ttl": self._ttl,
"_id": self._id,
"_ownerid": self._ownerid,
"_comment": self._comment,
"_mycomment": self._mycomment,
"_aliases": self._aliases,
"_tmp_hint_nb": self._tmp_hint_nb,
})
class HService(ServiceMeta):
def __init__(self, _session, _domainid, _zoneid, Service, **kwargs):
super(HService, self).__init__(_session, **kwargs)
self._domainid = _domainid
self._zoneid = _zoneid
self.service = Service
def _dumps(self):
return json.dumps(self._flat())
def _flat(self):
return {
"_svctype": self._svctype,
"_domain": self._domain,
"_ttl": self._ttl,
"_id": self._id,
"_ownerid": self._ownerid,
"_comment": self._comment,
"_mycomment": self._mycomment,
"_aliases": self._aliases,
"_tmp_hint_nb": self._tmp_hint_nb,
"Service": self.service,
}
def delete(self):
r = self._session.session.delete(
self.baseurl + "/api/domains/" + quote(self._domainid) + "/zone/" + quote(self._zoneid) + "/" + quote(self._domain) + "/services/" + quote(self._id),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()

107
happydomain/zone.py Normal file
View File

@ -0,0 +1,107 @@
import json
from urllib.parse import quote
from .error import HappyError
from .service import HService
class ZoneMeta:
def __init__(self, _session, **kwargs):
self._session = _session
self._load(**kwargs)
def _load(self, id, id_author, default_ttl, last_modified="", commit_message=None, commit_date=None, published=None):
self.id = id
self.id_author = id_author
self.default_ttl = default_ttl
self.last_modified = last_modified
self.commit_message = commit_message
self.commit_date = commit_date
self.published = published
def _dumps(self):
return json.dumps({
"id": self.id,
"id_author": self.id_author,
"default_ttl": self.default_ttl,
"last_modified": self.last_modified,
"commit_message": self.commit_message,
"commit_date": self.commit_date,
"published": self.published,
})
class Zone(ZoneMeta):
def __init__(self, _session, _domainid, services, **kwargs):
super(Zone, self).__init__(_session, **kwargs)
self._domainid = _domainid
self._load(services)
def _load(self, services, **kwargs):
if "id" in kwargs:
super(Zone, self).__init__(**kwargs)
self.services = {}
if services is not None:
for k in services:
self.services[k] = []
for s in services[k]:
self.services[k].append(HService(_session, self._domainid, self.id, **s))
def _svc_dumps(self):
services = {}
for k in self.services:
services[k] = []
for s in self.services[k]:
services[k].append(s._flat())
return services
def _dumps(self):
return json.dumps({
"id": self.id,
"id_author": self.id_author,
"default_ttl": self.default_ttl,
"last_modified": self.last_modified,
"commit_message": self.commit_message,
"commit_date": self.commit_date,
"published": self.published,
"services": self._svc_dumps(),
})
def add_zone_service(self, subdomain, svctype, svc):
r = self._session.session.post(
self._session.baseurl + "/api/domains/" + quote(self._domainid) + "/zone/" + quote(self.id) + "/" + quote(subdomain) + "/services",
data=HService(self._session, self._domainid, self.id, Service=svc, _svctype=svctype, _domain=subdomain, _ttl=self.default_ttl)._dumps(),
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
self._load(**r.json())
return self
def view_dump(self):
r = self._session.session.post(
self._session.baseurl + "/api/domains/" + quote(self._domainid) + "/zone/" + quote(self.id) + "/view",
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()
def apply_changes(self):
r = self._session.session.post(
self._session.baseurl + "/api/domains/" + quote(self._domainid) + "/zone/" + quote(self.id) + "/apply_changes",
)
if r.status_code > 300:
raise HappyError(r.status_code, **r.json())
return r.json()