-
Notifications
You must be signed in to change notification settings - Fork 3
/
run_flask.py
66 lines (55 loc) · 2.16 KB
/
run_flask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""
Script creating and running flask instance which can start/stop telegram bot
Miha Lotric, Dec 2019
"""
from flask import Flask, request
from cool_defi_bot import telegram_bot
from cool_defi_bot import config
from dotenv import load_dotenv
import requests
import os
load_dotenv() # Load keys from .env file
SLACK_KEY = os.getenv("SLACK_KEY") # Slack key to send developers the errors and exceptions
BOT_TOKEN = os.getenv('BOT_TOKEN') # Telegram bot token
app = Flask(__name__)
# Add private variables to app
app._bot_is_live = False
app._bot = telegram_bot.get_bot() # Telegram bot instance
app._token = BOT_TOKEN
def notify_slack(msg, chat='dev-telegram-bot'):
"""Notify Slack when bot is turned off or on."""
params = {'token': SLACK_KEY,
'channel': chat,
'text': msg,
'icon_emoji': ':blocky-cool:',
'username': 'Cool Defi Bot',
'pretty': 1}
url = config.URLS['slack_api']
requests.get(url, params=params)
@app.route('/start', methods=['POST'])
def start():
"""If no bot is already running start it."""
method = request.args.get('method', 'Unknown') # Local/Staging/Production
run_type = 'Auto' if request.args.get('auto') == 'true' else 'Manual'
if not app._bot_is_live:
app._bot_is_live = True
# Starts the listening
app._bot.start_polling()
notify_slack(f'{run_type} {method} bot started as {str(app._bot).lstrip("<telegram.ext.updater.Updater object at ").rstrip(">")}')
return 'Bot started'
else:
return 'Already live'
@app.route('/stop', methods=['POST'])
def stop():
"""If bot is running stop it."""
method = request.args.get('method', 'Unknown') # Local/Staging/Production
run_type = 'Auto' if request.args.get('auto') == 'true' else 'Manual'
if app._bot_is_live:
app._bot.stop()
app._bot_is_live = False
notify_slack(f'{run_type} {method} bot stopped as {str(app._bot).lstrip("<telegram.ext.updater.Updater object at ").rstrip(">")}')
return 'Bot stopped'
else:
return 'Already down'
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8080, debug=True)