-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmongodb.py
37 lines (31 loc) · 1.21 KB
/
mongodb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import datetime
import os
from pymongo.mongo_client import MongoClient
from dotenv import load_dotenv
load_dotenv()
class MongoDB:
def __init__(self):
mongo_user = os.getenv("MONGO_USER")
mongo_pass = os.getenv("MONGO_PASSWORD")
self.collection = os.getenv("COLLECTION_NAME")
mongo_hostname = os.getenv("MONGO_HOSTNAME")
uri = f"mongodb+srv://{mongo_user}:{mongo_pass}@{mongo_hostname}/?retryWrites=true&w=majority"
# Create a new client and connect to the server
self.client = MongoClient(uri)
# Send a ping to confirm a successful connection
try:
self.client.admin.command('ping')
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
print(e)
def insert_wikipedia_text(self, title: str, text: str):
eig_db = self.client.get_database("eig")
col = eig_db.get_collection(self.collection)
col.insert_one({
"title": title,
"content": text,
"date": datetime.datetime.now()
})
if __name__ == "__main__":
m = MongoDB()
m.insert_wikipedia_text('prueba 2', 'hola, esto es otra prueba con otro esquema!')