# Copyright (c) 2009,2015,2016,2017 MetPy Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""Support reading information from various NEXRAD formats."""
import bz2
from collections import defaultdict, namedtuple, OrderedDict
import contextlib
import datetime
import logging
import pathlib
import re
import struct
from xdrlib import Unpacker
import numpy as np
from scipy.constants import day, milli
from ._tools import (Array, BitField, Bits, DictStruct, Enum, IOBuffer, NamedStruct,
open_as_needed, zlib_decompress_all_frames)
from ..package_tools import Exporter
exporter = Exporter(globals())
log = logging.getLogger(__name__)
def version(val):
"""Calculate a string version from an integer value."""
ver = val / 100. if val > 2. * 100. else val / 10.
return f'{ver:.1f}'
def scaler(scale):
"""Create a function that scales by a specific value."""
def inner(val):
return val * scale
return inner
def angle(val):
"""Convert an integer value to a floating point angle."""
return val * 360. / 2**16
def az_rate(val):
"""Convert an integer value to a floating point angular rate."""
return val * 90. / 2**16
def bzip_blocks_decompress_all(data):
"""Decompress all of the bzip2-ed blocks.
Returns the decompressed data as a `bytearray`.
"""
frames = bytearray()
offset = 0
while offset < len(data):
block_cmp_bytes = abs(int.from_bytes(data[offset:offset + 4], 'big', signed=True))
offset += 4
try:
frames += bz2.decompress(data[offset:offset + block_cmp_bytes])
offset += block_cmp_bytes
except OSError:
# If we've decompressed any frames, this is an error mid-stream, so warn, stop
# trying to decompress and let processing proceed
if frames:
logging.warning('Error decompressing bz2 block stream at offset: %d',
offset - 4)
break
else: # Otherwise, this isn't a bzip2 stream, so bail
raise ValueError('Not a bz2 stream.')
return frames
def nexrad_to_datetime(julian_date, ms_midnight):
"""Convert NEXRAD date time format to python `datetime.datetime`."""
# Subtracting one from julian_date is because epoch date is 1
return datetime.datetime.utcfromtimestamp((julian_date - 1) * day + ms_midnight * milli)
def remap_status(val):
"""Convert status integer value to appropriate bitmask."""
status = 0
bad = BAD_DATA if val & 0xF0 else 0
val &= 0x0F
if val == 0:
status = START_ELEVATION
elif val == 1:
status = 0
elif val == 2:
status = END_ELEVATION
elif val == 3:
status = START_ELEVATION | START_VOLUME
elif val == 4:
status = END_ELEVATION | END_VOLUME
elif val == 5:
status = START_ELEVATION | LAST_ELEVATION
return status | bad
START_ELEVATION = 0x1
END_ELEVATION = 0x2
START_VOLUME = 0x4
END_VOLUME = 0x8
LAST_ELEVATION = 0x10
BAD_DATA = 0x20
@exporter.export
class Level2File:
r"""Handle reading the NEXRAD Level 2 data and its various messages.
This class attempts to decode every byte that is in a given data file.
It supports both external compression, as well as the internal BZ2
compression that is used.
Attributes
----------
stid : str
The ID of the radar station
dt : `~datetime.datetime`
The date and time of the data
vol_hdr : `collections.namedtuple`
The unpacked volume header
sweeps : list[tuple]
Data for each of the sweeps found in the file
rda_status : `collections.namedtuple`, optional
Unpacked RDA status information, if found
maintenance_data : `collections.namedtuple`, optional
Unpacked maintenance data information, if found
maintenance_data_desc : dict, optional
Descriptions of maintenance data fields, if maintenance data present
vcp_info : `collections.namedtuple`, optional
Unpacked VCP information, if found
clutter_filter_bypass_map : dict, optional
Unpacked clutter filter bypass map, if present
rda : dict, optional
Unpacked RDA adaptation data, if present
rda_adaptation_desc : dict, optional
Descriptions of RDA adaptation data, if adaptation data present
Notes
-----
The internal data structure that things are decoded into is still to be
determined.
"""
# Number of bytes
AR2_BLOCKSIZE = 2432 # 12 (CTM) + 2416 (Msg hdr + data) + 4 (FCS)
CTM_HEADER_SIZE = 12
MISSING = float('nan')
RANGE_FOLD = float('nan') # TODO: Need to separate from missing
[docs] def __init__(self, filename, *, has_volume_header=True):
r"""Create instance of `Level2File`.
Parameters
----------
filename : str or file-like object
If str, the name of the file to be opened. Gzip-ed files are
recognized with the extension '.gz', as are bzip2-ed files with
the extension `.bz2` If `filename` is a file-like object,
this will be read from directly.
"""
fobj = open_as_needed(filename)
with contextlib.closing(fobj):
self._buffer = IOBuffer.fromfile(fobj)
# Try to read the volume header. If this fails, or we're told we don't have one
# then we fall back and try to just read messages, assuming we have e.g. one of
# the real-time chunks.
try:
if has_volume_header:
self._read_volume_header()
except (OSError, ValueError):
log.warning('Unable to read volume header. Attempting to read messages.')
self._buffer.reset()
# See if we need to apply bz2 decompression
start = self._buffer.set_mark()
try:
self._buffer = IOBuffer(self._buffer.read_func(bzip_blocks_decompress_all))
except ValueError:
self._buffer.jump_to(start)
# Now we're all initialized, we can proceed with reading in data
self._read_data()
vol_hdr_fmt = NamedStruct([('version', '9s'), ('vol_num', '3s'),
('date', 'L'), ('time_ms', 'L'), ('stid', '4s')], '>', 'VolHdr')
def _read_volume_header(self):
self.vol_hdr = self._buffer.read_struct(self.vol_hdr_fmt)
self.dt = nexrad_to_datetime(self.vol_hdr.date, self.vol_hdr.time_ms)
self.stid = self.vol_hdr.stid
msg_hdr_fmt = NamedStruct([('size_hw', 'H'),
('rda_channel', 'B', BitField('Redundant Channel 1',
'Redundant Channel 2',
None, 'ORDA')),
('msg_type', 'B'), ('seq_num', 'H'), ('date', 'H'),
('time_ms', 'I'), ('num_segments', 'H'), ('segment_num', 'H')],
'>', 'MsgHdr')
def _read_data(self):
self._msg_buf = {}
self.sweeps = []
self.rda_status = []
while not self._buffer.at_end():
# Clear old file book marks and set the start of message for
# easy jumping to the end
self._buffer.clear_marks()
msg_start = self._buffer.set_mark()
# Skip CTM
self._buffer.skip(self.CTM_HEADER_SIZE)
# Read the message header
msg_hdr = self._buffer.read_struct(self.msg_hdr_fmt)
log.debug('Got message: %s (at offset %d)', str(msg_hdr), self._buffer._offset)
# The AR2_BLOCKSIZE accounts for the CTM header before the
# data, as well as the Frame Check Sequence (4 bytes) after
# the end of the data.
msg_bytes = self.AR2_BLOCKSIZE
# If the size is 0, this is just padding, which is for certain
# done in the metadata messages. Let the default block size handle rather
# than any specific heuristic to skip.
if msg_hdr.size_hw:
# For new packets, the message size isn't on the fixed size boundaries,
# so we use header to figure out. For these, we need to include the
# CTM header but not FCS, in addition to the size.
# As of 2620002P, this is a special value used to indicate that the segment
# number/count bytes are used to indicate total size in bytes.
if msg_hdr.size_hw == 65535:
msg_bytes = (msg_hdr.num_segments << 16 | msg_hdr.segment_num
+ self.CTM_HEADER_SIZE)
elif msg_hdr.msg_type in (29, 31):
msg_bytes = self.CTM_HEADER_SIZE + 2 * msg_hdr.size_hw
log.debug('Total message size: %d', msg_bytes)
# Try to handle the message. If we don't handle it, skipping
# past it is handled at the end anyway.
decoder = f'_decode_msg{msg_hdr.msg_type:d}'
if hasattr(self, decoder):
getattr(self, decoder)(msg_hdr)
else:
log.warning('Unknown message: %d', msg_hdr.msg_type)
# Jump to the start of the next message. This depends on whether
# the message was legacy with fixed block size or not.
self._buffer.jump_to(msg_start, msg_bytes)
# Check if we have any message segments still in the buffer
if self._msg_buf:
log.warning('Remaining buffered message segments for message type(s): %s',
' '.join(f'{typ} ({len(rem)})' for typ, rem in self._msg_buf.items()))
del self._msg_buf
msg1_fmt = NamedStruct([('time_ms', 'L'), ('date', 'H'),
('unamb_range', 'H', scaler(0.1)), ('az_angle', 'H', angle),
('az_num', 'H'), ('rad_status', 'H', remap_status),
('el_angle', 'H', angle), ('el_num', 'H'),
('surv_first_gate', 'h', scaler(0.001)),
('doppler_first_gate', 'h', scaler(0.001)),
('surv_gate_width', 'H', scaler(0.001)),
('doppler_gate_width', 'H', scaler(0.001)),
('surv_num_gates', 'H'), ('doppler_num_gates', 'H'),
('cut_sector_num', 'H'), ('calib_dbz0', 'f'),
('ref_offset', 'H'), ('vel_offset', 'H'), ('sw_offset', 'H'),
('dop_res', 'H', BitField(None, 0.5, 1.0)), ('vcp', 'H'),
(None, '14x'), ('nyq_vel', 'H', scaler(0.01)),
('atmos_atten', 'H', scaler(0.001)), ('tover', 'H', scaler(0.1)),
('spot_blanking', 'B', BitField('Radial', 'Elevation', 'Volume')),
(None, '32x')], '>', 'Msg1Fmt')
msg1_data_hdr = namedtuple('Msg1DataHdr',
'name first_gate gate_width num_gates scale offset')
def _decode_msg1(self, msg_hdr):
msg_start = self._buffer.set_mark()
hdr = self._buffer.read_struct(self.msg1_fmt)
data_dict = {}
# Process all data pointers:
read_info = []
if hdr.surv_num_gates and hdr.ref_offset:
read_info.append((hdr.ref_offset,
self.msg1_data_hdr('REF', hdr.surv_first_gate,
hdr.surv_gate_width,
hdr.surv_num_gates, 2.0, 66.0)))
if hdr.doppler_num_gates and hdr.vel_offset:
read_info.append((hdr.vel_offset,
self.msg1_data_hdr('VEL', hdr.doppler_first_gate,
hdr.doppler_gate_width,
hdr.doppler_num_gates,
1. / hdr.dop_res, 129.0)))
if hdr.doppler_num_gates and hdr.sw_offset:
read_info.append((hdr.sw_offset,
self.msg1_data_hdr('SW', hdr.doppler_first_gate,
hdr.doppler_gate_width,
hdr.doppler_num_gates, 2.0, 129.0)))
for ptr, data_hdr in read_info:
# Jump and read
self._buffer.jump_to(msg_start, ptr)
vals = self._buffer.read_array(data_hdr.num_gates, 'B')
# Scale and flag data
scaled_vals = (vals - data_hdr.offset) / data_hdr.scale
scaled_vals[vals == 0] = self.MISSING
scaled_vals[vals == 1] = self.RANGE_FOLD
# Store
data_dict[data_hdr.name] = (data_hdr, scaled_vals)
self._add_sweep(hdr)
self.sweeps[-1].append((hdr, data_dict))
msg2_fmt = NamedStruct([
('rda_status', 'H', BitField('None', 'Start-Up', 'Standby', 'Restart',
'Operate', 'Spare', 'Off-line Operate')),
('op_status', 'H', BitField('Disabled', 'On-Line',
'Maintenance Action Required',
'Maintenance Action Mandatory',
'Commanded Shut Down', 'Inoperable',
'Automatic Calibration')),
('control_status', 'H', BitField('None', 'Local Only',
'RPG (Remote) Only', 'Either')),
('aux_power_gen_state', 'H', BitField('Switch to Aux Power',
'Utility PWR Available',
'Generator On',
'Transfer Switch Manual',
'Commanded Switchover')),
('avg_tx_pwr', 'H'), ('ref_calib_cor', 'h', scaler(0.01)),
('data_transmission_enabled', 'H', BitField('None', 'None',
'Reflectivity', 'Velocity', 'Width')),
('vcp_num', 'h'), ('rda_control_auth', 'H', BitField('No Action',
'Local Control Requested',
'Remote Control Enabled')),
('rda_build', 'H', version), ('op_mode', 'H', BitField('None', 'Test',
'Operational', 'Maintenance')),
('super_res_status', 'H', BitField('None', 'Enabled', 'Disabled')),
('cmd_status', 'H', Bits(6)),
('avset_status', 'H', BitField('None', 'Enabled', 'Disabled')),
('rda_alarm_status', 'H', BitField('No Alarms', 'Tower/Utilities',
'Pedestal', 'Transmitter', 'Receiver',
'RDA Control', 'Communication',
'Signal Processor')),
('command_acknowledge', 'H', BitField('Remote VCP Received',
'Clutter Bypass map received',
'Redundant Chan Ctrl Cmd received')),
('channel_control_status', 'H'),
('spot_blanking', 'H', BitField('Enabled', 'Disabled')),
('bypass_map_gen_date', 'H'), ('bypass_map_gen_time', 'H'),
('clutter_filter_map_gen_date', 'H'), ('clutter_filter_map_gen_time', 'H'),
('refv_calib_cor', 'h', scaler(0.01)),
('transition_pwr_src_state', 'H', BitField('Off', 'OK')),
('RMS_control_status', 'H', BitField('RMS in control', 'RDA in control')),
# See Table IV-A for definition of alarms
(None, '2x'), ('alarms', '28s', Array('>14H'))], '>', 'Msg2Fmt')
msg2_additional_fmt = NamedStruct([
('sig_proc_options', 'H', BitField('CMD RhoHV Test')),
(None, '36x'), ('status_version', 'H')], '>', 'Msg2AdditionalFmt')
def _decode_msg2(self, msg_hdr):
msg_start = self._buffer.set_mark()
self.rda_status.append(self._buffer.read_struct(self.msg2_fmt))
remaining = (msg_hdr.size_hw * 2 - self.msg_hdr_fmt.size
- self._buffer.offset_from(msg_start))
# RDA Build 18.0 expanded the size
if remaining >= self.msg2_additional_fmt.size:
self.rda_status.append(self._buffer.read_struct(self.msg2_additional_fmt))
remaining -= self.msg2_additional_fmt.size
if remaining:
log.info('Padding detected in message 2. Length encoded as %d but offset when '
'done is %d', 2 * msg_hdr.size_hw, self._buffer.offset_from(msg_start))
def _decode_msg3(self, msg_hdr):
from ._nexrad_msgs.msg3 import descriptions, fields
self.maintenance_data_desc = descriptions
msg_fmt = DictStruct(fields, '>')
self.maintenance_data = self._buffer.read_struct(msg_fmt)
self._check_size(msg_hdr, msg_fmt.size)
vcp_fmt = NamedStruct([('size_hw', 'H'), ('pattern_type', 'H'),
('num', 'H'), ('num_el_cuts', 'H'),
('vcp_version', 'B'), ('clutter_map_group', 'B'),
('dop_res', 'B', BitField(None, 0.5, 1.0)),
('pulse_width', 'B', BitField('None', 'Short', 'Long')),
(None, '4x'), ('vcp_sequencing', 'H'),
('vcp_supplemental_info', 'H'), (None, '2x'),
('els', None)], '>', 'VCPFmt')
vcp_el_fmt = NamedStruct([('el_angle', 'H', angle),
('channel_config', 'B', Enum('Constant Phase', 'Random Phase',
'SZ2 Phase')),
('waveform', 'B', Enum('None', 'Contiguous Surveillance',
'Contig. Doppler with Ambiguity Res.',
'Contig. Doppler without Ambiguity Res.',
'Batch', 'Staggered Pulse Pair')),
('super_res', 'B', BitField('0.5 azimuth and 0.25km range res.',
'Doppler to 300km',
'Dual Polarization Control',
'Dual Polarization to 300km')),
('surv_prf_num', 'B'), ('surv_pulse_count', 'H'),
('az_rate', 'h', az_rate),
('ref_thresh', 'h', scaler(0.125)),
('vel_thresh', 'h', scaler(0.125)),
('sw_thresh', 'h', scaler(0.125)),
('zdr_thresh', 'h', scaler(0.125)),
('phidp_thresh', 'h', scaler(0.125)),
('rhohv_thresh', 'h', scaler(0.125)),
('sector1_edge', 'H', angle),
('sector1_doppler_prf_num', 'H'),
('sector1_pulse_count', 'H'), ('supplemental_data', 'H'),
('sector2_edge', 'H', angle),
('sector2_doppler_prf_num', 'H'),
('sector2_pulse_count', 'H'), ('ebc_angle', 'H', angle),
('sector3_edge', 'H', angle),
('sector3_doppler_prf_num', 'H'),
('sector3_pulse_count', 'H'), (None, '2x')], '>', 'VCPEl')
def _decode_msg5(self, msg_hdr):
vcp_info = self._buffer.read_struct(self.vcp_fmt)
els = [self._buffer.read_struct(self.vcp_el_fmt) for _ in range(vcp_info.num_el_cuts)]
self.vcp_info = vcp_info._replace(els=els)
self._check_size(msg_hdr,
self.vcp_fmt.size + vcp_info.num_el_cuts * self.vcp_el_fmt.size)
def _decode_msg13(self, msg_hdr):
data = self._buffer_segment(msg_hdr)
if data:
data = struct.Struct(f'>{len(data) // 2:d}h').unpack(data)
# Legacy format doesn't have date/time and has fewer azimuths
if data[0] <= 5:
num_el = data[0]
dt = None
num_az = 256
offset = 1
else:
date, time, num_el = data[:3]
# time is in "minutes since midnight", need to pass as ms since midnight
dt = nexrad_to_datetime(date, 60 * 1000 * time)
num_az = 360
offset = 3
self.clutter_filter_bypass_map = {'datetime': dt, 'data': []}
chunk_size = 32
bit_conv = Bits(16)
for e in range(num_el):
seg_num = data[offset]
if seg_num != (e + 1):
log.warning('Message 13 segments out of sync -- read %d but on %d',
seg_num, e + 1)
az_data = []
for _ in range(num_az):
gates = []
for i in range(1, chunk_size + 1):
gates.extend(bit_conv(data[offset + i]))
az_data.append(gates)
self.clutter_filter_bypass_map['data'].append(az_data)
offset += num_az * chunk_size + 1
if offset != len(data):
log.warning('Message 13 left data -- Used: %d Avail: %d', offset, len(data))
msg15_code_map = {0: 'Bypass Filter', 1: 'Bypass map in Control',
2: 'Force Filter'}
def _decode_msg15(self, msg_hdr):
# buffer the segments until we have the whole thing. The data
# will be returned concatenated when this is the case
data = self._buffer_segment(msg_hdr)
if data:
date, time, num_el, *data = struct.Struct(f'>{len(data) // 2:d}h').unpack(data)
if num_el == 0:
log.info('Message 15 num_el is 0--likely legacy clutter filter notch width. '
'Skipping...')
return
# time is in "minutes since midnight", need to pass as ms since midnight
self.clutter_filter_map = {'datetime': nexrad_to_datetime(date, 60 * 1000 * time),
'data': []}
offset = 0
for _ in range(num_el):
az_data = []
for _ in range(360):
num_rng = data[offset]
codes = data[offset + 1:offset + 1 + 2 * num_rng:2]
ends = data[offset + 2:offset + 2 + 2 * num_rng:2]
az_data.append(list(zip(ends, codes)))
offset += 2 * num_rng + 1
self.clutter_filter_map['data'].append(az_data)
if offset != len(data):
log.warning('Message 15 left data -- Used: %d Avail: %d', offset, len(data))
def _decode_msg18(self, msg_hdr):
# buffer the segments until we have the whole thing. The data
# will be returned concatenated when this is the case
data = self._buffer_segment(msg_hdr)
# Legacy versions don't even document this:
if data and self.vol_hdr.version[:8] not in (b'ARCHIVE2', b'AR2V0001'):
from ._nexrad_msgs.msg18 import descriptions, fields
self.rda_adaptation_desc = descriptions
# Can't use NamedStruct because we have more than 255 items--this
# is a CPython limit for arguments.
msg_fmt = DictStruct(fields, '>')
# Be extra paranoid about passing too much data in case of legacy files
self.rda = msg_fmt.unpack(data[:msg_fmt.size])
for num in (11, 21, 31, 32, 300, 301):
attr = f'VCPAT{num}'
dat = self.rda[attr]
vcp_hdr = self.vcp_fmt.unpack_from(dat, 0)
off = self.vcp_fmt.size
els = []
for _ in range(vcp_hdr.num_el_cuts):
els.append(self.vcp_el_fmt.unpack_from(dat, off))
off += self.vcp_el_fmt.size
self.rda[attr] = vcp_hdr._replace(els=els)
msg31_data_hdr_fmt = NamedStruct([('stid', '4s'), ('time_ms', 'L'),
('date', 'H'), ('az_num', 'H'),
('az_angle', 'f'), ('compression', 'B'),
(None, 'x'), ('rad_length', 'H'),
('az_spacing', 'B', Enum(0, 0.5, 1.0)),
('rad_status', 'B', remap_status),
('el_num', 'B'), ('sector_num', 'B'),
('el_angle', 'f'),
('spot_blanking', 'B', BitField('Radial', 'Elevation',
'Volume')),
('az_index_mode', 'B', scaler(0.01)),
('num_data_blks', 'H')], '>', 'Msg31DataHdr')
msg31_vol_const_fmt = NamedStruct([('type', 's'), ('name', '3s'),
('size', 'H'), ('major', 'B'),
('minor', 'B'), ('lat', 'f'), ('lon', 'f'),
('site_amsl', 'h'), ('feedhorn_agl', 'H'),
('calib_dbz', 'f'), ('txpower_h', 'f'),
('txpower_v', 'f'), ('sys_zdr', 'f'),
('phidp0', 'f'), ('vcp', 'H'),
('processing_status', 'H', BitField('RxR Noise',
'CBT'))],
'>', 'VolConsts')
msg31_el_const_fmt = NamedStruct([('type', 's'), ('name', '3s'),
('size', 'H'), ('atmos_atten', 'h', scaler(0.001)),
('calib_dbz0', 'f')], '>', 'ElConsts')
rad_const_fmt_v1 = NamedStruct([('type', 's'), ('name', '3s'), ('size', 'H'),
('unamb_range', 'H', scaler(0.1)),
('noise_h', 'f'), ('noise_v', 'f'),
('nyq_vel', 'H', scaler(0.01)),
(None, '2x')], '>', 'RadConstsV1')
rad_const_fmt_v2 = NamedStruct([('type', 's'), ('name', '3s'), ('size', 'H'),
('unamb_range', 'H', scaler(0.1)),
('noise_h', 'f'), ('noise_v', 'f'),
('nyq_vel', 'H', scaler(0.01)),
(None, '2x'), ('calib_dbz0_h', 'f'),
('calib_dbz0_v', 'f')], '>', 'RadConstsV2')
data_block_fmt = NamedStruct([('type', 's'), ('name', '3s'),
('reserved', 'L'), ('num_gates', 'H'),
('first_gate', 'H', scaler(0.001)),
('gate_width', 'H', scaler(0.001)),
('tover', 'H', scaler(0.1)),
('snr_thresh', 'h', scaler(0.1)),
('recombined', 'B', BitField('Azimuths', 'Gates')),
('data_size', 'B'),
('scale', 'f'), ('offset', 'f')], '>', 'DataBlockHdr')
Radial = namedtuple('Radial', 'header vol_consts elev_consts radial_consts moments')
def _decode_msg31(self, msg_hdr):
msg_start = self._buffer.set_mark()
data_hdr = self._buffer.read_struct(self.msg31_data_hdr_fmt)
if data_hdr.compression:
log.warning('Compressed message 31 not supported!')
# Read all the block pointers. While the ICD specifies that at least the vol, el, rad
# constant blocks as well as REF moment block are present, it says "the pointers are
# not order or location dependent."
radial = self.Radial(data_hdr, None, None, None, {})
block_count = 0
for ptr in self._buffer.read_binary(data_hdr.num_data_blks, '>L'):
if ptr:
block_count += 1
self._buffer.jump_to(msg_start, ptr)
info = self._buffer.get_next(6)
if info.startswith(b'RVOL'):
radial = radial._replace(
vol_consts=self._buffer.read_struct(self.msg31_vol_const_fmt))
elif info.startswith(b'RELV'):
radial = radial._replace(
elev_consts=self._buffer.read_struct(self.msg31_el_const_fmt))
elif info.startswith(b'RRAD'):
# Relies on the fact that the struct is small enough for its size
# to fit in a single byte
if int(info[-1]) == self.rad_const_fmt_v2.size:
rad_consts = self._buffer.read_struct(self.rad_const_fmt_v2)
else:
rad_consts = self._buffer.read_struct(self.rad_const_fmt_v1)
radial = radial._replace(radial_consts=rad_consts)
elif info.startswith(b'D'):
hdr = self._buffer.read_struct(self.data_block_fmt)
# TODO: The correctness of this code is not tested
vals = self._buffer.read_array(count=hdr.num_gates,
dtype=f'>u{hdr.data_size // 8}')
scaled_vals = (vals - hdr.offset) / hdr.scale
scaled_vals[vals == 0] = self.MISSING
scaled_vals[vals == 1] = self.RANGE_FOLD
radial.moments[hdr.name.strip()] = (hdr, scaled_vals)
else:
log.warning('Unknown Message 31 block type: %s', str(info[:4]))
self._add_sweep(data_hdr)
self.sweeps[-1].append(radial)
if data_hdr.num_data_blks != block_count:
log.warning('Incorrect number of blocks detected -- Got %d'
' instead of %d', block_count, data_hdr.num_data_blks)
if data_hdr.rad_length != self._buffer.offset_from(msg_start):
log.info('Padding detected in message. Length encoded as %d but offset when '
'done is %d', data_hdr.rad_length, self._buffer.offset_from(msg_start))
def _buffer_segment(self, msg_hdr):
# Add to the buffer
bufs = self._msg_buf.setdefault(msg_hdr.msg_type, {})
bufs[msg_hdr.segment_num] = self._buffer.read(2 * msg_hdr.size_hw
- self.msg_hdr_fmt.size)
# Warn for badly formatted data
if len(bufs) != msg_hdr.segment_num:
log.warning('Segment out of order (Got: %d Count: %d) for message type %d.',
msg_hdr.segment_num, len(bufs), msg_hdr.msg_type)
# If we're complete, return the full collection of data
if msg_hdr.num_segments == len(bufs):
self._msg_buf.pop(msg_hdr.msg_type)
return b''.join(bytes(item[1]) for item in sorted(bufs.items()))
else:
return None
def _add_sweep(self, hdr):
if not self.sweeps and not hdr.rad_status & START_VOLUME:
log.warning('Missed start of volume!')
if hdr.rad_status & START_ELEVATION:
self.sweeps.append([])
if len(self.sweeps) != hdr.el_num:
log.warning('Missed elevation -- Have %d but data on %d.'
' Compensating...', len(self.sweeps), hdr.el_num)
while len(self.sweeps) < hdr.el_num:
self.sweeps.append([])
def _check_size(self, msg_hdr, size):
hdr_size = msg_hdr.size_hw * 2 - self.msg_hdr_fmt.size
if size != hdr_size:
log.warning('Message type %d should be %d bytes but got %d',
msg_hdr.msg_type, size, hdr_size)
def reduce_lists(d):
"""Replace single item lists in a dictionary with the single item."""
for field in d:
old_data = d[field]
if len(old_data) == 1:
d[field] = old_data[0]
def two_comp16(val):
"""Return the two's-complement signed representation of a 16-bit unsigned integer."""
if val >> 15:
val = -(~val & 0x7fff) - 1
return val
def float16(val):
"""Convert a 16-bit floating point value to a standard Python float."""
# Fraction is 10 LSB, Exponent middle 5, and Sign the MSB
frac = val & 0x03ff
exp = (val >> 10) & 0x1F
sign = val >> 15
if exp:
value = 2 ** (exp - 16) * (1 + float(frac) / 2**10)
else:
value = float(frac) / 2**9
if sign:
value *= -1
return value
def float32(short1, short2):
"""Unpack a pair of 16-bit integers as a Python float."""
# Masking below in python will properly convert signed values to unsigned
return struct.unpack('>f', struct.pack('>HH', short1 & 0xFFFF, short2 & 0xFFFF))[0]
def date_elem(ind_days, ind_minutes):
"""Create a function to parse a datetime from the product-specific blocks."""
def inner(seq):
return nexrad_to_datetime(seq[ind_days], seq[ind_minutes] * 60 * 1000)
return inner
def scaled_elem(index, scale):
"""Create a function to scale a certain product-specific block."""
def inner(seq):
return seq[index] * scale
return inner
def combine_elem(ind1, ind2):
"""Create a function to combine two specified product-specific blocks into a single int."""
def inner(seq):
shift = 2**16
if seq[ind1] < 0:
seq[ind1] += shift
if seq[ind2] < 0:
seq[ind2] += shift
return (seq[ind1] << 16) | seq[ind2]
return inner
def float_elem(ind1, ind2):
"""Create a function to combine two specified product-specific blocks into a float."""
return lambda seq: float32(seq[ind1], seq[ind2])
def high_byte(ind):
"""Create a function to return the high-byte of a product-specific block."""
def inner(seq):
return seq[ind] >> 8
return inner
def low_byte(ind):
"""Create a function to return the low-byte of a product-specific block."""
def inner(seq):
return seq[ind] & 0x00FF
return inner
def delta_time(ind):
"""Create a function to return the delta time from a product-specific block."""
def inner(seq):
return seq[ind] >> 5
return inner
def supplemental_scan(ind):
"""Create a function to return the supplement scan type from a product-specific block."""
def inner(seq):
# ICD says 1->SAILS, 2->MRLE, but testing on 2020-08-17 makes this seem inverted
# given what's being reported by sites in the GSM.
return {0: 'Non-supplemental scan',
2: 'SAILS scan', 1: 'MRLE scan'}.get(seq[ind] & 0x001F, 'Unknown')
return inner
# Data mappers used to take packed data and turn into physical units
# Default is to use numpy array indexing to use LUT to change data bytes
# into physical values. Can also have a 'labels' attribute to give
# categorical labels
class DataMapper:
"""Convert packed integer data into physical units."""
# Need to find way to handle range folded
# RANGE_FOLD = -9999
RANGE_FOLD = float('nan')
MISSING = float('nan')
def __init__(self, num=256):
self.lut = np.full(num, self.MISSING, dtype=float)
def __call__(self, data):
"""Convert the values."""
return self.lut[data]
class DigitalMapper(DataMapper):
"""Maps packed integers to floats using a scale and offset from the product."""
_min_scale = 0.1
_inc_scale = 0.1
_min_data = 2
_max_data = 255
range_fold = False
def __init__(self, prod):
"""Initialize the mapper and the lookup table."""
super().__init__()
min_val = two_comp16(prod.thresholds[0]) * self._min_scale
inc = prod.thresholds[1] * self._inc_scale
num_levels = prod.thresholds[2]
# Generate lookup table -- sanity check on num_levels handles
# the fact that DHR advertises 256 levels, which *includes*
# missing, differing from other products
num_levels = min(num_levels, self._max_data - self._min_data + 1)
for i in range(num_levels):
self.lut[i + self._min_data] = min_val + i * inc
class DigitalRefMapper(DigitalMapper):
"""Mapper for digital reflectivity products."""
units = 'dBZ'
class DigitalVelMapper(DigitalMapper):
"""Mapper for digital velocity products."""
units = 'm/s'
range_fold = True
class DigitalSPWMapper(DigitalVelMapper):
"""Mapper for digital spectrum width products."""
_min_data = 129
# ICD says up to 152, but also says max value is 19, which implies 129 + 19/0.5 -> 167
_max_data = 167
class PrecipArrayMapper(DigitalMapper):
"""Mapper for precipitation array products."""
_inc_scale = 0.001
_min_data = 1
_max_data = 254
units = 'dBA'
class DigitalStormPrecipMapper(DigitalMapper):
"""Mapper for digital storm precipitation products."""
units = 'inches'
_inc_scale = 0.01
class DigitalVILMapper(DataMapper):
"""Mapper for digital VIL products."""
def __init__(self, prod):
"""Initialize the VIL mapper."""
super().__init__()
lin_scale = float16(prod.thresholds[0])
lin_offset = float16(prod.thresholds[1])
log_start = prod.thresholds[2]
log_scale = float16(prod.thresholds[3])
log_offset = float16(prod.thresholds[4])
# VIL is allowed to use 2 through 254 inclusive. 0 is thresholded,
# 1 is flagged, and 255 is reserved
ind = np.arange(255)
self.lut[2:log_start] = (ind[2:log_start] - lin_offset) / lin_scale
self.lut[log_start:-1] = np.exp((ind[log_start:] - log_offset) / log_scale)
class DigitalEETMapper(DataMapper):
"""Mapper for digital echo tops products."""
def __init__(self, prod):
"""Initialize the mapper."""
super().__init__()
data_mask = prod.thresholds[0]
scale = prod.thresholds[1]
offset = prod.thresholds[2]
topped_mask = prod.thresholds[3]
self.topped_lut = [False] * 256
for i in range(2, 256):
self.lut[i] = ((i & data_mask) - offset) / scale
self.topped_lut[i] = bool(i & topped_mask)
self.topped_lut = np.array(self.topped_lut)
def __call__(self, data_vals):
"""Convert the data values."""
return self.lut[data_vals], self.topped_lut[data_vals]
class GenericDigitalMapper(DataMapper):
"""Maps packed integers to floats using a scale and offset from the product.
Also handles special data flags.
"""
def __init__(self, prod):
"""Initialize the mapper by pulling out all the information from the product."""
# Need to treat this value as unsigned, so we can use the full 16-bit range. This
# is necessary at least for the DPR product, otherwise it has a value of -1.
max_data_val = prod.thresholds[5] & 0xFFFF
# Values will be [0, max] inclusive, so need to add 1 to max value to get proper size.
super().__init__(max_data_val + 1)
scale = float32(prod.thresholds[0], prod.thresholds[1])
offset = float32(prod.thresholds[2], prod.thresholds[3])
leading_flags = prod.thresholds[6]
trailing_flags = prod.thresholds[7]
if leading_flags > 1:
self.lut[1] = self.RANGE_FOLD
# Need to add 1 to the end of the range so that it's inclusive
for i in range(leading_flags, max_data_val - trailing_flags + 1):
self.lut[i] = (i - offset) / scale
class DigitalHMCMapper(DataMapper):
"""Mapper for hydrometeor classification products.
Handles assigning string labels based on values.
"""
labels = ['ND', 'BI', 'GC', 'IC', 'DS', 'WS', 'RA', 'HR',
'BD', 'GR', 'HA', 'LH', 'GH', 'UK', 'RF']
def __init__(self, prod):
"""Initialize the mapper."""
super().__init__()
for i in range(10, 256):
self.lut[i] = i // 10
self.lut[150] = self.RANGE_FOLD
# 156, 157
class EDRMapper(DataMapper):
"""Mapper for eddy dissipation rate products."""
def __init__(self, prod):
"""Initialize the mapper based on the product."""
data_levels = prod.thresholds[2]
super().__init__(data_levels)
scale = prod.thresholds[0] / 1000.
offset = prod.thresholds[1] / 1000.
leading_flags = prod.thresholds[3]
for i in range(leading_flags, data_levels):
self.lut[i] = scale * i + offset
class LegacyMapper(DataMapper):
"""Mapper for legacy products."""
lut_names = ['Blank', 'TH', 'ND', 'RF', 'BI', 'GC', 'IC', 'GR', 'WS',
'DS', 'RA', 'HR', 'BD', 'HA', 'UK']
def __init__(self, prod):
"""Initialize the values and labels from the product."""
# Don't worry about super() since we're using our own lut assembled sequentially
self.labels = []
self.lut = []
for t in prod.thresholds:
codes, val = t >> 8, t & 0xFF
label = ''
if codes >> 7:
label = self.lut_names[val]
if label in ('Blank', 'TH', 'ND'):
val = self.MISSING
elif label == 'RF':
val = self.RANGE_FOLD
elif codes >> 6:
val *= 0.01
label = f'{val:.2f}'
elif codes >> 5:
val *= 0.05
label = f'{val:.2f}'
elif codes >> 4:
val *= 0.1
label = f'{val:.1f}'
if codes & 0x1:
val *= -1
label = '-' + label
elif (codes >> 1) & 0x1:
label = '+' + label
if (codes >> 2) & 0x1:
label = '<' + label
elif (codes >> 3) & 0x1:
label = '>' + label
if not label:
label = str(val)
self.lut.append(val)
self.labels.append(label)
self.lut = np.array(self.lut)
@exporter.export
class Level3File:
r"""Handle reading the wide array of NEXRAD Level 3 (NIDS) product files.
This class attempts to decode every byte that is in a given product file.
It supports all the various compression formats that exist for these
products in the wild.
Attributes
----------
metadata : dict
Various general metadata available from the product
header : `collections.namedtuple`
Decoded product header
prod_desc : `collections.namedtuple`
Decoded product description block
siteID : str
ID of the site found in the header, empty string if none found
lat : float
Radar site latitude
lon : float
Radar site longitude
height : float
Radar site height AMSL
product_name : str
Name of the product contained in file
max_range : float
Maximum kilometer range of the product, taken from the NIDS ICD
map_data : `DataMapper`
Class instance mapping data int values to proper floating point values
sym_block : list, optional
Any symbology block packets that were found
tab_pages : list, optional
Any tabular pages that were found
graph_pages : list, optional
Any graphical pages that were found
Notes
-----
The internal data structure that things are decoded into is still to be
determined.
"""
ij_to_km = 0.25
wmo_finder = re.compile('((?:NX|SD|NO)US)\\d{2}[\\s\\w\\d]+\\w*(\\w{3})\r\r\n')
header_fmt = NamedStruct([('code', 'H'), ('date', 'H'), ('time', 'l'),
('msg_len', 'L'), ('src_id', 'h'), ('dest_id', 'h'),
('num_blks', 'H')], '>', 'MsgHdr')
# See figure 3-17 in 2620001 document for definition of status bit fields
gsm_fmt = NamedStruct([('divider', 'h'), ('block_len', 'H'),
('op_mode', 'h', BitField('Clear Air', 'Precip')),
('rda_op_status', 'h', BitField('Spare', 'Online',
'Maintenance Required',
'Maintenance Mandatory',
'Commanded Shutdown', 'Inoperable',
'Spare', 'Wideband Disconnect')),
('vcp', 'h'), ('num_el', 'h'),
('el1', 'h', scaler(0.1)), ('el2', 'h', scaler(0.1)),
('el3', 'h', scaler(0.1)), ('el4', 'h', scaler(0.1)),
('el5', 'h', scaler(0.1)), ('el6', 'h', scaler(0.1)),
('el7', 'h', scaler(0.1)), ('el8', 'h', scaler(0.1)),
('el9', 'h', scaler(0.1)), ('el10', 'h', scaler(0.1)),
('el11', 'h', scaler(0.1)), ('el12', 'h', scaler(0.1)),
('el13', 'h', scaler(0.1)), ('el14', 'h', scaler(0.1)),
('el15', 'h', scaler(0.1)), ('el16', 'h', scaler(0.1)),
('el17', 'h', scaler(0.1)), ('el18', 'h', scaler(0.1)),
('el19', 'h', scaler(0.1)), ('el20', 'h', scaler(0.1)),
('rda_status', 'h', BitField('Spare', 'Startup', 'Standby',
'Restart', 'Operate',
'Off-line Operate')),
('rda_alarms', 'h', BitField('Indeterminate', 'Tower/Utilities',
'Pedestal', 'Transmitter', 'Receiver',
'RDA Control', 'RDA Communications',
'Signal Processor')),
('tranmission_enable', 'h', BitField('Spare', 'None',
'Reflectivity',
'Velocity', 'Spectrum Width',
'Dual Pol')),
('rpg_op_status', 'h', BitField('Loadshed', 'Online',
'Maintenance Required',
'Maintenance Mandatory',
'Commanded shutdown')),
('rpg_alarms', 'h', BitField('None', 'Node Connectivity',
'Wideband Failure',
'RPG Control Task Failure',
'Data Base Failure', 'Spare',
'RPG Input Buffer Loadshed',
'Spare', 'Product Storage Loadshed'
'Spare', 'Spare', 'Spare',
'RPG/RPG Intercomputer Link Failure',
'Redundant Channel Error',
'Task Failure', 'Media Failure')),
('rpg_status', 'h', BitField('Restart', 'Operate', 'Standby')),
('rpg_narrowband_status', 'h', BitField('Commanded Disconnect',
'Narrowband Loadshed')),
('h_ref_calib', 'h', scaler(0.25)),
('prod_avail', 'h', BitField('Product Availability',
'Degraded Availability',
'Not Available')),
('super_res_cuts', 'h', Bits(16)),
('cmd_status', 'h', Bits(6)),
('v_ref_calib', 'h', scaler(0.25)),
('rda_build', 'h', version), ('rda_channel', 'h'),
('reserved', 'h'), ('reserved2', 'h'),
('build_version', 'h', version)], '>', 'GSM')
# Build 14.0 added more bytes to the GSM
additional_gsm_fmt = NamedStruct([('el21', 'h', scaler(0.1)),
('el22', 'h', scaler(0.1)),
('el23', 'h', scaler(0.1)),
('el24', 'h', scaler(0.1)),
('el25', 'h', scaler(0.1)),
('vcp_supplemental', 'H',
BitField('AVSET', 'SAILS', 'Site VCP', 'RxR Noise',
'CBT', 'VCP Sequence', 'SPRT', 'MRLE',
'Base Tilt', 'MPDA')),
('supplemental_cut_map', 'H', Bits(16)),
('supplemental_cut_count', 'B'),
('supplemental_cut_map2', 'B', Bits(8)),
('spare', '80s')], '>', 'GSM')
prod_desc_fmt = NamedStruct([('divider', 'h'), ('lat', 'l'), ('lon', 'l'),
('height', 'h'), ('prod_code', 'h'),
('op_mode', 'h'), ('vcp', 'h'), ('seq_num', 'h'),
('vol_num', 'h'), ('vol_date', 'h'),
('vol_start_time', 'l'), ('prod_gen_date', 'h'),
('prod_gen_time', 'l'), ('dep1', 'h'),
('dep2', 'h'), ('el_num', 'h'), ('dep3', 'h'),
('thr1', 'h'), ('thr2', 'h'), ('thr3', 'h'),
('thr4', 'h'), ('thr5', 'h'), ('thr6', 'h'),
('thr7', 'h'), ('thr8', 'h'), ('thr9', 'h'),
('thr10', 'h'), ('thr11', 'h'), ('thr12', 'h'),
('thr13', 'h'), ('thr14', 'h'), ('thr15', 'h'),
('thr16', 'h'), ('dep4', 'h'), ('dep5', 'h'),
('dep6', 'h'), ('dep7', 'h'), ('dep8', 'h'),
('dep9', 'h'), ('dep10', 'h'), ('version', 'b'),
('spot_blank', 'b'), ('sym_off', 'L'), ('graph_off', 'L'),
('tab_off', 'L')], '>', 'ProdDesc')
sym_block_fmt = NamedStruct([('divider', 'h'), ('block_id', 'h'),
('block_len', 'L'), ('nlayer', 'H')], '>', 'SymBlock')
tab_header_fmt = NamedStruct([('divider', 'h'), ('block_id', 'h'),
('block_len', 'L')], '>', 'TabHeader')
tab_block_fmt = NamedStruct([('divider', 'h'), ('num_pages', 'h')], '>', 'TabBlock')
sym_layer_fmt = NamedStruct([('divider', 'h'), ('length', 'L')], '>',
'SymLayer')
graph_block_fmt = NamedStruct([('divider', 'h'), ('block_id', 'h'),
('block_len', 'L'), ('num_pages', 'H')], '>', 'GraphBlock')
standalone_tabular = [62, 73, 75, 82]
prod_spec_map = {16: ('Base Reflectivity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
17: ('Base Reflectivity', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
18: ('Base Reflectivity', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
19: ('Base Reflectivity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),
('calib_const', float_elem(7, 8)))),
20: ('Base Reflectivity', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),
('calib_const', float_elem(7, 8)))),
21: ('Base Reflectivity', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
22: ('Base Velocity', 60., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4))),
23: ('Base Velocity', 115., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4))),
24: ('Base Velocity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4))),
25: ('Base Velocity', 60., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4))),
26: ('Base Velocity', 115., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4))),
27: ('Base Velocity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4),
('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)))),
28: ('Base Spectrum Width', 60., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3))),
29: ('Base Spectrum Width', 115., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3))),
30: ('Base Spectrum Width', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)))),
31: ('User Selectable Storm Total Precipitation', 230., LegacyMapper,
(('end_hour', 0),
('hour_span', 1),
('null_product', 2),
('max_rainfall', scaled_elem(3, 0.1)),
('rainfall_begin', date_elem(4, 5)),
('rainfall_end', date_elem(6, 7)),
('bias', scaled_elem(8, 0.01)),
('gr_pairs', scaled_elem(5, 0.01)))),
32: ('Digital Hybrid Scan Reflectivity', 230., DigitalRefMapper,
(('max', 3),
('avg_time', date_elem(4, 5)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
33: ('Hybrid Scan Reflectivity', 230., LegacyMapper,
(('max', 3), ('avg_time', date_elem(4, 5)))),
34: ('Clutter Filter Control', 230., LegacyMapper,
(('clutter_bitmap', 0),
('cmd_map', 1),
('bypass_map_date', date_elem(4, 5)),
('notchwidth_map_date', date_elem(6, 7)))),
35: ('Composite Reflectivity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
36: ('Composite Reflectivity', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
37: ('Composite Reflectivity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
38: ('Composite Reflectivity', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
41: ('Echo Tops', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', scaled_elem(3, 1000)))), # Max in ft
48: ('VAD Wind Profile', None, LegacyMapper,
(('max', 3),
('dir_max', 4),
('alt_max', scaled_elem(5, 10)))), # Max in ft
50: ('Cross Section Reflectivity', 230., LegacyMapper,
(('azimuth1', scaled_elem(0, 0.1)),
('range1', scaled_elem(1, 0.1)),
('azimuth2', scaled_elem(2, 0.1)),
('range2', scaled_elem(3, 0.1)))),
51: ('Cross Section Velocity', 230., LegacyMapper,
(('azimuth1', scaled_elem(0, 0.1)),
('range1', scaled_elem(1, 0.1)),
('azimuth2', scaled_elem(2, 0.1)),
('range2', scaled_elem(3, 0.1)))),
55: ('Storm Relative Mean Radial Velocity', 50., LegacyMapper,
(('window_az', scaled_elem(0, 0.1)),
('window_range', scaled_elem(1, 0.1)),
('el_angle', scaled_elem(2, 0.1)),
('min', 3),
('max', 4),
('source', 5),
('height', 6),
('avg_speed', scaled_elem(7, 0.1)),
('avg_dir', scaled_elem(8, 0.1)),
('alert_category', 9))),
56: ('Storm Relative Mean Radial Velocity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3),
('max', 4),
('source', 5),
('avg_speed', scaled_elem(7, 0.1)),
('avg_dir', scaled_elem(8, 0.1)))),
57: ('Vertically Integrated Liquid', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3))), # Max in kg / m^2
58: ('Storm Tracking Information', 460., LegacyMapper,
(('num_storms', 3),)),
59: ('Hail Index', 230., LegacyMapper, ()),
61: ('Tornado Vortex Signature', 230., LegacyMapper,
(('num_tvs', 3), ('num_etvs', 4))),
62: ('Storm Structure', 460., LegacyMapper, ()),
63: ('Layer Composite Reflectivity (Layer 1 Average)', 230., LegacyMapper,
(('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
64: ('Layer Composite Reflectivity (Layer 2 Average)', 230., LegacyMapper,
(('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
65: ('Layer Composite Reflectivity (Layer 1 Max)', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
66: ('Layer Composite Reflectivity (Layer 2 Max)', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
67: ('Layer Composite Reflectivity - AP Removed', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
74: ('Radar Coded Message', 460., LegacyMapper, ()),
78: ('Surface Rainfall Accumulation (1 hour)', 230., LegacyMapper,
(('max_rainfall', scaled_elem(3, 0.1)),
('bias', scaled_elem(4, 0.01)),
('gr_pairs', scaled_elem(5, 0.01)),
('rainfall_end', date_elem(6, 7)))),
79: ('Surface Rainfall Accumulation (3 hour)', 230., LegacyMapper,
(('max_rainfall', scaled_elem(3, 0.1)),
('bias', scaled_elem(4, 0.01)),
('gr_pairs', scaled_elem(5, 0.01)),
('rainfall_end', date_elem(6, 7)))),
80: ('Storm Total Rainfall Accumulation', 230., LegacyMapper,
(('max_rainfall', scaled_elem(3, 0.1)),
('rainfall_begin', date_elem(4, 5)),
('rainfall_end', date_elem(6, 7)),
('bias', scaled_elem(8, 0.01)),
('gr_pairs', scaled_elem(9, 0.01)))),
81: ('Hourly Digital Precipitation Array', 230., PrecipArrayMapper,
(('max_rainfall', scaled_elem(3, 0.001)),
('bias', scaled_elem(4, 0.01)),
('gr_pairs', scaled_elem(5, 0.01)),
('rainfall_end', date_elem(6, 7)))),
82: ('Supplemental Precipitation Data', None, LegacyMapper, ()),
89: ('Layer Composite Reflectivity (Layer 3 Average)', 230., LegacyMapper,
(('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
90: ('Layer Composite Reflectivity (Layer 3 Max)', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('layer_bottom', scaled_elem(4, 1000.)),
('layer_top', scaled_elem(5, 1000.)),
('calib_const', float_elem(7, 8)))),
93: ('ITWS Digital Base Velocity', 115., DigitalVelMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3),
('max', 4), ('precision', 6))),
94: ('Base Reflectivity Data Array', 460., DigitalRefMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
95: ('Composite Reflectivity Edited for AP', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
96: ('Composite Reflectivity Edited for AP', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
97: ('Composite Reflectivity Edited for AP', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
98: ('Composite Reflectivity Edited for AP', 460., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('calib_const', float_elem(7, 8)))),
99: ('Base Velocity Data Array', 300., DigitalVelMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3),
('max', 4),
('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
113: ('Power Removed Control', 300., LegacyMapper,
(('rpg_cut_num', 0), ('cmd_generated', 1),
('el_angle', scaled_elem(2, 0.1)),
('clutter_filter_map_dt', date_elem(4, 3)),
# While the 2620001Y ICD doesn't talk about using these
# product-specific blocks for this product, they have data in them
# and the compression info is necessary for proper decoding.
('compression', 7), ('uncompressed_size', combine_elem(8, 9)))),
132: ('Clutter Likelihood Reflectivity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),)),
133: ('Clutter Likelihood Doppler', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),)),
134: ('High Resolution VIL', 460., DigitalVILMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('num_edited', 4),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
135: ('Enhanced Echo Tops', 345., DigitalEETMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', scaled_elem(3, 1000.)), # Max in ft
('num_edited', 4),
('ref_thresh', 5),
('points_removed', 6),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
138: ('Digital Storm Total Precipitation', 230., DigitalStormPrecipMapper,
(('rainfall_begin', date_elem(0, 1)),
('bias', scaled_elem(2, 0.01)),
('max', scaled_elem(3, 0.01)),
('rainfall_end', date_elem(4, 5)),
('gr_pairs', scaled_elem(6, 0.01)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
141: ('Mesocyclone Detection', 230., LegacyMapper,
(('min_ref_thresh', 0),
('overlap_display_filter', 1),
('min_strength_rank', 2))),
152: ('Archive III Status Product', None, LegacyMapper,
(('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
153: ('Super Resolution Reflectivity Data Array', 460., DigitalRefMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
154: ('Super Resolution Velocity Data Array', 300., DigitalVelMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
155: ('Super Resolution Spectrum Width Data Array', 300.,
DigitalSPWMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
156: ('Turbulence Detection (Eddy Dissipation Rate)', 230., EDRMapper,
(('el_start_time', 0),
('el_end_time', 1),
('el_angle', scaled_elem(2, 0.1)),
('min_el', scaled_elem(3, 0.01)),
('mean_el', scaled_elem(4, 0.01)),
('max_el', scaled_elem(5, 0.01)))),
157: ('Turbulence Detection (Eddy Dissipation Rate Confidence)', 230.,
EDRMapper,
(('el_start_time', 0),
('el_end_time', 1),
('el_angle', scaled_elem(2, 0.1)),
('min_el', scaled_elem(3, 0.01)),
('mean_el', scaled_elem(4, 0.01)),
('max_el', scaled_elem(5, 0.01)))),
158: ('Differential Reflectivity', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.1)),
('max', scaled_elem(4, 0.1)))),
159: ('Digital Differential Reflectivity', 300., GenericDigitalMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.1)),
('max', scaled_elem(4, 0.1)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
160: ('Correlation Coefficient', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.00333)),
('max', scaled_elem(4, 0.00333)))),
161: ('Digital Correlation Coefficient', 300., GenericDigitalMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.00333)),
('max', scaled_elem(4, 0.00333)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
162: ('Specific Differential Phase', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.05)),
('max', scaled_elem(4, 0.05)))),
163: ('Digital Specific Differential Phase', 300., GenericDigitalMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.05)),
('max', scaled_elem(4, 0.05)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
164: ('Hydrometeor Classification', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),)),
165: ('Digital Hydrometeor Classification', 300., DigitalHMCMapper,
(('el_angle', scaled_elem(2, 0.1)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
166: ('Melting Layer', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)),)),
167: ('Super Res Digital Correlation Coefficient', 300.,
GenericDigitalMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', scaled_elem(3, 0.00333)),
('max', scaled_elem(4, 0.00333)), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
168: ('Super Res Digital Phi', 300., GenericDigitalMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3), ('max', 4), ('delta_time', delta_time(6)),
('supplemental_scan', supplemental_scan(6)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
169: ('One Hour Accumulation', 230., LegacyMapper,
(('null_product', low_byte(2)),
('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 5)),
('bias', scaled_elem(6, 0.01)),
('gr_pairs', scaled_elem(7, 0.01)))),
170: ('Digital Accumulation Array', 230., GenericDigitalMapper,
(('null_product', low_byte(2)),
('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 5)),
('bias', scaled_elem(6, 0.01)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
171: ('Storm Total Accumulation', 230., LegacyMapper,
(('rainfall_begin', date_elem(0, 1)),
('null_product', low_byte(2)),
('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 5)),
('bias', scaled_elem(6, 0.01)),
('gr_pairs', scaled_elem(7, 0.01)))),
172: ('Digital Storm Total Accumulation', 230., GenericDigitalMapper,
(('rainfall_begin', date_elem(0, 1)),
('null_product', low_byte(2)),
('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 5)),
('bias', scaled_elem(6, 0.01)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
173: ('Digital User-Selectable Accumulation', 230., GenericDigitalMapper,
(('period', 1),
('missing_period', high_byte(2)),
('null_product', low_byte(2)),
('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 0)),
('start_time', 5),
('bias', scaled_elem(6, 0.01)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
174: ('Digital One-Hour Difference Accumulation', 230.,
GenericDigitalMapper,
(('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 5)),
('min', scaled_elem(6, 0.1)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
175: ('Digital Storm Total Difference Accumulation', 230.,
GenericDigitalMapper,
(('rainfall_begin', date_elem(0, 1)),
('null_product', low_byte(2)),
('max', scaled_elem(3, 0.1)),
('rainfall_end', date_elem(4, 5)),
('min', scaled_elem(6, 0.1)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
176: ('Digital Instantaneous Precipitation Rate', 230.,
GenericDigitalMapper,
(('rainfall_begin', date_elem(0, 1)),
('precip_detected', high_byte(2)),
('need_bias', low_byte(2)),
('max', 3),
('percent_filled', scaled_elem(4, 0.01)),
('max_elev', scaled_elem(5, 0.1)),
('bias', scaled_elem(6, 0.01)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
177: ('Hybrid Hydrometeor Classification', 230., DigitalHMCMapper,
(('mode_filter_size', 3),
('hybrid_percent_filled', 4),
('max_elev', scaled_elem(5, 0.1)),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
180: ('TDWR Base Reflectivity', 90., DigitalRefMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
181: ('TDWR Base Reflectivity', 90., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3))),
182: ('TDWR Base Velocity', 90., DigitalVelMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3),
('max', 4),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
183: ('TDWR Base Velocity', 90., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('min', 3),
('max', 4))),
185: ('TDWR Base Spectrum Width', 90., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3))),
186: ('TDWR Long Range Base Reflectivity', 416., DigitalRefMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3),
('compression', 7),
('uncompressed_size', combine_elem(8, 9)))),
187: ('TDWR Long Range Base Reflectivity', 416., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)),
('max', 3)))}
[docs] def __init__(self, filename):
r"""Create instance of `Level3File`.
Parameters
----------
filename : str or file-like object
If str, the name of the file to be opened. If file-like object,
this will be read from directly.
"""
fobj = open_as_needed(filename)
if isinstance(filename, str):
self.filename = filename
elif isinstance(filename, pathlib.Path):
self.filename = str(filename)
else:
self.filename = 'No File'
# Just read in the entire set of data at once
with contextlib.closing(fobj):
self._buffer = IOBuffer.fromfile(fobj)
# Pop off the WMO header if we find it
self._process_wmo_header()
# Pop off last 4 bytes if necessary
self._process_end_bytes()
# Set up places to store data and metadata
self.metadata = {}
# Handle free text message products that are pure text
if self.wmo_code == 'NOUS':
self.header = None
self.prod_desc = None
self.thresholds = None
self.depVals = None
self.product_name = 'Free Text Message'
self.text = ''.join(self._buffer.read_ascii())
return
# Decompress the data if necessary, and if so, pop off new header
self._buffer = IOBuffer(self._buffer.read_func(zlib_decompress_all_frames))
self._process_wmo_header()
# Check for empty product
if len(self._buffer) == 0:
log.warning('%s: Empty product!', self.filename)
return
# Unpack the message header and the product description block
msg_start = self._buffer.set_mark()
self.header = self._buffer.read_struct(self.header_fmt)
log.debug('Buffer size: %d (%d expected) Header: %s', len(self._buffer),
self.header.msg_len, self.header)
if not self._buffer.check_remains(self.header.msg_len - self.header_fmt.size):
log.warning('Product contains an unexpected amount of data remaining--have: %d '
'expected: %d. This product may not parse correctly.',
len(self._buffer) - self._buffer._offset,
self.header.msg_len - self.header_fmt.size)
# Handle GSM and jump out
if self.header.code == 2:
self.gsm = self._buffer.read_struct(self.gsm_fmt)
self.product_name = 'General Status Message'
assert self.gsm.divider == -1
if self.gsm.block_len > 82:
# Due to the way the structures read it in, one bit from the count needs
# to be popped off and added as the supplemental cut status for the 25th
# elevation cut.
more = self._buffer.read_struct(self.additional_gsm_fmt)
cut_count = more.supplemental_cut_count
more.supplemental_cut_map2.append(bool(cut_count & 0x1))
self.gsm_additional = more._replace(supplemental_cut_count=cut_count >> 1)
assert self.gsm.block_len == 178
else:
assert self.gsm.block_len == 82
return
self.prod_desc = self._buffer.read_struct(self.prod_desc_fmt)
log.debug('Product description block: %s', self.prod_desc)
# Convert thresholds and dependent values to lists of values
self.thresholds = [getattr(self.prod_desc, f'thr{i}') for i in range(1, 17)]
self.depVals = [getattr(self.prod_desc, f'dep{i}') for i in range(1, 11)]
# Set up some time/location metadata
self.metadata['msg_time'] = nexrad_to_datetime(self.header.date,
self.header.time * 1000)
self.metadata['vol_time'] = nexrad_to_datetime(self.prod_desc.vol_date,
self.prod_desc.vol_start_time * 1000)
self.metadata['prod_time'] = nexrad_to_datetime(self.prod_desc.prod_gen_date,
self.prod_desc.prod_gen_time * 1000)
self.lat = self.prod_desc.lat * 0.001
self.lon = self.prod_desc.lon * 0.001
self.height = self.prod_desc.height
# Handle product-specific blocks. Default to compression and elevation angle
# Also get other product specific information, like name,
# maximum range, and how to map data bytes to values
default = ('Unknown Product', 230., LegacyMapper,
(('el_angle', scaled_elem(2, 0.1)), ('compression', 7),
('uncompressed_size', combine_elem(8, 9)), ('defaultVals', 0)))
self.product_name, self.max_range, mapper, meta = self.prod_spec_map.get(
self.header.code, default)
log.debug('Product info--name: %s max_range: %f mapper: %s metadata: %s',
self.product_name, self.max_range, mapper, meta)
for name, block in meta:
if callable(block):
self.metadata[name] = block(self.depVals)
else:
self.metadata[name] = self.depVals[block]
# Now that we have the header, we have everything needed to make tables
# Store as class that can be called
self.map_data = mapper(self)
# Process compression if indicated. We need to fail
# gracefully here since we default to it being on
if self.metadata.get('compression', False):
try:
comp_start = self._buffer.set_mark()
decomp_data = self._buffer.read_func(bz2.decompress)
self._buffer.splice(comp_start, decomp_data)
assert self._buffer.check_remains(self.metadata['uncompressed_size'])
except OSError:
# Compression didn't work, so we just assume it wasn't actually compressed.
pass
# Unpack the various blocks, if present. The factor of 2 converts from
# 'half-words' to bytes
# Check to see if this is one of the "special" products that uses
# header-free blocks and re-assigns the offsets
if self.header.code in self.standalone_tabular:
if self.prod_desc.sym_off:
# For standalone tabular alphanumeric, symbology offset is
# actually tabular
self._unpack_tabblock(msg_start, 2 * self.prod_desc.sym_off, False)
if self.prod_desc.graph_off:
# Offset seems to be off by 1 from where we're counting, but
# it's not clear why.
self._unpack_standalone_graphblock(msg_start,
2 * (self.prod_desc.graph_off - 1))
# Need special handling for (old) radar coded message format
elif self.header.code == 74:
self._unpack_rcm(msg_start, 2 * self.prod_desc.sym_off)
else:
if self.prod_desc.sym_off:
self._unpack_symblock(msg_start, 2 * self.prod_desc.sym_off)
if self.prod_desc.graph_off:
self._unpack_graphblock(msg_start, 2 * self.prod_desc.graph_off)
if self.prod_desc.tab_off:
self._unpack_tabblock(msg_start, 2 * self.prod_desc.tab_off)
if 'defaultVals' in self.metadata:
log.warning('%s: Using default metadata for product %d',
self.filename, self.header.code)
def _process_wmo_header(self):
# Read off the WMO header if necessary
data = self._buffer.get_next(64).decode('ascii', 'ignore')
match = self.wmo_finder.search(data)
log.debug('WMO Header: %s', match)
if match:
self.wmo_code = match.groups()[0]
self.siteID = match.groups()[-1]
self._buffer.skip(match.end())
else:
self.wmo_code = ''
def _process_end_bytes(self):
check_bytes = self._buffer[-4:-1]
log.debug('End Bytes: %s', check_bytes)
if check_bytes in (b'\r\r\n', b'\xff\xff\n'):
self._buffer.truncate(4)
@staticmethod
def _unpack_rle_data(data):
# Unpack Run-length encoded data
unpacked = []
for run in data:
num, val = run >> 4, run & 0x0F
unpacked.extend([val] * num)
return unpacked
[docs] @staticmethod
def pos_scale(is_sym_block):
"""Scale of the position information in km."""
return 0.25 if is_sym_block else 1
def _unpack_rcm(self, start, offset):
self._buffer.jump_to(start, offset)
header = self._buffer.read_ascii(10)
assert header == '1234 ROBUU'
text_data = self._buffer.read_ascii()
end = 0
# Appendix B of ICD tells how to interpret this stuff, but that just
# doesn't seem worth it.
for marker, name in [('AA', 'ref'), ('BB', 'vad'), ('CC', 'remarks')]:
start = text_data.find('/NEXR' + marker, end)
# For part C the search for end fails, but returns -1, which works
end = text_data.find('/END' + marker, start)
setattr(self, 'rcm_' + name, text_data[start:end])
def _unpack_symblock(self, start, offset):
self._buffer.jump_to(start, offset)
blk = self._buffer.read_struct(self.sym_block_fmt)
log.debug('Symbology block info: %s', blk)
self.sym_block = []
assert blk.divider == -1, ('Bad divider for symbology block: {:d} should be -1'
.format(blk.divider))
assert blk.block_id == 1, ('Bad block ID for symbology block: {:d} should be 1'
.format(blk.block_id))
for _ in range(blk.nlayer):
layer_hdr = self._buffer.read_struct(self.sym_layer_fmt)
assert layer_hdr.divider == -1
layer = []
self.sym_block.append(layer)
layer_start = self._buffer.set_mark()
while self._buffer.offset_from(layer_start) < layer_hdr.length:
packet_code = self._buffer.read_int(2, 'big', signed=False)
log.debug('Symbology packet: %d', packet_code)
if packet_code in self.packet_map:
layer.append(self.packet_map[packet_code](self, packet_code, True))
else:
log.warning('%s: Unknown symbology packet type %d/%x.',
self.filename, packet_code, packet_code)
self._buffer.jump_to(layer_start, layer_hdr.length)
assert self._buffer.offset_from(layer_start) == layer_hdr.length
def _unpack_graphblock(self, start, offset):
self._buffer.jump_to(start, offset)
hdr = self._buffer.read_struct(self.graph_block_fmt)
assert hdr.divider == -1, ('Bad divider for graphical block: {:d} should be -1'
.format(hdr.divider))
assert hdr.block_id == 2, ('Bad block ID for graphical block: {:d} should be 1'
.format(hdr.block_id))
self.graph_pages = []
for page in range(hdr.num_pages):
page_num = self._buffer.read_int(2, 'big', signed=False)
assert page + 1 == page_num
page_size = self._buffer.read_int(2, 'big', signed=False)
page_start = self._buffer.set_mark()
packets = []
while self._buffer.offset_from(page_start) < page_size:
packet_code = self._buffer.read_int(2, 'big', signed=False)
if packet_code in self.packet_map:
packets.append(self.packet_map[packet_code](self, packet_code, False))
else:
log.warning('%s: Unknown graphical packet type %d/%x.',
self.filename, packet_code, packet_code)
self._buffer.skip(page_size)
self.graph_pages.append(packets)
def _unpack_standalone_graphblock(self, start, offset):
self._buffer.jump_to(start, offset)
packets = []
while not self._buffer.at_end():
packet_code = self._buffer.read_int(2, 'big', signed=False)
if packet_code in self.packet_map:
packets.append(self.packet_map[packet_code](self, packet_code, False))
else:
log.warning('%s: Unknown standalone graphical packet type %d/%x.',
self.filename, packet_code, packet_code)
# Assume next 2 bytes is packet length and try skipping
num_bytes = self._buffer.read_int(2, 'big', signed=False)
self._buffer.skip(num_bytes)
self.graph_pages = [packets]
def _unpack_tabblock(self, start, offset, have_header=True):
self._buffer.jump_to(start, offset)
block_start = self._buffer.set_mark()
# Read the header and validate if needed
if have_header:
header = self._buffer.read_struct(self.tab_header_fmt)
assert header.divider == -1
assert header.block_id == 3
# Read off secondary message and product description blocks,
# but as far as I can tell, all we really need is the text that follows
self._buffer.read_struct(self.header_fmt)
self._buffer.read_struct(self.prod_desc_fmt)
# Get the start of the block with number of pages and divider
blk = self._buffer.read_struct(self.tab_block_fmt)
assert blk.divider == -1
# Read the pages line by line, break pages on a -1 character count
self.tab_pages = []
for _ in range(blk.num_pages):
lines = []
num_chars = self._buffer.read_int(2, 'big', signed=True)
while num_chars != -1:
lines.append(''.join(self._buffer.read_ascii(num_chars)))
num_chars = self._buffer.read_int(2, 'big', signed=True)
self.tab_pages.append('\n'.join(lines))
if have_header:
assert self._buffer.offset_from(block_start) == header.block_len
def __repr__(self):
"""Return the string representation of the product."""
attrs = ('product_name', 'header', 'prod_desc', 'thresholds', 'depVals', 'metadata',
'gsm', 'gsm_additional', 'siteID')
blocks = [str(getattr(self, name)) for name in attrs if hasattr(self, name)]
return self.filename + ': ' + '\n'.join(blocks)
def _unpack_packet_radial_data(self, code, in_sym_block):
hdr_fmt = NamedStruct([('ind_first_bin', 'H'), ('nbins', 'H'),
('i_center', 'h'), ('j_center', 'h'),
('scale_factor', 'h'), ('num_rad', 'H')],
'>', 'RadialHeader')
rad_fmt = NamedStruct([('num_hwords', 'H'), ('start_angle', 'h'),
('angle_delta', 'h')], '>', 'RadialData')
hdr = self._buffer.read_struct(hdr_fmt)
rads = []
for _ in range(hdr.num_rad):
rad = self._buffer.read_struct(rad_fmt)
start_az = rad.start_angle * 0.1
end_az = start_az + rad.angle_delta * 0.1
rads.append((start_az, end_az,
self._unpack_rle_data(
self._buffer.read_binary(2 * rad.num_hwords))))
start, end, vals = zip(*rads)
return {'start_az': list(start), 'end_az': list(end), 'data': list(vals),
'center': (hdr.i_center * self.pos_scale(in_sym_block),
hdr.j_center * self.pos_scale(in_sym_block)),
'gate_scale': hdr.scale_factor * 0.001, 'first': hdr.ind_first_bin}
digital_radial_hdr_fmt = NamedStruct([('ind_first_bin', 'H'), ('nbins', 'H'),
('i_center', 'h'), ('j_center', 'h'),
('scale_factor', 'h'), ('num_rad', 'H')],
'>', 'DigitalRadialHeader')
digital_radial_fmt = NamedStruct([('num_bytes', 'H'), ('start_angle', 'h'),
('angle_delta', 'h')], '>', 'DigitalRadialData')
def _unpack_packet_digital_radial(self, code, in_sym_block):
hdr = self._buffer.read_struct(self.digital_radial_hdr_fmt)
rads = []
for _ in range(hdr.num_rad):
rad = self._buffer.read_struct(self.digital_radial_fmt)
start_az = rad.start_angle * 0.1
end_az = start_az + rad.angle_delta * 0.1
rads.append((start_az, end_az, self._buffer.read_binary(rad.num_bytes)))
start, end, vals = zip(*rads)
return {'start_az': list(start), 'end_az': list(end), 'data': list(vals),
'center': (hdr.i_center * self.pos_scale(in_sym_block),
hdr.j_center * self.pos_scale(in_sym_block)),
'gate_scale': hdr.scale_factor * 0.001, 'first': hdr.ind_first_bin}
def _unpack_packet_raster_data(self, code, in_sym_block):
hdr_fmt = NamedStruct([('code', 'L'),
('i_start', 'h'), ('j_start', 'h'), # start in km/4
('xscale_int', 'h'), ('xscale_frac', 'h'),
('yscale_int', 'h'), ('yscale_frac', 'h'),
('num_rows', 'h'), ('packing', 'h')], '>', 'RasterData')
hdr = self._buffer.read_struct(hdr_fmt)
assert hdr.code == 0x800000C0
assert hdr.packing == 2
rows = []
for _ in range(hdr.num_rows):
num_bytes = self._buffer.read_int(2, 'big', signed=False)
rows.append(self._unpack_rle_data(self._buffer.read_binary(num_bytes)))
return {'start_x': hdr.i_start * hdr.xscale_int,
'start_y': hdr.j_start * hdr.yscale_int, 'data': rows}
def _unpack_packet_uniform_text(self, code, in_sym_block):
# By not using a struct, we can handle multiple codes
num_bytes = self._buffer.read_int(2, 'big', signed=False)
if code == 8:
value = self._buffer.read_int(2, 'big', signed=False)
read_bytes = 6
else:
value = None
read_bytes = 4
i_start = self._buffer.read_int(2, 'big', signed=True)
j_start = self._buffer.read_int(2, 'big', signed=True)
# Text is what remains beyond what's been read, not including byte count
text = ''.join(self._buffer.read_ascii(num_bytes - read_bytes))
return {'x': i_start * self.pos_scale(in_sym_block),
'y': j_start * self.pos_scale(in_sym_block), 'color': value, 'text': text}
def _unpack_packet_special_text_symbol(self, code, in_sym_block):
d = self._unpack_packet_uniform_text(code, in_sym_block)
# Translate special characters to their meaning
ret = {}
symbol_map = {'!': 'past storm position', '"': 'current storm position',
'#': 'forecast storm position', '$': 'past MDA position',
'%': 'forecast MDA position', ' ': None}
# Use this meaning as the key in the returned packet
for c in d['text']:
if c not in symbol_map:
log.warning('%s: Unknown special symbol %d/%x.', self.filename, c, ord(c))
else:
key = symbol_map[c]
if key:
ret[key] = d['x'], d['y']
del d['text']
return ret
def _unpack_packet_special_graphic_symbol(self, code, in_sym_block):
type_map = {3: 'Mesocyclone', 11: '3D Correlated Shear', 12: 'TVS',
26: 'ETVS', 13: 'Positive Hail', 14: 'Probable Hail',
15: 'Storm ID', 19: 'HDA', 25: 'STI Circle'}
point_feature_map = {1: 'Mesocyclone (ext.)', 3: 'Mesocyclone',
5: 'TVS (Ext.)', 6: 'ETVS (Ext.)', 7: 'TVS',
8: 'ETVS', 9: 'MDA', 10: 'MDA (Elev.)', 11: 'MDA (Weak)'}
# Read the number of bytes and set a mark for sanity checking
num_bytes = self._buffer.read_int(2, 'big', signed=False)
packet_data_start = self._buffer.set_mark()
scale = self.pos_scale(in_sym_block)
# Loop over the bytes we have
ret = defaultdict(list)
while self._buffer.offset_from(packet_data_start) < num_bytes:
# Read position
ret['x'].append(self._buffer.read_int(2, 'big', signed=True) * scale)
ret['y'].append(self._buffer.read_int(2, 'big', signed=True) * scale)
# Handle any types that have additional info
if code in (3, 11, 25):
ret['radius'].append(self._buffer.read_int(2, 'big', signed=True) * scale)
elif code == 15:
ret['id'].append(''.join(self._buffer.read_ascii(2)))
elif code == 19:
ret['POH'].append(self._buffer.read_int(2, 'big', signed=True))
ret['POSH'].append(self._buffer.read_int(2, 'big', signed=True))
ret['Max Size'].append(self._buffer.read_int(2, 'big', signed=False))
elif code == 20:
kind = self._buffer.read_int(2, 'big', signed=False)
attr = self._buffer.read_int(2, 'big', signed=False)
if kind < 5 or kind > 8:
ret['radius'].append(attr * scale)
if kind not in point_feature_map:
log.warning('%s: Unknown graphic symbol point kind %d/%x.',
self.filename, kind, kind)
ret['type'].append(f'Unknown ({kind:d})')
else:
ret['type'].append(point_feature_map[kind])
# Map the code to a name for this type of symbol
if code != 20:
if code not in type_map:
log.warning('%s: Unknown graphic symbol type %d/%x.',
self.filename, code, code)
ret['type'] = 'Unknown'
else:
ret['type'] = type_map[code]
# Check and return
assert self._buffer.offset_from(packet_data_start) == num_bytes
# Reduce dimensions of lists if possible
reduce_lists(ret)
return ret
def _unpack_packet_scit(self, code, in_sym_block):
num_bytes = self._buffer.read_int(2, 'big', signed=False)
packet_data_start = self._buffer.set_mark()
ret = defaultdict(list)
while self._buffer.offset_from(packet_data_start) < num_bytes:
next_code = self._buffer.read_int(2, 'big', signed=False)
if next_code not in self.packet_map:
log.warning('%s: Unknown packet in SCIT %d/%x.',
self.filename, next_code, next_code)
self._buffer.jump_to(packet_data_start, num_bytes)
return ret
else:
next_packet = self.packet_map[next_code](self, next_code, in_sym_block)
if next_code == 6:
ret['track'].append(next_packet['vectors'])
elif next_code == 25:
ret['STI Circle'].append(next_packet)
elif next_code == 2:
ret['markers'].append(next_packet)
else:
log.warning('%s: Unsupported packet in SCIT %d/%x.',
self.filename, next_code, next_code)
ret['data'].append(next_packet)
reduce_lists(ret)
return ret
def _unpack_packet_digital_precipitation(self, code, in_sym_block):
# Read off a couple of unused spares
self._buffer.read_int(2, 'big', signed=False)
self._buffer.read_int(2, 'big', signed=False)
# Get the size of the grid
lfm_boxes = self._buffer.read_int(2, 'big', signed=False)
num_rows = self._buffer.read_int(2, 'big', signed=False)
rows = []
# Read off each row and decode the RLE data
for _ in range(num_rows):
row_num_bytes = self._buffer.read_int(2, 'big', signed=False)
row_bytes = self._buffer.read_binary(row_num_bytes)
if code == 18:
row = self._unpack_rle_data(row_bytes)
else:
row = []
for run, level in zip(row_bytes[::2], row_bytes[1::2]):
row.extend([level] * run)
assert len(row) == lfm_boxes
rows.append(row)
return {'data': rows}
def _unpack_packet_linked_vector(self, code, in_sym_block):
num_bytes = self._buffer.read_int(2, 'big', signed=True)
if code == 9:
value = self._buffer.read_int(2, 'big', signed=True)
num_bytes -= 2
else:
value = None
scale = self.pos_scale(in_sym_block)
pos = [b * scale for b in self._buffer.read_binary(num_bytes / 2, '>h')]
vectors = list(zip(pos[::2], pos[1::2]))
return {'vectors': vectors, 'color': value}
def _unpack_packet_vector(self, code, in_sym_block):
num_bytes = self._buffer.read_int(2, 'big', signed=True)
if code == 10:
value = self._buffer.read_int(2, 'big', signed=True)
num_bytes -= 2
else:
value = None
scale = self.pos_scale(in_sym_block)
pos = [p * scale for p in self._buffer.read_binary(num_bytes / 2, '>h')]
vectors = list(zip(pos[::4], pos[1::4], pos[2::4], pos[3::4]))
return {'vectors': vectors, 'color': value}
def _unpack_packet_contour_color(self, code, in_sym_block):
# Check for color value indicator
assert self._buffer.read_int(2, 'big', signed=False) == 0x0002
# Read and return value (level) of contour
return {'color': self._buffer.read_int(2, 'big', signed=False)}
def _unpack_packet_linked_contour(self, code, in_sym_block):
# Check for initial point indicator
assert self._buffer.read_int(2, 'big', signed=False) == 0x8000
scale = self.pos_scale(in_sym_block)
startx = self._buffer.read_int(2, 'big', signed=True) * scale
starty = self._buffer.read_int(2, 'big', signed=True) * scale
vectors = [(startx, starty)]
num_bytes = self._buffer.read_int(2, 'big', signed=False)
pos = [b * scale for b in self._buffer.read_binary(num_bytes / 2, '>h')]
vectors.extend(zip(pos[::2], pos[1::2]))
return {'vectors': vectors}
def _unpack_packet_wind_barbs(self, code, in_sym_block):
# Figure out how much to read
num_bytes = self._buffer.read_int(2, 'big', signed=True)
packet_data_start = self._buffer.set_mark()
ret = defaultdict(list)
# Read while we have data, then return
while self._buffer.offset_from(packet_data_start) < num_bytes:
ret['color'].append(self._buffer.read_int(2, 'big', signed=True))
ret['x'].append(self._buffer.read_int(2, 'big', signed=True)
* self.pos_scale(in_sym_block))
ret['y'].append(self._buffer.read_int(2, 'big', signed=True)
* self.pos_scale(in_sym_block))
ret['direc'].append(self._buffer.read_int(2, 'big', signed=True))
ret['speed'].append(self._buffer.read_int(2, 'big', signed=True))
return ret
def _unpack_packet_generic(self, code, in_sym_block):
# Reserved HW
assert self._buffer.read_int(2, 'big', signed=True) == 0
# Read number of bytes (2 HW) and return
num_bytes = self._buffer.read_int(4, 'big', signed=True)
hunk = self._buffer.read(num_bytes)
xdrparser = Level3XDRParser(hunk)
return xdrparser(code)
def _unpack_packet_trend_times(self, code, in_sym_block):
self._buffer.read_int(2, 'big', signed=True) # number of bytes, not needed to process
return {'times': self._read_trends()}
def _unpack_packet_cell_trend(self, code, in_sym_block):
code_map = ['Cell Top', 'Cell Base', 'Max Reflectivity Height',
'Probability of Hail', 'Probability of Severe Hail',
'Cell-based VIL', 'Maximum Reflectivity',
'Centroid Height']
code_scales = [100, 100, 100, 1, 1, 1, 1, 100]
num_bytes = self._buffer.read_int(2, 'big', signed=True)
packet_data_start = self._buffer.set_mark()
cell_id = ''.join(self._buffer.read_ascii(2))
x = self._buffer.read_int(2, 'big', signed=True) * self.pos_scale(in_sym_block)
y = self._buffer.read_int(2, 'big', signed=True) * self.pos_scale(in_sym_block)
ret = {'id': cell_id, 'x': x, 'y': y}
while self._buffer.offset_from(packet_data_start) < num_bytes:
code = self._buffer.read_int(2, 'big', signed=True)
try:
ind = code - 1
key = code_map[ind]
scale = code_scales[ind]
except IndexError:
log.warning('%s: Unsupported trend code %d/%x.', self.filename, code, code)
key = 'Unknown'
scale = 1
vals = self._read_trends()
if code in (1, 2):
ret[f'{key} Limited'] = [v > 700 for v in vals]
vals = [v - 1000 if v > 700 else v for v in vals]
ret[key] = [v * scale for v in vals]
return ret
def _read_trends(self):
num_vols, latest = self._buffer.read(2)
vals = [self._buffer.read_int(2, 'big', signed=True) for _ in range(num_vols)]
# Wrap the circular buffer so that latest is last
return vals[latest:] + vals[:latest]
packet_map = {1: _unpack_packet_uniform_text,
2: _unpack_packet_special_text_symbol,
3: _unpack_packet_special_graphic_symbol,
4: _unpack_packet_wind_barbs,
6: _unpack_packet_linked_vector,
8: _unpack_packet_uniform_text,
# 9: _unpack_packet_linked_vector,
10: _unpack_packet_vector,
11: _unpack_packet_special_graphic_symbol,
12: _unpack_packet_special_graphic_symbol,
13: _unpack_packet_special_graphic_symbol,
14: _unpack_packet_special_graphic_symbol,
15: _unpack_packet_special_graphic_symbol,
16: _unpack_packet_digital_radial,
17: _unpack_packet_digital_precipitation,
18: _unpack_packet_digital_precipitation,
19: _unpack_packet_special_graphic_symbol,
20: _unpack_packet_special_graphic_symbol,
21: _unpack_packet_cell_trend,
22: _unpack_packet_trend_times,
23: _unpack_packet_scit,
24: _unpack_packet_scit,
25: _unpack_packet_special_graphic_symbol,
26: _unpack_packet_special_graphic_symbol,
28: _unpack_packet_generic,
29: _unpack_packet_generic,
0x0802: _unpack_packet_contour_color,
0x0E03: _unpack_packet_linked_contour,
0xaf1f: _unpack_packet_radial_data,
0xba07: _unpack_packet_raster_data}
class Level3XDRParser(Unpacker):
"""Handle XDR-formatted Level 3 NEXRAD products."""
def __call__(self, code):
"""Perform the actual unpacking."""
xdr = OrderedDict()
if code == 28:
xdr.update(self._unpack_prod_desc())
else:
log.warning('XDR: code %d not implemented', code)
# Check that we got it all
self.done()
return xdr
def unpack_string(self):
"""Unpack the internal data as a string."""
return Unpacker.unpack_string(self).decode('ascii')
def _unpack_prod_desc(self):
xdr = OrderedDict()
# NOTE: The ICD (262001U) incorrectly lists op-mode, vcp, el_num, and
# spare as int*2. Changing to int*4 makes things parse correctly.
xdr['name'] = self.unpack_string()
xdr['description'] = self.unpack_string()
xdr['code'] = self.unpack_int()
xdr['type'] = self.unpack_int()
xdr['prod_time'] = self.unpack_uint()
xdr['radar_name'] = self.unpack_string()
xdr['latitude'] = self.unpack_float()
xdr['longitude'] = self.unpack_float()
xdr['height'] = self.unpack_float()
xdr['vol_time'] = self.unpack_uint()
xdr['el_time'] = self.unpack_uint()
xdr['el_angle'] = self.unpack_float()
xdr['vol_num'] = self.unpack_int()
xdr['op_mode'] = self.unpack_int()
xdr['vcp_num'] = self.unpack_int()
xdr['el_num'] = self.unpack_int()
xdr['compression'] = self.unpack_int()
xdr['uncompressed_size'] = self.unpack_int()
xdr['parameters'] = self._unpack_parameters()
xdr['components'] = self._unpack_components()
return xdr
def _unpack_parameters(self):
num = self.unpack_int()
# ICD documents a "pointer" here, that seems to be garbage. Just read
# and use the number, starting the list immediately.
self.unpack_int()
if num == 0:
return None
ret = []
for i in range(num):
ret.append((self.unpack_string(), self.unpack_string()))
if i < num - 1:
self.unpack_int() # Another pointer for the 'list' ?
if num == 1:
ret = ret[0]
return ret
def _unpack_components(self):
num = self.unpack_int()
# ICD documents a "pointer" here, that seems to be garbage. Just read
# and use the number, starting the list immediately.
self.unpack_int()
ret = []
for i in range(num):
try:
code = self.unpack_int()
ret.append(self._component_lookup[code](self))
if i < num - 1:
self.unpack_int() # Another pointer for the 'list' ?
except KeyError:
log.warning('Unknown XDR Component: %d', code)
break
if num == 1:
ret = ret[0]
return ret
radial_fmt = namedtuple('RadialComponent', ['description', 'gate_width',
'first_gate', 'parameters',
'radials'])
radial_data_fmt = namedtuple('RadialData', ['azimuth', 'elevation', 'width',
'num_bins', 'attributes',
'data'])
def _unpack_radial(self):
ret = self.radial_fmt(description=self.unpack_string(),
gate_width=self.unpack_float(),
first_gate=self.unpack_float(),
parameters=self._unpack_parameters(),
radials=None)
num_rads = self.unpack_int()
rads = []
for _ in range(num_rads):
# ICD is wrong, says num_bins is float, should be int
rads.append(self.radial_data_fmt(azimuth=self.unpack_float(),
elevation=self.unpack_float(),
width=self.unpack_float(),
num_bins=self.unpack_int(),
attributes=self.unpack_string(),
data=self.unpack_array(self.unpack_int)))
return ret._replace(radials=rads)
text_fmt = namedtuple('TextComponent', ['parameters', 'text'])
def _unpack_text(self):
return self.text_fmt(parameters=self._unpack_parameters(),
text=self.unpack_string())
_component_lookup = {1: _unpack_radial, 4: _unpack_text}
@exporter.export
def is_precip_mode(vcp_num):
r"""Determine if the NEXRAD radar is operating in precipitation mode.
Parameters
----------
vcp_num : int
The NEXRAD volume coverage pattern (VCP) number
Returns
-------
bool
True if the VCP corresponds to precipitation mode, False otherwise
"""
return vcp_num // 10 != 3