You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
4.9 KiB
Python

import requests
import requests.exceptions
import json
import time
from . import urlqueryparse
class ApiKeyAuth( requests.auth.AuthBase ):
def __init__( self, api_username, api_key ):
self.api_username = api_username
self.api_key = api_key
def __call__( self, r ):
r.headers[ 'Authorization' ] = "ApiKey %s:%s" % ( self.api_username, self.api_key )
r.headers[ 'Content-Type' ] = "application/json"
r.headers[ 'Accept-Encoding' ] = "application/json"
return r
class APIv2( object ):
def __init__( self, api_username, api_key, base = "https://fulcrm.org", api_path = "/api/v2" ):
self.api_username = api_username
self.api_key = api_key
self.api_path = api_path
self.base = base
self.auth = ApiKeyAuth( self.api_username, self.api_key )
def normalize_path( self, path ):
if not path.startswith( self.api_path ):
if not path.startswith( "/" ):
path = "/" + path
path = self.api_path + path
if not path.endswith( "/" ):
path = path + "/"
return path
def normalize_url( self, url, no_d = False, query = None ):
url = urlqueryparse.set_url_components( url,
scheme = 'https',
netloc = 'fulcrm.org',
path = self.normalize_path )
new_query = urlqueryparse.get_query_fields( url )
if no_d:
if 'expand' in new_query:
new_query[ 'expand' ] = [ x for x in new_query[ 'expand' ] if not ( ( x == 'd' ) or ( x.endswith( '.d' ) ) ) ]
else:
if 'expand' in new_query:
new_query[ 'expand' ].append( 'd' )
else:
new_query[ 'expand' ] = [ 'd' ]
if query:
new_query.update( query )
if 'expand' in new_query:
if new_query[ 'expand' ]:
new_query[ 'expand' ] = [ ",".join( new_query[ 'expand' ] ) ]
else:
del new_query[ 'expand' ]
new_url = urlqueryparse.set_query_fields( url, new_query )
return new_url
def get( self, url, query = None, _countdown = 5 ):
url = self.normalize_url( url, query = query )
return self.make_request( url, data = None, method = 'GET', _countdown = _countdown )
def post( self, url, data, query = None, _countdown = 5 ):
url = self.normalize_url( url, query = query )
return self.make_request( url, data = data, method = 'POST', _countdown = _countdown )
def patch( self, url, data, query = None, _countdown = 5 ):
url = self.normalize_url( url, query = query )
return self.make_request( url, data = data, method = 'PATCH', _countdown = _countdown )
def put( self, url, data, query = None, _countdown = 5 ):
url = self.normalize_url( url, query = query )
return self.make_request( url, data = data, method = 'PUT', _countdown = _countdown )
def delete( self, url, query = None, _countdown = 5 ):
url = self.normalize_url( url, query = query )
return self.make_request( url, data = None, method = 'DELETE', _countdown = _countdown )
def make_request( self, url, data, method, _countdown = 5 ):
method = { 'POST': requests.post,
'PATCH': requests.patch,
'GET': requests.get,
'DELETE': requests.delete
}.get( method, requests.get )
try:
if data:
request = method( url, data = json.dumps( data ), auth = self.auth, timeout = 10.0 )
else:
request = method( url, auth = self.auth, timeout = 10.0 )
except requests.exceptions.ConnectionError:
if _countdown > 0:
time.sleep( 2 ** ( 5 - _countdown ) )
return self.make_request( url, data, method, _countdown - 1 )
else:
raise
if request.ok:
try:
return request.json()
except:
return None
else:
request.raise_for_status()
def get_one( self, url, no_d = False, query = None ):
url = self.normalize_url( url, no_d = no_d, query = query )
return self.make_request( url, None, 'GET' )
def get_many( self, url, count = 10, no_d = False, query = None ):
if query is None:
query = {}
query[ 'count' ] = [ count ]
url = self.normalize_url( url, no_d = no_d, query = query )
next = None
while True:
result = self.make_request( url, None, 'GET' )
if 'results' in result:
for object in result[ 'results' ]:
yield object
if 'next' in result:
next = result.get( 'next', None )
if next:
url = next
else:
return