This repository has been archived by the owner on Apr 12, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmyRequests.py
126 lines (110 loc) · 4.02 KB
/
myRequests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import requests
from utitls import configJson, myLogger
import traceback
def subscribe(callbackURL, channel_id):
_requsetBase(callbackURL, channel_id, 'subscribe')
def unsubscribe(callbackURL, channel_id):
_requsetBase(callbackURL, channel_id, 'unsubscribe')
def _requsetBase(callbackURL, channel_id, mode):
response = _basePost(
'https://pubsubhubbub.appspot.com/subscribe',
{
'hub.callback': callbackURL,
'hub.topic': 'https://www.youtube.com/xml/feeds/videos.xml?channel_id=' + channel_id,
'hub.verify': 'sync',
'hub.mode': mode,
'hub.verify_token': '',
'hub.secret': configJson().get('subSecert', ''),
'hub.lease_seconds': ''
}
)
return _baseRequestProcess(response)
g_key = 'AIzaSyBQQK9THRp1OzsGtbcIdgmmAn3MCP77G10' #youtube API key, it can call 1000k/3 times, it can be public. 1 call cost 3.
def getYoutubeLiveStreamInfo(vidoeID):
global g_key
resJson = _baseGet('https://www.googleapis.com/youtube/v3/videos?id={}&part=liveStreamingDetails,snippet&key={}'.format(vidoeID, g_key))
if resJson:
items = resJson.get('items', [])
if len(items) > 0:
if items[0].get('id'):
return items[0]
return None
else:
return None
else:
return None
def getYoutubeLiveVideoInfoFromChannelID(channelID, eventType='live'):
global g_key
resJson = _baseGet('https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={}&eventType={}&type=video&key={}'.format(channelID, eventType, g_key))
if resJson:
items = resJson.get('items', [])
if len(items) > 0:
item = items[0]
videoId = item.get('id', {}).get('videoId')
if videoId:
item = getYoutubeLiveStreamInfo(videoId)
return item
return None
else:
return None
else:
return None
def getUpcomingLiveVideos(channelID):
global g_key
resJson = _baseGet('https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={}&eventType=upcoming&type=video&key={}'.format(channelID, g_key))
if resJson:
items = resJson.get('items', [])
if len(items) > 0:
ret_videos_id = []
for v in items:
videoId = v.get('id', {}).get('videoId')
if videoId:
ret_videos_id.append(videoId)
return ret_videos_id
else:
return []
else:
return []
def isTwitcastingLiving(id):
res = _baseGet('http://api.twitcasting.tv/api/livestatus?user=' + id)
if res == None:
return False
if '"islive":true' in res.text:
return True
return False
def _baseGet(url):
try:
myLogger("GET URL:%s--" % url)
response = requests.get(url, timeout=30)
except Exception as e:
myLogger(str(e))
myLogger(traceback.format_exc())
return None
return _baseRequestProcess(response)
def _basePost(url, data):
try:
myLogger("POST URL:%s--" % url)
myLogger("DATA :%s--" % data)
response = requests.post(url, data=data, timeout=30)
except Exception as e:
myLogger(str(e))
myLogger(traceback.format_exc())
return None
return _baseRequestProcess(response)
def _baseRequestProcess(response):
if response == None:
return None
try:
myLogger("Request URL:%s-----------------" % response.request.url)
myLogger("Method:%s, Code:%s,\n text:%s" % (response.request.method, str(response.status_code), response.text))
if response.status_code == 200:
try:
return response.json()
except Exception:
return None
else:
return None
except Exception as e:
myLogger("request Exception:%s" % str(e))
myLogger(traceback.format_exc())
return None