Source code for mindmeld.system_entity_recognizer

# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
from abc import ABC, abstractmethod
from enum import Enum

import pycountry
import requests

from .components._config import (
    DEFAULT_DUCKLING_URL,
    get_system_entity_url_config,
    is_duckling_configured,
)
from .components.schemas import (
    validate_language_code,
    validate_locale_code,
    validate_timestamp,
)
from .core import Entity, QueryEntity, Span, _sort_by_lowest_time_grain
from .exceptions import MindMeldError, SystemEntityResolutionError
from .constants import SYSTEM_ENTITY_PREFIX

SUCCESSFUL_HTTP_CODE = 200
SYS_ENTITY_REQUEST_TIMEOUT = os.environ.get("MM_SYS_ENTITY_REQUEST_TIMEOUT", 3.0)
try:
    if float(SYS_ENTITY_REQUEST_TIMEOUT) <= 0.0:
        raise MindMeldError(
            "MM_SYS_ENTITY_REQUEST_TIMEOUT env var has to be > 0.0 seconds."
        )
except ValueError as e:
    raise MindMeldError(
        "MM_SYS_ENTITY_REQUEST_TIMEOUT env var has to be a float value."
    ) from e

logger = logging.getLogger(__name__)


[docs]class SystemEntityError(Exception): pass
[docs]class DucklingDimension(Enum): AMOUNT_OF_MONEY = "amount-of-money" CREDIT_CARD_NUMBER = "credit-card-number" DISTANCE = "distance" DURATION = "duration" NUMERAL = "numeral" ORDINAL = "ordinal" QUANTITY = "quantity" TEMPERATURE = "temperature" VOLUME = "volume" EMAIL = "email" PHONE_NUMBER = "phone-number" URL = "url" TIME = "time"
[docs]class SystemEntityRecognizer(ABC): """SystemEntityRecognizer is the external parsing service used to extract system entities. It is intended to be used as a singleton, so it's initialized only once during NLP object construction. """ _instance = None
[docs] @staticmethod def get_instance(): """Static access method. If there is no instance instantiated, we instantiate NoOpSystemEntityRecognizer. Returns: (SystemEntityRecognizer): A SystemEntityRecognizer instance """ if not SystemEntityRecognizer._instance: SystemEntityRecognizer._instance = NoOpSystemEntityRecognizer.get_instance() return SystemEntityRecognizer._instance
[docs] @staticmethod def set_system_entity_recognizer(system_entity_recognizer=None, app_path=None): """We set the global System Entity Recognizer to be the one configured from the application's path. Args: system_entity_recognizer: A system entity recognizer app_path (str): The application path Returns: (SystemEntityRecognizer) """ if system_entity_recognizer and isinstance( system_entity_recognizer, SystemEntityRecognizer ): SystemEntityRecognizer._instance = system_entity_recognizer elif app_path: SystemEntityRecognizer._instance = SystemEntityRecognizer.load_from_app_path( app_path ) else: raise SystemEntityError( "Either `system_entity_recognizer` or `app_path` must be valid." )
[docs] @staticmethod def load_from_app_path(app_path): """If the application configuration is empty, we do not use Duckling. Otherwise, we return the Duckling recognizer with the URL defined in the application's config, default to the DEFAULT_DUCKLING_URL. Args: app_path (str): Application path Returns: (SystemEntityRecognizer) """ if not app_path: raise SystemEntityError( "App path must be valid to load entity recognizer config." ) if is_duckling_configured(app_path): url = get_system_entity_url_config(app_path=app_path) return DucklingRecognizer.get_instance(url) else: return NoOpSystemEntityRecognizer.get_instance()
[docs] @abstractmethod def parse(self, sentence, **kwargs): """Calls System Entity Recognizer service API to extract numerical entities from a sentence. Args: sentence (str): A raw sentence. Returns: (tuple): A tuple containing: - response (list, dict): Response from the System Entity Recognizer service that \ consists of a list of dicts, each corresponding to a single prediction or just a \ dict, corresponding to a single prediction. - response_code (int): http status code. """ pass
[docs] @abstractmethod def resolve_system_entity(self, query, entity_type, span): """Resolves a system entity in the provided query at the specified span. Args: query (Query): The query containing the entity entity_type (str): The type of the entity span (Span): The character span of the entity in the query Returns: Entity: The resolved entity Raises: SystemEntityResolutionError """ pass
[docs] @abstractmethod def get_candidates(self, query, entity_types=None, **kwargs): """Identifies candidate system entities in the given query. Args: query (Query): The query to examine entity_types (list of str): The entity types to consider Returns: list of QueryEntity: The system entities found in the query """ pass
[docs] @abstractmethod def get_candidates_for_text(self, text, entity_types=None, **kwargs): """Identifies candidate system entities in the given text. Args: text (str): The text to examine entity_types (list of str): The entity types to consider Returns: list of dict: The system entities found in the text """ pass
[docs]class NoOpSystemEntityRecognizer(SystemEntityRecognizer): """ This is a no-ops recognizer which returns empty list and 200. """ _instance = None def __init__(self): if self._instance: raise SystemEntityError("NoOpSystemEntityRecognizer is a singleton.") NoOpSystemEntityRecognizer._instance = self
[docs] @staticmethod def get_instance(): if not NoOpSystemEntityRecognizer._instance: NoOpSystemEntityRecognizer() return NoOpSystemEntityRecognizer._instance
[docs] def parse(self, sentence, **kwargs): return [], SUCCESSFUL_HTTP_CODE
[docs] def resolve_system_entity(self, query, entity_type, span): return
[docs] def get_candidates(self, query, entity_types=None, **kwargs): return []
[docs] def get_candidates_for_text(self, text, entity_types=None, **kwargs): return []
[docs]class DucklingRecognizer(SystemEntityRecognizer): _instance = None def __init__(self, url=DEFAULT_DUCKLING_URL): """Private constructor for SystemEntityRecognizer. Do not directly construct the DucklingRecognizer object. Instead, use the static get_instance method. Args: url (str): Duckling URL """ if DucklingRecognizer._instance: raise SystemEntityError("DucklingRecognizer is a singleton") self.url = url DucklingRecognizer._instance = self
[docs] @staticmethod def get_instance(url=None): """Static access method. We get an instance for the Duckling URL. If there is no URL being passed, default to DEFAULT_DUCKLING_URL. Args: url: Duckling URL. Returns: (DucklingRecognizer): A DucklingRecognizer instance """ url = url or DEFAULT_DUCKLING_URL if not DucklingRecognizer._instance: DucklingRecognizer(url=url) return DucklingRecognizer._instance
[docs] def get_response(self, data): """ Send a post request to Duckling, data is a dictionary with field `text`. Return a tuple consisting the JSON response and a response code. Args: data (dict) Returns: (dict, int) """ try: response = requests.request( "POST", self.url, data=data, timeout=float(SYS_ENTITY_REQUEST_TIMEOUT) ) if response.status_code == requests.codes["ok"]: response_json = response.json() return response_json, response.status_code else: raise SystemEntityError("System entity status code is not 200.") except requests.ConnectionError: msg = ( "Unable to connect to the system entity recognizer at %s. Make sure it's " "running by typing 'mindmeld num-parse' at the command line." ) logger.exception(msg, self.url) raise except Exception: # pylint: disable=broad-except logger.exception("unhandled System Entity Recognizer Error, URL: %s", self.url) raise
[docs] def parse( self, sentence, dimensions=None, language=None, locale=None, time_zone=None, timestamp=None, ): """Calls System Entity Recognizer service API to extract numerical entities from a sentence. Args: sentence (str): A raw sentence. dimensions (None or list of str): The list of types (e.g. volume, \ temperature) to restrict the output to. If None, include all types. language (str, optional): Language of the sentence specified using a 639-1/2 code. If both locale and language are provided, the locale is used. If neither are provided, the EN language code is used. locale (str, optional): The locale representing the ISO 639-1 language code and \ ISO3166 alpha 2 country code separated by an underscore character. time_zone (str, optional): An IANA time zone id such as 'America/Los_Angeles'. \ If not specified, the system time zone is used. timestamp (long, optional): A unix millisecond timestamp used as the reference time. \ If not specified, the current system time is used. If `time_zone` \ Returns: (tuple): A tuple containing: - response (list, dict): Response from the System Entity Recognizer service that \ consists of a list of dicts, each corresponding to a single prediction or just a \ dict, corresponding to a single prediction. - response_code (int): http status code. """ if sentence == "": logger.error("Empty query passed to the system entity resolver") return [], SUCCESSFUL_HTTP_CODE data = { "text": sentence, "latent": True, } language = validate_language_code(language) locale = validate_locale_code(locale) # If a ISO 639-2 code is provided, we attempt to convert it to # ISO 639-1 since the dependent system entity resolver requires this if language and len(language) == 3: iso639_2_code = pycountry.languages.get(alpha_3=language.lower()) try: language = getattr(iso639_2_code, "alpha_2").upper() except AttributeError: language = None if locale and language: language_code_of_locale = locale.split("_")[0] if language_code_of_locale.lower() != language.lower(): logger.error( "Language code %s and Locale code do not match %s, " "using only the locale code for processing", language, locale, ) # The system entity recognizer prefers the locale code over the language code, # so we bias towards sending just the locale code when the codes dont match. language = None # If the locale is invalid, we use the default if not language and not locale: language = "EN" locale = "en_US" if locale: data["locale"] = locale if language: data["lang"] = language.upper() if dimensions is not None: data["dims"] = json.dumps(dimensions) if time_zone: data["tz"] = time_zone if timestamp: data["reftime"] = validate_timestamp(str(timestamp)) # Currently we rely on Duckling for parsing numerical data but in the future we can use # other system entity recognizer too return self.get_response(data)
[docs] def resolve_system_entity(self, query, entity_type, span): """Resolves a system entity in the provided query at the specified span. Args: query (Query): The query containing the entity entity_type (str): The type of the entity span (Span): The character span of the entity in the query Returns: Entity: The resolved entity Raises: SystemEntityResolutionError """ span_filtered_candidates = list( filter( lambda candidate: candidate.span == span, query.system_entity_candidates ) ) entity_type_filtered_candidates = list( filter( lambda candidate: candidate.entity.type == entity_type, span_filtered_candidates, ) ) if entity_type == "sys_time": entity_type_filtered_candidates = _sort_by_lowest_time_grain( entity_type_filtered_candidates ) if len(entity_type_filtered_candidates) > 0: # Duckling ranks sys_interval candidates with incomplete # "to" duration time interval higher than candidates with complete # "to" duration time interval. Therefore, we recommend the complete # candidate over the incomplete one when all the candidates have the # same "from" duration time. if entity_type == "sys_interval": from_vals = set() candidates_with_from_and_to_vals = [] for candidate in entity_type_filtered_candidates: from_val, to_val = candidate.entity.value["value"] from_vals.add(from_val) if from_val and to_val: candidates_with_from_and_to_vals.append(candidate) if len(candidates_with_from_and_to_vals) > 0 and len(from_vals) == 1: # All of the candidates have the same "from" time return candidates_with_from_and_to_vals[0] # Duckling sorts most probable entity candidates higher than # the lower probable candidates. So we return the best possible # candidate in this case when multiple duckling candidates are # returned. return entity_type_filtered_candidates[0] language = query.language time_zone = query.time_zone timestamp = query.timestamp duckling_candidates, _ = self.parse( span.slice(query.text), language=language, time_zone=time_zone, timestamp=timestamp, ) duckling_text_val_to_candidate = {} # If no matching candidate was found, try parsing only this entity # # For secondary candidate picking, we prioritize candidates as follows: # a) candidate matches both span range and entity type # b) candidate with the most number of matching characters to the user # annotation # c) candidate whose span matches either the start or end user annotation # span for raw_candidate in duckling_candidates: candidate = duckling_item_to_query_entity( query, raw_candidate, offset=span.start ) if candidate.entity.type == entity_type: # If the candidate matches the entire entity, return it if candidate.span == span: return candidate else: duckling_text_val_to_candidate.setdefault( candidate.text, [] ).append(candidate) # Sort duckling matching candidates by the length of the value best_duckling_candidate_names = list(duckling_text_val_to_candidate.keys()) best_duckling_candidate_names.sort(key=len, reverse=True) if best_duckling_candidate_names: default_duckling_candidate = None longest_matched_duckling_candidate = best_duckling_candidate_names[0] for candidate in duckling_text_val_to_candidate[ longest_matched_duckling_candidate ]: if candidate.span.start == span.start or candidate.span.end == span.end: return candidate else: default_duckling_candidate = candidate return default_duckling_candidate msg = "Unable to resolve system entity of type {!r} for {!r}." msg = msg.format(entity_type, span.slice(query.text)) if span_filtered_candidates: msg += " Entities found for the following types {!r}".format( [a.entity.type for a in span_filtered_candidates] ) raise SystemEntityResolutionError(msg)
[docs] def get_candidates( self, query, entity_types=None, locale=None, language=None, time_zone=None, timestamp=None, ): """Identifies candidate system entities in the given query. Args: query (Query): The query to examine entity_types (list of str): The entity types to consider locale (str, optional): The locale representing the ISO 639-1 language code and \ ISO3166 alpha 2 country code separated by an underscore character. language (str, optional): Language as specified using a 639-1/2 code. time_zone (str, optional): An IANA time zone id such as 'America/Los_Angeles'. If not specified, the system time zone is used. timestamp (long, optional): A unix timestamp used as the reference time. If not specified, the current system time is used. If `time_zone` is not also specified, this parameter is ignored. Returns: list of QueryEntity: The system entities found in the query """ dims = dimensions_from_entity_types(entity_types) language = language or query.language time_zone = time_zone or query.time_zone timestamp = timestamp or query.timestamp response, response_code = self.parse( query.text, dimensions=dims, locale=locale, language=language, time_zone=time_zone, timestamp=timestamp, ) if response_code == SUCCESSFUL_HTTP_CODE: return [ e for e in [ duckling_item_to_query_entity(query, item) for item in response ] if entity_types is None or e.entity.type in entity_types ] logger.debug( "System Entity Recognizer service did not process query: %s with dims: %s " "correctly and returned response: %s", query.text, str(dims), str(response), ) return []
[docs] def get_candidates_for_text( self, text, entity_types=None, locale=None, language=None, time_zone=None, timestamp=None, ): """Identifies candidate system entities in the given text. Args: text (str): The text to examine entity_types (list of str): The entity types to consider language (str): Language code locale (str): Locale code time_zone (str, optional): An IANA time zone id such as 'America/Los_Angeles'. If not specified, the system time zone is used. timestamp (long, optional): A unix timestamp used as the reference time. If not specified, the current system time is used. If `time_zone` is not also specified, this parameter is ignored. Returns: list of dict: The system entities found in the text """ dims = dimensions_from_entity_types(entity_types) response, response_code = self.parse( text, dimensions=dims, language=language, locale=locale, time_zone=time_zone, timestamp=timestamp, ) if response_code == SUCCESSFUL_HTTP_CODE: items = [] for item in response: entity = duckling_item_to_entity(item) if entity_types is None or entity.type in entity_types: item["entity_type"] = entity.type items.append(item) return items else: logger.debug( "System Entity Recognizer service did not process query: %s with dims: %s " "correctly and returned response: %s", text, str(dims), str(response), ) return []
def _construct_interval_helper(interval_item): from_ = interval_item.get("from", {}).get("value", None) to_ = interval_item.get("to", {}).get("value", None) return from_, to_
[docs]def duckling_item_to_entity(item): """Converts an item from the output of duckling into an Entity Args: item (dict): The duckling item Returns: Entity: The entity described by the duckling item """ value = {} dimension = item["dim"] # These dimensions have no 'type' key in the 'value' dict if dimension in map( lambda x: x.value, [ DucklingDimension.CREDIT_CARD_NUMBER, DucklingDimension.EMAIL, DucklingDimension.PHONE_NUMBER, DucklingDimension.URL, ], ): num_type = dimension value["value"] = item["value"]["value"] if "values" in item["value"]: value["alternate_values"] = item["value"]["values"] else: type_ = item["value"]["type"] # num_type = f'{dimension}-{type_}' # e.g. time-interval, temperature-value, etc num_type = dimension if type_ == "value": value["value"] = item["value"]["value"] if "values" in item["value"]: value["alternate_values"] = item["value"]["values"] elif type_ == "interval": # Some intervals will only contain one value. The other value will be None in that case value["value"] = _construct_interval_helper(item["value"]) if "values" in item["value"]: value["alternate_values"] = [ _construct_interval_helper(interval_item) for interval_item in item["value"]["values"] ] # Get the unit if it exists if "unit" in item["value"]: value["unit"] = item["value"]["unit"] # Special handling of time dimension grain if dimension == DucklingDimension.TIME.value: if type_ == "value": value["grain"] = item["value"].get("grain") elif type_ == "interval": # Want to predict time intervals as sys_interval num_type = "interval" if "from" in item["value"]: value["grain"] = item["value"]["from"].get("grain") elif "to" in item["value"]: value["grain"] = item["value"]["to"].get("grain") entity_type = f"{SYSTEM_ENTITY_PREFIX}{num_type}" return Entity(item["body"], entity_type, value=value)
[docs]def duckling_item_to_query_entity(query, item, offset=0): """Converts an item from the output of duckling into a QueryEntity Args: query (Query): The query to construct the QueryEntity from item (dict): The duckling item offset (int, optional): The offset into the query that the item's indexing begins Returns: QueryEntity: The query entity described by the duckling item or \ None if no item is present """ if item: start = int(item["start"]) + offset end = int(item["end"]) - 1 + offset entity = duckling_item_to_entity(item) return QueryEntity.from_query(query, Span(start, end), entity=entity) else: return
[docs]def dimensions_from_entity_types(entity_types): """ Args: entity_types (list) Returns: (list) """ entity_types = entity_types or [] dims = set() for entity_type in entity_types: if entity_type == "sys_interval": dims.add("time") if entity_type.startswith(SYSTEM_ENTITY_PREFIX): dims.add(entity_type.split("_")[1]) if not dims: return None return list(dims)