Source code for siphon.metadata

# Copyright (c) 2013-2015 Siphon Contributors.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""Helps support reading and parsing metadata elements from a TDS client catalog."""

from __future__ import print_function

import logging

logging.basicConfig(level=logging.ERROR)
log = logging.getLogger(__name__)

xlink_href_attr = '{http://www.w3.org/1999/xlink}href'
xlink_title_attr = '{http://www.w3.org/1999/xlink}title'


class _SimpleTypes(object):
    def __init__(self):
        self._valid = {'dataFormat': self._load_valid_data_format_types(),
                       'upOrDown': self._load_valid_up_or_down(),
                       'dataType': self._load_valid_data_types()}

    @staticmethod
    def _load_valid_data_types():
        valid = ['grid',
                 'image',
                 'point',
                 'radial',
                 'station',
                 'swath',
                 'trajectory']
        return valid

    @staticmethod
    def _load_valid_data_format_types():
        import mimetypes
        valid = ['BUFR',
                 'ESML',
                 'GEMPAK',
                 'GINI',
                 'GRIB-1',
                 'GRIB-2',
                 'HDF4',
                 'HDF5',
                 'McIDAS-AREA',
                 'NcML',
                 'NetCDF',
                 'NetCDF-4',
                 'NEXRAD2',
                 'NIDS',
                 'image/gif',
                 'image/jpeg',
                 'image/tiff',
                 'text/csv',
                 'text/html',
                 'text/plain',
                 'text/tab-separated-values',
                 'text/xml',
                 'video/mpeg',
                 'video/quicktime',
                 'video/realtime']

        valid_mime_types = list(mimetypes.types_map.values())
        valid.extend(valid_mime_types)
        return valid

    @staticmethod
    def _load_valid_up_or_down():
        return ['up', 'down']

    def handle_upOrDown(self, element):  # noqa
        # name="upOrDown"
        #   <xsd:restriction base="xsd:token">
        #    <xsd:enumeration value="up"/>
        #    <xsd:enumeration value="down"/>
        #   </xsd:restriction>
        #
        type_name = 'upOrDown'
        valid = self._valid[type_name]
        for attrib in element.attrib:
            attr = attrib
            val = element.attrib[attr]
            if val not in valid:
                log.warning('Value %s not valid for type %s: must be %s',
                            val, type_name, valid)
        return {attr: val}

    def handle_dataFormat(self, element):  # noqa
        # name="dataFormatTypes"
        #   <xsd:union memberTypes="xsd:token mimeType">
        #     <xsd:simpleType>
        #       <xsd:restriction base="xsd:token">
        #         <xsd:enumeration value="BUFR"/>
        #         <xsd:enumeration value="ESML"/>
        #         <xsd:enumeration value="GEMPAK"/>
        #         <xsd:enumeration value="GINI"/>
        #         <xsd:enumeration value="GRIB-1"/>
        #         <xsd:enumeration value="GRIB-2"/>
        #         <xsd:enumeration value="HDF4"/>
        #         <xsd:enumeration value="HDF5"/>
        #         <xsd:enumeration value="McIDAS-AREA"/>
        #         <xsd:enumeration value="NcML"/>
        # 		  <xsd:enumeration value="NetCDF"/>
        # 		  <xsd:enumeration value="NetCDF-4"/>
        #         <xsd:enumeration value="NEXRAD2"/>
        #         <xsd:enumeration value="NIDS"/>
        #
        #         <xsd:enumeration value="image/gif"/>
        #         <xsd:enumeration value="image/jpeg"/>
        #         <xsd:enumeration value="image/tiff"/>
        #         <xsd:enumeration value="text/csv"/>
        #         <xsd:enumeration value="text/html"/>
        # 		  <xsd:enumeration value="text/plain"/>
        # 		  <xsd:enumeration value="text/tab-separated-values"/>
        #         <xsd:enumeration value="text/xml"/>
        #         <xsd:enumeration value="video/mpeg"/>
        #         <xsd:enumeration value="video/quicktime"/>
        #         <xsd:enumeration value="video/realtime"/>
        #       </xsd:restriction>
        #     </xsd:simpleType>
        #   </xsd:union>
        #
        # name="mimeType"
        #   <xsd:restriction base="xsd:token">
        #     <xsd:annotation>
        #       <xsd:documentation>any valid mime type
        #         (see http://www.iana.org/assignments/media-types/)
        #       </xsd:documentation>
        #     </xsd:annotation>
        #   </xsd:restriction>
        #   NOTE: to see if mimetype is valude, check against
        #         mimetypes.types_map.values
        #
        type_name = 'dataFormat'
        valid = self._valid[type_name]
        val = element.text
        if val not in valid:
            log.warning('Value %s not valid for type %s: must be %s',
                        val, type_name, valid)
        return {type_name: val}

    def handle_dataType(self, element):  # noqa
        # name="dataTypes"
        #   <xsd:union memberTypes="xsd:token">
        #     <xsd:simpleType>
        #       <xsd:restriction base="xsd:token">
        #         <xsd:enumeration value="Grid"/>
        #         <xsd:enumeration value="Image"/>
        #         <xsd:enumeration value="Point"/>
        #         <xsd:enumeration value="Radial"/>
        #         <xsd:enumeration value="Station"/>
        #         <xsd:enumeration value="Swath"/>
        #         <xsd:enumeration value="Trajectory"/>
        #       </xsd:restriction>
        #     </xsd:simpleType>
        #   </xsd:union>
        type_name = 'dataType'
        valid = self._valid[type_name]
        # case insensitive

        val = element.text
        if val.lower() not in valid:
            log.warning('Value %s not valid for type %s: must be %s',
                        val, type_name, valid)
        return {type_name: val}


class _ComplexTypes(object):
    @staticmethod
    def _get_tag_name(element):
        if '}' in element.tag:
            element_name = element.tag.split('}')[-1]
        else:
            element_name = element.tag
        return element_name

    @staticmethod
    def _spatial_range_req_children():
        return ['start', 'size']

    @staticmethod
    def _spatial_range_opt_children():
        return ['resolution', 'units']

    @staticmethod
    def _date_type_formatted_valid_attrs():
        return ['format', 'type']

    @staticmethod
    def _controlled_vocatulary_opt_attrs():
        return ['vocabulary']

    @staticmethod
    def _variable_opt_attrs():
        return ['vocabulary_name', 'units']

    @staticmethod
    def _variable_req_attrs():
        return ['name']

    @staticmethod
    def _variables_opt_attrs():
        return ['vocabulary']

    @staticmethod
    def _data_size_req_attrs():
        return ['units']

    #
    # complex types:
    # ==============
    def handle_spatialRange(self, element):  # noqa
        # name="spatialRange">
        #   <xsd:sequence>
        #    <xsd:element name="start" type="xsd:double"  />
        #    <xsd:element name="size" type="xsd:double" />
        #    <xsd:element name="resolution" type="xsd:double" minOccurs="0" />
        #    <xsd:element name="units" type="xsd:string" minOccurs="0" />
        #   </xsd:sequence>
        type_name = 'spatialRange'
        req_children = self._spatial_range_req_children()
        opt_children = self._spatial_range_opt_children()
        valid = req_children + opt_children

        spatial_range = {}
        for child in element:
            child_name = child.tag
            if child_name in valid:
                if child_name != 'units':
                    spatial_range[child.tag] = float(child.text)
                else:
                    spatial_range[child.tag] = child.text
            else:
                # child not valid
                log.warning('%s is not valid for type %s',
                            child_name, type_name)
        return spatial_range

    def handle_controlledVocabulary(self, element):  # noqa
        #
        # type="controlledVocabulary"
        #   <xsd:simpleContent>
        #    <xsd:extension base="xsd:string">
        #     <xsd:attribute name="vocabulary" type="xsd:string" />
        #    </xsd:extension>
        #   </xsd:simpleContent>
        #
        type_name = 'controlledVocabulary'

        opt_attrs = self._controlled_vocatulary_opt_attrs()
        val = {}
        for attr in element.attrib:
            if attr not in opt_attrs:
                log.warning('%s not a valid attribute for %s', type_name,
                            attr)
            else:
                val[attr] = element.attrib[attr]

        name = element.text
        tmp = {'name': name}
        if val:
            tmp.update(val)
        return tmp

    def handle_dateTypeFormatted(self, element):  # noqa
        # name="dateTypeFormatted"
        #   <xsd:simpleContent>
        #     <xsd:extension base="dateType">
        #       <xsd:attribute name="format" type="xsd:string" /> // from
        #                                        java.text.SimpleDateFormat
        #       <xsd:attribute name="type" type="dateEnumTypes" />
        #     </xsd:extension>
        #
        type_name = 'dateTypeFormatted'
        valid_attrs = self._date_type_formatted_valid_attrs()
        val = {}
        for attr in element.attrib:
            if attr not in valid_attrs:
                log.warning('%s is not a valid attribute for %s', attr,
                            type_name)
            else:
                val[attr] = element.attrib[attr]

        val['value'] = element.text

        return val

    def handle_sourceType(self, element):  # noqa
        # name="sourceType"
        #   <xsd:sequence>
        #     <xsd:element name="name" type="controlledVocabulary"/>
        #     <xsd:element name="contact">
        #       <xsd:complexType>
        #         <xsd:attribute name="email" type="xsd:string"
        #                                     use="required"/>
        #         <xsd:attribute name="url" type="xsd:anyURI"/>
        #       </xsd:complexType>
        #     </xsd:element>
        #   </xsd:sequence>
        parsed = {}
        for child in element:
            value = {}
            if child.tag == 'name':
                value = self.handle_controlledVocabulary(child)
            elif child.tag == 'contact':
                if 'url' in child.attrib:
                    value['url'] = child.attrib['url']
                if 'email' in child.attrib:
                    value['email'] = child.attrib['email']
                else:
                    log.warning("'contact' must have an attribute: 'email'")
                    value['email'] = 'missing'
            if value:
                parsed.update(value)
        return parsed

    def handle_timeCoverageType(self, element):  # noqa
        # name="timeCoverageType">
        #   <xsd:sequence>
        #     <xsd:choice minOccurs="2" maxOccurs="3" >
        #       <xsd:element name="start" type="dateTypeFormatted"/>
        #       <xsd:element name="end" type="dateTypeFormatted"/>
        #       <xsd:element name="duration" type="duration"/>
        #     </xsd:choice>
        #     <xsd:element name="resolution" type="duration" minOccurs="0"/>
        #   </xsd:sequence>
        parsed = {}
        tags = []
        for child in element:
            tags.append(child.tag)
        valid_num_elements = len(tags) >= 2 & len(tags) <= 3
        if valid_num_elements:
            for child in element:
                value = {}
                if child.tag in ['start', 'end']:
                    processed = self.handle_dateTypeFormatted(child)
                    value[child.tag] = processed['value']
                elif child.tag in ['duration', 'resolution']:
                    value[child.tag] = child.text
                parsed.update(value)
        else:
            log.warning('Not enough elements to make a valid timeCoverage')

        return parsed

    def handle_variable(self, element):
        # element_name="variable"
        #   <xsd:complexType mixed="true">
        #     <xsd:attribute name="name" type="xsd:string" use="required"/>
        #     <xsd:attribute name="vocabulary_name" type="xsd:string"
        #                    use="optional"/>
        #     <xsd:attribute name="units" type="xsd:string"/>
        #   </xsd:complexType>
        type_name = 'variable'
        opt_attrs = self._variable_opt_attrs()
        req_attrs = self._variable_req_attrs()
        valid_attrs = opt_attrs + req_attrs
        valid = True
        variable = {}
        for req_attr in req_attrs:
            if req_attr not in element.attrib:
                valid = False
                log.warning('%s must have an attribute %s', type_name,
                            req_attr)
        if valid:
            if element.text:
                variable['description'] = element.text
            for attr in element.attrib:
                if attr in valid_attrs:
                    variable[attr] = element.attrib[attr]

        return variable

    @staticmethod
    def handle_variableMap(element):  # noqa
        # element_name="variableMap"
        #   <xsd:complexType>
        #     <xsd:attributeGroup ref="XLink"/>
        #   </xsd:complexType>
        type_name = 'variableMap'  # noqa
        var_map = {}
        for attr in element.attrib:
            var_map[attr] = element.attrib[attr]

        return var_map

    def handle_variables(self, element):
        # element_name="variables"
        #   <xsd:complexType>
        #     <xsd:choice>
        #       <xsd:element ref="variable" minOccurs="0"
        #                    maxOccurs="unbounded"/>
        #       <xsd:element ref="variableMap" minOccurs="0"/>
        #     </xsd:choice>
        #     <xsd:attribute name="vocabulary" type="variableNameVocabulary"
        #                    use="optional"/>
        #     <xsd:attributeGroup ref="XLink"/>
        #   </xsd:complexType>
        type_name = 'variables'  # noqa
        variables = {}
        variable_list = []
        variable_map_list = []
        for child in element:
            child_type = self._get_tag_name(child)

            if child_type == 'variable':
                var = self.handle_variable(child)
                variable_list.append(var)
            elif child_type == 'variableMap':
                var_map = self.handle_variableMap(element)
                variable_map_list.append(var_map)

        opt_attrs = self._variables_opt_attrs()
        for attr in element.attrib:
            if attr in opt_attrs:
                variables[attr] = element.attrib[attr]

        if variable_list:
            variables['variables'] = variable_list

        if variable_map_list:
            variables['variableMaps'] = variable_map_list
        return variables

    def handle_dataSize(self, element):  # noqa
        #   <xsd:complexType>
        #     <xsd:simpleContent>
        #     <xsd:extension base="xsd:string">
        #       <xsd:attribute name="units" type="xsd:string" use="required"/>
        #     </xsd:extension>
        #     </xsd:simpleContent>
        #   </xsd:complexType>
        #
        req_attrs = self._data_size_req_attrs()
        data_size = {'size': float(element.text)}

        for attr in element.attrib:
            if attr in req_attrs:
                data_size[attr] = element.attrib[attr]

        return data_size


[docs]class TDSCatalogMetadata(object): """Hold information contained in the catalog Metadata tag. Attributes ---------- metadata : dict[str, object] The dictionary containing the metadata entries """
[docs] def __init__(self, element, metadata_in=None): """Initialize a :class:`TDSCatalogMetadata` object. Parameters ---------- element : :class:`~xml.etree.ElementTree.Element` An :class:`~xml.etree.ElementTree.Element` representing a metadata node metadata_in : dict[str, object], optional Parent metadata to inherit, if appropriate. Defaults to None. """ self._ct = _ComplexTypes() self._st = _SimpleTypes() self._sts = _SimpleTypes.__dict__ self._cts = _ComplexTypes.__dict__ inherited = False if 'inherited' in element.attrib: inherited = element.attrib['inherited'] if inherited == 'true': inherited = True else: inherited = False if metadata_in and (inherited or self._is_external_metadata_doc(element)): # only inherit metadata passed in if the new metadata # element has inherit set to True or if the new # metadata element is pointing to an external metadata # document using an xlink self.metadata = metadata_in else: self.metadata = {'inherited': inherited} element_name = self._get_tag_name(element) if element_name == 'metadata': for child in element: self._parse_element(child) else: self._parse_element(element)
@staticmethod def _get_tag_name(element): if '}' in element.tag: element_name = element.tag.split('}')[-1] else: element_name = element.tag return element_name @staticmethod def _is_external_metadata_doc(element): attributes = element.attrib has_xlink_title = xlink_title_attr in attributes has_xlink_href = xlink_href_attr in attributes return has_xlink_title and has_xlink_href def _get_handler(self, handler_name): handler_name = 'handle_' + handler_name if handler_name in self._cts: return getattr(self._ct, handler_name) elif handler_name in self._sts: return getattr(self._st, handler_name) else: msg = 'cannot find handler for element {}'.format(handler_name) log.warning(msg) def _parse_element(self, element): element_name = self._get_tag_name(element) parser = {'documentation': self._parse_documentation, 'property': self._parse_property, 'contributor': self._parse_contributor, 'geospatialCoverage': self._parse_geospatial_coverage, 'serviceName': self._parse_service_name, 'authority': self._parse_authority, 'publisher': self._parse_publisher, 'creator': self._parse_creator, 'keyword': self._parse_keyword, 'project': self._parse_project, 'dataFormat': self._parse_data_format, 'dataType': self._parse_data_type, 'date': self._parse_date, 'timeCoverage': self._parse_timeCoverage, 'variableMap': self._parse_variableMap, 'variables': self._parse_variables, 'metadata': self._parse_embedded_metadata} try: parser[element_name](element) except KeyError: log.warning('No parser found for element %s', element_name) def _parse_documentation(self, element): # <xsd:simpleType name="documentationEnumTypes"> # <xsd:union memberTypes="xsd:token"> # <xsd:simpleType> # <xsd:restriction base="xsd:token"> # <xsd:enumeration value="funding"/> # <xsd:enumeration value="history"/> # <xsd:enumeration value="processing_level"/> # <xsd:enumeration value="rights"/> # <xsd:enumeration value="summary"/> # </xsd:restriction> # </xsd:simpleType> # </xsd:union> # </xsd:simpleType> # # <xsd:complexType name="documentationType" mixed="true"> # <xsd:sequence> # <xsd:any namespace="http://www.w3.org/1999/xhtml" minOccurs="0" # maxOccurs="unbounded" processContents="strict"/> # </xsd:sequence> # <xsd:attribute name="type" type="documentationEnumTypes"/> # <xsd:attributeGroup ref="XLink" /> # </xsd:complexType> # doc_enum_types = ("funding", "history", "processing_level", "rights", # "summary") known = 'type' in element.attrib # document element has no attributes plain_doc = not element.attrib md = self.metadata md.setdefault('documentation', {}) if known or plain_doc: if known: doc_type = element.attrib['type'] else: doc_type = 'generic' md['documentation'].setdefault(doc_type, []).append(element.text) elif xlink_href_attr in element.attrib: title = element.attrib[xlink_title_attr] href = element.attrib[xlink_href_attr] xlink = {'title': title, 'href': href} md['documentation'].setdefault('xlink', []).append(xlink) self.metadata = md def _parse_property(self, element): # <xsd:element name="property"> # <xsd:complexType> # <xsd:attribute name="name" type="xsd:string"/> # <xsd:attribute name="value" type="xsd:string"/> # </xsd:complexType> # </xsd:element> name = element.attrib['name'] value = element.attrib['value'] self.metadata.setdefault('property', {})[name] = value def _parse_contributor(self, element): # <xsd:element name="contributor"> # <xsd:complexType> # <xsd:simpleContent> # <xsd:extension base="xsd:string"> # <xsd:attribute name="role" type="xsd:string" # use="required"/> # </xsd:extension> # </xsd:simpleContent> # </xsd:complexType> # </xsd:element> element_type = 'contributor' role = element.attrib['role'] name = element.text self.metadata.setdefault(element_type, {}).setdefault(role, []).append(name) def _parse_geospatial_coverage(self, element): element_type = 'geospatialCoverage' md = {} # <xsd:element name="geospatialCoverage"> # <xsd:complexType> # <xsd:sequence> # <xsd:element name="northsouth" type="spatialRange" # minOccurs="0" /> # <xsd:element name="eastwest" type="spatialRange" # minOccurs="0" /> # <xsd:element name="updown" type="spatialRange" # minOccurs="0" /> # <xsd:element name="name" type="controlledVocabulary" # minOccurs="0" maxOccurs="unbounded"/> # </xsd:sequence> # # <xsd:attribute name="zpositive" type="upOrDown" default="up"/> # </xsd:complexType> # </xsd:element> elements = {'northsouth': 'spatialRange', 'eastwest': 'spatialRange', 'updown': 'spatialRange', 'name': 'controlledVocabulary' } attrs = {'zpositive': 'upOrDown'} if element.attrib: for attr in element.attrib: if attr in attrs: handler_name = attrs[attr] handler = self._get_handler(handler_name) value = handler(element) md.update({attr: value}) else: log.warning('Attr on %s : %s not captured', attr, element_type) for child in element: child_name = child.tag if child_name in elements: handler_name = elements[child_name] handler = self._get_handler(handler_name) value = handler(child) md.update(value) self.metadata.setdefault(element_type, []).append(md) def _parse_service_name(self, element): # can only have one serviceName element_type = 'serviceName' self.metadata[element_type] = element.text def _parse_authority(self, element): element_type = 'authority' self.metadata.setdefault(element_type, []).append(element.text) def _parse_publisher(self, element): element_type = 'publisher' parsed = self._ct.handle_sourceType(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_creator(self, element): element_type = 'creator' parsed = self._ct.handle_sourceType(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_keyword(self, element): element_type = 'keyword' parsed = self._ct.handle_controlledVocabulary(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_project(self, element): element_type = 'project' parsed = self._ct.handle_controlledVocabulary(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_data_format(self, element): element_type = 'dataFormat' # noqa parsed = self._st.handle_dataFormat(element) self.metadata.update(parsed) def _parse_data_type(self, element): element_type = 'dataType' # noqa parsed = self._st.handle_dataType(element) self.metadata.update(parsed) def _parse_date(self, element): element_type = 'date' parsed = self._ct.handle_dateTypeFormatted(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_timeCoverage(self, element): # noqa element_type = 'timeCoverage' parsed = self._ct.handle_timeCoverageType(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_variableMap(self, element): # noqa element_type = 'variableMap' parsed = self._ct.handle_variableMap(element) self.metadata.setdefault(element_type, []).append(parsed) def _parse_variables(self, element): element_type = 'variables' parsed = self._ct.handle_variables(element) for variable in parsed['variables']: var_name = variable['name'] variable.pop('name', None) self.metadata.setdefault(element_type, {})[var_name] = variable def _parse_embedded_metadata(self, element): element_type = 'external_metadata' if xlink_href_attr in element.attrib: title = element.attrib[xlink_title_attr] href = element.attrib[xlink_href_attr] self.metadata.setdefault(element_type, {})[title] = href else: log.warning('Cannot parse embedded metadata element %s: %s', element.tag, element.attrib)