Source code for alex.applications.PublicTransportInfoEN.hdc_policy

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import unicode_literals

import random
import itertools
from datetime import timedelta
from datetime import datetime
from datetime import time as dttime
from collections import defaultdict
import re
import os

from alex.applications.PublicTransportInfoEN.site_preprocessing import expand_stop
from alex.components.dm import DialoguePolicy
from alex.components.dm.dddstate import D3DiscreteValue
from alex.components.nlg.tools.en import word_for_number
from alex.components.slu.da import DialogueAct, DialogueActItem
from .time_zone import GoogleTimeFinder
from .directions import GoogleDirectionsFinder, Travel
from alex.applications.utils.weather import OpenWeatherMapWeatherFinder, WeatherPoint


[docs]def randbool(n): """Randomly return True in 1 out of n cases. :param n: Inverted chance of returning True :rtype: Boolean """ if random.randint(1, n) == 1: return True return False
[docs]class PTIENHDCPolicy(DialoguePolicy): """The handcrafted policy for the PTI-EN system.""" def __init__(self, cfg, ontology): super(PTIENHDCPolicy, self).__init__(cfg, ontology) os.environ['TZ'] = ontology['default_values']['time_zone'] directions_type = GoogleDirectionsFinder if 'directions' in cfg['DM'] and 'type' in cfg['DM']['directions']: directions_type = cfg['DM']['directions']['type'] self.directions = directions_type(cfg=cfg) self.weather = OpenWeatherMapWeatherFinder(cfg=cfg) self.time = GoogleTimeFinder(cfg=cfg) self.infer_default_stops = directions_type == GoogleDirectionsFinder self.system_das = [] self.last_system_dialogue_act = None self.debug = cfg['DM']['basic']['debug'] self.session_logger = cfg['Logging']['session_logger'] self.system_logger = cfg['Logging']['system_logger'] self.policy_cfg = self.cfg['DM']['dialogue_policy']['PTIENHDCPolicy'] self.accept_prob = self.policy_cfg['accept_prob']
[docs] def reset_on_change(self, ds, changed_slots): """Reset slots which depends on changed slots. :param ds: dialogue state :param changed_slots: slots changed in the last turn """ for ds_slot in ds: if ds_slot in changed_slots: # do not reset a slot which just changed continue for changed_slot in changed_slots: if self.ontology.reset_on_change(ds_slot, changed_slot): if isinstance(ds[ds_slot], float): ds[ds_slot] = 0.0 elif isinstance(ds[ds_slot], int): ds[ds_slot] = 0 elif isinstance(ds[ds_slot], basestring): ds[ds_slot] = "none" else: ds[ds_slot].reset() self.system_logger.debug("Reset on change: {slot} because of {changed_slot}".format(slot=ds_slot, changed_slot=changed_slot)) break
[docs] def filter_iconfirms(self, da): """Filter implicit confirms if the same information is uttered in an inform dialogue act item. Also filter implicit confirms for stop names equaling city names. Also check if the stop and city names are equal! :param da: unfiltered dialogue act :return: filtered dialogue act """ new_da = DialogueAct() informs = [] iconfirms = defaultdict(int) for dai in da: if dai.dat == 'inform': informs.append((dai.name, dai.value)) elif dai.dat == 'iconfirm': iconfirms[(dai.name, dai.value)] += 1 for dai in da: if dai.dat == 'iconfirm': # filter slots explicitly informed if (dai.name, dai.value) in informs: continue # filter repeating iconfirms elif iconfirms[dai.name, dai.value] > 1: iconfirms[dai.name, dai.value] -= 1 continue # filter mistakenly added iconfirms that have an unset/meaningless value elif dai.value is None or dai.value in ['none', '*']: continue # filter stop names that are the same as city names elif dai.name.endswith('_stop'): city_dai = dai.name[:-4] + 'city' if (city_dai, dai.value) in informs or iconfirms[(city_dai, dai.value)]: continue # filter state names that are the same as city names elif dai.name.endswith('_state'): city_dai = dai.name[:-5] + 'city' if (city_dai, dai.value) in informs or iconfirms[(city_dai, dai.value)]: continue new_da.append(dai) return new_da
[docs] def get_da(self, dialogue_state): """The main policy decisions are made here. For each action, some set of conditions must be met. These conditions depends on the action. :param dialogue_state: the belief state provided by the tracker :return: a dialogue act - the system action """ ludait_prob, last_user_dai_type = dialogue_state["ludait"].mph() if ludait_prob < self.policy_cfg['accept_prob_ludait']: last_user_dai_type = 'none' # all slots being requested by the user slots_being_requested = dialogue_state.get_slots_being_requested(self.policy_cfg['accept_prob_being_requested']) # all slots being confirmed by the user slots_being_confirmed = dialogue_state.get_slots_being_confirmed(self.policy_cfg['accept_prob_being_confirmed']) # all slots supplied by the user but not implicitly confirmed noninformed_slots = dialogue_state.get_slots_being_noninformed(self.policy_cfg['accept_prob_noninformed']) # all slots deemed to be accepted accepted_slots = dialogue_state.get_accepted_slots(self.accept_prob) # all slots that should be confirmed slots_tobe_confirmed = dialogue_state.get_slots_tobe_confirmed(self.policy_cfg['confirm_prob'], self.accept_prob) # filter out all the slots that are not defined by the ontology to be confirmed slots_tobe_confirmed = {k: v for k, v in slots_tobe_confirmed.items() if k in self.ontology.slots_system_confirms()} # all slots for which the policy can use ``select`` DAI slots_tobe_selected = dialogue_state.get_slots_tobe_selected(self.policy_cfg['select_prob']) # filter out all the slots that are not defined by the ontology to be selected slots_tobe_selected = {k: v for k, v in slots_tobe_selected.items() if k in self.ontology.slots_system_selects()} # all slots changed by a user in the last turn changed_slots = dialogue_state.get_changed_slots(self.accept_prob) # did the state changed at all? has_state_changed = dialogue_state.has_state_changed(self.policy_cfg['min_change_prob']) if self.debug: s = [] s.append('PTIENHDCPolicy - Slot stats') s.append("") s.append("ludait: %s" % unicode(last_user_dai_type)) s.append("Slots being requested: %s" % unicode(slots_being_requested)) s.append("Slots being confirmed: %s" % unicode(slots_being_confirmed)) s.append("Non-informed slots: %s" % unicode(noninformed_slots)) s.append("") s.append("Accepted slots: %s" % unicode(accepted_slots)) s.append("Slots to be confirmed: %s" % unicode(slots_tobe_confirmed)) s.append("Slots to be selected: %s" % unicode(slots_tobe_selected)) s.append("Changed slots: %s" % unicode(changed_slots)) s.append("State changed? %s" % unicode(has_state_changed)) s = '\n'.join(s) self.system_logger.debug(s) # output DA res_da = None # reset all slots depending on changed slots self.reset_on_change(dialogue_state, changed_slots) # These facts are used in the dialog-controlling conditions that follow. # They are named so that the dialog-controlling code is more readable.o fact = { 'max_turns_exceeded': dialogue_state.turn_number > self.cfg[ 'PublicTransportInfoEN']['max_turns'], 'dialog_begins': len(self.system_das) == 0, 'user_did_not_say_anything': last_user_dai_type == "silence", 'user_said_bye': "lta_bye" in accepted_slots, 'we_did_not_understand': last_user_dai_type == "null" or last_user_dai_type == "other", 'user_wants_help': last_user_dai_type == "help", 'user_thanked': last_user_dai_type == "thankyou", 'user_wants_restart': last_user_dai_type == "restart", 'user_wants_us_to_repeat': last_user_dai_type == "repeat", 'there_is_something_to_be_selected': bool(slots_tobe_selected), 'there_is_something_to_be_confirmed': bool(slots_tobe_confirmed), 'user_wants_to_know_the_time': 'current_time' in slots_being_requested, 'user_wants_to_know_the_weather': dialogue_state[ 'lta_task'].test('weather', self.accept_prob), 'user_wants_to_find_the_platform': dialogue_state[ 'lta_task'].test('find_platform', self.accept_prob), } # topic-independent behavior if fact['max_turns_exceeded']: # Hang up if the talk has been too long res_da = DialogueAct('bye()&inform(toolong="true")') elif fact['dialog_begins']: # NLG("Dobrý den. Jak Vám mohu pomoci") res_da = DialogueAct("hello()") elif fact['user_did_not_say_anything']: # at this moment the silence and the explicit null act # are treated the same way: NLG("") silence_time = dialogue_state['silence_time'] if silence_time > self.cfg['DM']['basic']['silence_timeout']: res_da = DialogueAct('inform(silence_timeout="true")') else: res_da = DialogueAct("silence()") dialogue_state["ludait"].reset() elif fact['user_said_bye']: # NLG("Na shledanou.") res_da = DialogueAct("bye()") dialogue_state["ludait"].reset() dialogue_state["lta_bye"].reset() elif fact['we_did_not_understand']: # NLG("Sorry, I did not understand. You can say...") res_da = DialogueAct("notunderstood()") if randbool(5): res_da.extend(self.get_limited_context_help(dialogue_state)) dialogue_state["ludait"].reset() elif fact['user_wants_help']: # NLG("Help.") based on context res_da = self.get_help_res_da(dialogue_state, accepted_slots, has_state_changed) # res_da = DialogueAct("help()") dialogue_state["ludait"].reset() elif fact['user_thanked']: # NLG("Díky.") if not changed_slots: # plain thank you, nothing else said dialogue_state.restart() res_da = DialogueAct('inform(cordiality="true")&hello()') dialogue_state["ludait"].reset() elif fact['user_wants_restart']: # NLG("Dobře, zančneme znovu. Jak Vám mohu pomoci?") dialogue_state.restart() res_da = DialogueAct("restart()&hello()") dialogue_state["ludait"].reset() elif fact['user_wants_us_to_repeat']: # NLG - use the last dialogue act res_da = DialogueAct("irepeat()") dialogue_state["ludait"].reset() elif fact['there_is_something_to_be_selected']: # implicitly confirm all changed slots res_da = self.get_iconfirm_info(changed_slots) # select between two values for a slot that is not certain res_da.extend(self.select_info(slots_tobe_selected)) res_da = self.filter_iconfirms(res_da) elif fact['there_is_something_to_be_confirmed']: # implicitly confirm all changed slots res_da = self.get_iconfirm_info(changed_slots) # confirm all slots that are not certain res_da.extend(self.confirm_info(slots_tobe_confirmed)) res_da = self.filter_iconfirms(res_da) elif fact['user_wants_to_know_the_time']: # Respond to questions about current time # TODO: allow combining with other questions? res_da = self.get_current_time_res_da(dialogue_state, accepted_slots, has_state_changed) # topic-dependent elif fact['user_wants_to_know_the_weather']: # implicitly confirm all changed slots res_da = self.get_iconfirm_info(changed_slots) # talk about weather w_da = self.get_weather_res_da(dialogue_state, last_user_dai_type, slots_being_requested, slots_being_confirmed, accepted_slots, changed_slots, has_state_changed) res_da.extend(w_da) res_da = self.filter_iconfirms(res_da) else: # implicitly confirm all changed slots #todo: refactoring (place, area) - remove this hack - streets share stop slot so we don't have to generate more nlg templates changed_slots = self.fix_stop_street_slots(changed_slots) res_da = self.get_iconfirm_info(changed_slots) # talk about public transport t_da = self.get_connection_res_da(dialogue_state, last_user_dai_type, slots_being_requested, slots_being_confirmed, accepted_slots, changed_slots, has_state_changed) res_da.extend(t_da) res_da = self.filter_iconfirms(res_da) self.last_system_dialogue_act = res_da # record the system dialogue acts self.system_das.append(self.last_system_dialogue_act) return self.last_system_dialogue_act
[docs] def get_connection_res_da(self, ds, ludait, slots_being_requested, slots_being_confirmed, accepted_slots, changed_slots, state_changed): """Handle the public transport connection dialogue topic. :param ds: The current dialogue state :rtype: DialogueAct """ # output DA res_da = None if ludait == "reqalts": # NLG("There is nothing else in the database.") # NLG("The next connection is ...") res_da = self.get_an_alternative(ds) ds["ludait"].reset() elif "alternative" in accepted_slots: # Search for traffic direction and/or present the requested directions already found, take into account additional requests (dep_time etc.) res_da = self.get_requested_alternative(ds, slots_being_requested, accepted_slots) ds["alternative"].reset() elif slots_being_requested: # inform about all requested slots res_da = self.get_requested_info(slots_being_requested, ds, accepted_slots) elif slots_being_confirmed: # inform about all slots being confirmed by the user res_da = self.get_confirmed_info(slots_being_confirmed, ds, accepted_slots) else: # gather known information about the connection req_da, iconfirm_da, conn_info = self.gather_connection_info(ds, accepted_slots) if len(req_da) == 0: if state_changed: # we know everything we need -> start searching ds.conn_info = conn_info res_da = iconfirm_da res_da.extend(self.get_directions(ds, check_conflict=True)) else: res_da = self.backoff_action(ds) else: res_da = req_da return res_da
[docs] def get_weather_res_da(self, ds, ludait, slots_being_requested, slots_being_confirmed, accepted_slots, changed_slots, state_changed): """Handle the dialogue about weather. :param ds: The current dialogue state :param slots_being_requested: The slots currently requested by the user :rtype: DialogueAct """ # collect all necesary information req_da, ref_point = self.gather_weather_info(ds, accepted_slots) if len(req_da): return req_da # check if it is valid information apology_da = self.check_city_state_conflict(ref_point.in_city, ref_point.in_state) if apology_da is not None: return apology_da # obtain the weather if have not done so in previous turn if state_changed: res_da = self.get_weather(ds, ref_point) else: res_da = self.backoff_action(ds) return res_da
[docs] def get_current_time_res_da(self, ds, accepted_slots, state_changed): """Generates a dialogue act informing about the current time. :rtype: DialogueAct """ req_da, in_city, in_state, lon, lat = self.gather_time_info(ds, accepted_slots) if len(req_da): return req_da # check for valid input if in_city != 'none': apology_da = self.check_city_state_conflict(in_city, in_state) if apology_da is not None: return apology_da # if state_changed: res_da = self.get_current_time(in_city, in_state, lon, lat) # else: # res_da = self.backoff_action(ds) return res_da
[docs] def get_weather(self, ds, ref_point = None): """Retrieve weather information according to the current dialogue state. Infers state names based on city names and vice versa. :param ds: The current dialogue state :rtype: DialogueAct """ # get dialogue state values time_abs = ds['time'].mpv() time_rel = ds['time_rel'].mpv() date_rel = ds['date_rel'].mpv() ampm = ds['ampm'].mpv() lta_time = ds['lta_time'].mpv() in_city = ref_point.in_city#ds['in_city'].mpv() in_state = ref_point.in_state#ds['in_state'].mpv() # return the result res_da = DialogueAct() # interpret time daily = (time_abs == 'none' and ampm == 'none' and date_rel != 'none' and lta_time != 'time_rel') # check if any time is set to distinguish current/prediction weather_ts = None if time_abs != 'none' or time_rel != 'none' or ampm != 'none' or date_rel != 'none': weather_ts, time_type = self.interpret_time(time_abs, ampm, time_rel, date_rel, lta_time) else: time_type = "" # request the weather weather = self.weather.get_weather(time=weather_ts, daily=daily, city=in_city, state=in_state) # check errorscale if weather is None: return DialogueAct('apology()&inform(in_city="%s")&inform(in_state="%s")' % (in_city, in_state)) # time if weather_ts: if time_type == 'rel': res_da.append(DialogueActItem('inform', 'time_rel', time_rel)) else: if time_abs != 'none' or ampm != 'none': res_da.append(DialogueActItem('inform', 'time', weather_ts.strftime("%I:%M:%p"))) # is time right? if date_rel != 'none': res_da.append(DialogueActItem('inform', 'date_rel', date_rel)) else: res_da.append(DialogueActItem('inform', 'time_rel', 'now')) # temperature if not daily: res_da.append(DialogueActItem('inform', 'temperature', str(weather.temp))) else: res_da.append(DialogueActItem('inform', 'min_temperature', str(weather.min_temp))) res_da.append(DialogueActItem('inform', 'max_temperature', str(weather.max_temp))) # weather conditions res_da.append(DialogueActItem('inform', 'weather_condition', weather.condition)) return res_da
[docs] def get_current_time(self, in_city, in_state, longitude, latitude): place = in_city + "," + in_state if in_city != 'none' else in_state if longitude and latitude: cur_time, time_zone = self.time.get_time(place=place, lat=latitude, lon=longitude) else: cur_time, time_zone = self.time.get_time(place=place) if cur_time is None: default_time = datetime.now() default_state = self.ontology.get_default_value('in_state') d_time = default_time.strftime("%I:%M:%p") if in_state is not default_state: return DialogueAct('apology()&inform(in_state="%s")&inform(current_time=%s)&iconfirm(in_state=%s)' % (in_state, d_time, default_state)) else: return DialogueAct('inform(current_time=%s)&iconfirm(in_state="%s")' % (d_time, default_state)) res_da = DialogueAct() res_da.append(DialogueActItem('iconfirm', 'in_state', in_state)) res_da.append(DialogueActItem('inform', 'current_time', cur_time.strftime("%I:%M:%p"))) res_da.append(DialogueActItem('inform', 'time_zone', time_zone)) return res_da
[docs] def backoff_action(self, ds): """Generate a random backoff dialogue act in case we don't know what to do. :param ds: The current dialogue state :rtype: DialogueAct """ if randbool(10): return self.get_limited_context_help(ds) elif randbool(9): return DialogueAct('reqmore()') elif randbool(8): return DialogueAct('notunderstood()') elif randbool(3): return DialogueAct('irepeat()') return DialogueAct('silence()')
[docs] def get_an_alternative(self, ds): """Return an alternative route, if there is one, or ask for origin stop if there has been no route searching so far. :param ds: The current dialogue state :rtype: DialogueAct """ if 'route_alternative' not in ds: return DialogueAct('request(from_stop)') else: ds['route_alternative'] += 1 ds['route_alternative'] %= len(ds.directions) if ds.directions is not None else 1 return self.get_directions(ds)
[docs] def get_requested_alternative(self, ds, slots_being_requested, accepted_slots): """Return the requested route (or inform about not finding one). :param ds: The current dialogue state :rtype: DialogueAct """ res_da = DialogueAct() if not 'route_alternative' in ds: if slots_being_requested: # just waste the requested slots for not to carry them to the next round self.get_requested_info(slots_being_requested, ds, accepted_slots) return DialogueAct('inform(stop_conflict="no_stops")') else: ds_alternative = ds["alternative"].mpv() type = ds_alternative if ds_alternative == "next": increment = 1 elif ds_alternative == "prev": increment = -1 elif ds_alternative in ['last', 'dontcare']: increment = 0 else: increment = int(ds_alternative) - ds["route_alternative"] - 1 type = "true" ds["route_alternative"] += increment try: if ds['route_alternative'] < 0: raise Exception() ds.directions[ds['route_alternative']] # just for failing except: ds["route_alternative"] -= increment if slots_being_requested: # just waste the requested slots for not to carry them to the next round self.get_requested_info(slots_being_requested, ds, accepted_slots) return DialogueAct('inform(found_directions="no_%s")' % type) if slots_being_requested: res_da.append(DialogueActItem('inform', 'alternative', word_for_number(ds['route_alternative'] + 1, True))) res_da.extend(self.get_requested_info(slots_being_requested, ds, accepted_slots)) ds["route_alternative"] -= increment return res_da res_da.extend(self.get_directions(ds, type)) # ds["route_alternative"] -= increment return res_da
[docs] def get_requested_info(self, requested_slots, ds, accepted_slots): """Return a DA containing information about all requested slots. :param ds: The current dialogue state :param requested_slots: A dictionary with keys for all requested \ slots and the correct return values. :rtype: DialogueAct """ res_da = DialogueAct() for slot in requested_slots: if ds['route_alternative'] in [0, 1, 2, 3]: if slot == 'from_stop': res_da.extend(self.req_from_stop(ds)) elif slot == 'to_stop': res_da.extend(self.req_to_stop(ds)) elif slot == 'departure_time': res_da.extend(self.req_departure_time(ds)) elif slot == 'departure_time_rel': res_da.extend(self.req_departure_time_rel(ds)) elif slot == 'arrival_time': res_da.extend(self.req_arrival_time(ds)) elif slot == 'arrival_time_rel': res_da.extend(self.req_arrival_time_rel(ds)) elif slot in 'duration': res_da.extend(self.req_duration(ds)) elif slot == "num_transfers": res_da.extend(self.req_num_transfers(ds)) elif slot == "time_transfers": res_da.extend(self.req_time_transfers(ds)) elif slot == "distance": res_da.extend(self.req_distance(ds)) else: if slot in ['from_stop', 'to_stop', 'departure_time', 'departure_time_rel', 'arrival_time', 'arrival_time_rel', 'duration', 'num_transfers', 'time_transfers', 'distance' ]: dai = DialogueActItem("inform", "stops_conflict", "no_stops") res_da.append(dai) if 'from_stop' not in accepted_slots: dai = DialogueActItem("help", "inform", "from_stop") res_da.append(dai) elif 'to_stop' not in accepted_slots: dai = DialogueActItem("help", "inform", "to_stop") res_da.append(dai) else: dai = DialogueActItem("inform", slot, requested_slots[slot]) res_da.append(dai) ds["rh_" + slot].reset() ds["rh_" + slot].reset() return res_da
[docs] def get_confirmed_info(self, confirmed_slots, ds, accepted_slots): """Return a DA containing information about all slots being confirmed by the user (confirm/deny). Update the current dialogue state regarding the information provided. *WARNING* This confirms only against values in the dialogue state, however, it should (also in some cases) confirm against the results obtained from database, e.g. departure_time slot. :param ds: The current dialogue state :param confirmed_slots: A dictionary with keys for all slots \ being confirmed, along with their values :rtype: DialogueAct """ res_da = DialogueAct() for slot in confirmed_slots: if confirmed_slots[slot].mpv() == ds[slot].mpv(): # it is as user expected res_da.append(DialogueActItem("affirm")) dai = DialogueActItem("inform", slot, ds[slot].mpv()) res_da.append(dai) else: # it is something else than what user expected res_da.append(DialogueActItem("negate")) dai = DialogueActItem("deny", slot, ds["ch_" + slot].mpv()) res_da.append(dai) if slot in accepted_slots: dai = DialogueActItem("inform", slot, ds[slot].mpv()) res_da.append(dai) ds["ch_" + slot].reset() return res_da
[docs] def confirm_info(self, tobe_confirmed_slots): """Return a DA containing confirming only one slot from the slot to be confirmed. Confirm the slot with the most probable value among all slots to be confirmed. :param tobe_confirmed_slots: A dictionary with keys for all slots \ that should be confirmed, along with their values :rtype: DialogueAct """ res_da = DialogueAct() for _, slot in sorted([(h.mpvp(), s) for s, h in tobe_confirmed_slots.items()], reverse=True): dai = DialogueActItem("confirm", slot, tobe_confirmed_slots[slot].mpv()) res_da.append(dai) #confirm explicitly only one slot at the time break return res_da
[docs] def select_info(self, tobe_selected_slots): """Return a DA containing select act for two most probable values of only one slot from the slot to be used for select DAI. :param tobe_selected_slots: A dictionary with keys for all slots \ which the two most probable values should be selected :rtype: DialogueAct """ res_da = DialogueAct() for slot in tobe_selected_slots: val1, val2 = tobe_selected_slots[slot].tmpvs() res_da.append(DialogueActItem("select", slot, val1)) res_da.append(DialogueActItem("select", slot, val2)) #select values only in one slot at the time break return res_da
[docs] def get_iconfirm_info(self, changed_slots): """Return a DA containing all needed implicit confirms. Implicitly confirm all slots provided but not yet confirmed. This include also slots changed during the conversation. :param changed_slots: A dictionary with keys for all slots \ that have not been implicitly confirmed, along with \ their values :rtype: DialogueAct """ res_da = DialogueAct() if changed_slots: iconf_da = DialogueAct() for slot in changed_slots: if 'system_iconfirms' in self.ontology['slot_attributes'][slot]: dai = DialogueActItem("iconfirm", slot, changed_slots[slot].mpv()) iconf_da.append(dai) res_da.extend(iconf_da) return res_da
[docs] def get_default_stop_for_city(self, city): """Return a `default' stop based on the city name (main bus/train station). :param city: city name (unicode) :rtype: unicode """ stops = self.ontology.get_compatible_vals('city_stop', city) for cand_stop_name in [city, 'Hlavní nádraží', 'CAN, Husova']: if cand_stop_name in stops: return cand_stop_name for cand_stop_suffix in [' main station', ' city', ]: stop = city + cand_stop_suffix if stop in stops: return stop return None
[docs] def get_accepted_mpv(self, ds, slot_name, accepted_slots): """Return a slot's 'mpv()' (most probable value) if the slot is accepted, and return 'none' otherwise. Also, convert a mpv of '*' to 'none' since we don't know how to interpret it. :param ds: Dialogue state :param slot_name: The name of the slot to query :param accepted_slots: The currently accepted slots of the dialogue state :rtype: string """ val = 'none' if slot_name in accepted_slots: val = ds[slot_name].mpv() if val == '*': val = 'none' return val
[docs] def gather_connection_info(self, ds, accepted_slots): """Return a DA requesting further information needed to search for traffic directions and a dictionary containing the known information. Infers city names based on stop names and vice versa. If the request DA is empty, the search for directions may be commenced immediately. :param ds: The current dialogue state :rtype: DialogueAct, dict """ req_da = DialogueAct() # retrieve the slot variables from_city_val = self.get_accepted_mpv(ds, 'from_city', accepted_slots) from_borough_val = self.get_accepted_mpv(ds, 'from_borough', accepted_slots) from_stop_val = self.get_accepted_mpv(ds, 'from_stop', accepted_slots) from_street_val = self.get_accepted_mpv(ds, 'from_street', accepted_slots) from_street2_val = self.get_accepted_mpv(ds, 'from_street2', accepted_slots) to_city_val = self.get_accepted_mpv(ds, 'to_city', accepted_slots) to_borough_val = self.get_accepted_mpv(ds, 'to_borough', accepted_slots) to_stop_val = self.get_accepted_mpv(ds, 'to_stop', accepted_slots) to_street_val = self.get_accepted_mpv(ds, 'to_street', accepted_slots) to_street2_val = self.get_accepted_mpv(ds, 'to_street2', accepted_slots) vehicle_val = self.get_accepted_mpv(ds, 'vehicle', accepted_slots) max_transfers_val = self.get_accepted_mpv(ds, 'num_transfers', accepted_slots) # infer cities based on stops from_cities, to_cities = None, None stop_city_inferred = False if from_stop_val != 'none' and from_city_val == 'none': from_cities = self.ontology.get_compatible_vals('stop_city', from_stop_val) if len(from_cities) == 1: from_city_val = from_cities.pop() stop_city_inferred = True if to_stop_val != 'none' and to_city_val == 'none': to_cities = self.ontology.get_compatible_vals('stop_city', to_stop_val) if len(to_cities) == 1: to_city_val = to_cities.pop() stop_city_inferred = True # infer cities based on stops from_boroughs, to_boroughs = None, None stop_borough_inferred = False if from_stop_val != 'none' and from_borough_val == 'none': from_boroughs = self.ontology.get_compatible_vals('stop_borough', from_stop_val) if len(from_boroughs) == 1: from_borough_val = from_boroughs.pop() stop_borough_inferred = True if to_stop_val != 'none' and to_borough_val == 'none': to_boroughs = self.ontology.get_compatible_vals('stop_borough', to_stop_val) if len(to_boroughs) == 1: to_borough_val = to_boroughs.pop() stop_borough_inferred = True # infer boroughs based on streets from_boroughs_st, to_boroughs_st = None, None street_borough_inferred = False if to_street_val != 'none' and to_borough_val == 'none': to_boroughs_st = self.ontology.get_compatible_vals('street_borough', to_street_val) if len(to_boroughs_st) == 1: to_borough_val = to_boroughs_st.pop() street_borough_inferred = True if from_street_val != 'none' and from_borough_val == 'none': from_boroughs_st = self.ontology.get_compatible_vals('street_borough', from_street_val) if len(from_boroughs_st) == 1: from_borough_val = from_boroughs_st.pop() street_borough_inferred = True # infer cities based on each other if from_stop_val != 'none' and from_city_val == 'none' and to_city_val in from_cities: from_city_val = to_city_val elif to_stop_val != 'none' and to_city_val == 'none' and from_city_val in to_cities: to_city_val = from_city_val # infer boroughs based on each other # from stops if from_stop_val != 'none' and from_borough_val == 'none' and to_borough_val in from_boroughs: from_borough_val = to_borough_val elif to_stop_val != 'none' and to_borough_val == 'none' and from_borough_val in to_boroughs: to_borough_val = from_borough_val # from streets if from_street_val != 'none' and from_borough_val == 'none' and to_borough_val in from_boroughs_st: from_borough_val = to_borough_val elif to_street_val != 'none' and to_borough_val == 'none' and from_borough_val in to_boroughs_st: to_borough_val = from_borough_val # try to infer cities from intersection if to_cities is not None and from_cities is not None and from_city_val == 'none' and to_city_val == 'none': intersect_c = [c for c in from_cities if c in to_cities] if len(intersect_c) == 1: from_city_val = intersect_c.pop() to_city_val = from_city_val stop_city_inferred = True # try infer boroughs from intersection if to_boroughs is not None and from_boroughs is not None and from_borough_val == 'none' and to_borough_val == 'none': intersect_b = [b for b in from_boroughs if b in to_boroughs] if len(intersect_b) == 1: from_borough_val = intersect_b.pop() to_borough_val = from_borough_val stop_borough_inferred = True if to_boroughs_st is not None and from_boroughs_st is not None and from_borough_val == 'none' and to_borough_val == 'none': intersect_bs = [b for b in from_boroughs_st if b in to_boroughs_st] if len(intersect_bs) == 1: from_borough_val = intersect_bs.pop() to_borough_val = from_borough_val street_borough_inferred = True # place can be specified by street or stop and area by city or borough or another street has_from_place = from_stop_val != 'none' or from_street_val != 'none' or from_city_val not in ['none', 'New York'] has_from_area = from_borough_val !='none' or from_street2_val != 'none' or from_city_val != 'none' from_info_complete = has_from_place and has_from_area has_to_place = to_stop_val != 'none' or to_street_val != 'none' or to_city_val not in ['none', 'New York'] has_to_area = to_borough_val !='none' or to_street2_val != 'none' or to_city_val != 'none' # hack for from CITY to CITY and allowing New York as one of them if (has_to_area and has_from_area) and from_city_val != to_city_val: has_to_place = True has_from_place = True to_info_complete = has_to_place and has_to_area if not from_info_complete and not to_info_complete and \ 'departure_time' not in accepted_slots and 'time' not in accepted_slots and randbool(10): req_da.extend(DialogueAct('request(departure_time)')) elif not has_to_place: req_da.extend(DialogueAct('request(to_stop)')) elif not has_from_place: req_da.extend(DialogueAct('request(from_stop)')) elif not has_to_area: if to_city_val == 'none' and to_street_val == 'none': req_da.extend(DialogueAct('request(to_city)')) else: req_da.extend(DialogueAct('request(to_borough)')) elif not has_from_area: if from_city_val == 'none' and from_street_val == 'none': req_da.extend(DialogueAct('request(from_city)')) else: req_da.extend(DialogueAct('request(from_borough)')) elif has_from_area and has_to_area: if not has_from_place and to_city_val == 'New York': req_da.extend(DialogueAct('request(to_stop)')) if not has_to_place and from_city_val == 'New York': req_da.extend(DialogueAct('request(from_stop)')) # generate implicit confirms if we inferred cities and they are not the same for both stops default_city = self.ontology.get_default_value('city') # don't iconfirm borrough if new york is the other city, because all boroughs are in new york iconfirm_da = DialogueAct() if len(req_da) == 0: if stop_city_inferred and from_city_val != to_city_val: if to_city_val != 'none': # append iconfirm only if it is not new york and from area is not borough - all boroughs are in new york if to_city_val != 'New York' or not has_from_area or from_city_val != 'none': iconfirm_da.append(DialogueActItem('iconfirm', 'to_city', to_city_val)) if from_city_val != 'none': # append iconfirm only if it is not new york and to area is not borough - all boroughs are in new york if from_city_val != 'New York' or not has_to_area or to_city_val != 'none': iconfirm_da.append(DialogueActItem('iconfirm', 'from_city', from_city_val)) if (stop_borough_inferred or street_borough_inferred) and from_borough_val != to_borough_val: if to_borough_val != 'none': iconfirm_da.append(DialogueActItem('iconfirm', 'to_borough', to_borough_val)) if from_borough_val != 'none': iconfirm_da.append(DialogueActItem('iconfirm', 'from_borough', from_borough_val)) # retrieve additional geo location data: # we only need geo locations for stops, str(city + stop) is more informative than geo location of a city! from_stop_geo = self.ontology['addinfo']['city'].get(from_city_val, {}).get(from_stop_val, None) if from_stop_geo: from_stop_geo = None if from_stop_geo['lat'].isalnum() or from_stop_geo['lon'].isalnum() else from_stop_geo to_stop_geo = self.ontology['addinfo']['city'].get(to_city_val, {}).get(to_stop_val, None) if to_stop_geo: to_stop_geo = None if to_stop_geo['lat'].isalnum() or to_stop_geo['lon'].isalnum() else to_stop_geo # express boroughs as cities if city values are not set if from_city_val == 'none': from_city_val = from_borough_val if from_borough_val else 'none' if to_city_val == 'none': to_city_val = to_borough_val if to_borough_val else 'none' from_streets = " and ".join([street for street in [from_street_val, from_street2_val] if street not in ['none', None]]) to_streets = " and ".join([street for street in [to_street_val, to_street2_val] if street not in ['none', None]]) if from_stop_val == 'none': from_stop_val = from_streets if from_streets else 'none' if from_city_val == 'none': from_city_val = self.ontology.get_default_value('in_city') if to_stop_val == 'none': to_stop_val = to_streets if to_streets else 'none' if to_city_val == 'none': to_city_val = self.ontology.get_default_value('in_city') # todo - this would be sufficient: from_place, from_area, from_geo, to_place, to_area, to_geo, vehicle return req_da, iconfirm_da, Travel(from_city=from_city_val, from_stop=from_stop_val, from_stop_geo=from_stop_geo, to_stop_geo=to_stop_geo, to_city=to_city_val, to_stop=to_stop_val, vehicle=vehicle_val, max_transfers=max_transfers_val)
[docs] def gather_weather_info(self, ds, accepted_slots): """Handles in_city and in_state to be properly filled. If needed, a Request DA is formed for missing slots to be filled. Returns Reqest DA and WeatherPoint - information about the place If the request DA is empty, the search for weather may be commenced immediately. :param ds: The current dialogue state, """ req_da = DialogueAct() # retrieve the slot variables in_state_val = ds['in_state'].mpv() if 'in_state' in accepted_slots else 'none' in_city_val = ds['in_city'].mpv() if 'in_city' in accepted_slots else 'none' in_borough_val = ds['in_city'].mpv() if 'in_borough' in accepted_slots else 'none' if in_city_val != 'none' and in_state_val == 'none': in_states = self.ontology.get_compatible_vals('city_state', in_city_val) if not in_states or not len(in_states): print "WARNING: there is no state compatible with this city: " + in_city_val elif len(in_states) == 1: in_state_val = in_states.pop() if in_city_val == 'none' and in_state_val == 'none': if in_borough_val != 'none': in_city_val = in_borough_val else: in_city_val = self.ontology.get_default_value('in_city') in_state_val = self.ontology.get_default_value('in_state') if in_state_val == 'none': req_da.extend(DialogueAct("request(in_state)")) elif in_city_val == 'none': req_da.extend(DialogueAct("request(in_city)")) return req_da, WeatherPoint(in_city_val, in_state_val)
[docs] def gather_time_info(self, ds, accepted_slots): """Handles if in_city specified it handles properly filled in_state slot. If needed, a Request DA is formed for missing in_state slot. Returns Reqest DA and in_state If the request DA is empty, the search for current_time may be commenced immediately. :param ds: The current dialogue state, """ req_da = DialogueAct() in_state_val = ds['in_state'].mpv() if 'in_state' in accepted_slots else 'none' in_city_val = ds['in_city'].mpv() if 'in_city' in accepted_slots else 'none' if in_city_val != 'none' and in_state_val == 'none': in_states = self.ontology.get_compatible_vals('city_state', in_city_val) if not in_states or not len(in_states): print "WARNING: there is no state compatible with this city: " + in_city_val elif len(in_states) == 1: in_state_val = in_states.pop() if in_city_val == 'none' and in_state_val == 'none': in_state_val = self.ontology.get_default_value('in_state') in_city_val = self.ontology.get_default_value('in_city') if in_state_val == 'none': req_da = DialogueAct("request(in_state)") # we don't know which state to choose lon = None lat = None if in_city_val != 'none' and in_state_val != 'none' and in_state_val in self.ontology['addinfo']['state']: cities = self.ontology['addinfo']['state'][in_state_val] if cities and in_city_val in cities: lat = cities[in_city_val]['lat'] lon = cities[in_city_val]['lon'] return req_da, in_city_val, in_state_val, lon, lat
[docs] def req_from_stop(self, ds): """Generates a dialogue act informing about the origin stop of the last recommended connection. TODO: this gives too much of information. Maybe it would be worth to split this into more dialogue acts and let user ask for all individual pieces of information. The good thing would be that it would lead to longer dialogues. :rtype : DialogueAct """ route = ds.directions[ds['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in leg.steps: if step.travel_mode == step.MODE_TRANSIT: da.append(DialogueActItem('inform', 'from_stop', step.departure_stop)) da.append(DialogueActItem('inform', 'vehicle', step.vehicle)) da.append(DialogueActItem('inform', 'line', step.line_name)) da.append(DialogueActItem('inform', 'headsign', step.headsign)) break return da
[docs] def req_to_stop(self, ds): """Return a DA informing about the destination stop of the last recommended connection. """ route = ds.directions[ds['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in reversed(leg.steps): if step.travel_mode == step.MODE_TRANSIT: da.append(DialogueActItem('inform', 'to_stop', step.arrival_stop)) break return da
[docs] def req_departure_time(self, dialogue_state): """Generates a dialogue act informing about the departure time from the origin stop of the last recommended connection. :rtype : DialogueAct """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in leg.steps: if step.travel_mode == step.MODE_TRANSIT: da.append(DialogueActItem('inform', 'from_stop', step.departure_stop)) da.append(DialogueActItem('inform', 'vehicle', step.vehicle)) da.append(DialogueActItem('inform', 'departure_time', step.departure_time.strftime("%I:%M:%p"))) break return da
[docs] def req_departure_time_rel(self, dialogue_state): """Return a DA informing the user about the relative time until the last recommended connection departs. """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in leg.steps: if step.travel_mode == step.MODE_TRANSIT: # construct relative time from now to departure now = datetime.now() now -= timedelta(seconds=now.second, microseconds=now.microsecond) # floor to minute start departure_time_rel = step.departure_time - now da.append(DialogueActItem('inform', 'vehicle', step.vehicle)) # the connection was missed if departure_time_rel.days < 0: da.append(DialogueActItem('apology')) da.append(DialogueActItem('inform', 'missed_connection', 'true')) # the connection is right now elif departure_time_rel.days == 0 and departure_time_rel.seconds == 0: da.append(DialogueActItem('inform', 'from_stop', step.departure_stop)) da.append(DialogueActItem('inform', 'departure_time_rel', 'now')) # future connections else: da.append(DialogueActItem('inform', 'from_stop', step.departure_stop)) departure_time_rel_hrs, departure_time_rel_mins = divmod(departure_time_rel.seconds / 60, 60) if departure_time_rel.days > 0: departure_time_rel_hrs += 24 * departure_time_rel.days da.append(DialogueActItem('inform', 'departure_time_rel', '%d:%02d' % (departure_time_rel_hrs, departure_time_rel_mins))) break return da
[docs] def req_arrival_time(self, dialogue_state): """Return a DA informing about the arrival time the destination stop of the last recommended connection. """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in reversed(leg.steps): if step.travel_mode == step.MODE_TRANSIT: da.append(DialogueActItem('inform', 'to_stop', step.arrival_stop)) da.append(DialogueActItem('inform', 'vehicle', step.vehicle)) da.append(DialogueActItem('inform', 'arrival_time', step.arrival_time.strftime("%I:%M:%p"))) break return da
[docs] def req_arrival_time_rel(self, dialogue_state): """Return a DA informing about the relative arrival time the destination stop of the last recommended connection. """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in reversed(leg.steps): if step.travel_mode == step.MODE_TRANSIT: da.append(DialogueActItem('inform', 'to_stop', step.arrival_stop)) da.append(DialogueActItem('inform', 'vehicle', step.vehicle)) # construct relative time from now to arrival now = datetime.now() now -= timedelta(seconds=now.second, microseconds=now.microsecond) # floor to minute start arrival_time_rel = step.arrival_time - now arrival_time_rel_hrs, arrival_time_rel_mins = divmod(arrival_time_rel.seconds / 60, 60) if arrival_time_rel.days > 0: arrival_time_rel_hrs += 24 * arrival_time_rel.days da.append(DialogueActItem('inform', 'arrival_time_rel', '%d:%02d' % (arrival_time_rel_hrs, arrival_time_rel_mins))) break return da
[docs] def req_duration(self, dialogue_state): """Return a DA informing about journey time to the destination stop of the last recommended connection. """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] da = DialogueAct() for step in leg.steps: if step.travel_mode == step.MODE_TRANSIT: departure_time = step.departure_time break else: departure_time = datetime.fromtimestamp(0) for step in reversed(leg.steps): if step.travel_mode == step.MODE_TRANSIT: arrival_time = step.arrival_time break else: # return time for walking (otherwise there would be mode_transit) arrival_time = datetime.fromtimestamp(leg.steps[0].duration) duration = (arrival_time - departure_time).seconds / 60 duration_hrs, duration_mins = divmod(duration, 60) if duration_hrs == 0 and duration_mins == 0: duration_mins = 1 da.append(DialogueActItem('inform', 'duration', '%d:%02d' % (duration_hrs, duration_mins))) return da
[docs] def req_distance(self, dialogue_state): """Return a DA informing the user about the distance and number of stops in the last recommended connection.""" def meters_to_miles(meters): return meters * 0.000621371 route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] res_da = DialogueAct('inform(distance="%0.1f")' % meters_to_miles(leg.distance)) steps = [(step.num_stops, step.departure_stop, step.vehicle, step.line_name) for step in leg.steps if step.travel_mode == step.MODE_TRANSIT] for stop_count, from_stop, vehice, line in steps: res_da.append(DialogueActItem('inform', 'num_stops', stop_count)) res_da.append(DialogueActItem('inform', 'from_stop', from_stop)) res_da.append(DialogueActItem('inform', 'vehicle', vehice)) res_da.append(DialogueActItem('inform', 'line', line)) return res_da
[docs] def req_num_transfers(self, dialogue_state): """Return a DA informing the user about the number of transfers in the last recommended connection. """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] n = sum([1 for step in leg.steps if step.travel_mode == step.MODE_TRANSIT]) - 1 da = DialogueAct('inform(num_transfers="%d")' % n) return da
[docs] def req_time_transfers(self, dialogue_state): """Return a DA informing the user about transfer places and time needed for the trasfer in the last recommended connection. """ route = dialogue_state.directions[dialogue_state['route_alternative']] leg = route.legs[0] # get only transit with some means of transport transits = [step for step in route.legs[0].steps if step.travel_mode == step.MODE_TRANSIT ] # get_time counts difference between two datetime objects, returns a string h:min get_time = lambda f,t: '%d:%02d' % divmod(((t - f).seconds / 60), 60) # calculate time needed as "departure_time from next stop minus arrival time to the stop" n = [ (arrive_at.arrival_stop, get_time( arrive_at.arrival_time, depart_from.departure_time)) for arrive_at, depart_from in itertools.izip(transits,transits[1:])] names = [ 'inform(time_transfers_stop="%s")&inform(time_transfers_limit="%s")' % tuple_n for tuple_n in n ] da = DialogueAct("&".join(names)) if len(names) > 0 else DialogueAct('inform(num_transfers="0")') return da
[docs] def check_directions_conflict(self, wp): """Check for conflicts in the given waypoints. Return an apology() DA if the origin and the destination are the same, or if a city is not compatible with the corresponding stop. :param wp: wayponts of the user's connection query :rtype: DialogueAct :return: apology dialogue act in case of conflict, or None """ # TODO: This is artificially added because streets now share stop slot and we don't need to check street compatibility if wp.to_stop not in self.ontology.ontology[u'slots'][u'stop'] or wp.from_stop not in self.ontology.ontology[u'slots'][u'stop']: return None # TODO: This is artificially added because boroughs now share city slot and we don't need to check borough compatibility if wp.to_city in self.ontology.ontology[u'slots'][u'borough'] or wp.from_city in self.ontology.ontology[u'slots'][u'borough']: return None # origin and destination are the same if (wp.from_city == wp.to_city) and (wp.from_stop in [wp.to_stop, None]): apology_da = DialogueAct('apology()&inform(stops_conflict="thesame")') apology_da.extend(DialogueAct(wp.get_minimal_info())) return apology_da # origin stop incompatible with origin city elif not self.ontology.is_compatible('city_stop', wp.from_city, wp.from_stop): apology_da = DialogueAct('apology()&inform(stops_conflict="incompatible")') apology_da.extend(DialogueAct('inform(from_city="%s")&inform(from_stop="%s")' % (wp.from_city, wp.from_stop))) return apology_da # destination stop incompatible with destination city elif not self.ontology.is_compatible('city_stop', wp.to_city, wp.to_stop): apology_da = DialogueAct('apology()&inform(stops_conflict="incompatible")') apology_da.extend(DialogueAct('inform(to_city="%s")&inform(to_stop="%s")' % (wp.to_city, wp.to_stop))) return apology_da return None
[docs] def check_city_state_conflict(self, in_city, in_state): """Check for conflicts in the given city and state. Return an apology() DA if the state and city is incompatible. :param in_city: city slot value :param in_state: state slot value :rtype: DialogueAct :return: apology dialogue act in case of conflict, or None """ if not self.ontology.is_compatible('city_state', in_city, in_state): apology_da = DialogueAct('apology()&inform(cities_conflict="incompatible")') apology_da.extend(DialogueAct('inform(in_city="%s")&inform(in_state="%s")' % (in_city, in_state))) return apology_da return None
[docs] def get_directions(self, ds, route_type='true', check_conflict=False): """Retrieve Google directions, save them to dialogue state and return corresponding DAs. Responsible for the interpretation of AM/PM time expressions. :param ds: The current dialogue state :param route_type: a label for the found route (to be passed on to \ :func:`say_directions`) :param check_conflict: If true, will check if the origin and \ destination stops are different and issue a warning DA if not. :rtype: DialogueAct """ conn_info = ds.conn_info # check for route conflicts if check_conflict: apology_da = self.check_directions_conflict(conn_info) if apology_da is not None: if 'route_alternative' in ds: ds.directions = None del ds['route_alternative'] return apology_da # get dialogue state values departure_time = ds['departure_time'].mpv() departure_time_rel = ds['departure_time_rel'].mpv() arrival_time = ds['arrival_time'].mpv() arrival_time_rel = ds['arrival_time_rel'].mpv() date_rel = ds['date_rel'].mpv() ampm = ds['ampm'].mpv() time = ds['time'].mpv() time_rel = ds['time_rel'].mpv() # interpret departure and arrival time departure_ts, arrival_ts = None, None if arrival_time != 'none' or arrival_time_rel != 'none': arrival_ts, _ = self.interpret_time(arrival_time, ampm, arrival_time_rel, date_rel, ds['lta_arrival_time'].mpv()) else: lta_departure_time = ds['lta_departure_time'].mpv() lta_time = ds['lta_time'].mpv() lta_time = lta_departure_time if lta_departure_time != 'none' else lta_time time_abs = departure_time if departure_time != 'none' else time time_rel = departure_time_rel if departure_time_rel != 'none' else time_rel departure_ts, _ = self.interpret_time(time_abs, ampm, time_rel, date_rel, lta_time) # retrieve transit directions ds.directions = self.directions.get_directions(conn_info, departure_time=departure_ts, arrival_time=arrival_ts) return self.process_directions_for_output(ds, route_type)
ORIGIN = 'ORIGIN' DESTIN = 'FINAL_DEST'
[docs] def process_directions_for_output(self, dialogue_state, route_type): """Return DAs for the directions in the current dialogue state. If the directions are not valid (nothing found), delete their object from the dialogue state and return apology DAs. :param dialogue_state: the current dialogue state :param route_type: the route type requested by the user ("last", "next" etc.) :rtype: DialogueAct """ if not isinstance(dialogue_state['route_alternative'], int): dialogue_state['route_alternative'] = 0 try: # get the alternative we want to say now route = dialogue_state.directions[dialogue_state['route_alternative']] # only 1 leg should be present in case we have no waypoints steps = route.legs[0].steps except IndexError: # this will lead to apology that no route has been found steps = [] #dialogue_state.directions = None del dialogue_state['route_alternative'] res = [] # introduction if len(dialogue_state.directions) > 1: res.append('inform(found_directions="%s")' % route_type) if route_type != "last": res.append("inform(alternative=%s)" % word_for_number(dialogue_state['route_alternative'] + 1, True)) # route description prev_arrive_stop = self.ORIGIN # remember previous arrival stop for step_ndx, step in enumerate(steps): # find out what will be the next departure stop (needed later) next_leave_stop = self.DESTIN if step_ndx < len(steps) - 2 and steps[step_ndx + 1].travel_mode == step.MODE_WALKING: next_leave_stop = steps[step_ndx + 2].departure_stop elif step_ndx < len(steps) - 1 and steps[step_ndx + 1].travel_mode == step.MODE_TRANSIT: next_leave_stop = steps[step_ndx + 1].departure_stop # walking if step.travel_mode == step.MODE_WALKING: # walking to stops with different names # todo: delete those after merging hdc slu with origin master directions_to_stop = expand_stop(dialogue_state.directions.to_stop) directions_from_stop = expand_stop(dialogue_state.directions.from_stop) if (next_leave_stop == self.DESTIN and prev_arrive_stop != directions_to_stop) or \ (prev_arrive_stop == self.ORIGIN and next_leave_stop != directions_from_stop) or \ (next_leave_stop != self.DESTIN and prev_arrive_stop != self.ORIGIN and next_leave_stop != prev_arrive_stop): # walking destination: next departure stop res.append("inform(walk_to=%s)" % next_leave_stop) #res.append("inform(duration=0:%02d)" % (step.duration / 60)) # public transport elif step.travel_mode == step.MODE_TRANSIT: res.append("inform(vehicle=%s)" % step.vehicle) res.append("inform(line=%s)" % step.line_name) res.append("inform(departure_time=%s)" % step.departure_time.strftime("%I:%M:%p")) # only mention departure if it differs from previous arrival if step.departure_stop != prev_arrive_stop: res.append("inform(enter_at=%s)" % step.departure_stop) res.append("inform(headsign=%s)" % step.headsign) res.append("inform(exit_at=%s)" % step.arrival_stop) # only mention transfer if there is one if next_leave_stop != self.DESTIN: res.append("inform(transfer='true')") prev_arrive_stop = step.arrival_stop # no route found: apologize if len(res) == 0: res.append('apology()') res.append(dialogue_state.directions.get_minimal_info()) res_da = DialogueAct("&".join(res)) return res_da
DEFAULT_AMPM_TIMES = {'morning': "06:00", 'am': "10:00", 'pm': "15:00", 'evening': "18:00", 'night': "00:00"}
[docs] def interpret_time(self, time_abs, time_ampm, time_rel, date_rel, lta_time): """Interpret time, given current dialogue state most probable values for relative and absolute time and date, plus the corresponding last-talked-about value. :return: the inferred time value + flag indicating the inferred time type ('abs' or 'rel') :rtype: tuple(datetime, string) """ now = datetime.now() now -= timedelta(seconds=now.second, microseconds=now.microsecond) # floor to minute start # use only last-talked-about time (of any type -- departure/arrival) if (time_abs != 'none' or date_rel != 'none') and time_rel != 'none': if lta_time.endswith('time_rel'): time_abs = 'none' date_rel = 'none' elif lta_time.endswith('time') or lta_time == 'date_rel': time_rel = 'none' # remove bogus values (i.e. "now") from time_abs if not re.match('^[0-2]?[0-9]:[0-5][0-9]$', time_abs): time_abs = 'none' # relative time if (time_abs == 'none' and time_ampm == 'none' and date_rel == 'none') or time_rel != 'none': time_type = 'rel' time_abs = now if time_rel not in ['none', 'now']: trel_parse = datetime.strptime(time_rel, "%H:%M") time_abs += timedelta(hours=trel_parse.hour, minutes=trel_parse.minute) # absolute time (with relative date) else: time_type = 'abs' if time_abs == 'none': if time_ampm != 'none': time_abs = self.DEFAULT_AMPM_TIMES[time_ampm] elif date_rel != 'none': time_abs = "%02d:%02d" % (now.hour, now.minute) time_parsed = datetime.combine(now, datetime.strptime(time_abs, "%H:%M").time()) time_hour = time_parsed.hour now_hour = now.hour # handle 12hr time if time_hour >= 1 and time_hour <= 12: # interpret AM/PM if time_ampm != 'none': # 'pm' ~ 12pm till 11:59pm if time_ampm == 'pm' and time_hour < 12: time_hour += 12 # 'am'/'morning' ~ 12am till 11:59am elif time_ampm in ['am', 'morning'] and time_hour == 12: time_hour = 0 # 'evening' ~ 4pm till 3:59am elif time_ampm == 'evening' and time_hour >= 4: time_hour = (time_hour + 12) % 24 # 'night' ~ 6pm till 5:59am elif time_ampm == 'night' and time_hour >= 6: time_hour = (time_hour + 12) % 24 # 12hr time + no AM/PM set + today or no date set: default to next 12hrs elif date_rel in ['none', 'today'] and now_hour > time_hour and now_hour < time_hour + 12: time_hour = (time_hour + 12) % 24 time_abs = datetime.combine(now, dttime(time_hour, time_parsed.minute)) # relative date if date_rel == 'tomorrow': time_abs += timedelta(days=1) elif date_rel == 'day_after_tomorrow': time_abs += timedelta(days=2) elif time_abs < now: time_abs += timedelta(days=1) return time_abs, time_type
[docs] def get_limited_context_help(self, dialogue_state): res_da = DialogueAct() # if we do not understand the input then provide the context sensitive help if not 'route_alternative' in dialogue_state: # before something is offered if randbool(10): res_da.append(DialogueActItem("help", "task", "weather")) elif randbool(10): res_da.append(DialogueActItem("help", "request", "current_time")) elif randbool(10): res_da.append(DialogueActItem("help", "inform", "hangup")) elif randbool(9): res_da.append(DialogueActItem("help", "request", "help")) elif randbool(8): res_da.append(DialogueActItem("help", "inform", "departure_time")) elif randbool(7): res_da.append(DialogueActItem("help", "repeat")) elif not dialogue_state['from_stop'].test("none", self.accept_prob, neg_val=True): res_da.append(DialogueActItem("help", "inform", "from_stop")) elif not dialogue_state['to_stop'].test("none", self.accept_prob, neg_val=True): res_da.append(DialogueActItem("help", "inform", "to_stop")) else: res_da.append(DialogueActItem("silence")) else: # we already offered a connection if randbool(4): res_da.append(DialogueActItem("help", "inform", "alternative_last")) elif randbool(7): res_da.append(DialogueActItem("help", "inform", "alternative_next")) elif randbool(6): res_da.append(DialogueActItem("help", "inform", "alternative_prev")) elif randbool(5): res_da.append(DialogueActItem("help", "inform", "alternative_abs")) elif randbool(4): res_da.append(DialogueActItem("help", "request", "from_stop")) elif randbool(3): res_da.append(DialogueActItem("help", "request", "to_stop")) elif randbool(2): res_da.append(DialogueActItem("help", "request", "num_transfers")) else: res_da.append(DialogueActItem("silence")) return res_da
[docs] def get_help_res_da(self, ds, accepted_slots, state_changed): topics_alternatives = ['inform="alternative_abs"', 'inform="alternative_prev"', 'inform="alternative_next"', 'inform="alternative_last"', ] topics_stops = ['inform="from_stop"', 'inform="to_stop"', 'request="from_stop"', 'request="to_stop"', 'request="num_transfers"', 'inform="num_transfers"',] topics_time = ['inform="departure_time"', 'request="current_time"', ] topics_general = ['', 'repeat', 'inform="hangup"', ] topics_weather = ['task="weather"', ] topics_connection = topics_alternatives + topics_stops + topics_time task = ds['task'].mpv() if 'task' in accepted_slots else '' if task == "weather": topics = topics_weather elif task == 'find_connection': topics = topics_connection else: topics = topics_general rand_theme = topics[random.randint(0, len(topics) - 1)] return DialogueAct("help(%s)" % rand_theme)
[docs] def fix_stop_street_slots(self, changed_slots): from_list_stop = ['from_street', 'from_street2', 'from_stop', 'from_street'] from_list_city = ['from_borough', 'from_city'] to_list_city = ['to_borough', 'to_city'] to_list_stop = ['to_street', 'to_street2', 'to_stop', 'to_street'] from_stop_value = ' and '.join([changed_slots.pop(slot).mpv() for slot in from_list_stop if slot in changed_slots]) to_stop_value = ' and '.join([changed_slots.pop(slot).mpv() for slot in to_list_stop if slot in changed_slots]) from_city_value = ' and '.join([changed_slots.pop(slot).mpv() for slot in from_list_city if slot in changed_slots]) to_city_value = ' and '.join([changed_slots.pop(slot).mpv() for slot in to_list_city if slot in changed_slots]) if from_stop_value: changed_slots['from_stop'] = D3DiscreteValue({from_stop_value: 1.0, 'none': 0.0}) if from_city_value: changed_slots['from_city'] = D3DiscreteValue({from_city_value: 1.0, 'none': 0.0}) if to_stop_value: changed_slots['to_stop'] = D3DiscreteValue({to_stop_value: 1.0, 'none': 0.0}) if to_city_value: changed_slots['to_city'] = D3DiscreteValue({to_city_value: 1.0, 'none': 0.0}) return changed_slots