Source code for mindmeld.active_learning.data_loading

# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains classes used to load queries for the Active Learning Pipeline.
"""

from typing import Dict, List
import logging

from .heuristics import Heuristic, stratified_random_sample, EntropySampling

from ..auto_annotator import BootstrapAnnotator
from ..components._config import DEFAULT_AUTO_ANNOTATOR_CONFIG
from ..constants import TuneLevel, TuningType, AL_MAX_LOG_USAGE_PCT
from ..core import ProcessedQuery
from ..markup import read_query_file
from ..resource_loader import ResourceLoader, ProcessedQueryList

logger = logging.getLogger(__name__)


[docs]class LabelMap: """Class that handles label encoding and mapping.""" def __init__(self, query_tree: Dict): """ Args: query_tree (dict): Nested Dictionary containing queries. Has the format: {"domain":{"intent":[Query List]}}. """ self.domain_to_intents = LabelMap.get_domain_to_intents(query_tree) self.domain2id = LabelMap._get_domain_mappings(self.domain_to_intents) self.id2domain = LabelMap._reverse_dict(self.domain2id) self.domain_to_intent2id = LabelMap._get_intent_mappings(self.domain_to_intents) self.id2intent = LabelMap._reverse_nested_dict(self.domain_to_intent2id)
[docs] @staticmethod def get_domain_to_intents(query_tree: Dict) -> Dict: """ Args: query_tree (dict): Nested Dictionary containing queries. Has the format: {"domain":{"intent":[Query List]}} Returns: domain_to_intents (dict): Dict mapping domains to a list of intents. """ domain_to_intents = {} for domain in query_tree: domain_to_intents[domain] = list(query_tree[domain]) return domain_to_intents
@staticmethod def _get_domain_mappings(domain_to_intents: Dict) -> Dict: """Creates a dictionary that maps domains to encoded ids. Args: domain_to_intents (dict): Dict mapping domains to a list of intents. Returns: domain2id (dict): dict with domain to id mappings. """ domain2id = {} domains = list(domain_to_intents) for index, domain in enumerate(domains): domain2id[domain] = index return domain2id @staticmethod def _get_intent_mappings(domain_to_intents: Dict) -> Dict: """Creates a dictionary that maps intents to encoded ids. Args: domain_to_intents (dict): Dict mapping domains to a list of intents. Returns: domain_to_intent2id (dict): dict with intent to id mappings. """ domain_to_intent2id = {} for domain in domain_to_intents: intent_labels = {} for index, intent in enumerate(domain_to_intents[domain]): intent_labels[intent] = index domain_to_intent2id[domain] = intent_labels return domain_to_intent2id @staticmethod def _reverse_dict(dictionary: Dict[str, int]): """ Returns: reversed_dict (dict): Reversed dictionary. """ reversed_dict = {v: k for k, v in dictionary.items()} return reversed_dict @staticmethod def _reverse_nested_dict(dictionary: Dict[str, Dict[str, int]]): """ Returns: reversed_dict (dict): Reversed dictionary. """ reversed_dict = {} for parent_key, parent_value in dictionary.items(): reversed_dict[parent_key] = LabelMap._reverse_dict(parent_value) return reversed_dict @staticmethod def _get_entity_mappings(query_list: ProcessedQueryList) -> Dict: """ Generates index mapping for entity labels in an application. Supports both BIO and BIOES tag schemes. Args: query_list (ProcessedQueryList): Data structure containing a list of processed queries. Returns: Dictionary mapping entity tags to index in entity vector. """ entity_labels = set() logger.info("Generating Entity Labels...") for d, i, entities in zip( query_list.domains(), query_list.intents(), query_list.entities() ): if len(entities): for entity in entities: e = str(entity.entity.type) entity_labels.add(f"{d}.{i}.B|{e}") entity_labels.add(f"{d}.{i}.I|{e}") entity_labels.add(f"{d}.{i}.S|{e}") entity_labels.add(f"{d}.{i}.E|{e}") e = "O|" entity_labels.add(f"{d}.{i}.{e}") entity_labels = sorted(list(entity_labels)) return dict(zip(entity_labels, range(len(entity_labels))))
[docs] @staticmethod def get_class_labels( tuning_level: list, query_list: ProcessedQueryList ) -> List[str]: """Creates a class label for a set of queries. These labels are used to split queries by type. Labels follow the format of "domain" or "domain|intent". For example, "date|get_date". Args: tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") query_list (ProcessedQueryList): Data structure containing a list of processed queries. Returns: class_labels (List[str]): list of labels for classification task. """ if TuneLevel.INTENT.value in tuning_level: return [ f"{d}.{i}" for d, i in zip(query_list.domains(), query_list.intents()) ] else: return [f"{d}" for d in query_list.domains()]
[docs] @staticmethod def create_label_map(app_path, file_pattern): """Creates a label map. Args: app_path (str): Path to MindMeld application file_pattern (str): Regex pattern to match text files. (".*train.*.txt") Returns: label_map (LabelMap): A label map. """ resource_loader = ResourceLoader.create_resource_loader(app_path) query_tree = resource_loader.get_labeled_queries(label_set=file_pattern) return LabelMap(query_tree)
[docs]class LogQueriesLoader: def __init__(self, app_path: str, tuning_level: list, log_file_path: str): """This class loads data as processed queries from a specified log file. Args: app_path (str): Path to the MindMeld application. tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") log_file_path (str): Path to the log file with log queries. """ self.app_path = app_path self.tuning_level = tuning_level self.log_file_path = log_file_path
[docs] @staticmethod def deduplicate_raw_text_queries(log_queries_iter) -> List[str]: """Removes duplicates in the text queries. Args: log_queries_iter (generator): Log queries generator. Returns: filtered_text_queries (List[str]): a List of filtered text queries. """ return list(set(q for q in log_queries_iter))
[docs] def convert_text_queries_to_processed( self, text_queries: List[str] ) -> List[ProcessedQuery]: """Converts text queries to processed queries using an annotator. Args: text_queries (List[str]): a List of text queries. Returns: queries (List[ProcessedQuery]): List of processed queries. """ logger.info("Loading a Bootstrap Annotator to process log queries.") annotator_params = DEFAULT_AUTO_ANNOTATOR_CONFIG annotator_params["app_path"] = self.app_path bootstrap_annotator = BootstrapAnnotator(**annotator_params) return bootstrap_annotator.text_queries_to_processed_queries( text_queries=text_queries )
@property def queries(self): log_queries_iter = read_query_file(self.log_file_path) filtered_text_queries = LogQueriesLoader.deduplicate_raw_text_queries( log_queries_iter ) return self.convert_text_queries_to_processed(filtered_text_queries)
[docs]class DataBucket: """Class to hold data throughout the Active Learning training pipeline. Responsible for data conversion, filtration, and storage. """ def __init__( self, label_map, resource_loader, test_queries: ProcessedQueryList, unsampled_queries: ProcessedQueryList, sampled_queries: ProcessedQueryList, ): """ Args: app_path (str): Path to MindMeld application test_queries (ProcessedQueryList): Queries to use for evaluation. unsampled_queries (ProcessedQueryList): Queries to sample from iteratively. sampled_queries (ProcessedQueryList): Queries currently included in the sample set. """ self.label_map = label_map self.resource_loader = resource_loader self.test_queries = test_queries self.unsampled_queries = unsampled_queries self.sampled_queries = sampled_queries
[docs] def get_queries(self, query_ids): """Method to get multiple queries from the QueryCache given a list of query ids. Args: query_ids (List[int]): List of ids corresponding to queries in the QueryCache. Returns: queries (List[ProcessedQuery]): List of processed queries from the cache. """ return [ self.resource_loader.query_cache.get(query_id) for query_id in query_ids ]
[docs] def update_sampled_queries(self, newly_sampled_queries_ids): """Update the current set of sampled queries by adding the set of newly sampled queries. A new PrcoessedQueryList object is created with the updated set of query ids. Args: newly_sampled_queries_ids (List[int]): List of ids corresponding the newly sampled queries in the QueryCache. """ sampled_queries_ids = self.sampled_queries.elements + newly_sampled_queries_ids self.sampled_queries = ProcessedQueryList( cache=self.resource_loader.query_cache, elements=sampled_queries_ids )
[docs] def update_unsampled_queries(self, remaining_indices): """Update the current set of unsampled queries by removing the set of newly sampled queries. A new PrcoessedQueryList object is created with the updated set of query ids. Args: remaining_indices (List[int]): List of ids corresponding the reamining queries queries in self.unsampled_queries. """ remaining_queries_ids = [ self.unsampled_queries.elements[i] for i in remaining_indices ] self.unsampled_queries = ProcessedQueryList( cache=self.resource_loader.query_cache, elements=remaining_queries_ids )
[docs] def sample_and_update( self, sampling_size: int, confidences_2d: List[List[float]], confidences_3d: List[List[List[float]]], heuristic: Heuristic, confidence_segments: Dict = None, tuning_type: TuningType = TuningType.CLASSIFIER, ): """Method to sample a DataBucket's unsampled_queries and update its sampled_queries and newly_sampled_queries. Args: sampling_size (int): Number of elements to sample in the next iteration. confidences_2d (List[List[float]]): Confidence probabilities per element. (3d for tagger tuning) confidences_3d (List[List[List[float]]]): Confidence probabilities per element. heuristic (Heuristic): Selection strategy. confidence_segments (Dict[(str, Tuple(int,int))]): A dictionary mapping segments to run KL Divergence. tuning_type (TuningType): Component to be tuned ("classifier" or "tagger") Returns: newly_sampled_queries_ids (List[int]): List of ids corresponding the newly sampled queries in the QueryCache. """ if tuning_type == TuningType.CLASSIFIER: params_rank_3d = {"confidences_3d": confidences_3d} if confidence_segments: params_rank_3d["confidence_segments"] = confidence_segments ranked_indices_2d = ( heuristic.rank_3d(**params_rank_3d) if confidences_3d else heuristic.rank_2d(confidences_2d) ) newly_sampled_indices = ranked_indices_2d[:sampling_size] remaining_indices = ranked_indices_2d[sampling_size:] else: try: ranked_entity_indices = heuristic.rank_entities(confidences_2d) except (TypeError, ValueError): # if heuristic does not have entity AL support default to entropy heuristic = EntropySampling ranked_entity_indices = heuristic.rank_entities(confidences_2d) newly_sampled_indices = ranked_entity_indices[:sampling_size] remaining_indices = ranked_entity_indices[sampling_size:] newly_sampled_queries_ids = [ self.unsampled_queries.elements[i] for i in newly_sampled_indices ] self.update_sampled_queries(newly_sampled_queries_ids) self.update_unsampled_queries(remaining_indices) return newly_sampled_queries_ids
[docs] @staticmethod def filter_queries_by_nlp_component( query_list: ProcessedQueryList, component_type: str, component_name: str ): """Filter queries for training preperation. Args: query_list (list): List of queries to filter component_type (str): Component type of desired queries (e.g. "domain") component_name (str): Component name of desired queries (e.g. "smart_home") Returns: filtered_queries_indices (list): List of indices of filtered queries. filtered_queries (list): List of filtered queries. """ filtered_queries = [] filtered_queries_indices = [] for index, query in enumerate(query_list.processed_queries()): if getattr(query, component_type) == component_name: filtered_queries_indices.append(index) filtered_queries.append(query) return filtered_queries_indices, filtered_queries
[docs]class DataBucketFactory: """Class to generate the initial data for experimentation. (Seed Queries, Remaining Queries, and Test Queries). Handles initial sampling and data split based on configuation details. """
[docs] @staticmethod def get_data_bucket_for_strategy_tuning( app_path: str, tuning_level: list, train_pattern: str, test_pattern: str, train_seed_pct: float, ): """Creates a DataBucket to be used for strategy tuning. Args: app_path (str): Path to MindMeld application tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") train_pattern (str): Regex pattern to match train files. (".*train.*.txt") test_pattern (str): Regex pattern to match test files. (".*test.*.txt") train_seed_pct (float): Percentage of training data to use as the initial seed Returns: strategy_tuning_data_bucket (DataBucket): DataBucket for tuning """ label_map = LabelMap.create_label_map(app_path, train_pattern) resource_loader = ResourceLoader.create_resource_loader(app_path) train_query_list = resource_loader.get_flattened_label_set( label_set=train_pattern ) if TuneLevel.ENTITY.value in tuning_level: label_map.entity2id = LabelMap._get_entity_mappings(train_query_list) label_map.id2entity = LabelMap._reverse_dict(label_map.entity2id) train_class_labels = LabelMap.get_class_labels(tuning_level, train_query_list) ranked_indices = stratified_random_sample(train_class_labels) sampling_size = int(train_seed_pct * len(train_query_list)) sampled_query_ids = [ train_query_list.elements[i] for i in ranked_indices[:sampling_size] ] unsampled_query_ids = [ train_query_list.elements[i] for i in ranked_indices[sampling_size:] ] sampled_queries = ProcessedQueryList( resource_loader.query_cache, sampled_query_ids ) unsampled_queries = ProcessedQueryList( resource_loader.query_cache, unsampled_query_ids ) test_queries = resource_loader.get_flattened_label_set(label_set=test_pattern) return DataBucket( label_map, resource_loader, test_queries, unsampled_queries, sampled_queries )
[docs] @staticmethod def get_data_bucket_for_query_selection( app_path: str, tuning_level: list, train_pattern: str, test_pattern: str, unlabeled_logs_path: str, labeled_logs_pattern: str = None, log_usage_pct: float = AL_MAX_LOG_USAGE_PCT, ): """Creates a DataBucket to be used for log query selection. Args: app_path (str): Path to MindMeld application tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") train_pattern (str): Regex pattern to match train files. For example, ".*train.*.txt" test_pattern (str): Regex pattern to match test files. For example, ".*test.*.txt" unlabeled_logs_path (str): Path a logs text file with unlabeled queries labeled_logs_pattern (str): Pattern to obtain logs already labeled within a MindMeld app log_usage_pct (float): Percentage of the log data to use for selection Returns: query_selection_data_bucket (DataBucket): DataBucket for log query selection """ label_map = LabelMap.create_label_map(app_path, train_pattern) resource_loader = ResourceLoader.create_resource_loader(app_path) train_query_list = resource_loader.get_flattened_label_set( label_set=train_pattern ) if TuneLevel.ENTITY.value in tuning_level: label_map.entity2id = LabelMap._get_entity_mappings(train_query_list) label_map.id2entity = LabelMap._reverse_dict(label_map.entity2id) if labeled_logs_pattern: log_query_list = resource_loader.get_flattened_label_set( label_set=labeled_logs_pattern ) else: log_queries = LogQueriesLoader( app_path, tuning_level, unlabeled_logs_path ).queries log_queries_keys = [ resource_loader.query_cache.get_key(q.domain, q.intent, q.query.text) for q in log_queries ] log_query_row_ids = [ resource_loader.query_cache.put(key, query) for key, query in zip(log_queries_keys, log_queries) ] log_query_list = ProcessedQueryList( cache=resource_loader.query_cache, elements=log_query_row_ids ) if log_usage_pct < AL_MAX_LOG_USAGE_PCT: sampling_size = int(log_usage_pct * len(log_query_list)) log_class_labels, _ = label_map.get_class_labels( tuning_level, log_query_list ) ranked_indices = stratified_random_sample(log_class_labels) log_query_ids = [ log_query_list.elements[i] for i in ranked_indices[:sampling_size] ] log_queries = ProcessedQueryList(log_query_list.cache, log_query_ids) sampled_queries = resource_loader.get_flattened_label_set( label_set=train_pattern ) test_queries = resource_loader.get_flattened_label_set(label_set=test_pattern) return DataBucket( label_map, resource_loader, test_queries, log_query_list, sampled_queries )