You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

464 lines
13 KiB
Python

0 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
# sipcentric/__init__.py
# Modern Python client library for the Sipcentric (Simwood Partner, formerly Nimvelo) API
# Copyright (c) 2015 Sipcentric Ltd. Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
# Copyright (c) 2022 Faelix Limited. Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
import sys
import math
import requests
import time
import logging
import urllib.parse
import simplejson as json
from .stream import Stream
logger = logging.getLogger(__name__)
API_BASE = "https://pbx.sipcentric.com/api/v1"
ACCOUNTTYPE_BUSINESS = "BUSINESS"
ACCOUNTTYPE_RESIDENTIAL = "RESIDENTIAL"
ACCOUNTTYPE_WLR = "WLR"
RECORDINGACCESS_ALL = "ALL"
RECORDINGACCESS_OWN = "OWN"
RECORDINGACCESS_NONE = "NONE"
class SipcentricException(Exception):
pass
class AuthenticationException(SipcentricException):
pass
class API(object):
def __init__(self, username, password, base=None):
self.username = username
self.password = password
self.base = API_BASE if base is None else base
def _request(self, uri, method="GET", data=None, params=None):
if uri.startswith("https://"):
url = uri
else:
url = self.base + uri
auth = requests.auth.HTTPBasicAuth(self.username, self.password)
kwargs = dict(auth=auth, verify=True, timeout=3.0)
if params:
kwargs.update(params=params)
if method == "GET":
fn = requests.get
elif method == "POST":
fn = requests.post
headers = {"content-type": "application/json"}
kwargs.update(headers=headers,data=json.dumps(data))
elif method == "PUT":
fn = requests.put
headers = {"content-type": "application/json"}
kwargs.update(headers=headers,data=json.dumps(data))
r = fn(url, **kwargs)
if (r.status_code == 200) or (r.status_code == 201):
try:
response = r.json()
return response
except:
return True
elif r.status_code == 401:
raise AuthenticationException(
"Could not authenticate you with the API. Make sure you are using the correct credentials from the 'Web Users' section of the control panel."
)
return False
else:
if r.json():
raise Exception("HTTP Error " + str(r.status_code), r.json())
else:
raise Exception(
"HTTP Error " + str(r.status_code),
"Something went wrong with the request.",
)
return False
def get(self, uri, params=None):
return self._request(uri, method="GET", params=params)
def getMany(self, uri, params=None, startPage=1, pageSize=20):
if params is None:
params = {"pageSize": pageSize, "page": startPage}
else:
params["pageSize"] = pageSize
params["page"] = startPage
resp = self._request(uri, "GET", params=params)
for item in resp.get("items", []):
yield item
nextPage = resp.get("nextPage", None)
while nextPage:
resp = self._request(nextPage, "GET")
for item in resp.get("items", []):
yield item
nextPage = resp.get("nextPage", None)
def post(self, uri, data=None, params=None):
return self._request(uri, method="POST", data=data, params=params)
def put(self, uri, data=None, params=None):
return self._request(uri, method="PUT", data=data, params=params)
class APIObject(object):
def __init__(self, api, id=None, data=None):
self._api = api
self.id = id
if data:
self.id = int(data.get("id", None))
self._data = data
@classmethod
def makeUrl(cls, id=None):
raise NotImplementedError()
@property
def data(self):
if self._data is None:
if self.id:
self._data = self._api.get(self.makeUrl(id=self.id))
else:
self._data = self._api.get(self.makeUrl())
return self._data
@data.setter
def set_data(self, data):
self._data = data
def create(self, parent=None, **kwargs):
if self.id:
raise ValueError(
"%s ID %d already created" % (self.__class__.__name__, self.id)
)
if self._data:
self._data.update(**kwargs)
else:
self._data = kwargs
if not self._data:
raise ValueError("No data associated with record.")
return self._create(parent=parent,data=self._data)
def _create(self, parent, data):
data['type'] = self.__class__.TYPE
self._data = self._api.post(self.__class__.makeUrl(parent=parent), data=data)
self.id = int(self._data.get("id"))
return self._data
def update(self):
if not self.id:
raise ValueError("%s not yet created" % (self.__class__.__name__,))
if not self._data:
raise ValueError("No data associated with record.")
self._data = self._api.put(self.url(), data=self._data)
self.id = int(self._data.get("id"))
return self._data
@classmethod
def makeUrl(cls, id=None, parent=None):
if id:
path = "/%s/%d/" % (cls.URLPART, id)
else:
path = "/%s/" % (cls.URLPART)
if parent:
return (parent.url() + path).replace("//", "/")
return path
def url(self, parent=None):
if self._data:
if 'uri' in self._data:
path = urllib.parse.urlparse(self._data['uri']).path
if path.startswith("/api/v1"):
return path[7:]
return self.__class__.makeUrl(self.id, parent)
class Partner(APIObject):
@classmethod
def makeUrl(cls, id=None, parent=None):
if parent:
raise ValueError("Partner should not have a parent")
if id:
raise NotImplementedError("There can be only one (Partner account)")
return "/"
def create(self, **kwargs):
raise NotImplementedError(
"You need to sign up as a new partner at https://www.simwood.com/partner/"
)
def customers(self):
for c in self._api.getMany(Customer.makeUrl()):
yield Customer(self._api, data=c)
class Customer(APIObject):
TYPE = "customer"
@classmethod
def makeUrl(cls, id=None, parent=None):
if parent:
raise ValueError("Customer should not have a parent")
if id == "me":
return "/customers/me/"
elif id:
return "/customers/%d/" % id
else:
return "/customers/"
def endpoints(self):
for c in self._api.getMany(Endpoint.makeUrl(parent=self)):
yield Endpoint(self._api, data=c)
def phonenumbers(self):
for c in self._api.getMany(PhoneNumber.makeUrl(parent=self)):
yield PhoneNumber(self._api, data=c)
def calls(self, params=None):
if params is None:
params = {}
params['pageSize'] = 500
for c in self._api.getMany(Call.makeUrl(parent=self), params=params):
yield Call(self._api, data=c)
def callbundles(self):
for c in self._api.getMany(CallBundle.makeUrl(parent=self)):
yield CallBundle(self._api, data=c)
def recordings(self):
for c in self._api.getMany(Recording.makeUrl(parent=self)):
yield Recording(self._api, data=c)
def phonebooks(self):
for c in self._api.getMany(PhoneBook.makeUrl(parent=self)):
yield PhoneBook(self._api, data=c)
def timeintervals(self):
for c in self._api.getMany(TimeInterval.makeUrl(parent=self)):
yield TimeInterval(self._api, data=c)
def smss(self):
for c in self._api.getMany(Sms.makeUrl(parent=self)):
yield Sms(self._api, data=c)
def sounds(self):
for c in self._api.getMany(Sound.makeUrl(parent=self)):
yield Sound(self._api, data=c)
def outgoingcallerids(self):
for c in self._api.getMany(OutgoingCallerId.makeUrl(parent=self)):
yield OutgoingCallerId(self._api, data=c)
def creditstatuses(self):
for c in self._api.getMany(CreditStatus.makeUrl(parent=self)):
yield CreditStatus(self._api, data=c)
def linkedusers(self):
for c in self._api.getMany(LinkedUser.makeUrl(parent=self)):
yield LinkedUser(self._api, data=c)
def limits(self):
return Limits(self._api, data=self._api.get(Limits.makeUrl(parent=self)))
def callcharging(self):
return CallCharging(self._api, data=self._api.get(CallCharging.makeUrl(parent=self)))
def permissions(self):
return Permissions(self._api, data=self._api.get(Permissions.makeUrl(parent=self)))
def create(self, parent=None, **kwargs):
if parent:
raise ValueError("Customer cannot have a parent")
if self._data:
self._data.update(**kwargs)
else:
self._data = kwargs
for k in ("accountType", "company", "firstName", "lastName", "email", "address1", "city", "postcode", "telephone"):
if k not in self._data:
raise ValueError('missing mandatory field "%s"' % k)
return self._create(parent=parent, data=self._data)
# {
# "type": "customer",
# "accountType": "BUSINESS",
# "company": "API TEST",
# "firstName": "Marek",
# "lastName": "Isalski",
# "email": "marek@isal.ski",
# "country": "GB",
# "properties": {
# },
# "enabled": true,
# "currency": "GBP",
# "partnerId": "56",
# "userEmailUpdatable": false,
# "postcode": "SA48 7LJ",
# "address1": "Llygad-yr-Haul",
# "city": "Llanwnnen",
# "telephone": "07779270405"
# }
class Endpoint(APIObject):
URLPART = "endpoints"
class PhoneEndpoint(Endpoint):
TYPE = "phone"
class VirtualEndpoint(Endpoint):
TYPE = "virtual"
class GroupEndpoint(Endpoint):
TYPE = "group"
class QueueEndpoint(Endpoint):
TYPE = "queue"
class MailboxEndpoint(Endpoint):
TYPE = "mailbox"
class PhoneNumber(APIObject):
URLPART = "phonenumbers"
TYPE = "did"
class Call(APIObject):
URLPART = "calls"
TYPE = "call"
class CallBundle(APIObject):
URLPART = "callbundles"
TYPE = "customerbundle"
class Recording(APIObject):
URLPART = "recordings"
TYPE = "recording"
class PhoneBook(APIObject):
URLPART = "phonebook"
TYPE = "phonebookentry"
class TimeInterval(APIObject):
URLPART = "timeintervals"
TYPE = "XXX"
class Sound(APIObject):
URLPART = "sounds"
TYPE = "music"
class OutgoingCallerId(APIObject):
URLPART = "outgoingcallerids"
TYPE = "outgoingcallerid"
class CreditStatus(APIObject):
URLPART = "creditstatus"
TYPE = "creditstatus"
class Pricing(APIObject):
URLPART = "pricing"
TYPE = "customerpricing"
class Limits(APIObject):
URLPART = "limits"
TYPE = "customerlimits"
class CallCharging(APIObject):
URLPART = "callcharging"
TYPE = "customercallcharging"
class Permissions(APIObject):
URLPART = "permissions"
TYPE = "customerpermissions"
class CustomerAPIObject(APIObject):
MANDATORY_FIELDS = ()
def create(self, parent, **kwargs):
if self._data:
self._data.update(**kwargs)
else:
self._data = kwargs
for k in self.__class__.MANDATORY_FIELDS:
if k not in self._data:
raise ValueError('missing mandatory field "%s"' % k)
return self._create(parent=parent, data=self._data)
class LinkedUser(CustomerAPIObject):
URLPART = "linkedusers"
TYPE = "linkeduser"
MANDATORY_FIELDS = ("activateUrl", "email", "recordingAccess", "owner", "enabled")
class Sms(APIObject):
URLPART = "sms"
TYPE = "smsmessage"
# def post(self, to=None, _from=None, body=None):
# data = {"type": "smsmessage", "to": to, "from": _from, "body": body}
# return self.parent._request(self.uri, method="POST", data=data)
def get_available_numbers(areaCode, location="GB", country="44", pageSize=10):
while areaCode.startswith("0"):
areaCode = areaCode[1:]
url = 'https://pbx.sipcentric.com/api/v1/availablenumbers?location=%s&country=%s&areaCode=%s&pageSize=%d' % (location,country,areaCode, pageSize)
while url:
r = requests.get(url, verify=True, timeout=3.0)
if (r.status_code == 200) or (r.status_code == 201):
response = r.json()
for item in response.get('items', []):
yield item
url = response.get('nextPage', None)
if __name__ == "__main__":
logging.error("Do not run directly, import module first!")
sys.exit()