# Copyright (c) 2013-2017 Siphon Contributors.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""
Code to support reading and parsing catalog files from a THREDDS Data Server (TDS).
They help identifying the latest dataset and finding proper URLs to access the data.
"""
from collections import OrderedDict
from datetime import datetime
import logging
import re
import xml.etree.ElementTree as ET
try:
from urlparse import urljoin, urlparse
except ImportError:
# Python 3
from urllib.parse import urljoin, urlparse
from .http_util import session_manager
from .metadata import TDSCatalogMetadata
logging.basicConfig(level=logging.ERROR)
log = logging.getLogger(__name__)
[docs]class IndexableMapping(OrderedDict):
"""Extend ``OrderedDict`` to allow index-based access to values."""
def __getitem__(self, item):
"""Return an item either by index or name."""
try:
item + '' # Raises if item not a string
return super(IndexableMapping, self).__getitem__(item)
except TypeError:
return list(self.values())[item]
[docs]class DatasetCollection(IndexableMapping):
"""Extend ``IndexableMapping`` to allow datetime-based filter queries."""
default_regex = re.compile(r'(?P<year>\d{4})(?P<month>[01]\d)(?P<day>[0123]\d)_'
r'(?P<hour>[012]\d)(?P<minute>[0-5]\d)')
def _get_datasets_with_times(self, regex):
# Set the default regex if we don't have one
if regex is None:
regex = self.default_regex
else:
regex = re.compile(regex)
# Loop over the collection looking for keys that match our regex
found_date = False
for ds in self:
match = regex.search(ds)
# If we find one, make a datetime and yield it along with the value
if match:
found_date = True
date_parts = match.groupdict()
dt = datetime(int(date_parts.get('year', 0)), int(date_parts.get('month', 0)),
int(date_parts.get('day', 0)), int(date_parts.get('hour', 0)),
int(date_parts.get('minute', 0)),
int(date_parts.get('second', 0)),
int(date_parts.get('microsecond', 0)))
yield dt, self[ds]
# If we never found any keys that match, we should let the user know that rather
# than have it be the same as if nothing matched filters
if not found_date:
raise ValueError('No datasets with times found.')
[docs] def filter_time_nearest(self, time, regex=None):
"""Filter keys for an item closest to the desired time.
Loops over all keys in the collection and uses `regex` to extract and build
`datetime`s. The collection of `datetime`s is compared to `start` and the value that
has a `datetime` closest to that requested is returned.If none of the keys in the
collection match the regex, indicating that the keys are not date/time-based,
a ``ValueError`` is raised.
Parameters
----------
time : ``datetime.datetime``
The desired time
regex : str, optional
The regular expression to use to extract date/time information from the key. If
given, this should contain named groups: 'year', 'month', 'day', 'hour', 'minute',
'second', and 'microsecond', as appropriate. When a match is found, any of those
groups missing from the pattern will be assigned a value of 0. The default pattern
looks for patterns like: 20171118_2356.
Returns
-------
The value with a time closest to that desired
"""
return min(self._get_datasets_with_times(regex),
key=lambda i: abs((i[0] - time).total_seconds()))[-1]
[docs] def filter_time_range(self, start, end, regex=None):
"""Filter keys for all items within the desired time range.
Loops over all keys in the collection and uses `regex` to extract and build
`datetime`s. From the collection of `datetime`s, all values within `start` and `end`
(inclusive) are returned. If none of the keys in the collection match the regex,
indicating that the keys are not date/time-based, a ``ValueError`` is raised.
Parameters
----------
start : ``datetime.datetime``
The start of the desired time range, inclusive
end : ``datetime.datetime``
The end of the desired time range, inclusive
regex : str, optional
The regular expression to use to extract date/time information from the key. If
given, this should contain named groups: 'year', 'month', 'day', 'hour', 'minute',
'second', and 'microsecond', as appropriate. When a match is found, any of those
groups missing from the pattern will be assigned a value of 0. The default pattern
looks for patterns like: 20171118_2356.
Returns
-------
All values corresponding to times within the specified range
"""
return [item[-1] for item in self._get_datasets_with_times(regex)
if start <= item[0] <= end]
def __str__(self):
"""Return a string representation of the collection."""
return str(list(self))
__repr__ = __str__
def _try_lower(arg):
try:
arg = arg.lower()
except (TypeError, AttributeError, ValueError):
log.warning('Could not convert %s to lowercase.', arg)
return arg
[docs]class CaseInsensitiveStr(str):
"""Extend ``str`` to use case-insensitive comparison and lookup."""
[docs] def __init__(self, *args):
"""Create str with a _lowered property."""
self._lowered = _try_lower(self)
def __hash__(self):
"""Hash str using _lowered property."""
return str.__hash__(self._lowered)
def __eq__(self, other):
"""Return true if other is case-insensitive equal to self."""
return str.__eq__(self._lowered, _try_lower(other))
def __gt__(self, other):
"""Return true if other is case-insensitive greater than self."""
return str.__gt__(self._lowered, _try_lower(other))
def __ge__(self, other):
"""Return true if other is case-insensitive greater than or equal to self."""
return str.__ge__(self._lowered, _try_lower(other))
def __lt__(self, other):
"""Return true if other is case-insensitive less than self."""
return str.__lt__(self._lowered, _try_lower(other))
def __le__(self, other):
"""Return true if other is case-insensitive less than or equal to to self."""
return str.__le__(self._lowered, _try_lower(other))
def __ne__(self, other):
"""Return true if other is case-insensitive unequal to self."""
return str.__ne__(self._lowered, _try_lower(other))
[docs]class CaseInsensitiveDict(dict):
"""Extend ``dict`` to use a case-insensitive key set."""
[docs] def __init__(self, *args, **kwargs):
"""Create a dict with a set of lowercase keys."""
super(CaseInsensitiveDict, self).__init__(*args, **kwargs)
self._keys_to_lower()
def __eq__(self, other):
"""Return true if other is case-insensitive equal to self."""
return super(CaseInsensitiveDict, self).__eq__(CaseInsensitiveDict(other))
def __getitem__(self, key):
"""Return value from case-insensitive lookup of ``key``."""
return super(CaseInsensitiveDict, self).__getitem__(CaseInsensitiveStr(key))
def __setitem__(self, key, value):
"""Set value with lowercase ``key``."""
super(CaseInsensitiveDict, self).__setitem__(CaseInsensitiveStr(key), value)
def __delitem__(self, key):
"""Delete value associated with case-insensitive lookup of ``key``."""
return super(CaseInsensitiveDict, self).__delitem__(CaseInsensitiveStr(key))
def __contains__(self, key):
"""Return true if key set includes case-insensitive ``key``."""
return super(CaseInsensitiveDict, self).__contains__(CaseInsensitiveStr(key))
[docs] def pop(self, key, *args, **kwargs):
"""Remove and return the value associated with case-insensitive ``key``."""
return super(CaseInsensitiveDict, self).pop(CaseInsensitiveStr(key))
def _keys_to_lower(self):
"""Convert key set to lowercase."""
for k in list(self.keys()):
val = super(CaseInsensitiveDict, self).__getitem__(k)
super(CaseInsensitiveDict, self).__delitem__(k)
self.__setitem__(CaseInsensitiveStr(k), val)
[docs]class TDSCatalog(object):
"""
Parse information from a THREDDS Client Catalog.
Attributes
----------
catalog_url : str
The url path of the catalog to parse.
base_tds_url : str
The top level server address
datasets : DatasetCollection[str, Dataset]
A dictionary of :class:`Dataset` objects, whose keys are the name of the
dataset's name
services : List
A list of :class:`SimpleService` listed in the catalog
catalog_refs : DatasetCollection[str, CatalogRef]
A dictionary of :class:`CatalogRef` objects whose keys are the name of the
catalog ref title.
"""
[docs] def __init__(self, catalog_url):
"""
Initialize the TDSCatalog object.
Parameters
----------
catalog_url : str
The URL of a THREDDS client catalog
"""
session = session_manager.create_session()
# get catalog.xml file
resp = session.get(catalog_url)
resp.raise_for_status()
# top level server url
self.catalog_url = resp.url
self.base_tds_url = _find_base_tds_url(self.catalog_url)
# If we were given an HTML link, warn about it and try to fix to xml
if 'html' in resp.headers['content-type']:
import warnings
new_url = self.catalog_url.replace('html', 'xml')
warnings.warn('URL {} returned HTML. Changing to: {}'.format(self.catalog_url,
new_url))
self.catalog_url = new_url
resp = session.get(self.catalog_url)
resp.raise_for_status()
# begin parsing the xml doc
root = ET.fromstring(resp.content)
self.catalog_name = root.attrib.get('name', 'No name found')
self.datasets = DatasetCollection()
self.services = []
self.catalog_refs = DatasetCollection()
self.metadata = {}
self.ds_with_access_elements_to_process = []
service_skip_count = 0
service_skip = 0
current_dataset = None
previous_dataset = None
for child in root.iter():
tag_type = child.tag.split('}')[-1]
if tag_type == 'dataset':
current_dataset = child.attrib['name']
self._process_dataset(child)
if previous_dataset:
# see if the previously processed dataset has access elements as children
# if so, these datasets need to be processed specially when making
# access_urls
if self.datasets[previous_dataset].access_element_info:
self.ds_with_access_elements_to_process.append(previous_dataset)
previous_dataset = current_dataset
elif tag_type == 'access':
self.datasets[current_dataset].add_access_element_info(child)
elif tag_type == 'catalogRef':
self._process_catalog_ref(child)
elif (tag_type == 'metadata') or (tag_type == ''):
self._process_metadata(child, tag_type)
elif tag_type == 'service':
if CaseInsensitiveStr(child.attrib['serviceType'])\
!= CaseInsensitiveStr('Compound'):
# we do not want to process single services if they
# are already contained within a compound service, so
# we need to skip over those cases.
if service_skip_count >= service_skip:
self.services.append(SimpleService(child))
service_skip = 0
service_skip_count = 0
else:
service_skip_count += 1
else:
self.services.append(CompoundService(child))
service_skip = self.services[-1].number_of_subservices
service_skip_count = 0
self._process_datasets()
def __str__(self):
"""Return a string representation of the catalog name."""
return str(self.catalog_name)
def _process_dataset(self, element):
catalog_url = ''
if 'urlPath' in element.attrib:
if element.attrib['urlPath'] == 'latest.xml':
catalog_url = self.catalog_url
ds = Dataset(element, catalog_url=catalog_url)
self.datasets[ds.name] = ds
def _process_catalog_ref(self, element):
catalog_ref = CatalogRef(self.catalog_url, element)
self.catalog_refs[catalog_ref.title] = catalog_ref
def _process_metadata(self, element, tag_type):
if tag_type == '':
log.warning('Trying empty tag type as metadata')
self.metadata = TDSCatalogMetadata(element, self.metadata).metadata
def _process_datasets(self):
# Need to use list (of keys) because we modify the dict while iterating
for dsName in list(self.datasets):
# check to see if dataset needs to have access urls created, if not,
# remove the dataset
has_url_path = self.datasets[dsName].url_path is not None
is_ds_with_access_elements_to_process = \
dsName in self.ds_with_access_elements_to_process
if has_url_path or is_ds_with_access_elements_to_process:
self.datasets[dsName].make_access_urls(
self.base_tds_url, self.services, metadata=self.metadata)
else:
self.datasets.pop(dsName)
@property
def latest(self):
"""Get the latest dataset, if available."""
for service in self.services:
if service.is_resolver():
latest_cat = self.catalog_url.replace('catalog.xml', 'latest.xml')
return TDSCatalog(latest_cat).datasets[0]
raise AttributeError('"latest" not available for this catalog')
__repr__ = __str__
[docs]class CatalogRef(object):
"""
An object for holding catalog references obtained from a THREDDS Client Catalog.
Attributes
----------
name : str
The name of the :class:`CatalogRef` element
href : str
url to the :class:`CatalogRef`'s THREDDS Client Catalog
title : str
Title of the :class:`CatalogRef` element
"""
[docs] def __init__(self, base_url, element_node):
"""
Initialize the catalogRef object.
Parameters
----------
base_url : str
URL to the base catalog that owns this reference
element_node : :class:`~xml.etree.ElementTree.Element`
An :class:`~xml.etree.ElementTree.Element` representing a catalogRef node
"""
self.title = element_node.attrib['{http://www.w3.org/1999/xlink}title']
self.name = element_node.attrib.get('name', self.title)
# Resolve relative URLs
href = element_node.attrib['{http://www.w3.org/1999/xlink}href']
self.href = urljoin(base_url, href)
def __str__(self):
"""Return a string representation of the catalog reference."""
return str(self.title)
[docs] def follow(self):
"""Follow the catalog reference and return a new :class:`TDSCatalog`.
Returns
-------
TDSCatalog
The referenced catalog
"""
return TDSCatalog(self.href)
__repr__ = __str__
[docs]class Dataset(object):
"""
An object for holding Datasets obtained from a THREDDS Client Catalog.
Attributes
----------
name : str
The name of the :class:`Dataset` element
url_path : str
url to the accessible dataset
access_urls : CaseInsensitiveDict[str, str]
A dictionary of access urls whose keywords are the access service
types defined in the catalog (for example, "OPENDAP", "NetcdfSubset",
"WMS", etc.
"""
ncssServiceNames = (CaseInsensitiveStr('NetcdfSubset'), CaseInsensitiveStr('NetcdfServer'))
[docs] def __init__(self, element_node, catalog_url=''):
"""Initialize the Dataset object.
Parameters
----------
element_node : :class:`~xml.etree.ElementTree.Element`
An :class:`~xml.etree.ElementTree.Element` representing a Dataset node
catalog_url : str
The top level server url
"""
self.name = element_node.attrib['name']
if 'urlPath' in element_node.attrib:
self.url_path = element_node.attrib['urlPath']
else:
self.url_path = None
self.catalog_name = ''
self.access_element_info = {}
self._resolved = False
self._resolverUrl = None
# if latest.xml, resolve the latest url
if self.url_path == 'latest.xml':
if catalog_url != '':
self._resolved = True
self._resolverUrl = self.url_path
self.url_path = self.resolve_url(catalog_url)
else:
log.warning('Must pass along the catalog URL to resolve '
'the latest.xml dataset!')
def __str__(self):
"""Return a string representation of the dataset."""
return str(self.name)
[docs] def resolve_url(self, catalog_url):
"""Resolve the url of the dataset when reading latest.xml.
Parameters
----------
catalog_url : str
The catalog url to be resolved
"""
if catalog_url != '':
resolver_base = catalog_url.split('catalog.xml')[0]
resolver_url = resolver_base + self.url_path
resolver_xml = session_manager.urlopen(resolver_url)
tree = ET.parse(resolver_xml)
root = tree.getroot()
if 'name' in root.attrib:
self.catalog_name = root.attrib['name']
else:
self.catalog_name = 'No name found'
resolved_url = ''
found = False
for child in root.iter():
if not found:
tag_type = child.tag.split('}')[-1]
if tag_type == 'dataset':
if 'urlPath' in child.attrib:
ds = Dataset(child)
resolved_url = ds.url_path
found = True
if found:
return resolved_url
else:
log.warning('no dataset url path found in latest.xml!')
[docs] def make_access_urls(self, catalog_url, all_services, metadata=None):
"""Make fully qualified urls for the access methods enabled on the dataset.
Parameters
----------
catalog_url : str
The top level server url
all_services : List[SimpleService]
list of :class:`SimpleService` objects associated with the dataset
metadata : dict
Metadata from the :class:`TDSCatalog`
"""
all_service_dict = CaseInsensitiveDict({})
for service in all_services:
all_service_dict[service.name] = service
if isinstance(service, CompoundService):
for subservice in service.services:
all_service_dict[subservice.name] = subservice
service_name = metadata.get('serviceName', None)
access_urls = CaseInsensitiveDict({})
server_url = _find_base_tds_url(catalog_url)
# process access urls for datasets that reference top
# level catalog services (individual or compound service
# types).
if service_name in all_service_dict:
service = all_service_dict[service_name]
if service.service_type != 'Resolver':
# if service is a CompoundService, create access url
# for each SimpleService
if isinstance(service, CompoundService):
for subservice in service.services:
server_base = urljoin(server_url, subservice.base)
access_urls[subservice.service_type] = urljoin(server_base,
self.url_path)
else:
server_base = urljoin(server_url, service.base)
access_urls[service.service_type] = urljoin(server_base, self.url_path)
# process access children of dataset elements
for service_type in self.access_element_info:
url_path = self.access_element_info[service_type]
if service_type in all_service_dict:
server_base = urljoin(server_url, all_service_dict[service_type].base)
access_urls[service_type] = urljoin(server_base, url_path)
self.access_urls = access_urls
[docs] def add_access_element_info(self, access_element):
"""Create an access method from a catalog element."""
service_name = access_element.attrib['serviceName']
url_path = access_element.attrib['urlPath']
self.access_element_info[service_name] = url_path
[docs] def download(self, filename=None):
"""Download the dataset to a local file.
Parameters
----------
filename : str, optional
The full path to which the dataset will be saved
"""
if filename is None:
filename = self.name
with self.remote_open() as infile:
with open(filename, 'wb') as outfile:
outfile.write(infile.read())
[docs] def remote_open(self):
"""Open the remote dataset for random access.
Get a file-like object for reading from the remote dataset, providing random access,
similar to a local file.
Returns
-------
A random access, file-like object
"""
return self.access_with_service('HTTPServer')
[docs] def remote_access(self, service=None, use_xarray=None):
"""Access the remote dataset.
Open the remote dataset and get a netCDF4-compatible `Dataset` object providing
index-based subsetting capabilities.
Parameters
----------
service : str, optional
The name of the service to use for access to the dataset, either
'CdmRemote' or 'OPENDAP'. Defaults to 'CdmRemote'.
Returns
-------
Dataset
Object for netCDF4-like access to the dataset
"""
if service is None:
service = 'CdmRemote' if 'CdmRemote' in self.access_urls else 'OPENDAP'
if service not in (CaseInsensitiveStr('CdmRemote'), CaseInsensitiveStr('OPENDAP')):
raise ValueError(service + ' is not a valid service for remote_access')
return self.access_with_service(service, use_xarray)
[docs] def subset(self, service=None):
"""Subset the dataset.
Open the remote dataset and get a client for talking to ``service``.
Parameters
----------
service : str, optional
The name of the service for subsetting the dataset. Defaults to 'NetcdfSubset'
or 'NetcdfServer', in that order, depending on the services listed in the
catalog.
Returns
-------
a client for communicating using ``service``
"""
if service is None:
for serviceName in self.ncssServiceNames:
if serviceName in self.access_urls:
service = serviceName
break
else:
raise RuntimeError('Subset access is not available for this dataset.')
elif service not in self.ncssServiceNames:
raise ValueError(service + ' is not a valid service for subset. Options are: ' +
', '.join(self.ncssServiceNames))
return self.access_with_service(service)
[docs] def access_with_service(self, service, use_xarray=None):
"""Access the dataset using a particular service.
Return an Python object capable of communicating with the server using the particular
service. For instance, for 'HTTPServer' this is a file-like object capable of
HTTP communication; for OPENDAP this is a netCDF4 dataset.
Parameters
----------
service : str
The name of the service for accessing the dataset
Returns
-------
An instance appropriate for communicating using ``service``.
"""
service = CaseInsensitiveStr(service)
if service == 'CdmRemote':
if use_xarray:
from .cdmr.xarray_support import CDMRemoteStore
try:
import xarray as xr
provider = lambda url: xr.open_dataset(CDMRemoteStore(url)) # noqa: E731
except ImportError:
raise ImportError('CdmRemote access needs xarray to be installed.')
else:
from .cdmr import Dataset as CDMRDataset
provider = CDMRDataset
elif service == 'OPENDAP':
if use_xarray:
try:
import xarray as xr
provider = xr.open_dataset
except ImportError:
raise ImportError('xarray to be installed if `use_xarray` is True.')
else:
try:
from netCDF4 import Dataset as NC4Dataset
provider = NC4Dataset
except ImportError:
raise ImportError('OPENDAP access needs netCDF4-python to be installed.')
elif service in self.ncssServiceNames:
from .ncss import NCSS
provider = NCSS
elif service == 'HTTPServer':
provider = session_manager.urlopen
else:
raise ValueError(service + ' is not an access method supported by Siphon')
try:
return provider(self.access_urls[service])
except KeyError:
raise ValueError(service + ' is not available for this dataset')
__repr__ = __str__
[docs]class SimpleService(object):
"""Hold information about an access service enabled on a dataset.
Attributes
----------
name : str
The name of the service
service_type : str
The service type (i.e. "OPENDAP", "NetcdfSubset", "WMS", etc.)
access_urls : dict[str, str]
A dictionary of access urls whose keywords are the access service
types defined in the catalog (for example, "OPENDAP", "NetcdfSubset",
"WMS", etc.)
"""
[docs] def __init__(self, service_node):
"""Initialize the Dataset object.
Parameters
----------
service_node : :class:`~xml.etree.ElementTree.Element`
An :class:`~xml.etree.ElementTree.Element` representing a service node
"""
self.name = service_node.attrib['name']
self.service_type = CaseInsensitiveStr(service_node.attrib['serviceType'])
self.base = service_node.attrib['base']
self.access_urls = {}
[docs] def is_resolver(self):
"""Return whether the service is a resolver service."""
return self.service_type == 'Resolver'
[docs]class CompoundService(object):
"""Hold information about compound services.
Attributes
----------
name : str
The name of the compound service
service_type : str
The service type (for this object, service type will always be
"COMPOUND")
services : list[SimpleService]
A list of :class:`SimpleService` objects
"""
[docs] def __init__(self, service_node):
"""Initialize a :class:`CompoundService` object.
Parameters
----------
service_node : :class:`~xml.etree.ElementTree.Element`
An :class:`~xml.etree.ElementTree.Element` representing a compound service node
"""
self.name = service_node.attrib['name']
self.service_type = CaseInsensitiveStr(service_node.attrib['serviceType'])
self.base = service_node.attrib['base']
services = []
subservices = 0
for child in list(service_node):
services.append(SimpleService(child))
subservices += 1
self.services = services
self.number_of_subservices = subservices
[docs] def is_resolver(self):
"""Return whether the service is a resolver service.
For a compound service, this is always False because it will never be
a resolver.
"""
return False
def _find_base_tds_url(catalog_url):
"""Identify the base URL of the THREDDS server from the catalog URL.
Will retain URL scheme, host, port and username/password when present.
"""
url_components = urlparse(catalog_url)
if url_components.path:
return catalog_url.split(url_components.path)[0]
else:
return catalog_url
[docs]def get_latest_access_url(catalog_url, access_method):
"""Get the data access url to the latest data using a specified access method.
These are available for a data available from a top level dataset catalog (url).
Currently only supports the existence of one "latest" dataset.
Parameters
----------
catalog_url : str
The URL of a top level data catalog
access_method : str
desired data access method (i.e. "OPENDAP", "NetcdfSubset", "WMS", etc)
Returns
-------
access_url : str
Data access URL to be used to access the latest data available from a
given catalog using the specified `access_method`. Typically a single string,
but not always.
"""
return TDSCatalog(catalog_url).latest.access_urls[access_method]