import collections from xonsh.history.base import History import uuid import time import requests import sys import json import threading import queue import random from requests.auth import HTTPBasicAuth _auth = HTTPBasicAuth('username', 'password') _url = 'https://couchdbserver.example.com/xonsh-history' # turn-on the worker thread class CouchDBHistory(History): def __init__(self, **kwargs): super().__init__(**kwargs) self.gc = None self.sessionid = self._build_session_id() self.inps = [] self.rtns = [] self.outs = [] self.tss = [] self.q = queue.Queue() threading.Thread(target=self.couch_worker, daemon=True).start() def couch_worker(self): while True: cmd = self.q.get() sent = False delay = 1.0 while not sent: try: self._save_to_db(cmd) sent = True except: delay = min(delay*(1+random.random()), 30) time.sleep(delay) self.q.task_done() def _build_session_id(self): ts = int(time.time() * 1000) return '{}-{}'.format(ts, str(uuid.uuid4())[:18]) def append(self, cmd): self.inps.append(cmd['inp']) self.rtns.append(cmd['rtn']) self.outs.append(None) self.tss.append(cmd.get('ts', (None, None))) self.q.put(cmd) def items(self): yield from self._get_db_items(self.sessionid) def all_items(self, **kwargs): yield from self._get_db_items() def info(self): data = collections.OrderedDict() data['backend'] = 'couchdb' data['sessionid'] = str(self.sessionid) return data def _save_to_db(self, cmd): data = cmd.copy() data['inp'] = cmd['inp'].rstrip() if 'out' in data: data.pop('out') data['_id'] = self._build_doc_id() try: self._request_db_data('', data=data) except Exception as e: msg = 'failed to save history: {}: {}'.format(e.__class__.__name__, e) print(msg, file=sys.stderr) def _get_db_items(self, sessionid=None): path = '/_all_docs?include_docs=true' if sessionid is not None: path += '&start_key="{0}:"&end_key="{0}:z"'.format(sessionid) try: r = self._request_db_data(path) except Exception as e: msg = 'error when query db: {}: {}'.format(e.__class__.__name__, e) print(msg, file=sys.stderr) return data = json.loads(r.text) for item in data['rows']: cmd = item['doc'].copy() cmd['ts'] = cmd['ts'][0] yield cmd def _build_doc_id(self): ts = int(time.time() * 1000) return '{}:{}-{}'.format(self.sessionid, ts, str(uuid.uuid4())[:18]) def _request_db_data(self, path, data=None): global _url, _auth url = _url + path headers = {'Content-Type': 'application/json'} if data is not None: resp = requests.post(url, json.dumps(data), headers=headers, auth=_auth, timeout=10) else: headers = {'Content-Type': 'text/plain'} resp = requests.get(url, headers=headers, auth=_auth) return resp