Add. Event handler to support MWI. (#1720)

* Add. Event handler to support MWI.

* Fix. store cache only when get data from memcache

* Change. Use UUID as PID.
This commit is contained in:
Alexey Melnichuk 2016-06-30 18:55:37 +03:00 committed by FusionPBX
parent ef18ad5fbe
commit ba857b6acf
6 changed files with 298 additions and 56 deletions

View File

@ -0,0 +1,132 @@
require "resources.functions.config"
require "resources.functions.split"
local log = require "resources.functions.log".mwi_subscribe
local file = require "resources.functions.file"
local Database = require "resources.functions.database"
local ievents = require "resources.functions.ievents"
local IntervalTimer = require "resources.functions.interval_timer"
local cache = require "resources.functions.cache"
local api = require "resources.functions.api"
local vm_to_uuid_sql = [[SELECT v.voicemail_uuid
FROM v_voicemails as v inner join v_domains as d on v.domain_uuid = d.domain_uuid
WHERE v.voicemail_id = '%s' and d.domain_name = '%s']]
local vm_messages_sql = [[SELECT
( SELECT count(*)
FROM v_voicemail_messages
WHERE voicemail_uuid = %s
AND (message_status is null or message_status = '')
) as new_messages,
( SELECT count(*)
FROM v_voicemail_messages
WHERE voicemail_uuid = %s
AND message_status = 'saved'
) as saved_messages
]]
local function vm_message_count(account, use_cache)
local id, domain_name = split_first(account, '@', true)
if not domain_name then return end
-- FusionPBX support only numeric voicemail id
if not tonumber(id) then
log.warningf('non numeric voicemail id: %s', id)
return
end
local dbh = Database.new('system')
if not dbh then return end
local uuid
if use_cache and cache.support() then
local uuid = cache.get('voicemail_uuid:' .. account)
if not uuid then
local sql = string.format(vm_to_uuid_sql,
dbh:escape(id), dbh:escape(domain_name)
)
uuid = dbh:first_value(sql)
if uuid and #uuid > 0 then
cache.set('voicemail_uuid:' .. account, uuid, 3600)
end
end
end
local sql
if uuid and #uuid > 0 then
sql = string.format(vm_messages_sql,
dbh:quoted(uuid), dbh:quoted(uuid)
)
else
local uuid_sql = '(' .. string.format(vm_to_uuid_sql,
dbh:escape(id), dbh:escape(domain_name)
) .. ')'
sql = string.format(vm_messages_sql,
uuid_sql, uuid_sql
)
end
local row = sql and dbh:first_row(sql)
dbh:release()
if not row then return end
return row.new_messages, row.saved_messages
end
local function mwi_notify(account, new_messages, saved_messages)
local event = freeswitch.Event("message_waiting")
if (new_messages == "0") then
event:addHeader("MWI-Messages-Waiting", "no")
else
event:addHeader("MWI-Messages-Waiting", "yes")
end
event:addHeader("MWI-Message-Account", "sip:" .. account)
event:addHeader("MWI-Voice-Message", new_messages.."/"..saved_messages.." (0/0)")
return event:fire()
end
local sleep = 60000
local pid_file = scripts_dir .. "/run/mwi_subscribe.tmp"
local pid = api:execute("create_uuid") or tostring(api:getTime())
file.write(pid_file, pid)
log.notice("start");
local timer = IntervalTimer.new(sleep):start()
for event in ievents("MESSAGE_QUERY", 1, timer:rest()) do
if (not event) or (timer:rest() < 1000) then
if not file.exists(pid_file) then break end
local stored = file.read(pid_file)
if stored and stored ~= pid then break end
timer:restart()
end
if event then
-- log.notice("event:" .. event:serialize("xml"));
local account_header = event:getHeader('Message-Account')
if account_header then
local proto, account = split_first(account_header, ':', true)
if (not account) or (proto ~= 'sip' and proto ~= 'sips') then
log.warningf("invalid format for voicemail id: %s", account_header)
else
local new_messages, saved_messages = vm_message_count(account)
if not new_messages then
log.warningf('can not find voicemail: %s', account)
else
log.noticef('voicemail %s has %s/%s messages', account, new_messages, saved_messages)
mwi_notify(account, new_messages, saved_messages)
end
end
end
end
end
log.notice("stop")

View File

@ -1,25 +1,13 @@
require "resources.functions.config"
require "resources.functions.split"
local log = require "resources.functions.log".call_flow_subscribe
local file = require "resources.functions.file"
local presence_in = require "resources.functions.presence_in"
local Database = require "resources.functions.database"
local ievents = function(events, ...)
if type(events) == 'string' then
events = freeswitch.EventConsumer(events)
end
local block, timeout = ...
if timeout and (timeout == 0) then block, timeout = 0, 0 end
timeout = timeout or 0
return function()
local event = events:pop(block, timeout)
return not event, event
end
end
local log = require "resources.functions.log".call_flow_subscribe
local file = require "resources.functions.file"
local presence_in = require "resources.functions.presence_in"
local Database = require "resources.functions.database"
local ievents = require "resources.functions.ievents"
local IntervalTimer = require "resources.functions.interval_timer"
local api = require "resources.functions.api"
local find_call_flow_sql = [[select t1.call_flow_uuid, t1.call_flow_status
from v_call_flows t1 inner join v_domains t2 on t1.domain_uuid = t2.domain_uuid
@ -38,52 +26,23 @@ local function find_call_flow(user)
return row.call_flow_uuid, row.call_flow_status
end
local IntervalTimer = {} do
IntervalTimer.__index = IntervalTimer
function IntervalTimer.new(interval)
local o = setmetatable({}, IntervalTimer)
o._interval = interval
return o
end
function IntervalTimer:rest()
local d = self._interval - os.difftime(os.time(), self._begin)
if d < 0 then d = 0 end
return d
end
function IntervalTimer:start()
self._begin = os.time()
return self
end
function IntervalTimer:stop()
self._begin = nil
return self
end
end
local sleep = 60000
local pid_file = scripts_dir .. "/run/call_flow_subscribe.tmp"
local pid = tostring(os.clock())
local pid = api:execute("create_uuid") or tostring(api:getTime())
file.write(pid_file, pid)
log.notice("start call_flow_subscribe");
local timer = IntervalTimer.new(60):start()
local timer = IntervalTimer.new(sleep):start()
for timeout, event in ievents("PRESENCE_PROBE", 1, timer:rest() * 1000) do
if timeout or timer:rest() == 0 then
for event in ievents("PRESENCE_PROBE", 1, timer:rest()) do
if (not event) or (timer:rest() < 1000) then
if not file.exists(pid_file) then break end
local stored = file.read(pid_file)
if stored then
if stored ~= pid then break end
else
if not file.exists(pid_file) then break end
end
timer:start()
if stored and stored ~= pid then break end
timer:restart()
end
if event then

View File

@ -0,0 +1,54 @@
-- Decode result of api execute command to Lua way.
-- in case of error function returns `nil` and `error message`.
-- in other case function return result as is.
local function api_result(result)
if string.find(result, '^%-ERR') or string.find(result, '^INVALID COMMAND!') then
return nil, string.match(result, "(.-)%s*$")
end
return result
end
local function class(base)
local t = base and setmetatable({}, base) or {}
t.__index = t
t.__class = t
t.__base = base
function t.new(...)
local o = setmetatable({}, t)
if o.__init then
if t == ... then -- we call as Class:new()
return o:__init(select(2, ...))
else -- we call as Class.new()
return o:__init(...)
end
end
return o
end
return t
end
local API = class() do
function API:__init(...)
self._api = freeswitch.API(...)
return self
end
function API:execute(...)
return api_result(self._api:execute(...))
end
function API:executeString(...)
return api_result(self._api:executeString(...))
end
function API:getTime()
return self._api:getTime()
end
end
return API.new()

View File

@ -0,0 +1,17 @@
local ievents = function(events, ...)
if type(events) == 'string' then
events = freeswitch.EventConsumer(events)
end
local block, timeout = ...
if timeout == 0 then block, timeout = 0, 0 end
timeout = timeout or 0
return function()
local event = events:pop(block, timeout)
if not event then return false end
return event
end
end
return ievents

View File

@ -0,0 +1,64 @@
local os_time = {
now = function() return os.time() end;
elapsed = function(t) return os.difftime(os.time(), t) end;
ms_to_time = function(ms) return ms / 1000 end;
time_to_ms = function(t) return t * 1000 end;
}
local os_clock = {
now = function() return os.clock() end;
elapsed = function(t) return os.clock() - t end;
ms_to_time = function(ms) return ms / 1000 end;
time_to_ms = function(t) return t * 1000 end;
}
local IntervalTimer = {} do
IntervalTimer.__index = IntervalTimer
function IntervalTimer.new(interval, timer)
local o = setmetatable({}, IntervalTimer)
o._interval = interval
o._timer = timer or os_clock
return o
end
function IntervalTimer:start()
assert(not self:started())
return self:restart()
end
function IntervalTimer:restart()
self._begin = self._timer.now()
return self
end
function IntervalTimer:started()
return not not self._begin
end
function IntervalTimer:elapsed()
assert(self:started())
local e = self._timer.elapsed(self._begin)
return self._timer.time_to_ms(e)
end
function IntervalTimer:rest()
local d = self._interval - self:elapsed()
if d < 0 then d = 0 end
return d
end
function IntervalTimer:stop()
if self:started() then
local d = self:elapsed()
self._begin = nil
return d
end
end
end
return {
new = IntervalTimer.new;
}

View File

@ -28,5 +28,21 @@
<!-- FusionPBX: Run FAX server queue poller -->
<!--<param name="startup-script" value="app/fax/resources/scripts/fax_queue_monitor.lua"/>-->
<!-- FusionPBX: Support BLF for call flow -->
<!-- There 2 way to handle this
1 - Monitor - ignore SUBSCRIBE and just send NOTIFY each X seconds
2 - Event handler - handle each SUBSCRIBE request
-->
<!--<param name="startup-script" value="call_flow_subscribe.lua"/>-->
<!--<param name="startup-script" value="call_flow_monitor.lua"/>-->
<!-- FusionPBX: Support MWI indicator-->
<!-- There 2 way to handle this
1 - Monitor - ignore SUBSCRIBE and just send NOTIFY each X seconds
2 - Event handler - handle each SUBSCRIBE request
-->
<!--<param name="startup-script" value="app/voicemail/resources/scripts/mwi.lua"/>-->
<!--<param name="startup-script" value="app/voicemail/resources/scripts/mwi_subscribe.lua"/>-->
</settings>
</configuration>