import constants import readdata import sys import json # import requests # As this takes SEVEN SECONDS, it's delayed until needed from sys import stdout, stderr from datetime import timedelta, datetime from time import sleep from itertools import islice, takewhile from database_records import GenericTimestampedRecord, EGVRecord # from pytz import UTC # Since the dependency is external, don't import unless needed from optparse import OptionParser G5_IS_DEFAULT = True parser = OptionParser() parser.add_option("--g4", action="store_false", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G4 instead of Dexcom G5") parser.add_option("--g5", action="store_true", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G5 instead of Dexcom G4") parser.add_option("-a", "--all", action="store_const", dest="command", const="dump_everything", help="dump all available records") parser.add_option("-p", "--poll", action="store_const", dest="command", const="poll", help="poll for latest CGM record") parser.add_option("--test", action="store_const", dest="command", const="test", help="test") parser.add_option("--hours", type="int", dest="hours", default=None, help="display N most recent hours of CGM records") parser.add_option("-n", type="int", dest="num_records", default=None, help="number of CGM records to display") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", help="verbosity (currently for debugging)") parser.add_option("-H", "--human", action="store_true", dest="human", help="print human-readable times") parser.add_option("-j", "--json", action="store_true", dest="json", help="print JSON output") parser.add_option("--http", type="string", dest="host", help="submit via HTTP") (options, args) = parser.parse_args() command = options.command or "dump_cgm" VERBOSE = options.verbose HUMAN = options.human JSON = options.json HOST = options.host if command is 'dump_cgm' and options.num_records is None and not options.hours: options.hours = 2 if options.num_records <= 0: options.num_records = None def get_dexcom_reader(): if options.g5: dd = readdata.DexcomG5.FindDevice() return readdata.DexcomG5(dd) else: dd = readdata.Dexcom.FindDevice() return readdata.Dexcom(dd) dr = get_dexcom_reader() def parseable_record_types(): unparseable = ['FIRMWARE_PARAMETER_DATA', 'RECEIVER_LOG_DATA', 'USER_SETTING_DATA', 'MAX_VALUE'] parsed_to_xml = ['MANUFACTURING_DATA', 'PC_SOFTWARE_PARAMETER'] skip = unparseable + parsed_to_xml return filter(lambda v: not v in skip, constants.RECORD_TYPES) def choose_range(rs): if options.hours: now = dr.ReadSystemTime() when = now - timedelta(hours=options.hours) return takewhile(lambda r: r.system_time > when, rs) else: return islice(rs, options.num_records) def dump_everything(): for t in parseable_record_types(): print t + ":" for r in choose_range(dr.iter_records(t)): print toJSON(r) if JSON else r def dump_cgm(): for cr in reversed(list(choose_range(dr.iter_records('EGV_DATA')))): if not cr.display_only: print toJSON(cr) if JSON else cr def recent(t): now = dr.ReadSystemTime() return t.system_time > now - timedelta(minutes=5) def print_verbose(s): global VERBOSE if VERBOSE: stderr.write('%s\n' % str(s)) def read_recent_egv_data(): try: r = dr.ReadRecords('EGV_DATA', 1)[-1] if recent(r) and not r.is_special and not r.display_only: return r else: return None except ValueError as v: if (v.args[0] == 'Attempting to use a port that is not open'): return False else: print_verbose(v) return None except: print_verbose(sys.exc_info) return None def time_fmt(t): global HUMAN return t.strftime('%c' if HUMAN else '%s') def format_times(now, stamp): diff = (stamp - now).total_seconds() operand = '-' if diff < 0 else '+' # should always be - return '%s %s %d' % (time_fmt(now), operand, abs(diff)) def print_localtime(str): print '%s (system): %s' % (time_fmt(datetime.now()), str) CONNECTED = None def connected(state): global CONNECTED if (state == CONNECTED): return else: CONNECTED = state print_localtime('dexcom receiver %s connected' % ('is' if state else 'is not')) def sleep_verbose(n): print_verbose('sleep(%d)' % n) sleep(n) def POST(path, json_str): resp = None try: import requests resp = requests.post(HOST + path, data=json_str, headers={'Content-type': 'application/json'}) resp.raise_for_status() return True except: print_verbose(sys.exc_info()) if resp: print_verbose(resp.text) return False def send_ping(now): if HOST: POST('/ping', toJSON(now)) def print_cgm_bg(now, r): if HOST: r.trend_arrow.replace('45_', 'DIAGONAL_', 1) POST('/bgevent', toJSON(r)) send_ping(now) return elif JSON: print toJSON(r) else: print '%s: %s %s' % (format_times(now, r.system_time), r.glucose, r.trend_arrow) stdout.flush() def poll(): print_localtime('dexcom_dumper started') while True: r = read_recent_egv_data() if r is None: connected(True) if HOST: now = dr.ReadSystemTime() send_ping(now) sleep_verbose(10) elif r is False: connected(False) sleep_verbose(10) else: now = dr.ReadSystemTime() print_cgm_bg(now, r) next_reading = (r.system_time - now + timedelta(minutes=5, seconds=2)).total_seconds() if (next_reading > 0): sleep_verbose(next_reading) def since(when, rectype='EGV_DATA'): return reversed(list(takewhile(lambda r: r.system_time > when, dr.iter_records(rectype)))) def test(): now = dr.ReadSystemTime() POST('/ping', toJSON(now)) for t in parseable_record_types(): rs = dr.ReadRecords(t, 1) if not rs: continue r=rs[-1] print toJSON([t,r]) class JSON_Time(json.JSONEncoder): def default(self, o): if isinstance(o, datetime): if o.tzinfo is None: from pytz import UTC return o.replace(tzinfo=UTC).isoformat() else: return o.isoformat() return json.JSONEncoder.default(self, o) class JSON_CGM(JSON_Time): def default(self, o): if isinstance(o, EGVRecord) and o.is_special: op={} for k in o.BASE_FIELDS + ['glucose_special_meaning']: op[k] = getattr(o, k) return op elif isinstance(o, GenericTimestampedRecord): op={} for k in o.BASE_FIELDS + o.FIELDS: op[k] = getattr(o, k) return op return JSON_Time.default(self, o) def toJSON(o): return json.dumps(o, cls=JSON_CGM) {"dump_everything": dump_everything, "dump_cgm": dump_cgm, "poll": poll, "test": test, }[command]()