Source code for siphon.catalog

# Copyright (c) 2013-2017 Siphon Contributors.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""
Code to support reading and parsing catalog files from a THREDDS Data Server (TDS).

They help identifying the latest dataset and finding proper URLs to access the data.
"""

from collections import OrderedDict
from datetime import datetime
import logging
import re
import xml.etree.ElementTree as ET
try:
    from urlparse import urljoin, urlparse
except ImportError:
    # Python 3
    from urllib.parse import urljoin, urlparse

from .http_util import session_manager
from .metadata import TDSCatalogMetadata

logging.basicConfig(level=logging.ERROR)
log = logging.getLogger(__name__)


[docs]class IndexableMapping(OrderedDict): """Extend ``OrderedDict`` to allow index-based access to values.""" def __getitem__(self, item): """Return an item either by index or name.""" try: item + '' # Raises if item not a string return super(IndexableMapping, self).__getitem__(item) except TypeError: return list(self.values())[item]
[docs]class DatasetCollection(IndexableMapping): """Extend ``IndexableMapping`` to allow datetime-based filter queries.""" default_regex = re.compile(r'(?P<year>\d{4})(?P<month>[01]\d)(?P<day>[0123]\d)_' r'(?P<hour>[012]\d)(?P<minute>[0-5]\d)') def _get_datasets_with_times(self, regex): # Set the default regex if we don't have one if regex is None: regex = self.default_regex else: regex = re.compile(regex) # Loop over the collection looking for keys that match our regex found_date = False for ds in self: match = regex.search(ds) # If we find one, make a datetime and yield it along with the value if match: found_date = True date_parts = match.groupdict() dt = datetime(int(date_parts.get('year', 0)), int(date_parts.get('month', 0)), int(date_parts.get('day', 0)), int(date_parts.get('hour', 0)), int(date_parts.get('minute', 0)), int(date_parts.get('second', 0)), int(date_parts.get('microsecond', 0))) yield dt, self[ds] # If we never found any keys that match, we should let the user know that rather # than have it be the same as if nothing matched filters if not found_date: raise ValueError('No datasets with times found.')
[docs] def filter_time_nearest(self, time, regex=None): """Filter keys for an item closest to the desired time. Loops over all keys in the collection and uses `regex` to extract and build `datetime`s. The collection of `datetime`s is compared to `start` and the value that has a `datetime` closest to that requested is returned.If none of the keys in the collection match the regex, indicating that the keys are not date/time-based, a ``ValueError`` is raised. Parameters ---------- time : ``datetime.datetime`` The desired time regex : str, optional The regular expression to use to extract date/time information from the key. If given, this should contain named groups: 'year', 'month', 'day', 'hour', 'minute', 'second', and 'microsecond', as appropriate. When a match is found, any of those groups missing from the pattern will be assigned a value of 0. The default pattern looks for patterns like: 20171118_2356. Returns ------- The value with a time closest to that desired """ return min(self._get_datasets_with_times(regex), key=lambda i: abs((i[0] - time).total_seconds()))[-1]
[docs] def filter_time_range(self, start, end, regex=None): """Filter keys for all items within the desired time range. Loops over all keys in the collection and uses `regex` to extract and build `datetime`s. From the collection of `datetime`s, all values within `start` and `end` (inclusive) are returned. If none of the keys in the collection match the regex, indicating that the keys are not date/time-based, a ``ValueError`` is raised. Parameters ---------- start : ``datetime.datetime`` The start of the desired time range, inclusive end : ``datetime.datetime`` The end of the desired time range, inclusive regex : str, optional The regular expression to use to extract date/time information from the key. If given, this should contain named groups: 'year', 'month', 'day', 'hour', 'minute', 'second', and 'microsecond', as appropriate. When a match is found, any of those groups missing from the pattern will be assigned a value of 0. The default pattern looks for patterns like: 20171118_2356. Returns ------- All values corresponding to times within the specified range """ return [item[-1] for item in self._get_datasets_with_times(regex) if start <= item[0] <= end]
def __str__(self): """Return a string representation of the collection.""" return str(list(self)) __repr__ = __str__
def _try_lower(arg): try: arg = arg.lower() except (TypeError, AttributeError, ValueError): log.warning('Could not convert %s to lowercase.', arg) return arg
[docs]class CaseInsensitiveStr(str): """Extend ``str`` to use case-insensitive comparison and lookup."""
[docs] def __init__(self, *args): """Create str with a _lowered property.""" self._lowered = _try_lower(self)
def __hash__(self): """Hash str using _lowered property.""" return str.__hash__(self._lowered) def __eq__(self, other): """Return true if other is case-insensitive equal to self.""" return str.__eq__(self._lowered, _try_lower(other)) def __gt__(self, other): """Return true if other is case-insensitive greater than self.""" return str.__gt__(self._lowered, _try_lower(other)) def __ge__(self, other): """Return true if other is case-insensitive greater than or equal to self.""" return str.__ge__(self._lowered, _try_lower(other)) def __lt__(self, other): """Return true if other is case-insensitive less than self.""" return str.__lt__(self._lowered, _try_lower(other)) def __le__(self, other): """Return true if other is case-insensitive less than or equal to to self.""" return str.__le__(self._lowered, _try_lower(other)) def __ne__(self, other): """Return true if other is case-insensitive unequal to self.""" return str.__ne__(self._lowered, _try_lower(other))
[docs]class CaseInsensitiveDict(dict): """Extend ``dict`` to use a case-insensitive key set."""
[docs] def __init__(self, *args, **kwargs): """Create a dict with a set of lowercase keys.""" super(CaseInsensitiveDict, self).__init__(*args, **kwargs) self._keys_to_lower()
def __eq__(self, other): """Return true if other is case-insensitive equal to self.""" return super(CaseInsensitiveDict, self).__eq__(CaseInsensitiveDict(other)) def __getitem__(self, key): """Return value from case-insensitive lookup of ``key``.""" return super(CaseInsensitiveDict, self).__getitem__(CaseInsensitiveStr(key)) def __setitem__(self, key, value): """Set value with lowercase ``key``.""" super(CaseInsensitiveDict, self).__setitem__(CaseInsensitiveStr(key), value) def __delitem__(self, key): """Delete value associated with case-insensitive lookup of ``key``.""" return super(CaseInsensitiveDict, self).__delitem__(CaseInsensitiveStr(key)) def __contains__(self, key): """Return true if key set includes case-insensitive ``key``.""" return super(CaseInsensitiveDict, self).__contains__(CaseInsensitiveStr(key))
[docs] def pop(self, key, *args, **kwargs): """Remove and return the value associated with case-insensitive ``key``.""" return super(CaseInsensitiveDict, self).pop(CaseInsensitiveStr(key))
def _keys_to_lower(self): """Convert key set to lowercase.""" for k in list(self.keys()): val = super(CaseInsensitiveDict, self).__getitem__(k) super(CaseInsensitiveDict, self).__delitem__(k) self.__setitem__(CaseInsensitiveStr(k), val)
[docs]class TDSCatalog(object): """ Parse information from a THREDDS Client Catalog. Attributes ---------- catalog_url : str The url path of the catalog to parse. base_tds_url : str The top level server address datasets : DatasetCollection[str, Dataset] A dictionary of :class:`Dataset` objects, whose keys are the name of the dataset's name services : List A list of :class:`SimpleService` listed in the catalog catalog_refs : DatasetCollection[str, CatalogRef] A dictionary of :class:`CatalogRef` objects whose keys are the name of the catalog ref title. """
[docs] def __init__(self, catalog_url): """ Initialize the TDSCatalog object. Parameters ---------- catalog_url : str The URL of a THREDDS client catalog """ session = session_manager.create_session() # get catalog.xml file resp = session.get(catalog_url) resp.raise_for_status() # top level server url self.catalog_url = resp.url self.base_tds_url = _find_base_tds_url(self.catalog_url) # If we were given an HTML link, warn about it and try to fix to xml if 'html' in resp.headers['content-type']: import warnings new_url = self.catalog_url.replace('html', 'xml') warnings.warn('URL {} returned HTML. Changing to: {}'.format(self.catalog_url, new_url)) self.catalog_url = new_url resp = session.get(self.catalog_url) resp.raise_for_status() # begin parsing the xml doc root = ET.fromstring(resp.content) self.catalog_name = root.attrib.get('name', 'No name found') self.datasets = DatasetCollection() self.services = [] self.catalog_refs = DatasetCollection() self.metadata = {} self.ds_with_access_elements_to_process = [] service_skip_count = 0 service_skip = 0 current_dataset = None previous_dataset = None for child in root.iter(): tag_type = child.tag.split('}')[-1] if tag_type == 'dataset': current_dataset = child.attrib['name'] self._process_dataset(child) if previous_dataset: # see if the previously processed dataset has access elements as children # if so, these datasets need to be processed specially when making # access_urls if self.datasets[previous_dataset].access_element_info: self.ds_with_access_elements_to_process.append(previous_dataset) previous_dataset = current_dataset elif tag_type == 'access': self.datasets[current_dataset].add_access_element_info(child) elif tag_type == 'catalogRef': self._process_catalog_ref(child) elif (tag_type == 'metadata') or (tag_type == ''): self._process_metadata(child, tag_type) elif tag_type == 'service': if CaseInsensitiveStr(child.attrib['serviceType'])\ != CaseInsensitiveStr('Compound'): # we do not want to process single services if they # are already contained within a compound service, so # we need to skip over those cases. if service_skip_count >= service_skip: self.services.append(SimpleService(child)) service_skip = 0 service_skip_count = 0 else: service_skip_count += 1 else: self.services.append(CompoundService(child)) service_skip = self.services[-1].number_of_subservices service_skip_count = 0 self._process_datasets()
def __str__(self): """Return a string representation of the catalog name.""" return str(self.catalog_name) def _process_dataset(self, element): catalog_url = '' if 'urlPath' in element.attrib: if element.attrib['urlPath'] == 'latest.xml': catalog_url = self.catalog_url ds = Dataset(element, catalog_url=catalog_url) self.datasets[ds.name] = ds def _process_catalog_ref(self, element): catalog_ref = CatalogRef(self.catalog_url, element) self.catalog_refs[catalog_ref.title] = catalog_ref def _process_metadata(self, element, tag_type): if tag_type == '': log.warning('Trying empty tag type as metadata') self.metadata = TDSCatalogMetadata(element, self.metadata).metadata def _process_datasets(self): # Need to use list (of keys) because we modify the dict while iterating for dsName in list(self.datasets): # check to see if dataset needs to have access urls created, if not, # remove the dataset has_url_path = self.datasets[dsName].url_path is not None is_ds_with_access_elements_to_process = \ dsName in self.ds_with_access_elements_to_process if has_url_path or is_ds_with_access_elements_to_process: self.datasets[dsName].make_access_urls( self.base_tds_url, self.services, metadata=self.metadata) else: self.datasets.pop(dsName) @property def latest(self): """Get the latest dataset, if available.""" for service in self.services: if service.is_resolver(): latest_cat = self.catalog_url.replace('catalog.xml', 'latest.xml') return TDSCatalog(latest_cat).datasets[0] raise AttributeError('"latest" not available for this catalog') __repr__ = __str__
[docs]class CatalogRef(object): """ An object for holding catalog references obtained from a THREDDS Client Catalog. Attributes ---------- name : str The name of the :class:`CatalogRef` element href : str url to the :class:`CatalogRef`'s THREDDS Client Catalog title : str Title of the :class:`CatalogRef` element """
[docs] def __init__(self, base_url, element_node): """ Initialize the catalogRef object. Parameters ---------- base_url : str URL to the base catalog that owns this reference element_node : :class:`~xml.etree.ElementTree.Element` An :class:`~xml.etree.ElementTree.Element` representing a catalogRef node """ self.title = element_node.attrib['{http://www.w3.org/1999/xlink}title'] self.name = element_node.attrib.get('name', self.title) # Resolve relative URLs href = element_node.attrib['{http://www.w3.org/1999/xlink}href'] self.href = urljoin(base_url, href)
def __str__(self): """Return a string representation of the catalog reference.""" return str(self.title)
[docs] def follow(self): """Follow the catalog reference and return a new :class:`TDSCatalog`. Returns ------- TDSCatalog The referenced catalog """ return TDSCatalog(self.href)
__repr__ = __str__
[docs]class Dataset(object): """ An object for holding Datasets obtained from a THREDDS Client Catalog. Attributes ---------- name : str The name of the :class:`Dataset` element url_path : str url to the accessible dataset access_urls : CaseInsensitiveDict[str, str] A dictionary of access urls whose keywords are the access service types defined in the catalog (for example, "OPENDAP", "NetcdfSubset", "WMS", etc. """ ncssServiceNames = (CaseInsensitiveStr('NetcdfSubset'), CaseInsensitiveStr('NetcdfServer'))
[docs] def __init__(self, element_node, catalog_url=''): """Initialize the Dataset object. Parameters ---------- element_node : :class:`~xml.etree.ElementTree.Element` An :class:`~xml.etree.ElementTree.Element` representing a Dataset node catalog_url : str The top level server url """ self.name = element_node.attrib['name'] if 'urlPath' in element_node.attrib: self.url_path = element_node.attrib['urlPath'] else: self.url_path = None self.catalog_name = '' self.access_element_info = {} self._resolved = False self._resolverUrl = None # if latest.xml, resolve the latest url if self.url_path == 'latest.xml': if catalog_url != '': self._resolved = True self._resolverUrl = self.url_path self.url_path = self.resolve_url(catalog_url) else: log.warning('Must pass along the catalog URL to resolve ' 'the latest.xml dataset!')
def __str__(self): """Return a string representation of the dataset.""" return str(self.name)
[docs] def resolve_url(self, catalog_url): """Resolve the url of the dataset when reading latest.xml. Parameters ---------- catalog_url : str The catalog url to be resolved """ if catalog_url != '': resolver_base = catalog_url.split('catalog.xml')[0] resolver_url = resolver_base + self.url_path resolver_xml = session_manager.urlopen(resolver_url) tree = ET.parse(resolver_xml) root = tree.getroot() if 'name' in root.attrib: self.catalog_name = root.attrib['name'] else: self.catalog_name = 'No name found' resolved_url = '' found = False for child in root.iter(): if not found: tag_type = child.tag.split('}')[-1] if tag_type == 'dataset': if 'urlPath' in child.attrib: ds = Dataset(child) resolved_url = ds.url_path found = True if found: return resolved_url else: log.warning('no dataset url path found in latest.xml!')
[docs] def make_access_urls(self, catalog_url, all_services, metadata=None): """Make fully qualified urls for the access methods enabled on the dataset. Parameters ---------- catalog_url : str The top level server url all_services : List[SimpleService] list of :class:`SimpleService` objects associated with the dataset metadata : dict Metadata from the :class:`TDSCatalog` """ all_service_dict = CaseInsensitiveDict({}) for service in all_services: all_service_dict[service.name] = service if isinstance(service, CompoundService): for subservice in service.services: all_service_dict[subservice.name] = subservice service_name = metadata.get('serviceName', None) access_urls = CaseInsensitiveDict({}) server_url = _find_base_tds_url(catalog_url) # process access urls for datasets that reference top # level catalog services (individual or compound service # types). if service_name in all_service_dict: service = all_service_dict[service_name] if service.service_type != 'Resolver': # if service is a CompoundService, create access url # for each SimpleService if isinstance(service, CompoundService): for subservice in service.services: server_base = urljoin(server_url, subservice.base) access_urls[subservice.service_type] = urljoin(server_base, self.url_path) else: server_base = urljoin(server_url, service.base) access_urls[service.service_type] = urljoin(server_base, self.url_path) # process access children of dataset elements for service_type in self.access_element_info: url_path = self.access_element_info[service_type] if service_type in all_service_dict: server_base = urljoin(server_url, all_service_dict[service_type].base) access_urls[service_type] = urljoin(server_base, url_path) self.access_urls = access_urls
[docs] def add_access_element_info(self, access_element): """Create an access method from a catalog element.""" service_name = access_element.attrib['serviceName'] url_path = access_element.attrib['urlPath'] self.access_element_info[service_name] = url_path
[docs] def download(self, filename=None): """Download the dataset to a local file. Parameters ---------- filename : str, optional The full path to which the dataset will be saved """ if filename is None: filename = self.name with self.remote_open() as infile: with open(filename, 'wb') as outfile: outfile.write(infile.read())
[docs] def remote_open(self): """Open the remote dataset for random access. Get a file-like object for reading from the remote dataset, providing random access, similar to a local file. Returns ------- A random access, file-like object """ return self.access_with_service('HTTPServer')
[docs] def remote_access(self, service=None, use_xarray=None): """Access the remote dataset. Open the remote dataset and get a netCDF4-compatible `Dataset` object providing index-based subsetting capabilities. Parameters ---------- service : str, optional The name of the service to use for access to the dataset, either 'CdmRemote' or 'OPENDAP'. Defaults to 'CdmRemote'. Returns ------- Dataset Object for netCDF4-like access to the dataset """ if service is None: service = 'CdmRemote' if 'CdmRemote' in self.access_urls else 'OPENDAP' if service not in (CaseInsensitiveStr('CdmRemote'), CaseInsensitiveStr('OPENDAP')): raise ValueError(service + ' is not a valid service for remote_access') return self.access_with_service(service, use_xarray)
[docs] def subset(self, service=None): """Subset the dataset. Open the remote dataset and get a client for talking to ``service``. Parameters ---------- service : str, optional The name of the service for subsetting the dataset. Defaults to 'NetcdfSubset' or 'NetcdfServer', in that order, depending on the services listed in the catalog. Returns ------- a client for communicating using ``service`` """ if service is None: for serviceName in self.ncssServiceNames: if serviceName in self.access_urls: service = serviceName break else: raise RuntimeError('Subset access is not available for this dataset.') elif service not in self.ncssServiceNames: raise ValueError(service + ' is not a valid service for subset. Options are: ' + ', '.join(self.ncssServiceNames)) return self.access_with_service(service)
[docs] def access_with_service(self, service, use_xarray=None): """Access the dataset using a particular service. Return an Python object capable of communicating with the server using the particular service. For instance, for 'HTTPServer' this is a file-like object capable of HTTP communication; for OPENDAP this is a netCDF4 dataset. Parameters ---------- service : str The name of the service for accessing the dataset Returns ------- An instance appropriate for communicating using ``service``. """ service = CaseInsensitiveStr(service) if service == 'CdmRemote': if use_xarray: from .cdmr.xarray_support import CDMRemoteStore try: import xarray as xr provider = lambda url: xr.open_dataset(CDMRemoteStore(url)) # noqa: E731 except ImportError: raise ImportError('CdmRemote access needs xarray to be installed.') else: from .cdmr import Dataset as CDMRDataset provider = CDMRDataset elif service == 'OPENDAP': if use_xarray: try: import xarray as xr provider = xr.open_dataset except ImportError: raise ImportError('xarray to be installed if `use_xarray` is True.') else: try: from netCDF4 import Dataset as NC4Dataset provider = NC4Dataset except ImportError: raise ImportError('OPENDAP access needs netCDF4-python to be installed.') elif service in self.ncssServiceNames: from .ncss import NCSS provider = NCSS elif service == 'HTTPServer': provider = session_manager.urlopen else: raise ValueError(service + ' is not an access method supported by Siphon') try: return provider(self.access_urls[service]) except KeyError: raise ValueError(service + ' is not available for this dataset')
__repr__ = __str__
[docs]class SimpleService(object): """Hold information about an access service enabled on a dataset. Attributes ---------- name : str The name of the service service_type : str The service type (i.e. "OPENDAP", "NetcdfSubset", "WMS", etc.) access_urls : dict[str, str] A dictionary of access urls whose keywords are the access service types defined in the catalog (for example, "OPENDAP", "NetcdfSubset", "WMS", etc.) """
[docs] def __init__(self, service_node): """Initialize the Dataset object. Parameters ---------- service_node : :class:`~xml.etree.ElementTree.Element` An :class:`~xml.etree.ElementTree.Element` representing a service node """ self.name = service_node.attrib['name'] self.service_type = CaseInsensitiveStr(service_node.attrib['serviceType']) self.base = service_node.attrib['base'] self.access_urls = {}
[docs] def is_resolver(self): """Return whether the service is a resolver service.""" return self.service_type == 'Resolver'
[docs]class CompoundService(object): """Hold information about compound services. Attributes ---------- name : str The name of the compound service service_type : str The service type (for this object, service type will always be "COMPOUND") services : list[SimpleService] A list of :class:`SimpleService` objects """
[docs] def __init__(self, service_node): """Initialize a :class:`CompoundService` object. Parameters ---------- service_node : :class:`~xml.etree.ElementTree.Element` An :class:`~xml.etree.ElementTree.Element` representing a compound service node """ self.name = service_node.attrib['name'] self.service_type = CaseInsensitiveStr(service_node.attrib['serviceType']) self.base = service_node.attrib['base'] services = [] subservices = 0 for child in list(service_node): services.append(SimpleService(child)) subservices += 1 self.services = services self.number_of_subservices = subservices
[docs] def is_resolver(self): """Return whether the service is a resolver service. For a compound service, this is always False because it will never be a resolver. """ return False
def _find_base_tds_url(catalog_url): """Identify the base URL of the THREDDS server from the catalog URL. Will retain URL scheme, host, port and username/password when present. """ url_components = urlparse(catalog_url) if url_components.path: return catalog_url.split(url_components.path)[0] else: return catalog_url
[docs]def get_latest_access_url(catalog_url, access_method): """Get the data access url to the latest data using a specified access method. These are available for a data available from a top level dataset catalog (url). Currently only supports the existence of one "latest" dataset. Parameters ---------- catalog_url : str The URL of a top level data catalog access_method : str desired data access method (i.e. "OPENDAP", "NetcdfSubset", "WMS", etc) Returns ------- access_url : str Data access URL to be used to access the latest data available from a given catalog using the specified `access_method`. Typically a single string, but not always. """ return TDSCatalog(catalog_url).latest.access_urls[access_method]