#!/usr/bin/env python3 import asyncio import aiohttp import logging import os import re import json from quart import jsonify, request, render_template_string, abort from quart_openapi import Pint, Resource from http import HTTPStatus from panoramisk import Manager, Message from utils import * from logging.config import dictConfig # One asyncio event loop is used for AMI communication and HTTP requests routing with Quart main_loop = asyncio.get_event_loop() app = Pint(__name__, title=os.getenv('APP_TITLE', 'PBX API'), no_openapi=True) app.config.update({ 'TITLE': os.getenv('APP_TITLE', 'PBX API'), 'APPLICATION_ROOT': os.getenv('APP_APPLICATION_ROOT', None), 'SCHEME': os.getenv('APP_SCHEME', 'http'), 'FQDN': os.getenv('APP_FQDN', '127.0.0.1'), 'PORT': int(os.getenv('APP_API_PORT', 8000)), 'BODY_TIMEOUT': int(os.getenv('APP_BODY_TIMEOUT', 60)), 'DEBUG': os.getenv('APP_DEBUG', 'False').lower() in TRUEs, 'MAX_CONTENT_LENGTH': int(os.getenv('APP_MAX_CONTENT_LENGTH', 16777216)), 'AMI_HOST': os.getenv('APP_AMI_HOST', '127.0.0.1'), 'AMI_PORT': int(os.getenv('APP_AMI_PORT', 5038)), 'AMI_USERNAME': os.getenv('APP_AMI_USERNAME', 'app'), 'AMI_SECRET': os.getenv('APP_AMI_SECRET', 'secret'), 'AMI_PING_DELAY': int(os.getenv('APP_AMI_PING_DELAY', 10)), 'AMI_PING_INTERVAL': int(os.getenv('APP_AMI_PING_INTERVAL', 10)), 'AMI_TIMEOUT': int(os.getenv('APP_AMI_TIMEOUT', 5)), 'AUTH_HEADER': os.getenv('APP_AUTH_HEADER', 'APP-auth-token'), 'AUTH_SECRET': os.getenv('APP_AUTH_SECRET', '3bfbeaabf363dd64fe263bd36830a6b6'), 'SWAGGER_JS_URL': os.getenv('APP_SWAGGER_JS_URL', SWAGGER_JS_URL), 'SWAGGER_CSS_URL': os.getenv('APP_SWAGGER_CSS_URL', SWAGGER_CSS_URL), 'STATE_CALLBACK_URL': os.getenv('APP_STATE_CALLBACK_URL', None), 'EXTRA_API_URL': os.getenv('APP_EXTRA_API_URL', None), 'STATE_CACHE': {'user':{}, 'presence':{}}}) manager = Manager( loop=main_loop, host=app.config['AMI_HOST'], port=app.config['AMI_PORT'], username=app.config['AMI_USERNAME'], secret=app.config['AMI_SECRET'], ping_delay=app.config['AMI_PING_DELAY'], ping_interval=app.config['AMI_PING_INTERVAL'], reconnect_timeout=app.config['AMI_TIMEOUT'], ) class AuthMiddleware: '''ASGI process middleware that rejects requests missing the correct authentication header''' def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if 'headers' not in scope: return await self.app(scope, receive, send) for header, value in scope['headers']: if ((header == bytes(app.config['AUTH_HEADER'].lower(), 'utf-8')) and (value == bytes(app.config['AUTH_SECRET'], 'utf-8'))): return await self.app(scope, receive, send) # Paths "/openapi.json" and "/ui" do not require auth if (('path' in scope) and (scope['path'] in NO_AUTH_ROUTES)): return await self.app(scope, receive, send) return await self.error_response(receive, send) async def error_response(self, receive, send): await send({'type': 'http.response.start', 'status': 401, 'headers': [(b'content-length', b'21')]}) await send({'type': 'http.response.body', 'body': b'Authorization requred', 'more_body': False}) app.asgi_app = AuthMiddleware(app.asgi_app) @manager.register_event('FullyBooted') async def fullyBootedCallback(mngr: Manager, msg: Message): await refreshStatesCache() @manager.register_event('ExtensionStatus') async def extensionStatusCallback(mngr: Manager, msg: Message): user = msg.exten #hint = msg.hint state = msg.statustext.lower() if user in app.config['STATE_CACHE']['user']: _combinedState = getUserStateCombined(user) app.config['STATE_CACHE']['user'][user] = state combinedState = getUserStateCombined(user) if combinedState != _combinedState: await userStateChangeCallback(user, combinedState) @manager.register_event('PresenceStatus') async def presenceStatusCallback(mngr: Manager, msg: Message): user = msg.exten #hint = msg.hint state = msg.status.lower() if user in app.config['STATE_CACHE']['user']: _combinedState = getUserStateCombined(user) app.config['STATE_CACHE']['presence'][user] = state combinedState = getUserStateCombined(user) if combinedState != _combinedState: await userStateChangeCallback(user, combinedState) @app.before_first_request async def initHttpClient(): app.config['HTTP_CLIENT'] = aiohttp.ClientSession(loop=main_loop) @app.route('/openapi.json') async def openapi(): '''Generates JSON that conforms OpenAPI Specification ''' schema = app.__schema__ schema['servers'] = [{'url':'{}://{}:{}'.format(app.config['SCHEME'], app.config['FQDN'], app.config['PORT'])}] if app.config['EXTRA_API_URL'] is not None: schema['servers'].append({'url':app.config['EXTRA_API_URL']}) schema['components'] = {'securitySchemes':{'ApiKey':{'type': 'apiKey', 'name': app.config['AUTH_HEADER'], 'in': 'header'}}} schema['security'] = [{'ApiKey':[]}] return jsonify(schema) @app.route('/ui') async def ui(): '''Swagger UI ''' return await render_template_string(SWAGGER_TEMPLATE, title=app.config['TITLE'], js_url=app.config['SWAGGER_JS_URL'], css_url=app.config['SWAGGER_CSS_URL']) @app.route('/ami/action', methods=['POST']) async def action(): _payload = await request.get_data() reply = await manager.send_action(json.loads(_payload)) return str(reply) @app.route('/ami/getvar/') async def amiGetVar(variable): '''AMI GetVar Returns value of requested variable using AMI action GetVar in background. Parameters: variable (string): Variable to query for Returns: string: Variable value or empty string if variable not found ''' reply = await manager.send_action({'Action': 'GetVar', 'Variable': variable}) app.logger.warning('GetVar({})->{}'.format(variable, reply.value)) return reply.value async def amiSetVar(variable, value): '''AMI SetVar Sets variable using AMI action SetVar to value in background. Parameters: variable (string): Variable to set value (string): Value to set for variable Returns: boolean: True if DBPut action was successfull, False overwise ''' reply = await manager.send_action({'Action': 'SetVar', 'Variable': variable, 'Value': value}) app.logger.warning('SetVar({}, {})'.format(variable, value)) if (isinstance(reply, Message) and reply.success): return True return False async def amiDBGet(family, key): '''AMI DBGet Returns value of requested astdb key using AMI action DBGet in background. Parameters: family (string): astdb key family to query for key (string): astdb key to query for Returns: string: Value or empty string if variable not found ''' reply = await manager.send_action({'Action': 'DBGet', 'Family': family, 'Key': key}) if (isinstance(reply, list) and (len(reply) > 1)): for message in reply: if (message.event == 'DBGetResponse'): app.logger.warning('DBGet(/{}/{})->{}'.format(family, key, message.val)) return message.val app.logger.warning('DBGet(/{}/{})->Error!'.format(family, key)) return None async def amiDBPut(family, key, value): '''AMI DBPut Writes value to astdb by family and key using AMI action DBPut in background. Parameters: family (string): astdb key family to write to key (string): astdb key to write to value (string): value to write Returns: boolean: True if DBPut action was successfull, False overwise ''' reply = await manager.send_action({'Action': 'DBPut', 'Family': family, 'Key': key, 'Val': value}) app.logger.warning('DBPut(/{}/{}, {})'.format(family, key, value)) if (isinstance(reply, Message) and reply.success): return True return False async def amiDBDel(family, key): '''AMI DBDel Deletes key from family in astdb using AMI action DBDel in background. Parameters: family (string): astdb key family key (string): astdb key to delete Returns: boolean: True if DBDel action was successfull, False overwise ''' reply = await manager.send_action({'Action': 'DBDel', 'Family': family, 'Key': key}) app.logger.warning('DBDel(/{}/{})'.format(family, key)) if (isinstance(reply, Message) and reply.success): return True return False async def amiSetHint(context, user, hint): '''AMI SetHint Sets hint for user in context using AMI action DialplanUserAdd with Replace=true in background. Parameters: context (string): dialplan context user (string): user hint (string): hint for user Returns: boolean: True if DialplanUserAdd action was successfull, False overwise ''' reply = await manager.send_action({'Action': 'DialplanExtensionAdd', 'Context': context, 'Extension': user, 'Priority': 'hint', 'Application': hint, 'Replace': 'yes'}) app.logger.warning('SetHint({},{},{})'.format(context, user, hint)) if (isinstance(reply, Message) and reply.success): return True return False async def amiPresenceState(user): '''AMI PresenceState request for CustomPresence provider Parameters: user (string): user Returns: string: One of: not_set, unavailable, available, away, xa, chat or dnd ''' reply = await manager.send_action({'Action': 'PresenceState', 'Provider': 'CustomPresence:{}'.format(user)}) app.logger.warning('PresenceState({})'.format(user)) if (isinstance(reply, Message) and reply.success): return reply.state return None async def amiPresenceStateList(): states = {} reply = await manager.send_action({'Action':'PresenceStateList'}) if len(reply) >= 2: for message in reply: if message.event == 'PresenceStateChange': user = re.search('CustomPresence:(\d+)', message.presentity).group(1) states[user] = message.status app.logger.warning('PresenceStateList: {}'.format(','.join(states.keys()))) return states async def amiExtensionStateList(): states = {} reply = await manager.send_action({'Action':'ExtensionStateList'}) if len(reply) >= 2: for message in reply: if ((message.event == 'ExtensionStatus') and (message.context == 'ext-local')): states[message.exten] = message.statustext.lower() app.logger.warning('ExtensionStateList: {}'.format(','.join(states.keys()))) return states async def amiCommand(command): '''AMI Command Runs specified command using AMI action Command in background. Parameters: command (string): command to run Returns: boolean, list: tuple representing the boolean result of request and list of lines of command output ''' reply = await manager.send_action({'Action': 'Command', 'Command': command}) result = [] if (isinstance(reply, Message) and reply.success): if isinstance(reply.output, list): result = reply.output else: result = reply.output.split('\n') app.logger.warning('Command({})->{}'.format(command, '\n'.join(result))) return True, result app.logger.warning('Command({})->Error!'.format(command)) return False, result async def amiReload(module='core'): '''AMI Reload Reload specified asterisk module using AMI action reload in background. Parameters: module (string): module to reload, defaults to core Returns: boolean: True if Reload action was successfull, False overwise ''' reply = await manager.send_action({'Action': 'Reload', 'Module': module}) app.logger.warning('Reload({})'.format(module)) if (isinstance(reply, Message) and reply.success): return True return False async def getGlobalVars(): globalVars = GlobalVars() for _var in globalVars.d(): setattr(globalVars, _var, await amiGetVar(_var)) return globalVars async def setUserHint(user, dial, ast): if dial in NONEs: hint = 'CustomPresence:{}'.format(user) else: _dial= [dial] if (ast.DNDDEVSTATE == 'TRUE'): _dial.append('Custom:DND{}'.format(user)) hint = '{},CustomPresence:{}'.format('&'.join(_dial), user) return await amiSetHint('ext-local', user, hint) async def amiQueues(): queues = {} reply = await manager.send_action({'Action':'QueueStatus'}) if len(reply) >= 2: for message in reply: if message.event == 'QueueMember': _qm = QueueMember(re.search('Local\/(\d+)', message.location).group(1)) queues.setdefault(message.queue, []).append(_qm.fromMessage(message)) return queues async def amiDeviceChannel(device): reply = await manager.send_action({'Action':'CoreShowChannels'}) if len(reply) >= 2: for message in reply: if message.event == 'CoreShowChannel': if message.calleridnum == device: return message.channel return None async def setQueueStates(queues, user, device, state): for queue in [_q for _q, _ma in queues.items() for _m in _ma if _m.user == user]: await amiSetVar('DEVICE_STATE(Custom:QUEUE{}*{})'.format(device, queue), state) async def getDeviceUser(device): return await amiDBGet('DEVICE', '{}/user'.format(device)) async def getDeviceDial(device): return await amiDBGet('DEVICE', '{}/dial'.format(device)) async def getUserCID(user): return await amiDBGet('AMPUSER', '{}/cidnum'.format(user)) async def setDeviceUser(device, user): return await amiDBPut('DEVICE', '{}/user'.format(device), user) async def getUserDevice(user): return await amiDBGet('AMPUSER', '{}/device'.format(user)) async def setUserDevice(user, device): if device is None: return await amiDBDel('AMPUSER', '{}/device'.format(user)) else: return await amiDBPut('AMPUSER', '{}/device'.format(user), device) async def unbindOtherDevices(user, newDevice, queues, ast): '''Unbinds user from all devices except newDevice and sets all required device states. ''' devices = await amiDBGet('AMPUSER', '{}/device'.format(user)) if devices not in NONEs: for _device in sorted(set(devices.split('&')), key=int): if _device != newDevice: if ast.FMDEVSTATE == 'TRUE': await amiSetVar('DEVICE_STATE(Custom:FOLLOWME{})'.format(_device), 'INVALID') if ast.QUEDEVSTATE == 'TRUE': await setQueueStates(queues, user, _device, 'NOT_INUSE') if ast.DNDDEVSTATE: await amiSetVar('DEVICE_STATE(Custom:DEVDND{})'.format(_device), 'NOT_INUSE') if ast.CFDEVSTATE: await amiSetVar('DEVICE_STATE(Custom:DEVCF{})'.format(_device), 'NOT_INUSE') await amiDBPut('DEVICE', '{}/user'.format(_device), 'none') async def setUserDeviceStates(user, device, queues, ast): if ast.FMDEVSTATE == 'TRUE': _followMe = await amiDBGet('AMPUSER', '{}/followme/ddial'.format(user)) if _followMe is not None: await amiSetVar('DEVICE_STATE(Custom:FOLLOWME{})'.format(device), followMe2DevState(_followMe)) if ast.QUEDEVSTATE == 'TRUE': await setQueueStates(queues, user, device, 'INUSE') if ast.DNDDEVSTATE: _dnd = await amiDBGet('DND', user) await amiSetVar('DEVICE_STATE(Custom:DEVDND{})'.format(device), 'INUSE' if _dnd == 'YES' else 'NOT_INUSE') if ast.CFDEVSTATE: _cf = await amiDBGet('CF', user) await amiSetVar('DEVICE_STATE(Custom:DEVCF{})'.format(device), 'INUSE' if _cf != '' else 'NOT_INUSE') async def refreshStatesCache(): app.config['STATE_CACHE']['user'] = await amiExtensionStateList() app.config['STATE_CACHE']['presence'] = await amiPresenceStateList() return len(app.config['STATE_CACHE']['user']) async def userStateChangeCallback(user, state): reply = None if ((app.config['STATE_CALLBACK_URL'] not in NONEs) and ('HTTP_CLIENT' in app.config)): reply = await app.config['HTTP_CLIENT'].post(app.config['STATE_CALLBACK_URL'], json={'user': user, 'state': state}) else: app.logger.warning('{} changed state to: {}'.format(user, state)) return reply def getUserStateCombined(user): if user not in app.config['STATE_CACHE']['user']: return None _state = app.config['STATE_CACHE']['user'][user] if (_state == 'idle'): if (user in app.config['STATE_CACHE']['presence']): if app.config['STATE_CACHE']['presence'][user] in ('not_set','available', 'xa', 'chat'): return 'available' else: return app.config['STATE_CACHE']['presence'][user] else: return 'available' else: if _state in ('unavailable', 'ringing'): return _state else: return 'busy' def getUsersStatesCombined(): return {user:getUserStateCombined(user) for user in app.config['STATE_CACHE']['user']} @app.route('/atxfer//') class AtXfer(Resource): @app.param('userA', 'User initiating the attended transfer', 'path') @app.param('userB', 'Transfer destination user', 'path') @app.response(HTTPStatus.OK, 'Successfuly transfered the call') @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required') @app.response(HTTPStatus.NOT_FOUND, 'User does not exist, or user is not bound to device, \ or device is offline, or no current call for user') @app.response(HTTPStatus.BAD_REQUEST, 'Transfer failed somehow or other AMI error') async def get(self, userA, userB): '''Attended call transfer ''' device = await getUserDevice(userA) if device in NONEs: abort(HTTPStatus.NOT_FOUND) channel = await amiDeviceChannel(device) if channel in NONEs: abort(HTTPStatus.NOT_FOUND) reply = await manager.send_action({'Action':'Atxfer', 'Channel':channel, 'async':'false', 'Exten':userB}) if (isinstance(reply, Message) and reply.success): return '', HTTPStatus.OK abort(HTTPStatus.BAD_REQUEST) @app.route('/users/states') class UsersStates(Resource): @app.response(HTTPStatus.OK, 'JSON map of form {user1:state1, user2:state2, ...}') @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required') async def get(self): '''Returns all users with their states. Possible states are: available, away, dnd, inuse, busy, unavailable, ringing ''' await refreshStatesCache() return getUsersStatesCombined() @app.route('/user//presence') class UserPresenceState(Resource): @app.param('user', 'User to query for presence state', 'path') @app.response(HTTPStatus.OK, 'Presence state string') @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required') @app.response(HTTPStatus.NOT_FOUND, 'User does not exist') @app.response(HTTPStatus.BAD_REQUEST, 'AMI error') async def get(self, user): '''Returns user's presence state. One of: not_set | unavailable | available | away | xa | chat | dnd ''' cidnum = await getUserCID(user) # Check if user exists in astdb if cidnum is None: abort(HTTPStatus.NOT_FOUND) state = await amiPresenceState(user) if state is None: abort(HTTPStatus.BAD_REQUEST) return state @app.route('/user//presence/') class SetUserPresenceState(Resource): @app.param('user', 'Target user to set the presence state', 'path') @app.param('state', 'The presence state to set for user, one of: not_set, unavailable, available, away, xa, chat or dnd', 'path') @app.response(HTTPStatus.OK, 'Successfuly set the presence state') @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required') @app.response(HTTPStatus.NOT_FOUND, 'User does not exist') @app.response(HTTPStatus.BAD_REQUEST, 'Wrong state string ot other AMI error') async def get(self, user, state): '''Sets user's presence state. Allowed states: not_set | unavailable | available | away | xa | chat | dnd ''' cidnum = await getUserCID(user) # Check if user exists in astdb if cidnum is None: abort(HTTPStatus.NOT_FOUND) if await amiSetVar('PRESENCE_STATE(CustomPresence:{})'.format(user), state): return '', HTTPStatus.OK else: abort(HTTPStatus.BAD_REQUEST) @app.route('/device///on') @app.route('/user///on') class UserDeviceBind(Resource): @app.param('device', 'Device number to bind to', 'path') @app.param('user', 'User to bind to device', 'path') @app.response(HTTPStatus.OK, 'JSON reply with fields "success" and "result"') @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required') async def get(self, device, user): '''Binds user to device. Both user and device numbers are checked for existance. Any device user was previously bound to, is unbound. Any user previously bound to device is unbound also. ''' cidnum = await getUserCID(user) # Check if user exists in astdb if cidnum is None: return noUser(user) dial = await getDeviceDial(device) # Check if device exists in astdb if dial is None: return noDevice(device) currentUser = await getDeviceUser(device) # Check if any user is already bound to device if currentUser == user: return beenBound(user, device) ast = await getGlobalVars() queues = await amiQueues() if currentUser not in NONEs: # If any other user is bound to device, unbind him, await setUserDevice(currentUser, None) if ast.QUEDEVSTATE == 'TRUE': # set device states for previous user queues await setQueueStates(queues, currentUser, device, 'NOT_INUSE') await setUserHint(currentUser, None, ast) # set hints for previous user await setDeviceUser(device, user) # Bind user to device # If user is bound to some other devices, unbind him and set # device states for those devices await unbindOtherDevices(user, device, queues, ast) if not (await setUserHint(user, dial, ast)): # Set hints for user on new device return hintError(user, device) await setUserDeviceStates(user, device, queues, ast) # Set device states for users new device if not (await setUserDevice(user, device)): # Bind device to user return bindError(user, device) return beenBound(user, device) @app.route('/device//off') class DeviceUnBind(Resource): @app.param('device', 'Device number to unbind', 'path') @app.response(HTTPStatus.OK, 'JSON reply with fields "success" and "result"') @app.response(HTTPStatus.UNAUTHORIZED, 'Authorization required') async def get(self, device): '''Unbinds any user from device. Device is checked for existance. ''' dial = await getDeviceDial(device) # Check if device exists in astdb if dial is None: return noDevice(device) currentUser = await getDeviceUser(device) # Check if any user is bound to device if currentUser in NONEs: return noUserBound(device) else: ast = await getGlobalVars() queues = await amiQueues() await setUserDevice(currentUser, None) # Unbind device from current user if ast.QUEDEVSTATE == 'TRUE': # set device states for current user queues await setQueueStates(queues, currentUser, device, 'NOT_INUSE') await setUserHint(currentUser, None, ast) # set hints for current user await setDeviceUser(device, 'none') # Unbind user from device return beenUnbound(currentUser, device) manager.connect() app.run(loop=main_loop, host='0.0.0.0', port=app.config['PORT'])