111 lines
3.3 KiB
Python
111 lines
3.3 KiB
Python
import collections
|
|
from xonsh.history.base import History
|
|
import uuid
|
|
import time
|
|
import requests
|
|
import sys
|
|
import json
|
|
import threading
|
|
import queue
|
|
import random
|
|
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
_auth = HTTPBasicAuth('username', 'password')
|
|
_url = 'https://couchdbserver.example.com/xonsh-history'
|
|
|
|
# turn-on the worker thread
|
|
|
|
class CouchDBHistory(History):
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.gc = None
|
|
self.sessionid = self._build_session_id()
|
|
self.inps = []
|
|
self.rtns = []
|
|
self.outs = []
|
|
self.tss = []
|
|
self.q = queue.Queue()
|
|
threading.Thread(target=self.couch_worker, daemon=True).start()
|
|
|
|
def couch_worker(self):
|
|
while True:
|
|
cmd = self.q.get()
|
|
sent = False
|
|
delay = 1.0
|
|
while not sent:
|
|
try:
|
|
self._save_to_db(cmd)
|
|
sent = True
|
|
except:
|
|
delay = min(delay*(1+random.random()), 30)
|
|
time.sleep(delay)
|
|
self.q.task_done()
|
|
|
|
def _build_session_id(self):
|
|
ts = int(time.time() * 1000)
|
|
return '{}-{}'.format(ts, str(uuid.uuid4())[:18])
|
|
|
|
def append(self, cmd):
|
|
self.inps.append(cmd['inp'])
|
|
self.rtns.append(cmd['rtn'])
|
|
self.outs.append(None)
|
|
self.tss.append(cmd.get('ts', (None, None)))
|
|
self.q.put(cmd)
|
|
|
|
def items(self):
|
|
yield from self._get_db_items(self.sessionid)
|
|
|
|
def all_items(self, **kwargs):
|
|
yield from self._get_db_items()
|
|
|
|
def info(self):
|
|
data = collections.OrderedDict()
|
|
data['backend'] = 'couchdb'
|
|
data['sessionid'] = str(self.sessionid)
|
|
return data
|
|
|
|
def _save_to_db(self, cmd):
|
|
data = cmd.copy()
|
|
data['inp'] = cmd['inp'].rstrip()
|
|
if 'out' in data:
|
|
data.pop('out')
|
|
data['_id'] = self._build_doc_id()
|
|
try:
|
|
self._request_db_data('', data=data)
|
|
except Exception as e:
|
|
msg = 'failed to save history: {}: {}'.format(e.__class__.__name__, e)
|
|
print(msg, file=sys.stderr)
|
|
|
|
def _get_db_items(self, sessionid=None):
|
|
path = '/_all_docs?include_docs=true'
|
|
if sessionid is not None:
|
|
path += '&start_key="{0}:"&end_key="{0}:z"'.format(sessionid)
|
|
try:
|
|
r = self._request_db_data(path)
|
|
except Exception as e:
|
|
msg = 'error when query db: {}: {}'.format(e.__class__.__name__, e)
|
|
print(msg, file=sys.stderr)
|
|
return
|
|
data = json.loads(r.text)
|
|
for item in data['rows']:
|
|
cmd = item['doc'].copy()
|
|
cmd['ts'] = cmd['ts'][0]
|
|
yield cmd
|
|
|
|
def _build_doc_id(self):
|
|
ts = int(time.time() * 1000)
|
|
return '{}:{}-{}'.format(self.sessionid, ts, str(uuid.uuid4())[:18])
|
|
|
|
def _request_db_data(self, path, data=None):
|
|
global _url, _auth
|
|
url = _url + path
|
|
headers = {'Content-Type': 'application/json'}
|
|
if data is not None:
|
|
resp = requests.post(url, json.dumps(data), headers=headers, auth=_auth, timeout=10)
|
|
else:
|
|
headers = {'Content-Type': 'text/plain'}
|
|
resp = requests.get(url, headers=headers, auth=_auth)
|
|
return resp
|
|
|