-
Notifications
You must be signed in to change notification settings - Fork 2
/
elastic_search.py
89 lines (76 loc) · 3.1 KB
/
elastic_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from typing import Dict, Any, List
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
class ElasticSearch():
def __init__(self, host: str, index_start: str):
self.es_host = host
self.index_start = index_start
self.client = Elasticsearch(host)
def get_client(self):
return self.client
def get_index_doc_count(self, index: str):
es_client = self.get_client()
return es_client.count(index=index).get('count')
def get_docs(self, end_date: str, start_date: str):
print("Extracting Data from ES")
docs = scan(
client=self.get_client(),
index=self.index_start,
scroll="60m",
query= { "query":{
"bool": {
"filter": {
"range": {
"retrieved": {
"gte": start_date,
"lt": end_date,
},
},
},
"must_not": [
{
"match_phrase": {
"text": "Result not found"
}
},
{
"match_phrase": {
"text": "No exact matches found"
}
},
{
"match_phrase": {
"text": "Not found"
}
},
{
"match_phrase": {
"text": "Item Not found"
}
},
],
}
}
})
import json
output_file = "output.jsonl"
with open(output_file, "w") as f:
for doc in docs:
f.write(json.dumps(doc) + "\n")
return output_file
def send_to_new_index(self, docs: List[Dict]) -> Dict[str, Any]:
raise NotImplementedError
# def gendata(docs: List[Dict]):
# for doc in docs:
# yield {
# '_index': self.index_end,
# '_id': doc["url"],
# '_source': doc
# }
# print("Starting dump")
# resp = helpers.bulk(
# self.client,
# gendata(docs),
# index = self.index_end,
# request_timeout=30
# )