#!/usr/bin/env python import constants import readdata import sys import json import traceback import subprocess from devicer import simple_devicer # import requests # As this takes SEVEN SECONDS, it's delayed until needed from sys import stdout, stderr from datetime import timedelta, datetime from time import sleep from itertools import islice, takewhile from database_records import GenericTimestampedRecord, EGVRecord, EventRecord # from pytz import UTC # Since the dependency is external, don't import unless needed from optparse import OptionParser G5_IS_DEFAULT = True def get_dracos_host(): res = subprocess.check_output([ 'sh', '-c', 'dracos_host=localhost:8001; [ -e /etc/dracos.conf ] && . /etc/dracos.conf; printf "%s" "$dracos_host" || true']) return res.decode('utf-8') parser = OptionParser() parser.add_option("--g4", action="store_false", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G4 instead of Dexcom G5") parser.add_option("--g5", action="store_true", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G5 instead of Dexcom G4") parser.add_option("-a", "--all", action="store_const", dest="command", const="dump_everything", help="dump all available records") parser.add_option("-p", "--poll", action="store_const", dest="command", const="poll", help="poll for latest CGM record") parser.add_option("--test", action="store_const", dest="command", const="test", help="test") parser.add_option("--hours", type="int", dest="hours", default=None, help="display N most recent hours of CGM records") parser.add_option("-n", type="int", dest="num_records", default=None, help="number of CGM records to display") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", help="verbosity (currently for debugging)") parser.add_option("-H", "--human", action="store_true", dest="human", help="print human-readable times") parser.add_option("-j", "--json", action="store_true", dest="json", help="print JSON output") parser.add_option("--http", type="string", dest="host", help="submit via HTTP") parser.add_option("--client", action="store_true", dest="client", help="act as client to dracos") (options, args) = parser.parse_args() command = options.command or "dump_cgm" VERBOSE = options.verbose HUMAN = options.human JSON = options.json HOST = options.host if options.client and not HOST: dracos_host = get_dracos_host() if dracos_host: HOST = "http://" + dracos_host else: raise ValueError("Option '--client' specified but 'dracos_host' not specified in '/etc/dracos.conf'") if command is 'dump_cgm' and options.num_records is None and not options.hours: options.hours = 2 if options.num_records <= 0: options.num_records = None def get_dexcom_reader(): if options.g5: dd = readdata.DexcomG5.FindDevice() return readdata.DexcomG5(dd) else: dd = readdata.Dexcom.FindDevice() return readdata.Dexcom(dd) dr = None if command is not 'poll': dr = get_dexcom_reader() def parseable_record_types(): unparseable = ['FIRMWARE_PARAMETER_DATA', 'RECEIVER_LOG_DATA', 'USER_SETTING_DATA', 'MAX_VALUE', 'RECEIVER_ERROR_DATA'] parsed_to_xml = ['MANUFACTURING_DATA', 'PC_SOFTWARE_PARAMETER'] skip = unparseable + parsed_to_xml return filter(lambda v: not v in skip, constants.RECORD_TYPES) def choose_range(rs): if options.hours: now = dr.ReadSystemTime() when = now - timedelta(hours=options.hours) return takewhile(lambda r: r.system_time > when, rs) else: return islice(rs, options.num_records) def dump_everything(): for t in parseable_record_types(): print t + ":" for r in choose_range(dr.iter_records(t)): print toJSON(r) if JSON else r def dump_cgm(): for cr in reversed(list(choose_range(dr.iter_records('EGV_DATA')))): if not cr.display_only: print toJSON(cr) if JSON else cr def recent(t): now = dr.ReadSystemTime() return t.system_time > now - timedelta(minutes=5) def print_verbose(s, newline=True): global VERBOSE if VERBOSE: stderr.write('%s%s' % (str(s), '\n' if newline else '')) def read_recent_egv_data(): now = dr.ReadSystemTime() r = dr.ReadRecords('EGV_DATA', 1)[-1] if recent(r) and not r.is_special and not r.display_only: return (r, now) else: return (None, now) def time_fmt(t): global HUMAN return t.strftime('%c' if HUMAN else '%s') def format_times(now, stamp): diff = (stamp - now).total_seconds() operand = '-' if diff < 0 else '+' # should always be - return '%s %s %d' % (time_fmt(now), operand, abs(diff)) CONNECTED = None def connected(state): global CONNECTED if (state == CONNECTED): return else: CONNECTED = state print_verbose('Dexcom receiver %s connected.' % ('is' if state else 'is not')) VERBOSITY_INSANE = False def sleep_verbose(n): if VERBOSITY_INSANE: print_verbose('sleep(%d)' % n) sleep(n) imported_requests = None def import_requests(): global imported_requests if not imported_requests: print_verbose("Loading 'requests' HTTP client library... ", newline=False) import requests print_verbose("done.") imported_requests = requests return imported_requests def POST(path, json_str): msg = None try: requests = import_requests() resp = requests.post(HOST + path, data=json_str, headers={'Content-type': 'application/json'}) msg = resp.text resp.raise_for_status() return True except requests.exceptions.HTTPError as e: print_verbose(e) if msg: print_verbose(msg) return False def send_ping(now): if HOST: POST('/ping', toJSON(now)) def print_cgm_bg(now, r): if JSON: print toJSON(r) else: print '%s: %s %s' % (format_times(now, r.system_time), r.glucose, r.trend_arrow) stdout.flush() MAX_POLL_WAIT_SECONDS = 5 MIN_POLL_WAIT_SECONDS = 1 def poll_remote(): (n, r) = remote_update('EGV_DATA') if n is None: return MAX_POLL_WAIT_SECONDS else: now = dr.ReadSystemTime() if n == 0: send_ping(now) if r: for t in ['METER_DATA', 'INSERTION_TIME', 'USER_EVENT_DATA']: # TODO: track how long this takes & adjust sleep accordingly try: remote_update(t) except: traceback.print_exc() next_reading = (r.system_time - now + timedelta(minutes=5, seconds=2)).total_seconds() return max(MIN_POLL_WAIT_SECONDS, min(MAX_POLL_WAIT_SECONDS, next_reading)) else: return MAX_POLL_WAIT_SECONDS def poll(): print_verbose('Started dexcom_dumper.') if HOST: print_verbose("Acting as client to dracos server at " + HOST) poll_once = poll_remote if HOST else poll_stdout dexcom_g5_product = '22a3/47/100' def is_dexcom_g5(dev): try: return dev.parent.get('PRODUCT') == dexcom_g5_product except: return False devicer = simple_devicer('tty', is_dexcom_g5) while True: try: print_verbose("Waiting for device.") if not devicer.wait_add(600): # This does not really need to time out, but if there # is no timeout, then keyboard interrupt does not work. # Might as well use the opportunity to print more # garbage into the log, though. continue print_verbose("Device found.") global dr if dr: dr.Disconnect() dr = readdata.DexcomG5(devicer.device.device_node) print("Device opened.") while devicer.have(): print_verbose("Polling device.") try: sleeptime = poll_once() except: traceback.print_exc(0) sleeptime = 10 if devicer.wait_remove(sleeptime): print_verbose("Device unplugged.") dr.Disconnect() dr = None except KeyboardInterrupt: break def poll_stdout(): (r, now) = read_recent_egv_data() if r is None: return 10 else: print_cgm_bg(now, r) next_reading = (r.system_time - now + timedelta(minutes=5, seconds=2)).total_seconds() return next_reading if (next_reading > 0) else 10 def since(when, rectype): filt = lambda r: (r.system_time > when) if when else True return list(reversed(list(takewhile(filt, dr.iter_records(rectype))))) def since_and_first(when, rectype): class closure: fst = None def filt(r): closure.fst = closure.fst or r return (r.system_time > when) if when else True return (list(reversed(list(takewhile(filt, dr.iter_records(rectype))))), closure.fst) def parsetime(s): return datetime.strptime(s, "%Y-%m-%dT%H:%M:%SZ") # Update the remote site with any records of 'rectype' that are missing. # The server is queried to determine its last update. # # Returns (a,b) where: # # a = the number of records sent to the server (possibly zero), or None for a server error # # b = the latest record available on the Dexcom # # TODO: This has a race condition if you imagine multiple clients, # server restarts, etc.; it should send back the value it got in the # query to the server, so that the server can verify that the updates # are consecutive. REMOTE_HAS = {} def check_remote_has(rectype): global REMOTE_HAS if REMOTE_HAS.get(rectype, None) is not None: return REMOTE_HAS[rectype] requests = import_requests() resp = requests.get(HOST + '/' + rectype + '/1') resp.raise_for_status() when = None if len(resp.json()): when = parsetime(resp.json()[0]['system_time']) print_verbose("Latest %s record on server: %s" % (rectype, when.isoformat())) REMOTE_HAS[rectype] = when return when def remote_update(rectype): global REMOTE_HAS when = check_remote_has(rectype) (rs, r) = since_and_first(when, rectype) connected(True) if len(rs): REMOTE_HAS[rectype] = None print_verbose("Sending %d %s record%s... " % (len(rs), rectype, '' if len(rs) == 1 else 's'), newline=False) result = POST('/' + rectype, toJSON(rs)) print_verbose("done. (Result: %s.)" % 'success' if result else 'failure') return (len(rs) if result else None, r) else: return (0, r) def test(): remote_update('USER_EVENT_DATA') def test0(): for t in parseable_record_types(): remote_update(t) from datetime import tzinfo, timedelta, datetime ZERO = timedelta(0) class UTCtzinfo(tzinfo): def utcoffset(self, dt): return ZERO def tzname(self, dt): return "UTC" def dst(self, dt): return ZERO UTC = UTCtzinfo() class JSON_Time(json.JSONEncoder): def default(self, o): if isinstance(o, datetime): if o.tzinfo is None: return o.replace(tzinfo=UTC).isoformat() else: return o.isoformat() return json.JSONEncoder.default(self, o) class JSON_CGM(JSON_Time): def default(self, o): if isinstance(o, EGVRecord) and o.is_special: op={} record={} for k in o.BASE_FIELDS: op[k] = getattr(o, k) op['record'] = [getattr(o, 'glucose_special_meaning'), []] return op elif isinstance(o, EventRecord): op={} for k in o.BASE_FIELDS: op[k] = getattr(o, k) if o.event_sub_type: op['record'] = [o.event_type + '_' + str(o.event_sub_type), []] else: op['record'] = [o.event_type, o.event_value] return op elif isinstance(o, GenericTimestampedRecord): op={} record={} for k in o.BASE_FIELDS: op[k] = getattr(o, k) for k in o.FIELDS: record[k] = getattr(o, k) if isinstance(o, EGVRecord): record = ['EGVRecord', record] op['record'] = record return op else: return JSON_Time.default(self, o) def toJSON(o): return json.dumps(o, cls=JSON_CGM) {"dump_everything": dump_everything, "dump_cgm": dump_cgm, "poll": poll, "test": poll, }[command]()