Source code for alex.components.hub.dm

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import unicode_literals

import multiprocessing
import time
import random
import urllib

from alex.components.slu.da import DialogueAct, DialogueActItem, DialogueActConfusionNetwork
from alex.components.hub.messages import Command, SLUHyp, DMDA
from alex.components.dm.common import dm_factory, get_dm_type
from alex.components.dm.exceptions import DMException
from alex.utils.procname import set_proc_name


[docs]class DM(multiprocessing.Process): """DM accepts N-best list hypothesis or a confusion network generated by an SLU component. The result of this component is an output dialogue act. When the component receives an SLU hypothesis then it immediately responds with an dialogue act. This component is a wrapper around multiple dialogue managers which handles multiprocessing communication. """ def __init__(self, cfg, commands, slu_hypotheses_in, dialogue_act_out, close_event): multiprocessing.Process.__init__(self) self.cfg = cfg self.commands = commands self.slu_hypotheses_in = slu_hypotheses_in self.dialogue_act_out = dialogue_act_out self.close_event = close_event self.last_user_da_time = time.time() self.last_user_diff_time = time.time() self.epilogue_state = None dm_type = get_dm_type(cfg) self.dm = dm_factory(dm_type, cfg) self.dm.new_dialogue() self.codes = ["%04d" % i for i in range(0, 10000)] # random.seed(self.cfg['DM']['epilogue']['code_seed']) random.shuffle(self.codes) self.test_code_server_connection()
[docs] def process_pending_commands(self): """Process all pending commands. Available commands: stop() - stop processing and exit the process flush() - flush input buffers. Now it only flushes the input connection. Return True if the process should terminate. """ while self.commands.poll(): command = self.commands.recv() if self.cfg['DM']['debug']: self.cfg['Logging']['system_logger'].debug(command) if isinstance(command, Command): if command.parsed['__name__'] == 'stop': return True if command.parsed['__name__'] == 'flush': # discard all data in in input buffers while self.slu_hypotheses_in.poll(): data_in = self.slu_hypotheses_in.recv() self.dm.end_dialogue() self.commands.send(Command("flushed()", 'DM', 'HUB')) return False if command.parsed['__name__'] == 'new_dialogue': self.epilogue_state = None self.dm.new_dialogue() self.cfg['Logging']['session_logger'].turn("system") self.dm.log_state() # I should generate the first DM output da = self.dm.da_out() if self.cfg['DM']['debug']: s = [] s.append("DM Output") s.append("-"*60) s.append(unicode(da)) s.append("") s = '\n'.join(s) self.cfg['Logging']['system_logger'].debug(s) self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB')) return False if command.parsed['__name__'] == 'end_dialogue': self.dm.end_dialogue() return False if command.parsed['__name__'] == 'timeout': # check whether there is a looong silence # if yes then inform the DM silence_time = command.parsed['silence_time'] cn = DialogueActConfusionNetwork() cn.add(1.0, DialogueActItem('silence','time', silence_time)) # process the input DA self.dm.da_in(cn) self.cfg['Logging']['session_logger'].turn("system") self.dm.log_state() if self.epilogue_state and float(silence_time) > 5.0: # a user was silent for too long, therefore hung up self.cfg['Logging']['session_logger'].dialogue_act("system", self.epilogue_da) self.commands.send(DMDA(self.epilogue_da, 'DM', 'HUB')) self.commands.send(Command('hangup()', 'DM', 'HUB')) else: da = self.dm.da_out() if self.cfg['DM']['debug']: s = [] s.append("DM Output") s.append("-"*60) s.append(unicode(da)) s.append("") s = '\n'.join(s) self.cfg['Logging']['system_logger'].debug(s) self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB')) if da.has_dat("bye"): self.commands.send(Command('hangup()', 'DM', 'HUB')) return False return False
[docs] def epilogue_final_question(self): da = DialogueAct('say(text="{text}")'.format(text=self.cfg['DM']['epilogue']['final_question'])) self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB'))
[docs] def epilogue_final_apology(self): # apology for not reaching minimum number of turns text = self.cfg['DM']['epilogue']['final_code_text_min_turn_count_not_reached'] da = DialogueAct('say(text="{text}")'.format(text=text)) self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB'))
[docs] def epilogue_final_code(self): code = self.codes.pop() # pull the url url = self.cfg['DM']['epilogue']['final_code_url'].format(code = code) urllib.urlopen(url) text = [c for c in code] text = ", ".join(text) text = self.cfg['DM']['epilogue']['final_code_text'].format(code = text) text = [text,]*3 text = self.cfg['DM']['epilogue']['final_code_text_repeat'].join(text) da = DialogueAct('say(text="{text}")'.format(text=text)) self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB')) self.final_code_given = True
[docs] def epilogue(self): """ Gives the user last information before hanging up. :return the name of the activity or None """ if self.cfg['DM']['epilogue']['final_question']: self.epilogue_final_question() return 'final_question' elif self.cfg['DM']['epilogue']['final_code_url']: if self.dm.dialogue_state.turn_number < self.cfg['DM']['epilogue']['final_code_min_turn_count']: self.epilogue_final_apology() else: self.epilogue_final_code() return None
[docs] def read_slu_hypotheses_write_dialogue_act(self): # read SLU hypothesis if self.slu_hypotheses_in.poll(): # read SLU hypothesis data_slu = self.slu_hypotheses_in.recv() if self.epilogue_state: # we have got another turn, now we can hang up. self.cfg['Logging']['session_logger'].turn("system") self.dm.log_state() self.cfg['Logging']['session_logger'].dialogue_act("system", self.epilogue_da) self.commands.send(DMDA(self.epilogue_da, 'DM', 'HUB')) self.commands.send(Command('hangup()', 'DM', 'HUB')) elif isinstance(data_slu, SLUHyp): # reset measuring of the user silence self.last_user_da_time = time.time() self.last_user_diff_time = time.time() # process the input DA self.dm.da_in(data_slu.hyp, utterance=data_slu.asr_hyp) self.cfg['Logging']['session_logger'].turn("system") self.dm.log_state() da = self.dm.da_out() # do not communicate directly with the NLG, let the HUB decide # to do work. The generation of the output must by synchronised with the input. if da.has_dat("bye"): self.epilogue_state = self.epilogue() self.epilogue_da = da if not self.epilogue_state: self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB')) self.commands.send(Command('hangup()', 'DM', 'HUB')) else: if self.cfg['DM']['debug']: s = [] s.append("DM Output") s.append("-"*60) s.append(unicode(da)) s.append("") s = '\n'.join(s) self.cfg['Logging']['system_logger'].debug(s) self.cfg['Logging']['session_logger'].dialogue_act("system", da) self.commands.send(DMDA(da, 'DM', 'HUB')) elif isinstance(data_slu, Command): self.cfg['Logging']['system_logger'].info(data_slu) else: raise DMException('Unsupported input.')
[docs] def run(self): try: set_proc_name("Alex_DM") self.cfg['Logging']['session_logger'].cancel_join_thread() while 1: # Check the close event. if self.close_event.is_set(): print 'Received close event in: %s' % multiprocessing.current_process().name return time.sleep(self.cfg['Hub']['main_loop_sleep_time']) s = (time.time(), time.clock()) # process all pending commands if self.process_pending_commands(): return # process the incoming SLU hypothesis self.read_slu_hypotheses_write_dialogue_act() d = (time.time() - s[0], time.clock() - s[1]) if d[0] > 0.200: print "EXEC Time inner loop: DM t = {t:0.4f} c = {c:0.4f}\n".format(t=d[0], c=d[1]) except KeyboardInterrupt: print 'KeyboardInterrupt exception in: %s' % multiprocessing.current_process().name self.close_event.set() return except: self.cfg['Logging']['system_logger'].exception('Uncaught exception in the DM process.') self.close_event.set() raise print 'Exiting: %s. Setting close event' % multiprocessing.current_process().name self.close_event.set()
[docs] def test_code_server_connection(self): """ this opens a test connection to our code server, content of the response is not important if our server is down this call will fail and the vm will crash. this is more sensible to CF people, otherwise CF contributor would do the job without getting paid. """ if self.cfg['DM']['epilogue']['final_question'] is None and self.cfg['DM']['epilogue']['final_code_url'] is not None: url = self.cfg['DM']['epilogue']['final_code_url'].format(code='test') urllib.urlopen(url)