1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import constants
import readdata
import sys
from sys import stdout, stderr
from datetime import timedelta, datetime
from time import sleep
from optparse import OptionParser
G5_IS_DEFAULT = True
DEFAULT_PAGE_COUNT = 2
parser = OptionParser()
parser.add_option("--g4", action="store_false", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G4 instead of Dexcom G5")
parser.add_option("--g5", action="store_true", dest="g5", default=G5_IS_DEFAULT, help="use Dexcom G5 instead of Dexcom G4")
parser.add_option("-a", "--all", action="store_const", dest="command", const="dump_everything", help="dump all available records")
parser.add_option("-p", "--poll", action="store_const", dest="command", const="poll", help="poll for latest CGM record")
parser.add_option("-n", type="int", dest="num_records", default=DEFAULT_PAGE_COUNT, help="number of pages of CGM records to display")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose", help="verbosity (currently for debugging)")
(options, args) = parser.parse_args()
command = options.command or "dump_cgm"
VERBOSE = options.verbose
def get_dexcom_reader():
if options.g5:
dd = readdata.DexcomG5.FindDevice()
return readdata.DexcomG5(dd)
else:
dd = readdata.Dexcom.FindDevice()
return readdata.Dexcom(dd)
dr = get_dexcom_reader()
def dump_everything():
# record_types = ['METER_DATA', 'INSERTION_TIME', 'USER_EVENT_DATA', 'CAL_SET', 'SENSOR_DATA']
unparseable = ['FIRMWARE_PARAMETER_DATA', 'RECEIVER_LOG_DATA', 'USER_SETTING_DATA', 'MAX_VALUE']
parsed_to_xml = ['MANUFACTURING_DATA', 'PC_SOFTWARE_PARAMETER']
skip = unparseable + parsed_to_xml
record_types = filter(lambda v: not v in skip, constants.RECORD_TYPES)
for t in record_types:
print t + ":"
for r in dr.ReadRecords(t):
print r
def dump_cgm():
cgm_records = dr.ReadRecords('EGV_DATA', options.num_records)
for cr in cgm_records:
if not cr.display_only:
print cr
def recent(t):
now = dr.ReadSystemTime()
return t.system_time > now - timedelta(minutes=5)
def print_verbose(s):
global VERBOSE
if VERBOSE:
stderr.write('%s\n' % str(s))
def read_recent_egv_data():
try:
r = dr.ReadRecords('EGV_DATA', options.num_records)[-1]
if recent(r) and not r.is_special and not r.display_only:
return r
else:
return None
except ValueError as v:
if (v.args[0] == 'Attempting to use a port that is not open'):
return False
else:
print_verbose(v)
return None
except:
print_verbose(sys.exc_info)
return None
def format_times(now, stamp):
diff = (stamp - now).total_seconds()
operand = '-' if diff < 0 else '+' # should always be -
return '%s %s %d' % (now.strftime('%s'), operand, abs(diff))
def print_localtime(str):
print '%s (system): %s' % (datetime.now().strftime('%s'), str)
CONNECTED = None
def connected(state):
global CONNECTED
if (state == CONNECTED):
return
else:
CONNECTED = state
print_localtime('dexcom receiver %s connected' % ('is' if state else 'is not'))
def sleep_verbose(n):
print_verbose('sleep(%d)' % n)
sleep(n)
def poll():
print_localtime('dexcom_dumper started')
while True:
r = read_recent_egv_data()
if r is None:
connected(True)
sleep_verbose(10)
elif r is False:
connected(False)
sleep_verbose(10)
else:
now = dr.ReadSystemTime()
print '%s: %s %s' % (format_times(now, r.system_time), r.glucose, r.trend_arrow)
stdout.flush()
next_reading = (r.system_time - now + timedelta(minutes=5, seconds=2)).total_seconds()
if (next_reading > 0):
sleep_verbose(next_reading)
{"dump_everything": dump_everything,
"dump_cgm": dump_cgm,
"poll": poll
}[command]()
|