import constants import readdata import sys import json import requests from pytz import UTC from sys import stdout, stderr from datetime import timedelta, datetime from time import sleep from itertools import islice, takewhile from database_records import GenericTimestampedRecord from optparse import OptionParser G5_IS_DEFAULT = True parser = OptionParser() parser.add_option("--g4", action="store_false", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G4 instead of Dexcom G5") parser.add_option("--g5", action="store_true", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G5 instead of Dexcom G4") parser.add_option("-a", "--all", action="store_const", dest="command", const="dump_everything", help="dump all available records") parser.add_option("-p", "--poll", action="store_const", dest="command", const="poll", help="poll for latest CGM record") parser.add_option("--test", action="store_const", dest="command", const="test", help="test") parser.add_option("--hours", type="int", dest="hours", default=None, help="display N most recent hours of CGM records") parser.add_option("-n", type="int", dest="num_records", default=None, help="number of CGM records to display") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", help="verbosity (currently for debugging)") parser.add_option("-H", "--human", action="store_true", dest="human", help="print human-readable times") parser.add_option("-j", "--json", action="store_true", dest="json", help="print JSON output") parser.add_option("--http", type="string", dest="host", help="submit via HTTP") (options, args) = parser.parse_args() command = options.command or "dump_cgm" VERBOSE = options.verbose HUMAN = options.human JSON = options.json HOST = options.host if command is 'dump_cgm' and options.num_records is None and not options.hours: options.hours = 2 def get_dexcom_reader(): if options.g5: dd = readdata.DexcomG5.FindDevice() return readdata.DexcomG5(dd) else: dd = readdata.Dexcom.FindDevice() return readdata.Dexcom(dd) dr = get_dexcom_reader() def parseable_record_types(): unparseable = ['FIRMWARE_PARAMETER_DATA', 'RECEIVER_LOG_DATA', 'USER_SETTING_DATA', 'MAX_VALUE'] parsed_to_xml = ['MANUFACTURING_DATA', 'PC_SOFTWARE_PARAMETER'] skip = unparseable + parsed_to_xml return filter(lambda v: not v in skip, constants.RECORD_TYPES) def choose_range(rs): if options.hours: now = dr.ReadSystemTime() when = now - timedelta(hours=options.hours) return takewhile(lambda r: r.system_time > when, rs) else: return islice(rs, options.num_records) def dump_everything(): for t in parseable_record_types(): print t + ":" for r in choose_range(dr.iter_records(t)): print toJSON(r) if JSON else r def dump_cgm(): for cr in reversed(list(choose_range(dr.iter_records('EGV_DATA')))): if not cr.display_only: print toJSON(cr) if JSON else cr def recent(t): now = dr.ReadSystemTime() return t.system_time > now - timedelta(minutes=5) def print_verbose(s): global VERBOSE if VERBOSE: stderr.write('%s\n' % str(s)) def read_recent_egv_data(): try: r = dr.ReadRecords('EGV_DATA', 1)[-1] if recent(r) and not r.is_special and not r.display_only: return r else: return None except ValueError as v: if (v.args[0] == 'Attempting to use a port that is not open'): return False else: print_verbose(v) return None except: print_verbose(sys.exc_info) return None def time_fmt(t): global HUMAN return t.strftime('%c' if HUMAN else '%s') def format_times(now, stamp): diff = (stamp - now).total_seconds() operand = '-' if diff < 0 else '+' # should always be - return '%s %s %d' % (time_fmt(now), operand, abs(diff)) def print_localtime(str): print '%s (system): %s' % (time_fmt(datetime.now()), str) CONNECTED = None def connected(state): global CONNECTED if (state == CONNECTED): return else: CONNECTED = state print_localtime('dexcom receiver %s connected' % ('is' if state else 'is not')) def sleep_verbose(n): print_verbose('sleep(%d)' % n) sleep(n) def POST(path, json_str): try: resp = requests.post(HOST + path, data=json_str, headers={'Content-type': 'application/json'}) print_verbose(resp.text) resp.raise_for_status() return True except: print_verbose(sys.exc_info()) return False def send_ping(now): if HOST: POST('/ping', toJSON(now)) def print_cgm_bg(now, r): if HOST: r.trend_arrow.replace('45_', 'DIAGONAL_', 1) POST('/bgevent', toJSON(r)) send_ping(now) return elif JSON: print toJSON(r) else: print '%s: %s %s' % (format_times(now, r.system_time), r.glucose, r.trend_arrow) stdout.flush() def poll(): print_localtime('dexcom_dumper started') while True: r = read_recent_egv_data() if r is None: connected(True) if HOST: now = dr.ReadSystemTime() send_ping(now) sleep_verbose(10) elif r is False: connected(False) sleep_verbose(10) else: now = dr.ReadSystemTime() print_cgm_bg(now, r) next_reading = (r.system_time - now + timedelta(minutes=5, seconds=2)).total_seconds() if (next_reading > 0): sleep_verbose(next_reading) def since(when, rectype='EGV_DATA'): return reversed(list(takewhile(lambda r: r.system_time > when, dr.iter_records(rectype)))) def test(): now = dr.ReadSystemTime() POST('/ping', toJSON(now)) for t in parseable_record_types(): rs = dr.ReadRecords(t, 1) if not rs: continue r=rs[-1] print toJSON([t,r]) class JSON_Time(json.JSONEncoder): def default(self, o): if isinstance(o, datetime): if o.tzinfo is None: return o.replace(tzinfo=UTC).isoformat() else: return o.isoformat() return json.JSONEncoder.default(self, o) class JSON_CGM(JSON_Time): def default(self, o): if isinstance(o, GenericTimestampedRecord): op={} for k in o.BASE_FIELDS + o.FIELDS: op[k] = getattr(o, k) return op return JSON_Time.default(self, o) def toJSON(o): return json.dumps(o, cls=JSON_CGM) {"dump_everything": dump_everything, "dump_cgm": dump_cgm, "poll": poll, "test": test, }[command]()