-
Notifications
You must be signed in to change notification settings - Fork 0
/
transcription.py
70 lines (55 loc) · 2.65 KB
/
transcription.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
from datetime import datetime, timedelta
import pandas as pd
import requests
from youtube_transcript_api import YouTubeTranscriptApi
CHANNEL_IDS = ["UCmCylh0ZK5plLdvueo06OYA", "UC2QtjeenJ3KtEli0w4vq5vw", "UCjDsbbzHgTrGc4Ff26TJtsA", "UCe8vRS6vFq5GmAZIj53Iikw"]
API_KEY = 'AIzaSyAReAH9uptDzBcWsEHRgrre558z9mHeono'
def get_transcription(video_id):
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join([t["text"] for t in transcript])
except:
print(f"Did not find transcript for {video_id}")
class Transcript:
def __init__(self, time_frame, channels=CHANNEL_IDS) -> None:
self.time_frame = time_frame
self.video_ids = [vid for channel_id in channels for vid in self._get_videos(channel_id)]
self.video_ids = pd.DataFrame(self.video_ids)
self.video_ids = self._filter_time()
def _get_videos(self, channel_id, num_videos=50, api_key=API_KEY):
base_url = "https://www.googleapis.com/youtube/v3/search"
params = {
'part': 'snippet',
'channelId': channel_id,
'maxResults': num_videos,
'order': 'date', # order by date (latest first)
'type': 'video', # we want only videos, not playlists or channels
'key': api_key
}
response = requests.get(base_url, params=params)
data = response.json()
videos = [{"vid":item['id']['videoId'],
"date":pd.to_datetime(item["snippet"]["publishTime"]).replace(tzinfo=None)}
for item in data['items']]
return videos
def _filter_time(self):
time_limit = datetime.now() - timedelta(days=self.time_frame)
return self.video_ids[self.video_ids["date"] > time_limit]
def get_transcript(self):
"""Returns a dataframe of transcripted videos within the desired time frame for designated channels"""
transcripts = pd.DataFrame([{"text": get_transcription(vid), "vid": vid} for vid in self.video_ids["vid"]])
transcripts = transcripts[transcripts["text"].notna()]
return transcripts
def save_transcripts(self, transcripts_df, filename):
"""Saves the transcripted videos"""
data_dir = "data/"
# if the data directory does not exist, create it
if not os.path.exists(data_dir):
os.makedirs(data_dir)
transcripts_df.to_csv("data/" + filename + ".csv", index=False)
if __name__ == "__main__":
time_frame = 1 # in days
transcrpt = Transcript(time_frame=time_frame)
transcripts = transcrpt.get_transcript()
transcrpt.save_transcripts(transcripts, "transcripts")