Skip to content

Commit

Permalink
Работа с двумя кластерами
Browse files Browse the repository at this point in the history
  • Loading branch information
Boris committed Oct 31, 2024
1 parent 872456c commit 12dfa9c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 22 deletions.
4 changes: 2 additions & 2 deletions front/assets/js/app-search.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ $(document).ready(function(){
var str = "";
for(var k in data) {
name = data[k].Name;
host = data[k].Host;
$('#clusters').append(new Option(name, host,false,false));
ctype = data[k].Type;
$('#clusters').append(new Option(name, ctype,false,false));
}
}
});
Expand Down
12 changes: 9 additions & 3 deletions modules/router/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,21 @@ func (rt *Router) getNodes() ([]singleNode, error) {

}

func (rt *Router) getIndexGroups() ([]indexGroup, error) {
func (rt *Router) getIndexGroups(cluster string) ([]indexGroup, error) {
var igs, igresp []indexGroup

var host string
re := regexp.MustCompile(`^([\w\d\-_]+)-(\d{4}\.\d{2}\.\d{2}(-\d{2})*)`)

// rt.nodes.RLock()
// defer rt.nodes.RUnlock()
t := time.Now()
response, err := rt.doGet(rt.conf.Snapshot.Host+"_cat/indices/*-"+t.Format("2006.01.02")+"*,-.*/?format=json&h=index", "Search")
if cluster == "Snapshot" {
host = rt.conf.Snapshot.Host
} else if cluster == "Search" {
host = rt.conf.Search.Host
}

response, err := rt.doGet(host+"_cat/indices/*-"+t.Format("2006.01.02")+"*,-.*/?format=json&h=index", cluster)
if err != nil {
return nil, err
}
Expand Down
50 changes: 34 additions & 16 deletions modules/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type indexGroup struct {
type Cluster struct {
Name string
Host string
Type string
}

type snapList []struct {
Expand Down Expand Up @@ -558,9 +559,17 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {

}
/* ---- search --- */
case "get_clusters":
{
var cl []Cluster
cl = append(cl, Cluster{rt.conf.Snapshot.Name, rt.conf.Snapshot.Host, "Snapshot"})
cl = append(cl, Cluster{rt.conf.Search.Name, rt.conf.Search.Host, "Search"})
j, _ := json.Marshal(cl)
w.Write(j)
}
case "get_index_groups":
{
response, err := rt.getIndexGroups()
response, err := rt.getIndexGroups(request.Search.Cluster)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(remoteIP, "\t", r.Method, "\t", r.URL.Path, "\t", request.Action, "\t", http.StatusInternalServerError, "\t", err.Error())
Expand All @@ -576,10 +585,15 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
var (
fullm map[string]interface{}
m map[string]interface{}
host string
)

if request.Search.Cluster == "Snapshot" {
host = rt.conf.Snapshot.Host
} else if request.Search.Cluster == "Search" {
host = rt.conf.Search.Host
}
flatMap := make(map[string]string)
response, err := rt.doGet(request.Search.Cluster+request.Search.Index+t.Format("2006.01.02")+"/_mapping", "Search")
response, err := rt.doGet(host+request.Search.Index+t.Format("2006.01.02")+"/_mapping", "Search")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(remoteIP, "\t", r.Method, "\t", r.URL.Path, "\t", request.Action, "\t", http.StatusInternalServerError, "\t", err.Error())
Expand All @@ -605,15 +619,6 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
w.Write(j)
}

case "get_clusters":
{
var cl []Cluster
cl = append(cl, Cluster{rt.conf.Snapshot.Name, rt.conf.Snapshot.Host})
cl = append(cl, Cluster{rt.conf.Search.Name, rt.conf.Search.Host})
j, _ := json.Marshal(cl)
w.Write(j)
}

case "search":
{
var (
Expand All @@ -628,7 +633,13 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
tf string
fields string
req map[string]interface{}
host string
)
if request.Search.Cluster == "Snapshot" {
host = rt.conf.Snapshot.Host
} else if request.Search.Cluster == "Search" {
host = rt.conf.Search.Host
}

ds, _ := time.Parse("2006-01-02 15:04:05 (MST)", request.Search.DateStart+" (MSK)")
de, _ := time.Parse("2006-01-02 15:04:05 (MST)", request.Search.DateEnd+" (MSK)")
Expand Down Expand Up @@ -676,7 +687,7 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {

if request.Search.Count {
_ = json.Unmarshal([]byte("{"+query+"}"), &req)
cresponse, err := rt.doPost(request.Search.Cluster+request.Search.Index+"/_count", req, "Search")
cresponse, err := rt.doPost(host+request.Search.Index+"/_count", req, "Search")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(remoteIP, "\t", r.Method, "\t", r.URL.Path, "\t", request.Action, "\t", http.StatusInternalServerError, "\t", err.Error())
Expand All @@ -685,7 +696,7 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
w.Write(cresponse)
} else {
_ = json.Unmarshal([]byte(full_query), &req)
sresponse, err := rt.doPost(request.Search.Cluster+request.Search.Index+"/_search", req, "Search")
sresponse, err := rt.doPost(host+request.Search.Index+"/_search", req, "Search")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(remoteIP, "\t", r.Method, "\t", r.URL.Path, "\t", request.Action, "\t", http.StatusInternalServerError, "\t", err.Error())
Expand Down Expand Up @@ -713,8 +724,15 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
req map[string]interface{}
scrollresponse scrollResponse
fields_list []string
host string
)

if request.Search.Cluster == "Snapshot" {
host = rt.conf.Snapshot.Host
} else if request.Search.Cluster == "Search" {
host = rt.conf.Search.Host
}

ds, _ := time.Parse("2006-01-02 15:04:05 (MST)", request.Search.DateStart+" (MSK)")
de, _ := time.Parse("2006-01-02 15:04:05 (MST)", request.Search.DateEnd+" (MSK)")

Expand Down Expand Up @@ -767,7 +785,7 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
log.Println(remoteIP, "\t", r.Method, "\t", r.URL.Path, "\t", request.Action, "\t", http.StatusInternalServerError, "\t", err.Error())
return
}
sresponse, err := rt.doPost(request.Search.Cluster+request.Search.Index+"/_search?scroll=10m", req, "Search")
sresponse, err := rt.doPost(host+request.Search.Index+"/_search?scroll=10m", req, "Search")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(remoteIP, "\t", r.Method, "\t", r.URL.Path, "\t", request.Action, "\t", http.StatusInternalServerError, "\t", err.Error())
Expand Down Expand Up @@ -843,7 +861,7 @@ func (rt *Router) ApiHandler(w http.ResponseWriter, r *http.Request) {
if scrollresponse.ScrollID != "" {
scroll := map[string]interface{}{"scroll": "10m", "scroll_id": scrollresponse.ScrollID}
for i := 0; i < rt.conf.Search.FileLimit.Rows/10000; i++ {
sresponse, err := rt.doPost(request.Search.Cluster+"_search/scroll", scroll, "Search")
sresponse, err := rt.doPost(host+"_search/scroll", scroll, "Search")
if err != nil {
log.Println("Failed to get scroll batch: ", err)
return
Expand Down
2 changes: 1 addition & 1 deletion modules/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@

package version

var Version = "extractor/v0.2.6"
var Version = "extractor/v0.2.7"

0 comments on commit 12dfa9c

Please sign in to comment.