Просмотр исходного кода

epoch in user shnge state webhook

Hal De 2 лет назад
Родитель
Сommit
d04da296b5
1 измененных файлов с 93 добавлено и 48 удалено
  1. 93 48
      app/app.py

+ 93 - 48
app/app.py

@@ -6,6 +6,7 @@ import logging
 import os
 import re
 import json
+import time
 from datetime import date as date
 from datetime import datetime as dt
 from datetime import timedelta as td
@@ -101,7 +102,8 @@ app.cache = {'devices':{},
              'queues':{},
              'calls':{},
              'cel_queue_calls':{},
-             'cel_calls':{}}
+             'cel_calls':{},
+             'dead_callbacks':{}}
 
 manager = Manager(
   loop=main_loop,
@@ -153,24 +155,26 @@ async def reloadCallback(mngr: Manager, msg: Message):
 async def extensionStatusCallback(mngr: Manager, msg: Message):
   user = msg.exten
   state = msg.statustext.lower()
+  epoch = time.time_ns()
   #app.logger.warning('ExtensionStatus({}, {})'.format(user, state))
   if user in app.cache['ustates']:
     prevState = getUserStateCombined(user)
     app.cache['ustates'][user] = state
     combinedState = getUserStateCombined(user)
     if combinedState != prevState:
-      await userStateChangeCallback(user, combinedState, prevState)
+      await userStateChangeCallback(user, combinedState,epoch, prevState)
 
 @manager.register_event('PresenceStatus')
 async def presenceStatusCallback(mngr: Manager, msg: Message):
   user = msg.exten #hint = msg.hint
   state = msg.status.lower()
+  epoch = time.time_ns()
   if user in app.cache['ustates']:
     prevState = getUserStateCombined(user)
     app.cache['pstates'][user] = state
     combinedState = getUserStateCombined(user)
     if combinedState != prevState:
-      await userStateChangeCallback(user, combinedState, prevState)
+      await userStateChangeCallback(user, combinedState, epoch, prevState)
 
 @manager.register_event('Hangup')
 async def hangupCallback(mngr: Manager, msg: Message):
@@ -344,6 +348,7 @@ async def celCallback(mngr: Manager, msg: Message):
            if (msg.linkedid in  app.cache['cel_calls']):
              fillCallbackParameters(_cb,app.cache['cel_calls'][msg.linkedid],cid)
            reply = await doCallback(device, _cb)
+           reply = await doCallback('user_calling', _cb)
     if ((msg.Application == 'Queue') and
         (msg.EventName == 'APP_START') and 
         (firstMessage.Context.startswith('from-internal')) and 
@@ -563,9 +568,9 @@ async def getCallInfo(callid):
     
   _q = '''SELECT *
           FROM cdr 
-          WHERE linkedid=:linkedid or linkedid in (select distinct linkedid from cdr where uniqueid in :uniqueids)
+          WHERE  linkedid in (select distinct linkedid from cdr where uniqueid in :uniqueids)
           ORDER BY sequence;'''
-  _v = {'linkedid': linkedid, 'uniqueids': uniqueids}
+  _v = {'uniqueids': uniqueids}
   #app.logger.warning('values {}'.format(_v)) 
   _f = True
   unique_ids = set()
@@ -634,20 +639,45 @@ async def getUserCDR(user,
 async def getCEL(start=None, end=None, table='cel', field='eventtime', sort='id'):
   return await getCDR(start, end, table, field, sort)
 
+async def getCallback(callback):
+  row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
+                               values={'device': callback})
+  if row is not None:
+      return row['url']
+  return False
+
+async def replaceCallback(callback, url):
+  await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
+                       values={'device': callback,'url': url})
+  
+async def addCallback(callback, url):
+  await db.execute(query='update callback_urls set url = CONCAT(url,";",:url) where device= :device and  locate(:url,url) in (0,NULL)',
+                       values={'device': callback,'url': url})
+  
 async def doCallback(entity, msg):
+  reply = None
   row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device', values={'device': entity})
   if ((row is not None) and (row['url'].startswith('http')) and ('blackhole' not in row['url'])):
-    app.logger.warning(f'''POST {row['url']} data: {str(msg)}''')
+#    if (row['url'] in app.cache['dead_callbacks']):
+#      if (app.cache['dead_callbacks'][row['url']] > dt.now()):
+#        app.logger.warning(f'''Skipping CALLBACK URL {row['url']} (disabled for {(app.cache['dead_callbacks'][row['url']]-dt.now()).total_seconds()} seconds)''')
+#        return None
+#      else:
+#        del app.cache['dead_callbacks'][row['url']]
+    app.logger.warning(f'''CALLBACK URL {row['url']} ''')
     if not 'HTTP_CLIENT' in app.config:
       await initHttpClient()
-    try:
-      reply = await app.config['HTTP_CLIENT'].post(row['url'], json=msg)
-      return reply
-    except Exception as e:
-      app.logger.warning('callback error {}'.format(row['url']))
+    for u in row['url'].split(";"):
+      app.logger.warning(f'''POST {u} data: {str(msg)}''')
+      try:
+        reply = await app.config['HTTP_CLIENT'].post(u, json=msg)
+        #return reply
+      except Exception as e:
+        app.cache['dead_callbacks'][row['url']] = dt.now() + td(minutes=15)
+        app.logger.warning(f'''callback error {u} data: {str(msg)} error {str(e)}''')
   else:
     app.logger.warning('No callback url defined for {}'.format(entity))
-  return None
+  return reply
 
 async def doCallbackPostfix(entity,postfix, msg):
   row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device', values={'device': entity})
@@ -1207,7 +1237,7 @@ async def rebindLostDevices():
   app.cache['usermap'] = copy.deepcopy(usermap)
   app.cache['devicemap'] = copy.deepcopy(devicemap)
 
-async def userStateChangeCallback(user, state, prevState = None):
+async def userStateChangeCallback(user, state, epoch, prevState = None):
   reply = None
   device = None
   if (user in app.cache['devicemap']):
@@ -1215,12 +1245,14 @@ async def userStateChangeCallback(user, state, prevState = None):
     if device is not None:
       _cb = {'webhook_name': 'user_status',
              'user': user,
+             'epoch': epoch,
              'state': state,
              'prev_state':prevState}
       reply = await doCallback(device, _cb)
 
   _cb = {'webhook_name': 'user_status',
              'user': user,
+             'epoch': epoch,
              'state': state,
              'prev_state':prevState}
   reply = await doCallback('user_status', _cb)
@@ -1769,6 +1801,7 @@ class DeviceCallback(Resource):
     if url is not None:
       await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
                        values={'device': device,'url': url})
+      app.logger.warning(f'''set callback for {device}: {url}''')
     else:
       row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
                                values={'device': device})
@@ -1780,6 +1813,7 @@ class DeviceCallback(Resource):
 class GroupRingingCallback(Resource):
   @authRequired
   @app.param('url', 'used to set the Callback url for the group ringing callback', 'query')
+  @app.param('add', 'set to 1 for adding this url instad of replace', 'query')
   @app.response(HTTPStatus.OK, 'JSON data {"url":url}')
   @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required')
   async def get(self):
@@ -1788,20 +1822,22 @@ class GroupRingingCallback(Resource):
     if not request.admin:
       abort(401)
     url = request.args.get('url', None)
+    add = bool(int(request.args.get('add', 0)))
+    callback = 'groupRinging'
     if url is not None:
-      await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
-                       values={'device': 'groupRinging','url': url})
+      if add:
+        await addCallback(callback,url)
+      else:
+        await replaceCallback(callback,url)
     else:
-      row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
-                               values={'device': 'groupRinging'})
-      if row is not None:
-        url = row['url']
-    return successCommonCallbackURL('groupRinging', url)
+      url = await getCallback(callback)
+    return successCommonCallbackURL(callback, url)
 
 @app.route('/group/answered/callback')
 class GroupAnsweredCallback(Resource):
   @authRequired
   @app.param('url', 'used to set the Callback url for the group answered callback', 'query')
+  @app.param('add', 'set to 1 for adding this url instad of replace', 'query')
   @app.response(HTTPStatus.OK, 'JSON data {"url":url}')
   @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required')
   async def get(self):
@@ -1810,20 +1846,22 @@ class GroupAnsweredCallback(Resource):
     if not request.admin:
       abort(401)
     url = request.args.get('url', None)
+    add = bool(int(request.args.get('add', 0)))
+    callback = 'groupAnswered'
     if url is not None:
-      await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
-                       values={'device': 'groupAnswered','url': url})
+      if add :
+        await addCallback(callback,url)
+      else:
+        await replaceCallback(callback,url)
     else:
-      row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
-                               values={'device': 'groupAnswered'})
-      if row is not None:
-        url = row['url']
-    return successCommonCallbackURL('groupAnswered', url)
+      url = await getCallback(callback)
+    return  successCommonCallbackURL(callback, url)
 
 @app.route('/queue/enter/callback')
 class QueueEnterCallback(Resource):
   @authRequired
   @app.param('url', 'used to set the Callback url for the queue enter callback', 'query')
+  @app.param('add', 'set to 1 for adding this url instad of replace', 'query')
   @app.response(HTTPStatus.OK, 'JSON data {"url":url}')
   @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required')
   async def get(self):
@@ -1832,20 +1870,22 @@ class QueueEnterCallback(Resource):
     if not request.admin:
       abort(401)
     url = request.args.get('url', None)
+    add = bool(int(request.args.get('add', 0)))
+    callback = 'queueEnter'
     if url is not None:
-      await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
-                       values={'device': 'queueEnter','url': url})
+      if add :
+        await addCallback(callback,url)
+      else:
+        await replaceCallback(callback,url)
     else:
-      row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
-                               values={'device': 'queueEnter'})
-      if row is not None:
-        url = row['url']
-    return successCommonCallbackURL('queueEnter', url)
+      url = await getCallback(callback)
+    return  successCommonCallbackURL(callback, url)
 
 @app.route('/queue/leave/callback')
 class QueueLeaveCallback(Resource):
   @authRequired
   @app.param('url', 'used to set the Callback url for the queue leave callback', 'query')
+  @app.param('add', 'set to 1 for adding this url instad of replace', 'query')
   @app.response(HTTPStatus.OK, 'JSON data {"url":url}')
   @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required')
   async def get(self):
@@ -1854,15 +1894,16 @@ class QueueLeaveCallback(Resource):
     if not request.admin:
       abort(401)
     url = request.args.get('url', None)
+    add = bool(int(request.args.get('add', 0)))
+    callback = 'queueLeave'
     if url is not None:
-      await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
-                       values={'device': 'queueLeave','url': url})
+      if add :
+        await addCallback(callback,url)
+      else:
+        await replaceCallback(callback,url)
     else:
-      row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
-                               values={'device': 'queueLeave'})
-      if row is not None:
-        url = row['url']
-    return successCommonCallbackURL('queueLeave', url)
+      url = await getCallback(callback)
+    return  successCommonCallbackURL(callback, url)
 
 
 @app.route('/callback/<type>')
@@ -1870,6 +1911,7 @@ class CommonCallback(Resource):
   @authRequired
   @app.param('type', 'Callback type', 'path')
   @app.param('url', 'used to set the Callback url for the autocall', 'query')
+  @app.param('add', 'set to 1 for adding this url instad of replace', 'query')
   @app.response(HTTPStatus.OK, 'JSON data {"url":url}')
   @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required')
   async def get(self,type):
@@ -1878,15 +1920,18 @@ class CommonCallback(Resource):
     if not request.admin:
       abort(401)
     url = request.args.get('url', None)
+    add = bool(int(request.args.get('add', 0)))
+    callback = type
     if url is not None:
-      await db.execute(query='REPLACE INTO callback_urls (device, url) VALUES (:device, :url)',
-                       values={'device': type,'url': url})
+      if add :
+        await addCallback(callback,url)
+      else:
+        await replaceCallback(callback,url)
     else:
-      row = await db.fetch_one(query='SELECT url FROM callback_urls WHERE device = :device',
-                               values={'device': type})
-      if row is not None:
-        url = row['url']
-    return successCommonCallbackURL(type, url)
+      url = await getCallback(callback)
+    return  successCommonCallbackURL(callback, url)
+
+
 
 manager.connect()
 app.run(loop=main_loop, host='0.0.0.0', port=app.config['PORT'])