You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

111 lines
3.3 KiB
Python

import collections
from xonsh.history.base import History
import uuid
import time
import requests
import sys
import json
import threading
import queue
import random
from requests.auth import HTTPBasicAuth
_auth = HTTPBasicAuth('username', 'password')
_url = 'https://couchdbserver.example.com/xonsh-history'
# turn-on the worker thread
class CouchDBHistory(History):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.gc = None
self.sessionid = self._build_session_id()
self.inps = []
self.rtns = []
self.outs = []
self.tss = []
self.q = queue.Queue()
threading.Thread(target=self.couch_worker, daemon=True).start()
def couch_worker(self):
while True:
cmd = self.q.get()
sent = False
delay = 1.0
while not sent:
try:
self._save_to_db(cmd)
sent = True
except:
delay = min(delay*(1+random.random()), 30)
time.sleep(delay)
self.q.task_done()
def _build_session_id(self):
ts = int(time.time() * 1000)
return '{}-{}'.format(ts, str(uuid.uuid4())[:18])
def append(self, cmd):
self.inps.append(cmd['inp'])
self.rtns.append(cmd['rtn'])
self.outs.append(None)
self.tss.append(cmd.get('ts', (None, None)))
self.q.put(cmd)
def items(self):
yield from self._get_db_items(self.sessionid)
def all_items(self, **kwargs):
yield from self._get_db_items()
def info(self):
data = collections.OrderedDict()
data['backend'] = 'couchdb'
data['sessionid'] = str(self.sessionid)
return data
def _save_to_db(self, cmd):
data = cmd.copy()
data['inp'] = cmd['inp'].rstrip()
if 'out' in data:
data.pop('out')
data['_id'] = self._build_doc_id()
try:
self._request_db_data('', data=data)
except Exception as e:
msg = 'failed to save history: {}: {}'.format(e.__class__.__name__, e)
print(msg, file=sys.stderr)
def _get_db_items(self, sessionid=None):
path = '/_all_docs?include_docs=true'
if sessionid is not None:
path += '&start_key="{0}:"&end_key="{0}:z"'.format(sessionid)
try:
r = self._request_db_data(path)
except Exception as e:
msg = 'error when query db: {}: {}'.format(e.__class__.__name__, e)
print(msg, file=sys.stderr)
return
data = json.loads(r.text)
for item in data['rows']:
cmd = item['doc'].copy()
cmd['ts'] = cmd['ts'][0]
yield cmd
def _build_doc_id(self):
ts = int(time.time() * 1000)
return '{}:{}-{}'.format(self.sessionid, ts, str(uuid.uuid4())[:18])
def _request_db_data(self, path, data=None):
global _url, _auth
url = _url + path
headers = {'Content-Type': 'application/json'}
if data is not None:
resp = requests.post(url, json.dumps(data), headers=headers, auth=_auth, timeout=10)
else:
headers = {'Content-Type': 'text/plain'}
resp = requests.get(url, headers=headers, auth=_auth)
return resp