Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 19, 2024
1 parent 89a12a7 commit aa63bf8
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 15 deletions.
17 changes: 12 additions & 5 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "runner/FlusherRunner.h"
#include "runner/LogProcess.h"
#include "runner/sink/http/HttpSink.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
Expand Down Expand Up @@ -272,6 +272,13 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

const char* deployMode = getenv("DEPLOY_MODE");
const char* enableK8sMeta = getenv("ENABLE_KUBERNETES_META");
if (deployMode != NULL && strlen(deployMode) > 0 && strcmp(deployMode, "singleton") == 0
&& strcmp(enableK8sMeta, "true") == 0) {
LogtailPlugin::GetInstance()->LoadPluginBase();
}

LogProcess::GetInstance()->Start();

time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
Expand Down
10 changes: 5 additions & 5 deletions pkg/helper/k8smeta/k8s_meta_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type metadataHandler struct {
maxLatency atomic.Int64
}

func newMetadataHandler() *metadataHandler {
func newMetadataHandler(metaManager *MetaManager) *metadataHandler {
metadataHandler := &metadataHandler{
metaManager: GetMetaManagerInstance(),
metaManager: metaManager,
}
return metadataHandler
}
Expand All @@ -52,7 +52,7 @@ func (m *metadataHandler) K8sServerRun(stopCh <-chan struct{}) error {
mux := http.NewServeMux()

// TODO: add port in ip endpoint
mux.HandleFunc("/metadata/ip", m.handler(m.handlePodMetaByHostIP))
mux.HandleFunc("/metadata/ip", m.handler(m.handlePodMetaByUniqueID))
mux.HandleFunc("/metadata/containerid", m.handler(m.handlePodMetaByUniqueID))
mux.HandleFunc("/metadata/host", m.handler(m.handlePodMetaByHostIP))
server.Handler = mux
Expand Down Expand Up @@ -82,8 +82,8 @@ func (m *metadataHandler) GetMetrics() map[string]string {
"value.k8s_meta_http_avg_latency": avgLatency,
"value.k8s_meta_http_max_latency": strconv.FormatInt(m.maxLatency.Load(), 10),
}
m.requestCount.Store(-m.requestCount.Load())
m.totalLatency.Store(-m.totalLatency.Load())
m.requestCount.Add(-m.requestCount.Load())
m.totalLatency.Add(-m.totalLatency.Load())
m.maxLatency.Store(0)
return metrics
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/helper/k8smeta/k8s_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func GetMetaManagerInstance() *MetaManager {
metaManager = &MetaManager{
stopCh: make(chan struct{}),
}
metaManager.metadataHandler = newMetadataHandler()
metaManager.metadataHandler = newMetadataHandler(metaManager)
metaManager.cacheMap = make(map[string]MetaCache)
for _, resource := range AllResources {
metaManager.cacheMap[resource] = newK8sMetaCache(metaManager.stopCh, resource)
Expand Down
7 changes: 4 additions & 3 deletions plugins/input/kubernetesmetav2/meta_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,12 @@ func (m *metaCollector) generateEntityClusterLink(entityEvent models.PipelineEve

func (m *metaCollector) genEntityTypeKey(kind string) string {
var prefix string
if kind == "" {
switch {
case kind == "":
prefix = "k8s."
} else if kind == "cluster" && m.serviceK8sMeta.Domain == acsDomain {
case kind == "cluster" && m.serviceK8sMeta.Domain == acsDomain:
prefix = m.serviceK8sMeta.Domain + ".ack."
} else {
default:
prefix = m.serviceK8sMeta.Domain + ".k8s."
}
return fmt.Sprintf("%s%s", prefix, strings.ToLower(kind))
Expand Down
2 changes: 1 addition & 1 deletion plugins/input/kubernetesmetav2/meta_collector_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (m *metaCollector) processPodNodeLink(data *k8smeta.ObjectWrapper, method s
log := &models.Log{}
log.Contents = models.NewLogContents()
m.processEntityLinkCommonPart(log.Contents, obj.Pod.Kind, obj.Pod.Namespace, obj.Pod.Name, obj.Node.Kind, "", obj.Node.Name, method, data.FirstObservedTime, data.LastObservedTime)
log.Contents.Add(entityLinkRelationTypeFieldName, "related_to")
log.Contents.Add(entityLinkRelationTypeFieldName, "runs")
log.Timestamp = uint64(time.Now().Unix())
return []models.PipelineEvent{log}
}
Expand Down

0 comments on commit aa63bf8

Please sign in to comment.