Source code for mindmeld.components.dialogue

# -*- coding: utf-8 -*-
# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module contains the dialogue manager component of MindMeld"""
import asyncio
import copy
import json
import logging
import random
import warnings
from functools import cmp_to_key, partial
from typing import List, Optional
from marshmallow.exceptions import ValidationError
import immutables

from .. import path
from .request import FrozenParams, Params, Request
from ..core import Entity, FormEntity
from ..models import entity_features, query_features
from ..models.helpers import DEFAULT_SYS_ENTITIES

mod_logger = logging.getLogger(__name__)

[docs]class DirectiveNames: """A constants object for directive names.""" LIST = "list" """A directive to display a list.""" LISTEN = "listen" """A directive to listen (start speech recognition).""" REPLY = "reply" """A directive to display a text view.""" RESET = "reset" """A directive to reset.""" SPEAK = "speak" """A directive to speak text out loud.""" SUGGESTIONS = "suggestions" """A view for a list of suggestions.""" SLEEP = "sleep" """A directive to put the client to sleep after a specified number of milliseconds."""
[docs]class DirectiveTypes: """A constants object for directive types.""" VIEW = "view" """An action directive.""" ACTION = "action" """A view directive."""
[docs]class DialogueStateException(Exception): def __init__(self, message=None, target_dialogue_state=None): super().__init__(message) self.target_dialogue_state = target_dialogue_state
[docs]class DialogueStateRule: """A rule that determines a dialogue state. Each rule represents a pattern that must match in order to invoke a particular dialogue state. Attributes: dialogue_state (str): The name of the dialogue state. domain (str): The name of the domain to match against. entity_types (set): The set of entity types to match against. intent (str): The name of the intent to match against. targeted_only (bool): Whether the state is targeted only. default (bool): Whether this is the default state. """ logger = mod_logger.getChild("DialogueStateRule") """Class logger.""" def __init__(self, dialogue_state, **kwargs): """Initializes a dialogue state rule. Args: dialogue_state (str): The name of the dialogue state. domain (str): The name of the domain to match against. has_entity (str): A single entity type to match. has_entities (list, set): A list/set of entity types to match against. intent (str): The name of the intent to match against. """ self.dialogue_state = dialogue_state key_kwargs = ( ("domain",), ("intent",), ("has_entity", "has_entities"), ("targeted_only",), ("default",), ) valid_kwargs = set() for keys in key_kwargs: valid_kwargs.update(keys) for kwarg in kwargs: if kwarg not in valid_kwargs: raise TypeError( ( "DialogueStateRule() got an unexpected keyword argument" " '{!s}'" ).format(kwarg) ) resolved = {} for keys in key_kwargs: if len(keys) == 2: single, plural = keys # pylint: disable=unbalanced-tuple-unpacking if single in kwargs and plural in kwargs: msg = "Only one of {!r} and {!r} can be specified for a dialogue state rule" raise ValueError(msg.format(single, plural)) elif single in kwargs and isinstance(kwargs[single], str): resolved[plural] = {kwargs[single]} elif plural in kwargs and isinstance( kwargs[plural], (list, set, tuple) ): resolved[plural] = set(kwargs[plural]) else: if single in kwargs: msg = "Invalid argument type {!r} for {!r}" raise ValueError(msg.format(kwargs[single], single)) elif plural in kwargs: msg = "Invalid argument type {!r} for {!r}" raise ValueError(msg.format(kwargs[plural], plural)) elif keys[0] in kwargs: resolved[keys[0]] = kwargs[keys[0]] self.domain = resolved.get("domain", None) self.intent = resolved.get("intent", None) self.targeted_only = resolved.get("targeted_only", False) self.default = resolved.get("default", False) entities = resolved.get("has_entities", None) self.entity_types = None if entities is not None: for entity in entities: if not isinstance(entity, str): msg = "Invalid entity specification for dialogue state rule: {!r}" raise ValueError(msg.format(entities)) self.entity_types = frozenset(entities) if self.targeted_only and any([self.domain, self.intent, self.entity_types]): raise ValueError( "For a dialogue state rule, if targeted_only is " "True, domain, intent, and has_entity must be omitted" ) if self.default and any( [self.domain, self.intent, self.entity_types, self.targeted_only] ): raise ValueError( "For a dialogue state rule, if default is True, " "domain, intent, has_entity, and targeted_only must be omitted" )
[docs] def apply(self, request): """Applies the dialogue state rule to the given context. Args: request (Request): A request object. Returns: (bool): Whether or not the context matches. """ # Note: this will probably change as the details of "context" are worked out # bail if this rule is only reachable via target_dialogue_state if self.targeted_only: return False # check domain is correct if self.domain is not None and self.domain != request.domain: return False # check intent is correct if self.intent is not None and self.intent != request.intent: return False # check expected entity types are present if self.entity_types is not None: # TODO cache entity types entity_types = set() for entity in request.entities: entity_types.add(entity["type"]) if len(self.entity_types & entity_types) < len(self.entity_types): return False return True
@property def complexity(self): """Returns an integer representing the complexity of this dialogue state rule. Components of a rule in order of increasing complexity are as follows: default rule, domains, intents, entity types, entity mappings. Returns: (int): A number representing the rule complexity. """ complexity = [0] * 4 if self.entity_types: complexity[0] = len(self.entity_types) if self.intent: complexity[1] = 1 if self.domain: complexity[2] = 1 if self.default: complexity[3] = 1 return tuple(complexity) def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ raise NotImplementedError def __ne__(self, other): if isinstance(other, self.__class__): return not self.__eq__(other) raise NotImplementedError def __repr__(self): return "<{} {!r}>".format(self.__class__.__name__, self.dialogue_state)
[docs] @staticmethod def compare(this, that): """Compares the complexity of two dialogue state rules. Args: this (DialogueStateRule): A dialogue state rule. that (DialogueStateRule): A dialogue state rule. Returns: (int): The comparison result -1: that is more complex than this 0: this and that are equally complex 1: this is more complex than that """ if not ( isinstance(this, DialogueStateRule) and isinstance(that, DialogueStateRule) ): raise NotImplementedError # return (this.complexity > that.complexity) - (this.complexity < that.complexity)
[docs]class DialogueManager: logger = mod_logger.getChild("DialogueManager") def __init__(self, responder_class=None, async_mode=False): self.async_mode = async_mode self.handler_map = {} self.middlewares = [] self.rules = [] self.responder_class = responder_class or DialogueResponder self.default_rule = None
[docs] def handle(self, **kwargs): """A decorator that is used to register dialogue state rules.""" def _decorator(func): name = kwargs.pop("name", None) self.add_dialogue_rule(name, func, **kwargs) return func return _decorator
[docs] def middleware(self, *args): """A decorator that is used to register dialogue handler middleware.""" def _decorator(func): self.add_middleware(func) return func if args and callable(args[0]): # Support syntax: @middleware _decorator(args[0]) return args[0] # Support syntax: @middleware() return _decorator
[docs] def add_middleware(self, middleware): """Adds middleware for the dialogue manager. Middleware will be called for each message before the dialogue state handler. Middleware registered first will be called first. Args: middleware (callable): A dialogue manager middleware function. """ if self.async_mode and not asyncio.iscoroutinefunction(middleware): msg = ( "Cannot use middleware {!r} in async mode. " "Middleware must be coroutine function." ) raise TypeError(msg.format(middleware.__name__)) self.middlewares.append(middleware)
[docs] def add_dialogue_rule(self, name, handler, **kwargs): """Adds a dialogue state rule for the dialogue manager. Args: name (str): The name of the dialogue state. handler (function): The dialogue state handler function. kwargs (dict): A list of options to be passed to the DialogueStateRule initializer. """ if name is None: name = handler.__name__ if self.async_mode and not asyncio.iscoroutinefunction(handler): msg = ( "Cannot use dialogue state handler {!r} in async mode. " "Handler must be coroutine function in async mode." ) raise TypeError(msg.format(name)) rule = DialogueStateRule(name, **kwargs) self.rules.append(rule) self.rules.sort(key=cmp_to_key(, reverse=True) if handler is not None: old_handler = self.handler_map.get(name) if old_handler is not None and old_handler != handler: msg = ( "Handler mapping is overwriting an existing dialogue state: %s" % name ) raise AssertionError(msg) self.handler_map[name] = handler if rule.default: if self.default_rule: raise AssertionError("Only one default rule may be specified") self.default_rule = rule
[docs] def apply_handler(self, request, responder, target_dialogue_state=None): """Applies the dialogue state handler for the most complex matching rule. Args: request (Request): The request object. responder (DialogueResponder): The responder object. target_dialogue_state (str, optional): The target dialogue state. Returns: (DialogueResponder): A DialogueResponder containing the dialogue state and directives. """ if self.async_mode: return self._apply_handler_async( request, responder, target_dialogue_state=target_dialogue_state ) return self._apply_handler_sync( request, responder, target_dialogue_state=target_dialogue_state )
def _apply_handler_sync(self, request, responder, target_dialogue_state=None): """Applies the dialogue state handler for the most complex matching rule. Args: request (Request): The request object. responder (DialogueResponder): The responder object. target_dialogue_state (str, optional): The target dialogue state. Returns: (DialogueResponder): A DialogueResponder containing the dialogue state and directives. """ try: return self._attempt_handler_sync( request, responder, target_dialogue_state=target_dialogue_state ) except DialogueStateException as e: if e.target_dialogue_state != target_dialogue_state: target_dialogue_state = e.target_dialogue_state else: self.logger.warning( "Ignoring target dialogue state '{}'".format( e.target_dialogue_state ) ) target_dialogue_state = None if target_dialogue_state: self.logger.warning( "Ignoring target dialogue state '{}'".format(target_dialogue_state) ) return self._attempt_handler_sync(request, responder) def _attempt_handler_sync(self, request, responder, target_dialogue_state=None): """Tries to apply the dialogue state handler for the most complex matching rule Args: request (Request): The request object. responder (DialogueResponder): The responder object. target_dialogue_state (str, optional): The target dialogue state. Returns: (DialogueResponder): A DialogueResponder containing the dialogue state and directives. """ dialogue_state = self._get_dialogue_state(request, target_dialogue_state) handler = self._get_dialogue_handler(dialogue_state) responder.dialogue_state = dialogue_state res = handler(request, responder) # Add dialogue flow's sub-dialogue_state if provided if res and isinstance(res, dict) and "dialogue_state" in res: # TODO: check if this flow is executed, currently not covered in tests dialogue_state = ".".join([dialogue_state, res["dialogue_state"]]) responder.dialogue_state = dialogue_state return responder async def _apply_handler_async( self, request, responder, target_dialogue_state=None ): """Applies the dialogue state handler for the most complex matching rule. Args: request (Request): The request object from the DM responder (DialogueResponder): The responder from the DM target_dialogue_state (str, optional): The target dialogue state Returns: DialogueResponder: A DialogueResponder containing the dialogue state and directives """ try: return await self._attempt_handler_async( request, responder, target_dialogue_state=target_dialogue_state ) except DialogueStateException as e: if e.target_dialogue_state != target_dialogue_state: target_dialogue_state = e.target_dialogue_state else: self.logger.warning( "Ignoring target dialogue state '{}'".format( e.target_dialogue_state ) ) target_dialogue_state = None if target_dialogue_state: self.logger.warning( "Ignoring target dialogue state '{}'".format(target_dialogue_state) ) return await self._attempt_handler_async(request, responder) async def _attempt_handler_async( self, request, responder, target_dialogue_state=None ): """Tries to apply the dialogue state handler for the most complex matching rule Args: request (Request): The request object from the DM responder (DialogueResponder): The responder from the DM target_dialogue_state (str, optional): The target dialogue state Returns: DialogueResponder: A DialogueResponder containing the dialogue state and directives """ dialogue_state = self._get_dialogue_state(request, target_dialogue_state) handler = self._get_dialogue_handler(dialogue_state) responder.dialogue_state = dialogue_state result_handler = await handler(request, responder) # Add dialogue flow's sub-dialogue_state if provided if ( result_handler and isinstance(result_handler, dict) and "dialogue_state" in result_handler ): # TODO: check if this flow is executed, currently not covered in tests dialogue_state = "{}.{}".format( dialogue_state, result_handler["dialogue_state"] ) responder.dialogue_state = dialogue_state return responder
[docs] @staticmethod def reprocess(target_dialogue_state=None): """Forces the dialogue manager to back out of the flow based on the initial target dialogue state setting and reselect a handler, following a new target dialogue state Args: target_dialogue_state (str, optional): a dialogue_state name to push system into """ raise DialogueStateException( message="reprocess", target_dialogue_state=target_dialogue_state )
def _get_dialogue_state(self, request, target_dialogue_state=None): dialogue_state = None for rule in self.rules: if target_dialogue_state: if target_dialogue_state == rule.dialogue_state: dialogue_state = rule.dialogue_state break else: if rule.apply(request): dialogue_state = rule.dialogue_state break if dialogue_state is None: msg = "Failed to find dialogue state for {domain}.{intent}".format( domain=request.domain, intent=request.intent ), request) return dialogue_state def _get_dialogue_handler(self, dialogue_state): handler = ( self.handler_map[dialogue_state] if dialogue_state else self._default_handler ) for m in reversed(self.middlewares): handler = partial(m, handler=handler) return handler def _create_responder(self): return self.responder_class(slots={}) @staticmethod def _default_handler(context, responder): # TODO: implement default handler pass
[docs]class DialogueFlow(DialogueManager): """A special dialogue manager subclass used to implement dialogue flows. Dialogue flows allow developers to implement multiple turn interactions where only a subset of dialogue states should be accessible or where different dialogue rules should apply. Attributes: app (Application): The application that initializes this flow. exit_flow_states (list): The list of exit states. """ logger = mod_logger.getChild("DialogueFlow") """Class logger.""" all_flows = {} """The dictionary that references all dialogue flows.""" def __init__(self, name, entrance_handler, app, **kwargs): super().__init__(async_mode=app.async_mode) self._name = name self.all_flows[name] = self = app self.exit_flow_states = [] def _set_target_state(request, responder): responder.params.target_dialogue_state = self.flow_state return entrance_handler(request, responder) async def _async_set_target_state(request, responder): responder.params.target_dialogue_state = self.flow_state return await entrance_handler(request, responder) self._entrance_handler = ( _async_set_target_state if self.async_mode else _set_target_state ) app.add_dialogue_rule(, self._entrance_handler, **kwargs) handler = ( self._apply_flow_handler_async if self.async_mode else self._apply_flow_handler_sync ) app.add_dialogue_rule(self.flow_state, handler, targeted_only=True) @property def name(self): """The name of this flow.""" return self._name @property def flow_state(self): """The state of the flow (<name>_flow).""" return self._name + "_flow" @property def dialogue_manager(self): """The dialogue manager which contains this flow.""" if and return return None def __call__(self, ctx, responder): return self._entrance_handler(ctx, responder)
[docs] def use_middleware(self, *args): """Allows a middleware to be added to this flow.""" def _decorator(func): self.add_middleware(func) return func try: # Support syntax: @middleware func = args[0] if not callable(func): raise TypeError _decorator(func) return func except (IndexError, TypeError): # Support syntax: @middleware() return _decorator
[docs] def handle(self, **kwargs): """The dialogue flow handler.""" def _decorator(func): name = kwargs.pop("name", None) exit_flow = kwargs.pop("exit_flow", False) if exit_flow: func_name = name or func.__name__ self.exit_flow_states.append(func_name) self.add_dialogue_rule(name, func, **kwargs) return func return _decorator
def _apply_flow_handler_sync(self, request, responder): """Applies the dialogue state handler for the dialogue flow and set the target dialogue state to the flow state. Args: request (Request): The request object. responder (DialogueResponder): The responder object. Returns: (dict): A dict containing the dialogue state and directives. """ dialogue_state = self._get_dialogue_state(request) handler = self._get_dialogue_handler(dialogue_state) if dialogue_state not in self.exit_flow_states: responder.params.target_dialogue_state = self.flow_state handler(request, responder) return {"dialogue_state": dialogue_state, "directives": responder.directives} async def _apply_flow_handler_async(self, request, responder): """Applies the dialogue state handler for the dialogue flow and sets the target dialogue state to the flow state asynchronously. Args: request (Request): The request object. responder (DialogueResponder): The responder object. Returns: (dict): A dict containing the dialogue state and directives. """ dialogue_state = self._get_dialogue_state(request) handler = self._get_dialogue_handler(dialogue_state) if dialogue_state not in self.exit_flow_states: responder.params.target_dialogue_state = self.flow_state res = handler(request, responder) if asyncio.iscoroutine(res): await res return {"dialogue_state": dialogue_state, "directives": responder.directives} def _get_dialogue_handler(self, dialogue_state): handler = ( self.handler_map[dialogue_state] if dialogue_state else self._default_handler ) try: middlewares = self.middlewares except AttributeError: middlewares = getattr(self, "middleware", tuple()) for m in reversed(middlewares): handler = partial(m, handler=handler) return handler
[docs]class Form: """This class incapsulates Form data """ def __init__(self, entities: Optional[List[FormEntity]] = None, exit_keys: Optional[List[str]] = None, exit_msg: Optional[str] = None, max_retries: Optional[str] = None): self.entities = entities self.exit_keys = list(map(str.lower, exit_keys or ["cancel", "restart", "exit", "reset"])) self.exit_msg = exit_msg or "How may I help you?" self.max_retries = max_retries or 1 @property def entities(self): return self._entities @entities.setter def entities(self, values): if any(isinstance(item, (dict, immutables.Map)) for item in values): self._entities = [FormEntity(**item) for item in values] else: self._entities = values
[docs]class AutoEntityFilling: """A class to implement Automatic Entity (Slot) Filling (AEF) that allows developers to prompt users for completing the missing requirements for entity slots. """ _logger = mod_logger.getChild("AutoEntityFilling") """Class logger.""" def __init__(self, handler, form, app): """ Args: handler (func): The function to which control is returned after completion of flow. form (dict): Developer-defined slot-filling form. app (Application): The application that initializes this flow. """ self._app = app self._app.lazy_init() self._handler = handler self._form = Form(entities=form.get('entities'), exit_keys=form.get('exit_keys'), exit_msg=form.get('exit_msg'), max_retries=form.get('max_retries')) self._local_entity_form = None self._prompt_turn = None self._params_schema = ParamsSchema( context={ "nlp": self._app.app_manager.nlp, "dialogue_handler_map": self._app.app_manager.dialogue_manager.handler_map } ) def _set_next_turn(self, request, responder): """Set target dialogue state to the entrance handler's name""" responder.params.allowed_intents = tuple( ["{}.{}".format(request.domain, request.intent)] ) responder.params.target_dialogue_state = self._handler.__name__ def _exit_flow(self, responder): """Exits this flow and clears the related parameter for re-usability""" self._prompt_turn = None self._local_entity_form = None responder.params.allowed_intents = tuple() responder.exit_flow() def _extract_query_features( self, text, time_zone=None, timestamp=None, locale=None, language=None ): """ Extracts Query object from the user input and converts it into appropriate format for entity extraction. Args: text (Request.text): Text in the query time_zone (str, optional): An IANA time zone id to create the query relative to. timestamp (int, optional): A reference unix timestamp to create the query relative to, in seconds. locale (str, optional): The locale representing the ISO 639-1 language code and \ ISO3166 alpha 2 country code separated by an underscore character. language (str, optional): Language as specified using a 639-1/2 code Returns: Query: A newly constructed query """ query_factory = self._app.app_manager.nlp.resource_loader.query_factory query = query_factory.create_query(text, time_zone, timestamp, locale, language) return query def _validate(self, request, slot): """Validates the user input based on the entity type and validation type. Args: request: Request object slot: FormEntity object Returns: (bool): Boolean whether the user input fulfills the slot requirements """ text = request.text entity_type = slot.entity extracted_feature = {} _resolved_value = {} query = None if slot.default_eval: if entity_type in DEFAULT_SYS_ENTITIES: # system entity validation - checks for presence of required system entity try: entity_text = str(request.entities[0]["value"][0]["value"]) except (KeyError, IndexError): entity_text = text query = self._extract_query_features(entity_text) resources = {} extracted_feature = dict( query_features.extract_sys_candidates([entity_type])( query, resources ) ) else: # gazetteer validation try: query = self._extract_query_features( request.entities[0]["value"][0]["cname"] ) except (KeyError, IndexError): query = self._extract_query_features(text) gaz = self._app.app_manager.nlp.resource_loader.get_gazetteer( entity_type ) # payload format for entity feature extractors: # tuple(query (Query Object), list of entities, entity index) _payload = (query, [query], 0) if len(gaz) > 0: gazetteer = {"gazetteers": {entity_type: gaz}} extracted_feature = entity_features.extract_in_gaz_features()( _payload, gazetteer ) if not extracted_feature: return False, _resolved_value if request.entities: _resolved_value = request.entities[0]["value"] if slot.hints: # hints / user-list validation if text in slot.hints: extracted_feature.update({"hint_validated_entity": text}) else: return False, _resolved_value if slot.custom_eval: # Custom validation using function provided by developer. # Should return True/False/Custom Resolution value for validation status. # If false, overall validation fails. If either true or a custom resolved # value is returned, then the validation succeeds. custom_eval_func = self._app.registry.functions_registry[slot.custom_eval] _validity = custom_eval_func(request) if _validity is False: # For checking 'false' return cases return False, _resolved_value if _validity is not True: # For cases with custom resolution value return if entity_type in DEFAULT_SYS_ENTITIES: # for custom system entity resolution _resolved_value = [{"value": _validity}] else: # for custom gazetteer entity resolution _resolved_value = [{"cname": _validity}] extracted_feature.update({"custom_validated_entity": text}) # return True iff user input results in extracted features (i.e. successfully validated) return len(extracted_feature) > 0, _resolved_value def _initial_fill(self, request): """Performs the first pass and fills the entity form with entity values available in the initial query. Args: request (Request): The request object. """ for entity in request.entities: entity_type = entity["type"] role = entity["role"] for slot in self._local_entity_form: if entity_type == slot.entity: if (slot.role is None) or (role == slot.role): slot.value = dict(entity) break def _end_slot_fill(self, request, responder, async_mode): # Returns filled entity objects as request.entities # We pass in the previous turn's responder's params to the current request request = self._app.app_manager.request_class( text=request.text, domain=request.domain, intent=request.intent, entities=tuple([slot.value for slot in self._local_entity_form]), context=request.context or {}, history=request.history or [], frame=responder.frame or {}, params=request.params, form=immutables.Map(DEFAULT_FORM_SCHEMA.dump(self._form)), ) self._exit_flow(responder) if async_mode: return self._end_slot_fill_async(request, responder) return self._end_slot_fill_sync(request, responder) def _end_slot_fill_sync(self, request, responder): return self._handler(request, responder) async def _end_slot_fill_async(self, request, responder): return await self._handler(request, responder) def _prompt_slot(self, responder, nlr): """Prompts user for missing slot. Args: responder (DialogueResponder): responder object. nlr (str): natural language response to prompt for the missing slot. """ response_form = copy.deepcopy(self._form) response_form.entities = self._local_entity_form responder.form = DEFAULT_FORM_SCHEMA.dump(response_form) responder.reply(nlr) responder.speak(nlr) self._retry_attempts = 0 self._prompt_turn = False def _retry_logic(self, request, responder, nlr): if self._retry_attempts < self._form.max_retries: self._retry_attempts += 1 response_form = copy.deepcopy(self._form) response_form.entities = self._local_entity_form responder.form = DEFAULT_FORM_SCHEMA.dump(response_form) responder.reply(nlr) responder.speak(nlr) else: # max attempts exceeded, reset counter, exit auto_fill. self._retry_attempts = 0 self._exit_flow(responder) # reprocess query to obtain intended nlp config, ignoring previous config. processed_query = self._app.app_manager.nlp.process(query_text=request.text) # create new request object from the current responder object. response_form = copy.deepcopy(self._form) request = self._app.app_manager.request_class( context=request.context or {}, history=request.history or [], frame=responder.frame or {}, params=FrozenParams(**dict(responder.params)), form=DEFAULT_FORM_SCHEMA.dump(response_form), **processed_query, ) # call intended handler from reprocessed query. self._app.app_manager.dialogue_manager.apply_handler(request, responder) def __call__(self, request, responder): """ The iterative call to fill missing slots in the entity form till all slots have been filled up or the flow has been exited. Args: request (Request): The request object. responder (DialogueResponder): The responder object. """ # If form iteration in request object, continue using that. # If None, set to original form. if request.form and request.form["entities"]: self._local_entity_form = [FormEntity(**copy.deepcopy(elem)) for elem in request.form["entities"]] else: self._local_entity_form = None if request.text.lower() in self._form.exit_keys: responder.reply(self._form.exit_msg) responder.speak(self._form.exit_msg) self._exit_flow(responder) return self._set_next_turn(request, responder) if self._prompt_turn is None or not self._local_entity_form: # Entering the flow self._prompt_turn = True self._local_entity_form = copy.deepcopy(self._form.entities) self._retry_attempts = 0 # Fill the form with the entities in the first query self._initial_fill(request) # convert json response to FormEntity objects (deserialize) for i, slot in enumerate(self._local_entity_form): if isinstance(slot, dict): self._local_entity_form[i] = FormEntity(**slot) # Iterate through all slots, fill in empty ones for slot in self._local_entity_form: if not slot.value: # check if user has been prompted for this entity slot if self._prompt_turn: self._prompt_slot(responder, slot.responses) responder.listen() return # If already prompted, # validate the user response and retry if invalid response _is_valid, _resolved_value = self._validate(request, slot) if not _is_valid: # retry logic self._retry_logic(request, responder, slot.retry_response) responder.listen() return slot.value = Entity( text=request.text, entity_type=slot.entity, role=slot.role, value=_resolved_value, ).to_dict() # Reset prompt for next slot self._prompt_turn = True # Finish slot-filling and return to handler return self._end_slot_fill(request, responder, self._app.async_mode)
[docs] async def call_async(self, request, responder): """The slot-filling call for asynchronous apps Args: request (Request): The request object. responder (DialogueResponder): The responder object. """ self(request, responder)
[docs] def invoke(self, request, responder): """ Invoke slot-filling as a direct call without requiring a decorator. """ # ensures that the slot-filling function is targeted. kwargs = {"targeted_only": True} name = self._handler.__name__ try: # sets a dialogue rule for the handler passed in this invoke call to iteratively call # the slot-filling flow till completion or exit. This rule is added temporarily for this # flow and reset for the handler with every new invoke call. self._app.app_manager.dialogue_manager.add_dialogue_rule( name, self.__call__, **kwargs ) except AssertionError: self._app.app_manager.dialogue_manager.handler_map[name] = self.__call__ # re-run to continue flow self(request, responder)
[docs] async def invoke_async(self, request, responder): """ Async invoke slot-filling as a direct call without requiring a decorator. """ await self.invoke(request, responder)
[docs]class DialogueResponder: """The dialogue responder helps generate directives and fill slots in the system-generated natural language responses. """ _logger = mod_logger.getChild("DialogueResponder") DirectiveNames = DirectiveNames """The list of directive names.""" DirectiveTypes = DirectiveTypes """The list of directive types.""" def __init__( self, frame=None, params=None, history=None, slots=None, request=None, dialogue_state=None, directives=None, form=None, ): """ Initializes a dialogue responder. Args: frame (dict): The frame object. params (Params): The params object. history (list): The history of the responder. slots (dict): The slots of the responder. request (Request): The request object associated with the responder. dialogue_state (str): The dialogue state. directives (list): The directives of the responder. form (list): Autofill entities """ self.directives = directives or [] self.frame = frame or {} self.params = params self.dialogue_state = dialogue_state self.slots = slots or {} self.form = form self.request = request self.history = history @property def history(self): return self._history @history.setter def history(self, history): # If any of the elements in the history list is a map, then we validate the element # as a DialogueResponder object and serialize that to make sure the dictionary complies # with the attributes of a DialogueResponder object if isinstance(history, (list, tuple)) and \ any(isinstance(item, (dict, immutables.Map)) for item in history): try: self._history = [dict(DialogueResponder(**DEFAULT_RESPONSE_SCHEMA.load(item))) for item in history] except ValidationError as err: # TODO: Fix deserialization issues between workbench and mindmeld history payloads logging.warning("Could not deserialize history properly due to error: %s, " "this might be due to version incompatibility. " "We set the history to what " "is passed in to provide backwards compatibility.", err.messages) self._history = history else: self._history = history or [] @property def request(self): return self._request @request.setter def request(self, value): if isinstance(value, dict): if isinstance(value['params'], dict): value['params'] = Params(**value['params']) self._request = Request(**value) else: self._request = value or Request() @property def params(self): return self._params @params.setter def params(self, value): if isinstance(value, dict): self._params = Params(**value) else: self._params = value or Params() def __iter__(self): for key, value in DEFAULT_RESPONSE_SCHEMA.dump(self).items(): yield key, value
[docs] def reply(self, text): """Adds a 'reply' directive. Args: text (str): The text of the reply. """ text = self._process_template(text) self.display(DirectiveNames.REPLY, payload={"text": text})
[docs] def speak(self, text): """Adds a 'speak' directive. Args: text (str): The text to speak aloud. """ text = self._process_template(text) self.act(DirectiveNames.SPEAK, payload={"text": text})
[docs] def list(self, items): """Adds a 'list' view directive. Args: items (list): The list of dictionary objects. """ items = items or [] self.display(DirectiveNames.LIST, payload=items)
[docs] def suggest(self, suggestions): """Adds a 'suggestions' directive. Args: suggestions (list): A list of suggestions. """ suggestions = suggestions or [] self.display(DirectiveNames.SUGGESTIONS, payload=suggestions)
[docs] def listen(self): """Adds a 'listen' directive.""" self.act(DirectiveNames.LISTEN)
[docs] def reset(self): """Adds a 'reset' directive.""" self.act(DirectiveNames.RESET)
[docs] def display(self, name, payload=None): """Adds an arbitrary directive of type 'view'. Args: name (str): The name of the directive. payload (dict, optional): The payload for the view. """, DirectiveTypes.VIEW, payload=payload)
[docs] def act(self, name, payload=None): """Adds an arbitrary directive of type 'action'. Args: name (str): The name of the directive. payload (dict, optional): The payload for the action. """, DirectiveTypes.ACTION, payload=payload)
[docs] def direct(self, name, dtype, payload=None): """Adds an arbitrary directive. Args: name (str): The name of the directive. dtype (str): The type of the directive. payload (dict, optional): The payload for the view. """ directive = {"name": name, "type": dtype} if payload: directive["payload"] = payload self.directives.append(directive)
[docs] def respond(self, directive): """Adds an arbitrary directive. Args: directive (dict): A directive. """ msg = "respond() is deprecated. Instead use direct()." warnings.warn(msg) self.directives.append(directive)
[docs] def prompt(self, text): """Alias for `reply()`. Deprecated. Args: text (str): The text of the reply. """ msg = "prompt() is deprecated. Please use reply() and listen() instead" warnings.warn(msg) self.reply(text)
[docs] def sleep(self, delay=0): """Adds a 'sleep' directive. Args: delay (int): The amount of milliseconds to wait before putting the client to sleep. """ self.act(DirectiveNames.SLEEP, payload={"delay": delay})
@staticmethod def _choose(items): """Chooses a random item from items.""" result = items if isinstance(items, (tuple, list)): result = random.choice(items) elif isinstance(items, set): result = random.choice(tuple(items)) return result def _process_template(self, text): return self._choose(text).format(**self.slots)
[docs] def exit_flow(self): """Exit the current flow by clearing the target dialogue state.""" self.params.target_dialogue_state = None
[docs]class Conversation: """The conversation object is a very basic MindMeld client. It can be useful for testing out dialogue flows in python. Example: >>> convo = Conversation(app_path='path/to/my/app') >>> convo.say('Hello') ['Hello. I can help you find store hours. How can I help?'] >>> convo.say('Is the store on elm open?') ['The 23 Elm Street Kwik-E-Mart is open from 7:00 to 19:00.'] Attributes: history (list): The history of the conversation. Starts with the most recent message. context (dict): The context of the conversation, containing user context. default_params (Params): The default params to use with each turn. These \ defaults will be overridden by params passed for each turn. params (FrozenParams): The params returned by the most recent turn. force_sync (bool): Force synchronous return for `say()` and `process()` \ even when app is in async mode. verbose (bool, optional): If True, returns class probabilities along with class \ prediction. """ _logger = mod_logger.getChild("Conversation") def __init__( self, app=None, app_path=None, nlp=None, context=None, default_params=None, force_sync=False, verbose=False, ): """ Args: app (Application, optional): An initialized app object. Either app or app_path must be given. app_path (None, optional): The path to the app data. Used to create an app object. Either app or app_path must be given. nlp (NaturalLanguageProcessor, optional): A natural language processor for the app. If passed, changes to this processor will affect the response from `say()` context (dict, optional): The context to be used in the conversation. default_params (Params, optional): The default params to use with each turn. These defaults will be overridden by params passed for each turn. force_sync (bool, optional): Force synchronous return for `say()` and `process()` even when app is in async mode. verbose (bool, optional): If True, returns class probabilities along with class \ prediction. """ app = app or path.get_app(app_path) app.lazy_init(nlp) self._app_manager = app.app_manager if not self._app_manager.ready: self._app_manager.load() self.context = context or {} self.history = [] self.frame = {} self.form = {} self.default_params = default_params or Params() self.force_sync = force_sync self.params = FrozenParams() self.verbose = verbose self._params_schema = ParamsSchema( context={ "nlp": self._app_manager.nlp, "dialogue_handler_map": self._app_manager.dialogue_manager.handler_map } )
[docs] def say(self, text, params=None, force_sync=False): """Send a message in the conversation. The message will be processed by the app based on the current state of the conversation and returns the extracted messages from the directives. Args: text (str): The text of a message. params (dict): The params to use with this message, overriding any defaults which may have been set. force_sync (bool, optional): Force synchronous response even when app is in async mode. Returns: (list): A text representation of the dialogue responses. """ if self._app_manager.async_mode: res = self._say_async(text, params=params) if self.force_sync or force_sync: return asyncio.get_event_loop().run_until_complete(res) return res response = self.process(text, params=params) # handle directives response_texts = [self._follow_directive(a) for a in response.directives] return response_texts
async def _say_async(self, text, params=None): """Send a message in the conversation. The message will be processed by the app based on the current state of the conversation and returns the extracted messages from the directives. Args: text (str): The text of a message. params (dict): The params to use with this message, overriding any defaults which may have been set. Returns: (list): A text representation of the dialogue responses. """ response = await self.process(text, params=params) # handle directives response_texts = [self._follow_directive(a) for a in response.directives] return response_texts
[docs] def process(self, text, params=None, force_sync=False): """Send a message in the conversation. The message will be processed by the app based on the current state of the conversation and returns the response. Args: text (str): The text of a message. params (dict): The params to use with this message, overriding any defaults which may have been set. force_sync (bool, optional): Force synchronous response even when app is in async mode. Returns: (dict): The dictionary response. """ if self._app_manager.async_mode: res = self._process_async(text, params=params) if self.force_sync or force_sync: return asyncio.get_event_loop().run_until_complete(res) return res if not self._app_manager.ready: self._app_manager.load() internal_params = copy.deepcopy(self.params) if isinstance(params, dict): # Validate params params = self._params_schema.load(params) params = FrozenParams(**params) if isinstance(internal_params, dict): # Validate internal params internal_params = self._params_schema.load(internal_params) internal_params = FrozenParams(**internal_params) if params: # If the params arg is explicitly set, overight the internal params for k, v in vars(params).items(): vars(internal_params)[k] = v response = self._app_manager.parse( text, params=internal_params, context=self.context, frame=self.frame, form=self.form, history=self.history, verbose=self.verbose, ) # Validate params response.param = Params(**self._params_schema.load(dict(response.params))) self.history = response.history self.frame = response.frame self.form = response.form self.params = response.params return response
async def _process_async(self, text, params=None): """Send a message in the conversation. The message will be processed by the app based on the current state of the conversation and returns the response. Args: text (str): The text of a message. params (dict): The params to use with this message, overriding any defaults which may have been set. Returns: (DialogueResponder): The DialogueResponder Response. """ if not self._app_manager.ready: await self._app_manager.load() internal_params = copy.deepcopy(self.params) if isinstance(params, dict): params = self._params_schema.load(params) params = FrozenParams(**params) if isinstance(internal_params, dict): internal_params = FrozenParams(**internal_params) if params: # If the params arg is explicitly set, overight the internal params for k, v in vars(params).items(): vars(internal_params)[k] = v response = await self._app_manager.parse( text, params=internal_params, context=self.context, frame=self.frame, form=self.form, history=self.history, verbose=self.verbose, ) # Validate params response.param = Params(**self._params_schema.load(dict(response.params))) self.history = response.history self.frame = response.frame self.form = response.form self.params = response.params return response def _follow_directive(self, directive): msg = "" try: directive_name = directive["name"] if directive_name in [DirectiveNames.REPLY, DirectiveNames.SPEAK]: msg = directive["payload"]["text"] elif directive_name == DirectiveNames.SUGGESTIONS: suggestions = directive["payload"] if not suggestions: raise ValueError msg = "Suggestion{}:".format("" if len(suggestions) == 1 else "s") texts = [] for idx, suggestion in enumerate(suggestions): if idx > 0: msg += ", {!r}" else: msg += " {!r}" texts.append(self._generate_suggestion_text(suggestion)) msg = msg.format(*texts) elif directive_name == DirectiveNames.LIST: msg = "\n".join( [ json.dumps(item, indent=4, sort_keys=True) for item in directive["payload"] ] ) elif directive_name == DirectiveNames.LISTEN: msg = "Listening..." elif directive_name == DirectiveNames.RESET: msg = "Resetting..." except (KeyError, ValueError, AttributeError): msg = "Unsupported response: {!r}".format(directive) return msg @staticmethod def _generate_suggestion_text(suggestion): pieces = [] if "text" in suggestion: pieces.append(suggestion["text"]) if suggestion["type"] != "text": pieces.append("({})".format(suggestion["type"])) return " ".join(pieces)
[docs] def reset(self): """Reset the history, frame and params of the Conversation object.""" self.history = [] self.frame = {} self.form = {} self.params = FrozenParams()