-
Notifications
You must be signed in to change notification settings - Fork 0
/
dependencies.py
176 lines (140 loc) · 5.69 KB
/
dependencies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import datetime
import os
import re
import pymongo
import streamlit as st
import streamlit_authenticator as stauth
from dotenv import load_dotenv
from pymongo.server_api import ServerApi
from langchain_helper import create_vector_db
from langchain_helper import create_vector_db
import pandas as pd
load_dotenv()
client = pymongo.MongoClient(os.getenv("MONGO_URL"), server_api=ServerApi("1"))
db = client.get_database("main")["llmUsers"]
# AUTHENTICATION
def insert_user(email, username, password):
date_joined = str(datetime.datetime.now())
return db.insert_one(
{
"email": email,
"username": username,
"password": password,
"date_joined": date_joined,
"projects": {},
}
)
def fetch_users():
users = [user for user in db.find()]
return list(users)
def get_user_emails():
emails = [user["email"] for user in db.find()]
return emails
def get_usernames():
usernames = [user["username"] for user in db.find()]
return usernames
def validate_email(email):
pattern = "^[a-zA-Z0-9-_]+@[a-zA-Z0-9]+.[a-z]{1,3}$" # [email protected]
if re.match(pattern, email):
return True
return False
def validate_username(username):
pattern = "^[a-zA-Z0-9_]*$"
if re.match(pattern, username):
return True
return False
def sign_up():
with st.form(key="signup", clear_on_submit=True):
st.subheader(":green[Sign Up]")
email = st.text_input(":blue[Email]", placeholder="Enter Your Email")
username = st.text_input(":blue[Username]", placeholder="Enter Your Username")
password1 = st.text_input(
":blue[Password]", placeholder="Enter Your Password", type="password"
)
password2 = st.text_input(
":blue[Confirm Password]",
placeholder="Confirm Your Password",
type="password",
)
if email:
if validate_email(email):
if email not in get_user_emails():
if validate_username(username):
if username not in get_usernames():
if len(username) >= 2:
if len(password1) >= 6:
if password1 == password2:
# Add User to DB
hashed_password = stauth.Hasher(
[password2]
).generate()
insert_user(
email, username.lower(), hashed_password[0]
)
insert_user(
email, username.lower(), hashed_password[0]
)
st.success("Account created successfully!!")
st.balloons()
else:
st.warning("Passwords Do Not Match")
else:
st.warning("Password is too Short")
else:
st.warning("Username Too short")
else:
st.warning("Username Already Exists")
else:
st.warning("Invalid Username")
else:
st.warning("Email Already exists!!")
else:
st.warning("Invalid Email")
st.form_submit_button("Sign Up")
# PROJECTS
def get_projects(email):
projects_db = [user["projects"] for user in db.find({"email": email})][0]
projects = list(projects_db)
return projects_db
def insert_project(email, project_name, project_description):
prev_projects = [user["projects"] for user in db.find({"email": email})][0]
new_project = {project_name: {"project_desc": project_description, "files": {}}}
new_project.update(prev_projects)
return db.update_one({"email": email}, {"$set": {"projects": new_project}})
def get_files(email, project_name):
projects_db = [user["projects"] for user in db.find({"email": email})][0]
if project_name in projects_db:
files_db = projects_db[project_name]["files"]
files = list(files_db.keys())
return files_db
else:
return "Project not found"
# def insert_file(email, project_name, file_name, file):
# file_name = file_name.replace(".", ",")
def insert_file(email, project_name, file_name_original, file):
file_name = file_name_original.replace(".", ",")
if file is not None and file.name.endswith(".csv"):
try:
df = pd.read_csv(file, encoding="latin-1")
data = df.to_dict("records")
# Saving to MongoDB
db.update_one(
{"email": email},
{
"$set": {
f"projects.{project_name}.files.{file_name}": data,
"date_modified": str(datetime.datetime.now()),
}
},
)
# Saving locally
directory = os.path.join("data", project_name)
if not os.path.exists(directory):
os.makedirs(directory)
file_path = os.path.join(directory, file_name_original)
df.to_csv(file_path, index=False)
create_vector_db(project_name, file_name_original)
except Exception as e:
st.error(f"Error during upload: {str(e)}")
else:
st.error("File is either None or not a CSV file.")