Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add create_audiobook function #50

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 249 additions & 2 deletions Pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@
from ezlocalai.STT import STT
from ezlocalai.CTTS import CTTS
from ezlocalai.Embedding import Embedding
from ezlocalai.Helpers import chunk_content_by_tokens
from pydub import AudioSegment
from datetime import datetime
from Globals import getenv
from pyngrok import ngrok
import requests
import base64
import pdfplumber
import torch
from Globals import getenv
import zipfile
import docx2txt
import pandas as pd
import random
import json
import io

try:
from ezlocalai.IMG import IMG
Expand All @@ -22,6 +30,61 @@
from ezlocalai.VLM import VLM


async def file_to_text(file_path: str = ""):
"""
Learn from a file

Args:
file_path (str, optional): Path to the file. Defaults to "".

Returns:
str: Response from the agent
"""
file_content = ""
file_name = os.path.basename(file_path)
logging.info(f"File path: {file_path}")
file_type = file_name.split(".")[-1]
if file_type == "pdf":
with pdfplumber.open(file_path) as pdf:
content = "\n".join([page.extract_text() for page in pdf.pages])
file_content += content
elif file_path.endswith(".zip"):
extracted_zip_folder_name = f"extracted_{file_name.replace('.zip', '_zip')}"
new_folder = os.path.join(os.path.dirname(file_path), extracted_zip_folder_name)
file_content += f"Content from the zip file uploaded named `{file_name}`:\n"
with zipfile.ZipFile(file_path, "r") as zipObj:
zipObj.extractall(path=new_folder)
# Iterate over every file that was extracted including subdirectories
for root, dirs, files in os.walk(new_folder):
for name in files:
file_content += f"Content from file uploaded named `{name}`:\n"
file_content += await file_to_text(file_path=os.path.join(root, name))
return file_content
elif file_path.endswith(".doc") or file_path.endswith(".docx"):
file_content = docx2txt.process(file_path)
elif file_type == "csv":
with open(file_path, "r") as f:
file_content = f.read()
elif file_type == "xlsx" or file_type == "xls":
xl = pd.ExcelFile(file_path)
if len(xl.sheet_names) > 1:
sheet_count = len(xl.sheet_names)
for i, sheet_name in enumerate(xl.sheet_names, 1):
df = xl.parse(sheet_name)
csv_file_path = file_path.replace(f".{file_type}", f"_{i}.csv")
df.to_csv(csv_file_path, index=False)
else:
df = pd.read_excel(file_path)
csv_file_path = file_path.replace(f".{file_type}", ".csv")
df.to_csv(csv_file_path, index=False)
with open(csv_file_path, "r") as f:
file_content = f.read()
else:
with open(file_path, "r") as f:
file_content = f.read()
return file_content


class Pipes:
def __init__(self):
load_dotenv()
Expand Down Expand Up @@ -301,3 +364,187 @@ async def get_response(self, data, completion_type="chat"):
else:
response["choices"][0]["message"]["content"] += f"\n\n{generated_image}"
return response, audio_response

async def create_audiobook(
self,
content,
voice,
language="en",
):
string_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_file_name = f"audiobook_{string_timestamp}"
# Step 1: Chunk the book content into paragraphs
paragraphs = chunk_content_by_tokens(content)

# Step 2: Extract characters, their lines, genders, and maintain order
characters = {}
ordered_content = []

def find_similar_character(name):
# Check for exact match first
if name in characters:
return name
# Check for case-insensitive match
lower_name = name.lower()
for char in characters:
if char.lower() == lower_name:
return char
# Check for partial matches (e.g., "Mr. Smith" vs "Smith")
for char in characters:
if name in char or char in name:
return char
return None

for paragraph in paragraphs:
# Inject a list of characters we know so far.
prompt = f"""## Characters we know so far:
{json.dumps(characters, indent=4)}

## Paragraph
{paragraph}

## System
Analyze the text in the paragraph and extract:
1. All character names and their genders (male, female, or unknown.) Use best judgement based on hisortical uses of a name to determine gender. Attempt to normalize character names to match existing characters if possible.
2. Lines spoken by each character
3. Narrator lines (not spoken by any character)

Provide the result in JSON format:
{{
"characters": [
{{"name": "character1", "gender": "male/female/unknown"}},
{{"name": "character2", "gender": "male/female/unknown"}},
...
],
"content": [
{{"type": "narrator", "text": "narrator line"}},
{{"type": "character", "name": "character1", "text": "character1 line"}},
{{"type": "narrator", "text": "narrator line"}},
{{"type": "character", "name": "character2", "text": "character2 line"}},
...
]
}}
Ensure the content array preserves the original order of narration and dialogue."""

response = await self.llm.completion(prompt=prompt)
result_text = response["choices"][0]["text"]

# Strip out code block markers if present
if "```json" in result_text:
result_text = result_text.split("```json")[1].split("```")[0]
elif "```" in result_text:
result_text = result_text.split("```")[1].split("```")[0]

try:
result = json.loads(result_text)
for char in result.get("characters", []):
similar_char = find_similar_character(char["name"])
if similar_char:
# Use the existing character name
char["name"] = similar_char
else:
# Add new character
characters[char["name"]] = char["gender"]

# Update content with potentially merged character names
for item in result.get("content", []):
if item["type"] == "character":
similar_char = find_similar_character(item["name"])
if similar_char:
item["name"] = similar_char
ordered_content.append(item)

except json.JSONDecodeError:
logging.error(f"Failed to parse JSON from LLM response: {result_text}")
continue

# Step 3: Translate the content if necessary
if language != "en":
translated_content = []
for item in ordered_content:
translation_prompt = f"""## Original text:{item['text']}\n\n## System\nTranslate the original text to {language}.\nReturn only the translated text without any additional commentary."""
translation_response = await self.llm.completion(
prompt=translation_prompt
)
translated_text = translation_response["choices"][0]["text"].strip()
translated_item = item.copy()
translated_item["text"] = translated_text
translated_content.append(translated_item)
ordered_content = translated_content

# Step 4: Assign voices to characters based on gender
character_voices = {}
male_voices = [f"male-{i}" for i in range(1, 101)]
female_voices = [f"female-{i}" for i in range(1, 101)]
unknown_voices = male_voices + female_voices
random.shuffle(male_voices)
random.shuffle(female_voices)
random.shuffle(unknown_voices)

for character, gender in characters.items():
if gender == "male" and male_voices:
character_voices[character] = male_voices.pop()
elif gender == "female" and female_voices:
character_voices[character] = female_voices.pop()
elif unknown_voices:
character_voices[character] = unknown_voices.pop()
else:
logging.warning(
f"Ran out of voices. Reusing voices for character: {character}"
)
character_voices[character] = random.choice(male_voices + female_voices)

# Step 5: Generate audio for each item in ordered_content
audio_segments = []
text_output = []

for item in ordered_content:
if item["type"] == "narrator":
try:
audio = await self.ctts.generate(
text=item["text"], voice=voice, language=language
)
audio_segments.append(base64.b64decode(audio))
text_output.append(f"Narrator: {item['text']}")
except Exception as e:
logging.error(
f"Failed to generate audio for narrator text: {item['text'][:50]}... Error: {str(e)}"
)
elif item["type"] == "character":
character_voice = character_voices.get(item["name"], voice)
try:
audio = await self.ctts.generate(
text=item["text"], voice=character_voice, language=language
)
audio_segments.append(base64.b64decode(audio))
text_output.append(f"{item['name']}: {item['text']}")
except Exception as e:
logging.error(
f"Failed to generate audio for character {item['name']}: {item['text'][:50]}... Error: {str(e)}"
)

# Step 6: Combine all audio segments
combined_audio = AudioSegment.empty()
for audio_data in audio_segments:
try:
audio = AudioSegment.from_wav(io.BytesIO(audio_data))
combined_audio += audio
combined_audio += AudioSegment.silent(
duration=500
) # 0.5 second pause between segments
except Exception as e:
logging.error(f"Failed to process audio segment. Error: {str(e)}")
outputs = os.path.join(os.getcwd(), "outputs")
# Step 7: Export the final audiobook
audio_output_path = os.path.join(outputs, f"{output_file_name}.mp3")
combined_audio.export(audio_output_path, format="mp3")

# Step 8: Save the text output
text_output_path = os.path.join(outputs, f"{output_file_name}.txt")
with open(text_output_path, "w", encoding="utf-8") as f:
f.write("\n\n".join(text_output))
return {
"audio_file": f"{self.local_uri}/outputs/{output_file_name}.mp3",
"text_file": f"{self.local_uri}/outputs/{output_file_name}.txt",
"character_voices": character_voices,
}
37 changes: 36 additions & 1 deletion app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Dict, Union, Optional
from Pipes import Pipes
from Pipes import Pipes, file_to_text
import base64
import os
import logging
Expand Down Expand Up @@ -376,6 +376,41 @@ async def upload_voice(
return {"detail": f"Voice {voice_name} has been uploaded."}


class BookToSpeech(BaseModel):
voice: Optional[str] = "default"
language: Optional[str] = "en"


class BookToSpeechResponse(BaseModel):
audio_file: str
text_file: str


@app.post(
"/v1/audio/book",
tags=["Audio"],
dependencies=[Depends(verify_api_key)],
)
async def book_to_speech(
book: BookToSpeech,
file: UploadFile = File(...),
user=Depends(verify_api_key),
):
if getenv("TTS_ENABLED").lower() == "false":
raise HTTPException(status_code=404, detail="Text to speech is disabled.")
file_type = file.filename.split(".")[-1]
file_path = os.path.join(os.getcwd(), "outputs", f"{uuid.uuid4().hex}.{file_type}")
with open(file_path, "wb") as audio_file:
audio_file.write(await file.read())
file_content = await file_to_text(file_path=file_path)
audiobook = await pipe.create_audiobook(
voice=book.voice, language=book.language, content=file_content
)
return BookToSpeechResponse(
audio_file=audiobook["audio_file"], text_file=audiobook["text_file"]
)


# Image Generation endpoint
# https://platform.openai.com/docs/api-reference/images

Expand Down
4 changes: 3 additions & 1 deletion cuda-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ onnx
timm>=0.9.16
sentencepiece
attrdict
einops
einops
docx2txt
pandas
59 changes: 59 additions & 0 deletions ezlocalai/Helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,62 @@ def chunk_content(text: str) -> List[str]:
sentences = list(doc.sents)
content_chunks = [str(sentence).strip() for sentence in sentences]
return content_chunks


# Export chunks of paragraphs up to 2000 tokens
def chunk_content_by_tokens(text: str, max_tokens: int = 2000) -> List[str]:
# Load spaCy model
try:
nlp = spacy.load("en_core_web_sm")
except:
spacy.cli.download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")

encoding = tiktoken.get_encoding("cl100k_base")
paragraphs = text.split("\n\n")
chunks = []
current_chunk = []
current_chunk_tokens = 0

def add_to_chunk(content: str):
nonlocal current_chunk, current_chunk_tokens, chunks
content_tokens = encoding.encode(content)
if current_chunk_tokens + len(content_tokens) > max_tokens:
chunks.append("\n\n".join(current_chunk))
current_chunk = []
current_chunk_tokens = 0
current_chunk.append(content)
current_chunk_tokens += len(content_tokens)

for paragraph in paragraphs:
paragraph_tokens = encoding.encode(paragraph)
if len(paragraph_tokens) <= max_tokens:
add_to_chunk(paragraph)
else:
# Split long paragraph into sentences using spaCy
doc = nlp(paragraph)
sentences = [sent.text for sent in doc.sents]
current_sentence_group = []
current_group_tokens = 0

for sentence in sentences:
sentence_tokens = encoding.encode(sentence)
if current_group_tokens + len(sentence_tokens) <= max_tokens:
current_sentence_group.append(sentence)
current_group_tokens += len(sentence_tokens)
else:
# Add the current group of sentences as a chunk
if current_sentence_group:
add_to_chunk(" ".join(current_sentence_group))
current_sentence_group = [sentence]
current_group_tokens = len(sentence_tokens)

# Add any remaining sentences
if current_sentence_group:
add_to_chunk(" ".join(current_sentence_group))

# Add the last chunk if it's not empty
if current_chunk:
chunks.append("\n\n".join(current_chunk))

return chunks
Loading
Loading