Source code for mindmeld.components.custom_action

import logging

import ssl
import aiohttp
import requests

from .request import Params


logger = logging.getLogger(__name__)


RESPONSE_FIELDS = ["frame", "directives", "params", "slots"]


[docs]class CustomActionException(Exception): pass
[docs]class CustomAction: """ This class allows the client to send Request and Responder to another server and return the the directives and frame in the response. """ def __init__(self, name: str, config: dict, merge: bool = True): self._name = name self._config = config or {} self.url = self._config.get("url") self._cert = self._config.get("cert") self._public_key = self._config.get("public_key") self._private_key = self._config.get("private_key") self.merge = merge
[docs] def get_json_payload(self, request, responder): request_json = dict(request) responder_json = dict(responder) return { "request": request_json, "responder": {field: responder_json[field] for field in RESPONSE_FIELDS}, "action": self._name, }
[docs] def invoke(self, request, responder, async_mode=False): """Invoke the custom action with Request and Responder and return True if the action is executed successfully, False otherwise. Upon successful execution, we update the Frame and Directives of the Responder object. Args: request (Request) responder (DialogueResponder) async_mode (bool) Returns: (bool) """ if not self.url: raise CustomActionException( "No URL is given for custom action {}.".format(self._name) ) json_data = self.get_json_payload(request, responder) try: if async_mode: # returning the coroutine to be awaited elsewhere return self._process_async(json_data, responder) else: return self._process(json_data, responder) except ConnectionError: logger.error( "Connection error trying to reach custom action server %s.", self.url ) return False
[docs] async def invoke_async(self, request, responder): """Asynchronously invoke the custom action with Request and Responder and return True if the action is executed successfully, False otherwise. Upon successful execution, we update the Frame and Directives of the Responder object. Args: request (Request) responder (DialogueResponder) Returns: (bool) """ return await self.invoke(request, responder, async_mode=True)
def _process(self, json_data, responder): status_code, result_json = self.post(json_data) return self._process_post_response(status_code, result_json, responder) async def _process_async(self, json_data, responder): status_code, result_json = await self.post_async(json_data) return self._process_post_response(status_code, result_json, responder) def _process_post_response(self, status_code, result_json, responder): if status_code == 200: for field in RESPONSE_FIELDS: if field not in result_json: logger.info( "`%s` not in the response of custom action %s.", field, self._name, ) if self.merge: responder.frame.update(result_json.get("frame", {})) responder.directives.extend(result_json.get("directives", [])) responder.slots.update(result_json.get("slots", {})) params = Params(**result_json.get("params", {})) responder.params.allowed_intents += tuple(params.allowed_intents) responder.params.dynamic_resource.update(params.dynamic_resource) responder.params.time_zone = params.time_zone responder.params.language = params.language responder.params.locale = params.locale responder.params.target_dialogue_state = params.target_dialogue_state responder.params.timestamp = params.timestamp else: responder.frame = result_json.get("frame", {}) responder.directives = result_json.get("directives", []) responder.slots = result_json.get("slots", {}) responder.params = Params(**result_json.get("params", {})) return True else: logger.error( "Error %s trying to reach custom action server %s.", status_code, self.url, ) return False
[docs] def post(self, json_data): if self._public_key and self._private_key: result = requests.post( url=self.url, json=json_data, cert=(self._public_key, self._private_key) ) elif self._public_key: result = requests.post(url=self.url, json=json_data, cert=self._public_key) else: result = requests.post(url=self.url, json=json_data) if result.status_code == 200: return 200, result.json() else: return result.status_code, {}
[docs] async def post_async(self, json_data): ssl_context = None if self._cert: ssl_context = ssl.create_default_context(cafile=self._cert) if self._public_key and self._private_key: ssl_context.load_cert_chain(self._public_key, self._private_key) async with aiohttp.ClientSession() as session: async with session.post( self.url, json=json_data, ssl=ssl_context ) as response: if response.status == 200: return 200, await response.json() else: return response.status, {}
def __repr__(self): return self._name def __str__(self): return self._name
[docs]class CustomActionSequence: """ This class implements a sequence of custom actions """ def __init__(self, actions, config, merge=True): self.actions = [CustomAction(action, config, merge=merge) for action in actions]
[docs] def invoke(self, request, responder): for action in self.actions: result = action.invoke(request, responder) if not result: logger.warning("Failed to invoke action %s.", action) return False return True
[docs] async def invoke_async(self, request, responder): for action in self.actions: result = await action.invoke_async(request, responder) if not result: logger.warning("Failed to invoke action %s.", action) return False return True
def __repr__(self): return str(self.actions) def __str__(self): return "action_seq=" + str(self.actions)
[docs]def invoke_custom_action(name, config, request, responder, merge=True): return CustomAction(name, config, merge=merge).invoke(request, responder)
[docs]async def invoke_custom_action_async(name, config, request, responder, merge=True): return await CustomAction(name, config, merge=merge).invoke_async( request, responder )