sipcentric/src/sipcentric/__init__.py

396 lines
11 KiB
Python

0 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
# sipcentric/__init__.py
# Modern Python client library for the Sipcentric (Simwood Partner, formerly Nimvelo) API
# Copyright (c) 2015 Sipcentric Ltd. Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
# Copyright (c) 2022 Faelix Limited. Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
import sys
import math
import requests
import time
import logging
import simplejson as json
from .stream import Stream
logger = logging.getLogger(__name__)
API_BASE = "https://pbx.sipcentric.com/api/v1"
ACCOUNTTYPE_BUSINESS = "BUSINESS"
ACCOUNTTYPE_RESIDENTIAL = "RESIDENTIAL"
ACCOUNTTYPE_WLR = "WLR"
RECORDINGACCESS_ALL = "ALL"
RECORDINGACCESS_OWN = "OWN"
RECORDINGACCESS_NONE = "NONE"
class SipcentricException(Exception):
pass
class AuthenticationException(SipcentricException):
pass
class API(object):
def __init__(self, username, password, base=None):
self.username = username
self.password = password
self.base = API_BASE if base is None else base
def _request(self, uri, method="GET", data=None, params=None):
if uri.startswith("https://"):
url = uri
else:
url = self.base + uri
auth = requests.auth.HTTPBasicAuth(self.username, self.password)
if method == "GET":
if params:
r = requests.get(url, auth=auth, params=params, verify=True, timeout=3.0)
else:
r = requests.get(url, auth=auth, verify=True, timeout=3.0)
elif method == "POST":
headers = {"content-type": "application/json"}
if params:
r = requests.post(
url,
auth=auth,
headers=headers,
data=json.dumps(data),
params=params,
verify=True,
timeout=3.000,
)
else:
r = requests.post(
url,
auth=auth,
headers=headers,
data=json.dumps(data),
verify=True,
timeout=3.000,
)
if (r.status_code == 200) or (r.status_code == 201):
try:
response = r.json()
return response
except:
return True
elif r.status_code == 401:
raise AuthenticationException(
"Could not authenticate you with the API. Make sure you are using the correct credentials from the 'Web Users' section of the control panel."
)
return False
else:
if r.json():
raise Exception("HTTP Error " + str(r.status_code), r.json())
else:
raise Exception(
"HTTP Error " + str(r.status_code),
"Something went wrong with the request.",
)
return False
def get(self, uri, params=None):
return self._request(uri, method="GET", params=params)
def getMany(self, uri, params=None, startPage=1, pageSize=20):
if params is None:
params = {"pageSize": pageSize, "page": startPage}
else:
params["pageSize"] = pageSize
params["page"] = startPage
resp = self._request(uri, "GET", params=params)
for item in resp.get("items", []):
yield item
nextPage = resp.get("nextPage", None)
while nextPage:
resp = self._request(nextPage, "GET")
for item in resp.get("items", []):
yield item
nextPage = resp.get("nextPage", None)
def post(self, uri, data=None, params=None):
return self._request(uri, method="POST", data=data, params=params)
def patch(self, uri, data=None, params=None):
return self._request(uri, method="PATCH", data=data, params=params)
def put(self, uri, data=None, params=None):
return self._request(uri, method="PUT", data=data, params=params)
class APIObject(object):
def __init__(self, api, id=None, data=None):
self._api = api
self.id = id
if data:
self.id = int( data.get("id", None) )
self._data = data
@classmethod
def makeUrl(cls, id=None):
raise NotImplementedError()
@property
def data(self):
if self._data is None:
self._data = self._api.get(self.makeUrl())
return self._data
@data.setter
def set_data(self, data):
self._data = data
def create(self, parent=None, **kwargs):
if self.id:
raise ValueError(
"%s ID %d already created" % (self.__class__.__name__, self.id)
)
self.data.update(**kwargs)
if not self._data:
raise ValueError("No data associated with record.")
return self._create(parent=parent,data=self._data)
def _create(self, parent, data):
data['type'] = self.__class__.TYPE
self._data = self._api.post(self.__class__.makeUrl(parent=parent), data=data)
self.id = int(self._data.get("id"))
return self._data
def update(self):
if not self.id:
raise ValueError("%s not yet created" % (self.__class__.__name__,))
if not self._data:
raise ValueError("No data associated with record.")
self._data = self._api.patch(self.makeUrl(), data=self._data)
self.id = int( self._data.get("id") )
return self._data
@classmethod
def makeUrl(cls, id=None, parent=None):
if id:
path = "/%s/%d/" % (cls.URLPART, id)
else:
path = "/%s/" % (cls.URLPART)
if parent:
return (parent + path).replace("//", "/")
return path
def url(self, parent=None):
return self.__class__.makeUrl(self.id, parent)
class Partner(APIObject):
@classmethod
def makeUrl(cls, id=None, parent=None):
if parent:
raise ValueError("Partner should not have a parent")
if id:
raise NotImplementedError("There can be only one (Partner account)")
return "/"
def create(self, **kwargs):
raise NotImplementedError(
"You need to sign up as a new partner at https://www.simwood.com/partner/"
)
def customers(self):
for c in self._api.getMany(Customer.makeUrl()):
yield Customer(self._api, data=c)
class Customer(APIObject):
TYPE = "customer"
@classmethod
def makeUrl(cls, id=None, parent=None):
if parent:
raise ValueError("Customer should not have a parent")
if id == "me":
return "/customers/me/"
elif id:
return "/customers/%d/" % id
else:
return "/customers/"
def endpoints(self):
for c in self._api.getMany(Endpoint.makeUrl(parent=self.url())):
yield Endpoint(self._api, data=c)
def phonenumbers(self):
for c in self._api.getMany(PhoneNumber.makeUrl(parent=self.url())):
yield PhoneNumber(self._api, data=c)
def calls(self, params=None):
for c in self._api.getMany(Call.makeUrl(parent=self.url()), params=params, pageSize=100):
yield Call(self._api, data=c)
def callbundles(self):
for c in self._api.getMany(CallBundle.makeUrl(parent=self.url())):
yield CallBundle(self._api, data=c)
def recordings(self):
for c in self._api.getMany(Recording.makeUrl(parent=self.url())):
yield Recording(self._api, data=c)
def phonebooks(self):
for c in self._api.getMany(PhoneBook.makeUrl(parent=self.url())):
yield PhoneBook(self._api, data=c)
def timeintervals(self):
for c in self._api.getMany(TimeInterval.makeUrl(parent=self.url())):
yield TimeInterval(self._api, data=c)
def smss(self):
for c in self._api.getMany(Sms.makeUrl(parent=self.url())):
yield Sms(self._api, data=c)
def sounds(self):
for c in self._api.getMany(Sound.makeUrl(parent=self.url())):
yield Sound(self._api, data=c)
def outgoingcallerids(self):
for c in self._api.getMany(OutgoingCallerId.makeUrl(parent=self.url())):
yield OutgoingCallerId(self._api, data=c)
def creditstatuses(self):
for c in self._api.getMany(CreditStatus.makeUrl(parent=self.url())):
yield CreditStatus(self._api, data=c)
def linkedusers(self):
for c in self._api.getMany(LinkedUser.makeUrl(parent=self.url())):
yield LinkedUser(self._api, data=c)
def create(self, parent=None, **kwargs):
if parent:
raise ValueError("Customer cannot have a parent")
self.data.update(**kwargs)
for k in ("accountType", "company", "firstName", "lastName", "email", "address1", "city", "postcode", "telephone"):
if k not in self._data:
raise ValueError('missing mandatory field "%s"' % k)
return self._create(parent=parent, data=self._data)
# {
# "type": "customer",
# "accountType": "BUSINESS",
# "company": "API TEST",
# "firstName": "Marek",
# "lastName": "Isalski",
# "email": "marek@isal.ski",
# "country": "GB",
# "properties": {
# },
# "enabled": true,
# "currency": "GBP",
# "partnerId": "56",
# "userEmailUpdatable": false,
# "postcode": "SA48 7LJ",
# "address1": "Llygad-yr-Haul",
# "city": "Llanwnnen",
# "telephone": "07779270405"
# }
class Endpoint(APIObject):
URLPART = "endpoints"
class PhoneEndpoint(Endpoint):
TYPE = "phone"
class VirtualEndpoint(Endpoint):
TYPE = "virtual"
class GroupEndpoint(Endpoint):
TYPE = "group"
class QueueEndpoint(Endpoint):
TYPE = "queue"
class MailboxEndpoint(Endpoint):
TYPE = "mailbox"
class PhoneNumber(APIObject):
URLPART = "phonenumbers"
class Call(APIObject):
URLPART = "calls"
class CallBundle(APIObject):
URLPART = "callbundles"
class Recording(APIObject):
URLPART = "recordings"
class PhoneBook(APIObject):
URLPART = "phonebook"
class TimeInterval(APIObject):
URLPART = "timeintervals"
class Sound(APIObject):
URLPART = "sounds"
class OutgoingCallerId(APIObject):
URLPART = "outgoingcallerids"
class CreditStatus(APIObject):
URLPART = "creditstatus"
class LinkedUser(APIObject):
URLPART = "linkedusers"
TYPE = "linkeduser"
def create(self, parent, **kwargs):
self.data.update(**kwargs)
for k in ("activateUrl", "email", "recordingAccess", "owner", "enabled"):
if k not in self._data:
raise ValueError('missing mandatory field "%s"' % k)
return self._create(parent=parent, data=self._data)
class Sms(APIObject):
URLPART = "sms"
# def post(self, to=None, _from=None, body=None):
# data = {"type": "smsmessage", "to": to, "from": _from, "body": body}
# return self.parent._request(self.uri, method="POST", data=data)
if __name__ == "__main__":
logging.error("Do not run directly, import module first!")
sys.exit()