Break out data fetching operations into routines
[infoex-autowx.git] / infoex-autowx.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 """
5 InfoEx <-> NRCS/MesoWest Auto Wx implementation
6 Alexander Vasarab
7 Wylark Mountaineering LLC
8
9 This program fetches data from either an NRCS SNOTEL site or MesoWest
10 weather station and pushes it to InfoEx using the new automated weather
11 system implementation.
12
13 It is designed to be run hourly, and it asks for the last three hours
14 of data of each desired type, and selects the most recent one. This
15 lends some resiliency to the process and helps ensure that we have a
16 value to send, but it can lead to somewhat inconsistent/untruthful
17 data if e.g. the HS is from the last hour but the tempPres is from two
18 hours ago because the instrumentation had a hiccup. It's worth
19 considering if this is a bug or a feature.
20
21 For more information, see file: README
22 For licensing, see file: LICENSE
23 """
24
25 import configparser
26 import csv
27 import datetime
28 import logging
29 import os
30 import sys
31 import time
32
33 from ftplib import FTP
34 from optparse import OptionParser
35
36 import requests
37
38 import zeep
39 import zeep.cache
40 import zeep.transports
41
42 __version__ = '2.0.0'
43
44 LOG = logging.getLogger(__name__)
45 LOG.setLevel(logging.NOTSET)
46
47 def get_parser():
48 """Return OptionParser for this program"""
49 parser = OptionParser(version=__version__)
50
51 parser.add_option("--config",
52 dest="config",
53 metavar="FILE",
54 help="location of config file")
55
56 parser.add_option("--log-level",
57 dest="log_level",
58 default=None,
59 help="set the log level (debug, info, warning)")
60
61 parser.add_option("--dry-run",
62 action="store_true",
63 dest="dry_run",
64 default=False,
65 help="fetch data but don't upload to InfoEx")
66
67 return parser
68
69 def setup_config(config):
70 """Setup config variable based on values specified in the ini file"""
71 try:
72 infoex = {
73 'host': config['infoex']['host'],
74 'uuid': config['infoex']['uuid'],
75 'api_key': config['infoex']['api_key'],
76 'csv_filename': config['infoex']['csv_filename'],
77 'location_uuid': config['infoex']['location_uuid'],
78 'wx_data': {}, # placeholder key, values to come later
79 }
80
81 data = dict()
82 data['provider'] = config['station']['type']
83
84 if data['provider'] not in ['nrcs', 'mesowest']:
85 print("Please specify either nrcs or mesowest as the station type.")
86 sys.exit(1)
87
88 if data['provider'] == 'nrcs':
89 data['source'] = 'https://www.wcc.nrcs.usda.gov/awdbWebService/services?WSDL'
90 data['station_id'] = config['station']['station_id']
91
92 try:
93 data['desired_data'] = config['station']['desired_data'].split(',')
94 except:
95 # desired_data malformed or missing, setting default
96 data['desired_data'] = [
97 'TOBS', # AIR TEMPERATURE OBSERVED (degF)
98 'SNWD', # SNOW DEPTH (in)
99 'PREC' # PRECIPITATION ACCUMULATION (in)
100 ]
101
102 # XXX: For NRCS, we're manually overriding units for now! Once
103 # unit conversion is supported for NRCS, REMOVE THIS!
104 if 'units' not in data:
105 data['units'] = 'imperial'
106
107 if data['provider'] == 'mesowest':
108 data['source'] = 'https://api.synopticdata.com/v2/stations/timeseries'
109 data['station_id'] = config['station']['station_id']
110 data['units'] = config['station']['units']
111
112 try:
113 data['desired_data'] = config['station']['desired_data']
114 except:
115 # desired_data malformed or missing, setting default
116 data['desired_data'] = 'air_temp,snow_depth'
117
118 # construct full API URL (sans start/end time, added later)
119 data['source'] = data['source'] + '?token=' + config['station']['token'] + '&within=60&units=' + data['units'] + '&stid=' + data['station_id'] + '&vars=' + data['desired_data']
120
121 except KeyError as e:
122 LOG.critical("%s not defined in %s" % (e, options.config))
123 exit(1)
124 except Exception as exc:
125 LOG.critical("Exception occurred in config parsing: '%s'" % (exc))
126 exit(1)
127
128 # all sections/values present in config file, final sanity check
129 try:
130 for key in config.sections():
131 for subkey in config[key]:
132 if not len(config[key][subkey]):
133 raise ValueError;
134 except ValueError as exc:
135 LOG.critical("Config value '%s.%s' is empty" % (key, subkey))
136 exit(1)
137
138 return (infoex, data)
139
140 def setup_logging(log_level):
141 """Setup our logging infrastructure"""
142 try:
143 from systemd.journal import JournalHandler
144 LOG.addHandler(JournalHandler())
145 except:
146 ## fallback to syslog
147 #import logging.handlers
148 #LOG.addHandler(logging.handlers.SysLogHandler())
149 # fallback to stdout
150 handler = logging.StreamHandler(sys.stdout)
151 LOG.addHandler(handler)
152
153 # ugly, but passable
154 if log_level in [None, 'debug', 'info', 'warning']:
155 if log_level == 'debug':
156 LOG.setLevel(logging.DEBUG)
157 elif log_level == 'info':
158 LOG.setLevel(logging.INFO)
159 elif log_level == 'warning':
160 LOG.setLevel(logging.WARNING)
161 else:
162 LOG.setLevel(logging.NOTSET)
163 else:
164 return False
165
166 return True
167
168 def main():
169 """Main routine: sort through args, decide what to do, then do it"""
170 parser = get_parser()
171 (options, args) = parser.parse_args()
172
173 config = configparser.ConfigParser(allow_no_value=False)
174
175 if not options.config:
176 parser.print_help()
177 print("\nPlease specify a configuration file via --config.")
178 sys.exit(1)
179
180 config.read(options.config)
181
182 if not setup_logging(options.log_level):
183 parser.print_help()
184 print("\nPlease select an appropriate log level or remove the switch (--log-level).")
185 sys.exit(1)
186
187 (infoex, data) = setup_config(config)
188
189 LOG.debug('Config parsed, starting up')
190
191 # create mappings
192 (fmap, final_data) = setup_infoex_fields_mapping(infoex['location_uuid'])
193 iemap = setup_infoex_counterparts_mapping(data['provider'])
194
195 # override units if user selected metric
196 #
197 # NOTE: to update this, use the fmap<->final_data mapping laid out above
198 #
199 # NOTE: this only 'works' with MesoWest for now, as the MesoWest API
200 # itself handles the unit conversion; in the future, we will also
201 # support NRCS unit conversion, but this must be done by this
202 # program.
203 if data['units'] == 'metric':
204 final_data[fmap['tempPresUnit']] = 'C'
205 final_data[fmap['hsUnit']] = 'm'
206 final_data[fmap['windSpeedUnit']] = 'm/s'
207 final_data[fmap['windGustSpeedNumUnit']] = 'm/s'
208
209 # floor time to nearest hour
210 dt = datetime.datetime.now()
211 end_date = dt - datetime.timedelta(minutes=dt.minute % 60,
212 seconds=dt.second,
213 microseconds=dt.microsecond)
214 begin_date = end_date - datetime.timedelta(hours=3)
215
216 # get the data
217 LOG.debug("Getting %s data from %s to %s" % (str(data['desired_data']),
218 str(begin_date), str(end_date)))
219
220 time_all_elements = time.time()
221
222 # get the data
223 if data['provider'] == 'nrcs':
224 infoex['wx_data'] = get_nrcs_data(begin_date, end_date, data)
225 elif data['provider'] == 'mesowest':
226 infoex['wx_data'] = get_mesowest_data(begin_date, end_date,
227 data)
228
229 LOG.info("Time to get all data : %.3f sec" % (time.time() -
230 time_all_elements))
231
232 LOG.debug("infoex[wx_data]: %s", str(infoex['wx_data']))
233
234 # Now we only need to add in what we want to change thanks to that
235 # abomination of a variable declaration earlier
236 final_data[fmap['Location UUID']] = infoex['location_uuid']
237 final_data[fmap['obDate']] = end_date.strftime('%m/%d/%Y')
238 final_data[fmap['obTime']] = end_date.strftime('%H:%M')
239
240 for elementCd in infoex['wx_data']:
241 if elementCd not in iemap:
242 LOG.warning("BAD KEY wx_data['%s']" % (elementCd))
243 continue
244
245 # CONSIDER: Casting every value to Float() -- need to investigate if
246 # any possible elementCds we may want are any other data
247 # type than float.
248 #
249 # Another possibility is to query the API with
250 # getStationElements and temporarily store the
251 # storedUnitCd. But that's pretty network-intensive and
252 # may not even be worth it if there's only e.g. one or two
253 # exceptions to any otherwise uniformly Float value set.
254 final_data[fmap[iemap[elementCd]]] = infoex['wx_data'][elementCd]
255
256 LOG.debug("final_data: %s" % (str(final_data)))
257
258 if not write_local_csv(infoex['csv_filename'], final_data):
259 LOG.warning('Could not write local CSV file: %s',
260 infoex['csv_filename'])
261 return 1;
262
263 if not options.dry_run:
264 upload_csv(infoex['csv_filename'], infoex)
265
266 LOG.debug('DONE')
267 return 0
268
269 # data structure operations
270 def setup_infoex_fields_mapping(location_uuid):
271 """
272 Create a mapping of InfoEx fields to the local data's indexing scheme.
273
274 INFOEX FIELDS
275
276 This won't earn style points in Python, but here we establish a couple
277 of helpful mappings variables. The reason this is helpful is that the
278 end result is simply an ordered set, the CSV file. But we still may
279 want to manipulate the values arbitrarily before writing that file.
280
281 Also note that the current Auto Wx InfoEx documentation shows these
282 keys in a graphical table with the "index" beginning at 1, but here we
283 sanely index beginning at 0.
284 """
285 fmap = {} ; final_data = [None] * 29
286 fmap['Location UUID'] = 0 ; final_data[0] = location_uuid
287 fmap['obDate'] = 1 ; final_data[1] = None
288 fmap['obTime'] = 2 ; final_data[2] = None
289 fmap['timeZone'] = 3 ; final_data[3] = 'Pacific'
290 fmap['tempMaxHour'] = 4 ; final_data[4] = None
291 fmap['tempMaxHourUnit'] = 5 ; final_data[5] = 'F'
292 fmap['tempMinHour'] = 6 ; final_data[6] = None
293 fmap['tempMinHourUnit'] = 7 ; final_data[7] = 'F'
294 fmap['tempPres'] = 8 ; final_data[8] = None
295 fmap['tempPresUnit'] = 9 ; final_data[9] = 'F'
296 fmap['precipitationGauge'] = 10 ; final_data[10] = None
297 fmap['precipitationGaugeUnit'] = 11 ; final_data[11] = 'in'
298 fmap['windSpeedNum'] = 12 ; final_data[12] = None
299 fmap['windSpeedUnit'] = 13 ; final_data[13] = 'mph'
300 fmap['windDirectionNum'] = 14 ; final_data[14] = None
301 fmap['hS'] = 15 ; final_data[15] = None
302 fmap['hsUnit'] = 16 ; final_data[16] = 'in'
303 fmap['baro'] = 17 ; final_data[17] = None
304 fmap['baroUnit'] = 18 ; final_data[18] = 'inHg'
305 fmap['rH'] = 19 ; final_data[19] = None
306 fmap['windGustSpeedNum'] = 20 ; final_data[20] = None
307 fmap['windGustSpeedNumUnit'] = 21 ; final_data[21] = 'mph'
308 fmap['windGustDirNum'] = 22 ; final_data[22] = None
309 fmap['dewPoint'] = 23 ; final_data[23] = None
310 fmap['dewPointUnit'] = 24 ; final_data[24] = 'F'
311 fmap['hn24Auto'] = 25 ; final_data[25] = None
312 fmap['hn24AutoUnit'] = 26 ; final_data[26] = 'in'
313 fmap['hstAuto'] = 27 ; final_data[27] = None
314 fmap['hstAutoUnit'] = 28 ; final_data[28] = 'in'
315
316 return (fmap, final_data)
317
318 def setup_infoex_counterparts_mapping(provider):
319 """
320 Create a mapping of the NRCS/MesoWest fields that this program supports to
321 their InfoEx counterparts
322 """
323 iemap = {}
324
325 if provider == 'nrcs':
326 iemap['PREC'] = 'precipitationGauge'
327 iemap['TOBS'] = 'tempPres'
328 iemap['SNWD'] = 'hS'
329 iemap['PRES'] = 'baro'
330 iemap['RHUM'] = 'rH'
331 iemap['WSPD'] = 'windSpeedNum'
332 iemap['WDIR'] = 'windDirectionNum'
333 # unsupported by NRCS:
334 # windGustSpeedNum
335 elif provider == 'mesowest':
336 iemap['precip_accum'] = 'precipitationGauge'
337 iemap['air_temp'] = 'tempPres'
338 iemap['snow_depth'] = 'hS'
339 iemap['pressure'] = 'baro'
340 iemap['relative_humidity'] = 'rH'
341 iemap['wind_speed'] = 'windSpeedNum'
342 iemap['wind_direction'] = 'windDirectionNum'
343 iemap['wind_gust'] = 'windGustSpeedNum'
344
345 return iemap
346
347 # provider-specific operations
348 def get_nrcs_data(begin, end, data):
349 """get the data we're after from the NRCS WSDL"""
350 transport = zeep.transports.Transport(cache=zeep.cache.SqliteCache())
351 client = zeep.Client(wsdl=data['source'], transport=transport)
352 remote_data = {}
353
354 for elementCd in data['desired_data']:
355 time_element = time.time()
356
357 # get the last three hours of data for this elementCd
358 tmp = client.service.getHourlyData(
359 stationTriplets=[data['station_id']],
360 elementCd=elementCd,
361 ordinal=1,
362 beginDate=begin,
363 endDate=end)
364
365 LOG.info("Time to get elementCd '%s': %.3f sec" % (elementCd,
366 time.time() - time_element))
367
368 values = tmp[0]['values']
369
370 # sort and isolate the most recent
371 #
372 # NOTE: we do this because sometimes there are gaps in hourly data
373 # in NRCS; yes, we may end up with slightly inaccurate data,
374 # so perhaps this decision will be re-evaluated in the future
375 if values:
376 ordered = sorted(values, key=lambda t: t['dateTime'], reverse=True)
377 remote_data[elementCd] = ordered[0]['value']
378 else:
379 remote_data[elementCd] = None
380
381 return remote_data
382
383 def get_mesowest_data(begin, end, data):
384 """get the data we're after from the MesoWest/Synoptic API"""
385 remote_data = {}
386
387 # massage begin/end date format
388 begin_date_str = begin.strftime('%Y%m%d%H%M')
389 end_date_str = end.strftime('%Y%m%d%H%M')
390
391 # construct final, completed API URL
392 api_req_url = data['source'] + '&start=' + begin_date_str + '&end=' + end_date_str
393 req = requests.get(api_req_url)
394
395 try:
396 json = req.json()
397 except ValueError:
398 LOG.error("Bad JSON in MesoWest response")
399 sys.exit(1)
400
401 try:
402 observations = json['STATION'][0]['OBSERVATIONS']
403 except ValueError:
404 LOG.error("Bad JSON in MesoWest response")
405 sys.exit(1)
406
407 pos = len(observations['date_time']) - 1
408
409 for elementCd in data['desired_data'].split(','):
410 # sort and isolate the most recent, see note above in NRCS for how and
411 # why this is done
412 #
413 # NOTE: Unlike in the NRCS case, the MesoWest API response contains all
414 # data (whereas with NRCS, we have to make a separate request for
415 # each element we want). This is nice for network efficiency but
416 # it means we have to handle this part differently for each.
417 #
418 # NOTE: Also unlike NRCS, MesoWest provides more granular data; NRCS
419 # provides hourly data, but MesoWest can often provide data every
420 # 10 minutes -- though this provides more opportunity for
421 # irregularities
422
423 # we may not have the data at all
424 key_name = elementCd + '_set_1'
425 if key_name in observations:
426 if observations[key_name][pos]:
427 remote_data[elementCd] = observations[key_name][pos]
428 else:
429 remote_data[elementCd] = None
430 else:
431 remote_data[elementCd] = None
432
433 return remote_data
434
435 # CSV operations
436 def write_local_csv(path_to_file, data):
437 """Write the specified CSV file to disk"""
438 with open(path_to_file, 'w') as f:
439 # The requirement is that empty values are represented in the CSV
440 # file as "", csv.QUOTE_NONNUMERIC achieves that
441 LOG.debug("writing CSV file '%s'" % (path_to_file))
442 writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
443 writer.writerow(data)
444 f.close()
445 return True
446
447 def upload_csv(path_to_file, infoex_data):
448 """Upload the specified CSV file to InfoEx FTP and remove the file"""
449 with open(path_to_file, 'rb') as file_object:
450 LOG.debug("uploading FTP file '%s'" % (infoex_data['host']))
451 ftp = FTP(infoex_data['host'], infoex_data['uuid'],
452 infoex_data['api_key'])
453 ftp.storlines('STOR ' + path_to_file, file_object)
454 ftp.close()
455 file_object.close()
456 os.remove(path_to_file)
457
458 if __name__ == "__main__":
459 sys.exit(main())