diff --git a/resources/install/scripts/app/voicemail/resources/scripts/mwi_subscribe.lua b/resources/install/scripts/app/voicemail/resources/scripts/mwi_subscribe.lua index b9369f03e1..953ca152a3 100644 --- a/resources/install/scripts/app/voicemail/resources/scripts/mwi_subscribe.lua +++ b/resources/install/scripts/app/voicemail/resources/scripts/mwi_subscribe.lua @@ -2,33 +2,36 @@ require "resources.functions.config" require "resources.functions.split" local log = require "resources.functions.log".mwi_subscribe -local file = require "resources.functions.file" +local EventConsumer = require "resources.functions.event_consumer" local Database = require "resources.functions.database" -local ievents = require "resources.functions.ievents" -local IntervalTimer = require "resources.functions.interval_timer" local cache = require "resources.functions.cache" -local api = require "resources.functions.api" local mwi_notify = require "app.voicemail.resources.functions.mwi_notify" +local sleep = 60000 +local pid_file = scripts_dir .. "/run/mwi_subscribe.tmp" +local shutdown_event = "CUSTOM::fusion::mwi::shutdown" + +local vm_message_count do + local vm_to_uuid_sql = [[SELECT v.voicemail_uuid FROM v_voicemails as v inner join v_domains as d on v.domain_uuid = d.domain_uuid WHERE v.voicemail_id = '%s' and d.domain_name = '%s']] local vm_messages_sql = [[SELECT ( SELECT count(*) - FROM v_voicemail_messages - WHERE voicemail_uuid = %s - AND (message_status is null or message_status = '') + FROM v_voicemail_messages + WHERE voicemail_uuid = %s + AND (message_status is null or message_status = '') ) as new_messages, ( SELECT count(*) - FROM v_voicemail_messages - WHERE voicemail_uuid = %s - AND message_status = 'saved' + FROM v_voicemail_messages + WHERE voicemail_uuid = %s + AND message_status = 'saved' ) as saved_messages ]] -local function vm_message_count(account, use_cache) +function vm_message_count(account, use_cache) local id, domain_name = split_first(account, '@', true) if not domain_name then return end @@ -80,52 +83,48 @@ local function vm_message_count(account, use_cache) return row.new_messages, row.saved_messages end -local sleep = 60000 -local pid_file = scripts_dir .. "/run/mwi_subscribe.tmp" -local pid = api:execute("create_uuid") or tostring(api:getTime()) -file.write(pid_file, pid) - -log.notice("start"); - -local timer = IntervalTimer.new(sleep):start() - -for event in ievents({"MESSAGE_QUERY", "SHUTDOWN"}, 1, timer:rest()) do - if (not event) or (timer:rest() < 1000) then - if not file.exists(pid_file) then break end - local stored = file.read(pid_file) - if stored and stored ~= pid then break end - timer:restart() - end - - if event then - -- log.notice("event:" .. event:serialize("xml")); - local event_name = event:getHeader('Event-Name') - if event_name == 'MESSAGE_QUERY' then - local account_header = event:getHeader('Message-Account') - if account_header then - local proto, account = split_first(account_header, ':', true) - - if (not account) or (proto ~= 'sip' and proto ~= 'sips') then - log.warningf("invalid format for voicemail id: %s", account_header) - else - local new_messages, saved_messages = vm_message_count(account) - if not new_messages then - log.warningf('can not find voicemail: %s', account) - else - log.noticef('voicemail %s has %s/%s messages', account, new_messages, saved_messages) - mwi_notify(account, new_messages, saved_messages) - end - end - end - elseif event_name == 'SHUTDOWN' then - log.notice("shutdown") - break - end - end end -if file.read(pid_file) == pid then - file.remove(pid_file) +local events = EventConsumer.new(sleep, pid_file) + +-- FS shutdown +events:bind("SHUTDOWN", function(self, name, event) + log.notice("shutdown") + return self:stop() +end) + +-- shutdown command +if shutdown_event then + events:bind(shutdown_event, function(self, name, event) + log.notice("shutdown") + return self:stop() + end) end +-- MWI SUBSCRIBE +events:bind("MESSAGE_QUERY", function(self, name, event) + local account_header = event:getHeader('Message-Account') + if not account_header then + return log.warningf("MWI message without `Message-Account` header") + end + + local proto, account = split_first(account_header, ':', true) + + if (not account) or (proto ~= 'sip' and proto ~= 'sips') then + return log.warningf("invalid format for voicemail id: %s", account_header) + end + + local new_messages, saved_messages = vm_message_count(account) + if not new_messages then + return log.warningf('can not find voicemail: %s', account) + end + + log.noticef('voicemail %s has %s/%s message(s)', account, new_messages, saved_messages) + mwi_notify(account, new_messages, saved_messages) +end) + +log.notice("start") + +events:run() + log.notice("stop") diff --git a/resources/install/scripts/call_flow_subscribe.lua b/resources/install/scripts/call_flow_subscribe.lua index a4355fca30..6db71120b6 100644 --- a/resources/install/scripts/call_flow_subscribe.lua +++ b/resources/install/scripts/call_flow_subscribe.lua @@ -2,19 +2,18 @@ require "resources.functions.config" require "resources.functions.split" local log = require "resources.functions.log".call_flow_subscribe -local file = require "resources.functions.file" +local EventConsumer = require "resources.functions.event_consumer" local presence_in = require "resources.functions.presence_in" local Database = require "resources.functions.database" -local ievents = require "resources.functions.ievents" -local IntervalTimer = require "resources.functions.interval_timer" -local api = require "resources.functions.api" + +local find_call_flow do local find_call_flow_sql = [[select t1.call_flow_uuid, t1.call_flow_status from v_call_flows t1 inner join v_domains t2 on t1.domain_uuid = t2.domain_uuid where t2.domain_name = '%s' and t1.call_flow_feature_code = '%s' ]] -local function find_call_flow(user) +function find_call_flow(user) local ext, domain_name = split_first(user, '@', true) if not domain_name then return end local dbh = Database.new('system') @@ -26,55 +25,50 @@ local function find_call_flow(user) return row.call_flow_uuid, row.call_flow_status end +end + local sleep = 60000 local pid_file = scripts_dir .. "/run/call_flow_subscribe.tmp" +local shutdown_event = "CUSTOM::fusion::flow::shutdown" -local pid = api:execute("create_uuid") or tostring(api:getTime()) +local events = EventConsumer.new(sleep, pid_file) -file.write(pid_file, pid) +-- FS shutdown +events:bind("SHUTDOWN", function(self, name, event) + log.notice("shutdown") + return self:stop() +end) -log.notice("start"); +-- shutdown command +if shutdown_event then + events:bind(shutdown_event, function(self, name, event) + log.notice("shutdown") + return self:stop() + end) +end -local timer = IntervalTimer.new(sleep):start() +-- FS receive SUBSCRIBE to BLF from device +events:bind("PRESENCE_PROBE", function(self, name, event) + --handle only blf with `flow+` prefix + if event:getHeader('proto') ~= 'flow' then return end -for event in ievents({"PRESENCE_PROBE", "SHUTDOWN"}, 1, timer:rest()) do - if (not event) or (timer:rest() < 1000) then - if not file.exists(pid_file) then break end - local stored = file.read(pid_file) - if stored and stored ~= pid then break end - timer:restart() - end - - if event then - -- log.notice("event:" .. event:serialize("xml")); - local event_name = event:getHeader('Event-Name') - if event_name == 'PRESENCE_PROBE' then - if event:getHeader('proto') == 'flow' and - event:getHeader('Event-Calling-Function') == 'sofia_presence_handle_sip_i_subscribe' - then - local from, to = event:getHeader('from'), event:getHeader('to') - local expires = tonumber(event:getHeader('expires')) - if expires and expires > 0 then - local call_flow_uuid, call_flow_status = find_call_flow(to) - if call_flow_uuid then - log.debugf("Find call flow: %s", to) - presence_in.turn_lamp(call_flow_status == "false", to, call_flow_uuid); - else - log.warningf("Can not find call flow: %s", to) - end - else - log.noticef("%s UNSUBSCRIBE from %s", from, to) - end - end - elseif event_name == 'SHUTDOWN' then - log.notice("shutdown") - break + local from, to = event:getHeader('from'), event:getHeader('to') + local expires = tonumber(event:getHeader('expires')) + if expires and expires > 0 then + local call_flow_uuid, call_flow_status = find_call_flow(to) + if call_flow_uuid then + log.noticef("Find call flow: %s staus: %s", to, tostring(call_flow_status)) + presence_in.turn_lamp(call_flow_status == "false", to, call_flow_uuid) + else + log.warningf("Can not find call flow: %s", to) end + else + log.noticef("%s UNSUBSCRIBE from %s", from, to) end -end +end) -if file.read(pid_file) == pid then - file.remove(pid_file) -end +log.notice("start") + +events:run() log.notice("stop") diff --git a/resources/install/scripts/resources/functions/event_consumer.lua b/resources/install/scripts/resources/functions/event_consumer.lua new file mode 100644 index 0000000000..92da61cc7c --- /dev/null +++ b/resources/install/scripts/resources/functions/event_consumer.lua @@ -0,0 +1,475 @@ +require "resources.functions.mkdir"; +require "resources.functions.split" + +local IntervalTimer = require "resources.functions.interval_timer" +local file = require "resources.functions.file" +local api = require "resources.functions.api" + +local function class(base) + local t = base and setmetatable({}, base) or {} + t.__index = t + t.__class = t + t.__base = base + + function t.new(...) + local o = setmetatable({}, t) + if o.__init then + if t == ... then -- we call as Class:new() + return o:__init(select(2, ...)) + else -- we call as Class.new() + return o:__init(...) + end + end + return o + end + + return t +end + +local function callable(f) + return type(f) == 'function' +end + +local function basename(p) + return (string.match(p, '^(.-)[/\\][^/\\]+$')) +end + +local function split_event(event_name) + local name, class = split_first(event_name, "::", true) + if class then return name, class end + return name +end + +------------------------------------------------------------------------------- +local BasicEventEmitter = class() do + +local ANY_EVENT = {} + +BasicEventEmitter.ANY = ANY_EVENT + +function BasicEventEmitter:__init() + -- map of array of listeners + self._handlers = {} + -- map to convert user's listener to internal wrapper + self._once = {} + + return self +end + +function BasicEventEmitter:on(event, handler) + local list = self._handlers[event] or {} + + for i = 1, #list do + if list[i] == handler then + return self + end + end + + list[#list + 1] = handler + self._handlers[event] = list + + return self +end + +function BasicEventEmitter:many(event, ttl, handler) + self:off(event, handler) + + local function listener(...) + ttl = ttl - 1 + if ttl == 0 then self:off(event, handler) end + handler(...) + end + + self:on(event, listener) + self._once[handler] = listener + + return self +end + +function BasicEventEmitter:once(event, handler) + return self:many(event, 1, handler) +end + +function BasicEventEmitter:off(event, handler) + local list = self._handlers[event] + + if not list then return self end + + if handler then + + local listener = self._once[handler] or handler + self._once[handler] = nil + + for i = 1, #list do + if list[i] == listener then + table.remove(list, i) + break + end + end + + if #list == 0 then self._handlers[event] = nil end + + else + + for handler, listener in pairs(self._once) do + for i = 1, #list do + if list[i] == listener then + self._once[handler] = nil + break + end + end + end + + self._handlers[event] = nil + + end + + return self +end + +function BasicEventEmitter:onAny(handler) + return self:on(ANY_EVENT, handler) +end + +function BasicEventEmitter:manyAny(ttl, handler) + return self:many(ANY_EVENT, ttl, handler) +end + +function BasicEventEmitter:onceAny(handler) + return self:once(ANY_EVENT, handler) +end + +function BasicEventEmitter:offAny(handler) + return self:off(ANY_EVENT, handler) +end + +function BasicEventEmitter:_emit_impl(call_any, event, ...) + local ret = false + + if call_any and ANY_EVENT ~= event then + ret = self:_emit_impl(false, ANY_EVENT, ...) or ret + end + + local list = self._handlers[event] + + if list then + for i = #list, 1, -1 do + if list[i] then + -- we need this check because cb could remove some listeners + list[i](...) + ret = true + end + end + end + + return ret +end + +function BasicEventEmitter:emit(event, ...) + return self:_emit_impl(true, event, ...) +end + +function BasicEventEmitter:_emit_all(...) + -- we have to copy because cb can remove/add some events + -- and we do not need call new one or removed one + local names = {} + for name in pairs(self._handlers) do + names[#names+1] = name + end + + local ret = false + for i = 1, #names do + ret = self:_emit_impl(false, names[i], ...) or ret + end + + return ret +end + +function BasicEventEmitter:_empty() + return nil == next(self._handlers) +end + +function BasicEventEmitter:removeAllListeners(eventName) + if not eventName then + self._handlers = {} + self._once = {} + else + self:off(eventName) + end + + return self +end + +end +------------------------------------------------------------------------------- + +------------------------------------------------------------------------------- +local EventEmitter = class() do + +function EventEmitter:__init(opt) + if opt and opt.wildcard then + assert('`EventEmitter::wildcard` not supported') + -- self._EventEmitter = TreeEventEmitter.new(opt.delimiter) + else + self._EventEmitter = BasicEventEmitter.new() + end + self._EventEmitter_self = opt and opt.self or self + + return self +end + +function EventEmitter:on(event, listener) + assert(event, 'event expected') + assert(callable(listener), 'function expected') + + self._EventEmitter:on(event, listener) + return self +end + +function EventEmitter:many(event, ttl, listener) + assert(event, 'event expected') + assert(type(ttl) == 'number', 'number required') + assert(callable(listener), 'function expected') + + self._EventEmitter:many(event, ttl, listener) + return self +end + +function EventEmitter:once(event, listener) + assert(event, 'event expected') + assert(callable(listener), 'function expected') + + self._EventEmitter:once(event, listener) + return self +end + +function EventEmitter:off(event, listener) + assert(event, 'event expected') + assert((listener == nil) or callable(listener), 'function expected') + + self._EventEmitter:off(event, listener) + return self +end + +function EventEmitter:emit(event, ...) + assert(event, 'event expected') + + return self._EventEmitter:emit(event, self._EventEmitter_self, event, ...) +end + +function EventEmitter:onAny(listener) + assert(callable(listener), 'function expected') + + self._EventEmitter:onAny(listener) + return self +end + +function EventEmitter:manyAny(ttl, listener) + assert(type(ttl) == 'number', 'number required') + assert(callable(listener), 'function expected') + + self._EventEmitter:manyAny(ttl, listener) + return self +end + +function EventEmitter:onceAny(listener) + assert(callable(listener), 'function expected') + + self._EventEmitter:onceAny(listener) + return self +end + +function EventEmitter:offAny(listener) + assert((listener == nil) or callable(listener), 'function expected') + + self._EventEmitter:offAny(listener) + return self +end + +function EventEmitter:removeAllListeners(eventName) + self._EventEmitter:removeAllListeners(eventName) + return self +end + +-- aliases + +EventEmitter.addListener = EventEmitter.on + +EventEmitter.removeListener = EventEmitter.off + +end +------------------------------------------------------------------------------- + +------------------------------------------------------------------------------- +local EventConsumer = class(EventEmitter) do + +function EventConsumer:__init(timeout, pid_file) + self.__base.__init(self) + + if pid_file then timeout = timeout or 60000 end + + if timeout then assert(timeout > 0) end + + self._bound = {} + self._running = false + self._consumer = freeswitch.EventConsumer() + self._timeout = timeout + self._timer = timeout and IntervalTimer.new(timeout) + self._pid = api:execute("create_uuid") or tostring(api:getTime()) + self._pid_file = pid_file + + if pid_file then + local pid_path = basename(self._pid_file) + mkdir(pid_path) + assert(file.write(self._pid_file, self._pid)) + end + + return self +end + +function EventConsumer:_check_pid_file() + if not self._pid_file then + return true + end + if not file.exists(self._pid_file) then + return false + end + + local stored = file.read(self._pid_file) + if stored and stored ~= self._pid then + return false + end + + return true +end + +function EventConsumer:bind(event_name, cb) + if not self._bound[event_name] then + local name, class = split_event(event_name) + + local ok, err + if not class then ok, err = self._consumer:bind(name) + else ok, err = self._consumer:bind(name, class) end + + if ok then self._bound[event_name] = true end + end + + if self._bound[event_name] and cb then + if event_name == 'ALL' then + self:onAny(function(self, name, event) + if event then return cb(self, name, event) end + end) + else + self:on(event_name, cb) + end + end +end + +function EventConsumer:run() + self._timer:restart() + self._running = true + + while self._running do + local timeout + if self._timer then + timeout = self._timer:rest() + if timeout == 0 then + if not self:_check_pid_file() then + return + end + timeout = self._timeout + self._timer:restart() + self:emit('TIMEOUT') + end + else + -- wait infinity + timeout = 0 + end + + local event = self._consumer:pop(1, timeout) + + if not event then + if not self:_check_pid_file() then + return + end + else + local event_name = event:getHeader('Event-Name') + if self._bound[event_name] then + self:emit(event_name, event) + end + local event_class = event:getHeader('Event-Subclass') + if event_class and #event_class > 0 then + event_name = event_name .. '::' .. event_class + self:emit(event_name, event) + end + end + end + + self._running = false +end + +function EventConsumer:stop() + self._running = false + if self._pid_file and self:_check_pid_file() then + file.remove(self._pid_file) + end +end + +end +------------------------------------------------------------------------------- + +--- +-- +-- @param events [string|array]- array of events to subscribe. To specify subclass you +-- can use string like `::` or array like `{, }`. +-- If `events` is string then it specify single event. +-- @param block [booolean?] - by default it use block +-- @param timeout [number?] - by default it 0. If set `block` that means infinity wait. +-- +-- @usage +-- -- do blocked itarate over 'MEMCACHE' and 'SHUTDOWN' events +-- for event in ievents{'MEMCACHE','SHUTDOWN'} do ... end +-- +-- -- do blocked iterate with timeout 1 sec +-- for event in ievents('SHUTDOWN', 1000) do +-- if event then -- has event +-- else -- timeout +-- end +-- end +local ievents = function(events, block, timeout) + if type(events) == 'string' then + events = freeswitch.EventConsumer(split_event(events)) + elseif type(events) == 'table' then + local array = events + events = freeswitch.EventConsumer() + for _, event in ipairs(array) do + local name, class + if type(event) == 'table' then + base, sub = event[1], event[2] + else + name, class = split_event(base) + end + if not class then events:bind(name) + else events:bind(name, class) end + end + end + + if type(block) == 'number' then + block, timeout = true, block + end + + timeout = timeout or 0 + block = block and 1 or 0 + + return function() + local event = events:pop(block, timeout) + if not event then return false end + return event + end +end + +return { + EventConsumer = EventConsumer; + new = EventConsumer.new; + ievents = ievents; +} \ No newline at end of file diff --git a/resources/install/scripts/resources/functions/ievents.lua b/resources/install/scripts/resources/functions/ievents.lua deleted file mode 100644 index ae561b7118..0000000000 --- a/resources/install/scripts/resources/functions/ievents.lua +++ /dev/null @@ -1,30 +0,0 @@ -local ievents = function(events, ...) - if type(events) == 'string' then - events = freeswitch.EventConsumer(events) - elseif type(events) == 'table' then - local array = events - events = freeswitch.EventConsumer() - for _, event in ipairs(array) do - local base, sub - if type(event) == 'table' then - base, sub = event[1], event[2] - else - base = event - end - if not sub then events:bind(base) - else events:bind(base, sub) end - end - end - - local block, timeout = ... - if timeout and (timeout == 0) then block, timeout = 0, 0 end - timeout = timeout or 0 - - return function() - local event = events:pop(block, timeout) - if not event then return false end - return event - end -end - -return ievents \ No newline at end of file diff --git a/resources/install/scripts/resources/functions/interval_timer.lua b/resources/install/scripts/resources/functions/interval_timer.lua index 397c303564..0cc496421e 100644 --- a/resources/install/scripts/resources/functions/interval_timer.lua +++ b/resources/install/scripts/resources/functions/interval_timer.lua @@ -1,3 +1,15 @@ +-- absolute timer +local fs_time if freeswitch then + local api = require "resources.functions.api" + fs_time = { + now = function() return api:getTime() end; + elapsed = function(t) return api:getTime() - t end; + ms_to_time = function(ms) return ms end; + time_to_ms = function(t) return t end; + } +end + +-- absolute timer local os_time = { now = function() return os.time() end; elapsed = function(t) return os.difftime(os.time(), t) end; @@ -5,6 +17,7 @@ local os_time = { time_to_ms = function(t) return t * 1000 end; } +-- monotonic timer (not work on my test Debian system) local os_clock = { now = function() return os.clock() end; elapsed = function(t) return os.clock() - t end; @@ -12,13 +25,19 @@ local os_clock = { time_to_ms = function(t) return t * 1000 end; } +local timers = { + freeswitch = fs_time; + time = os_time; + clock = os_clock; +} + local IntervalTimer = {} do IntervalTimer.__index = IntervalTimer function IntervalTimer.new(interval, timer) local o = setmetatable({}, IntervalTimer) o._interval = interval - o._timer = timer or os_clock + o._timer = timer and assert(timers[timer], "unknown timer: " .. timer) or os_time return o end