From 534ac623dec9a81e221244539a8739e06646c85e Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Tue, 22 Nov 2022 10:16:59 +0200 Subject: [PATCH] chore: close http responses in a way to allow the Transport to re-use the TCP connection --- cmd/devtool/commands/event.go | 69 ++++++++++--------- config/backend-config/namespace_config.go | 4 +- config/backend-config/single_workspace.go | 4 +- enterprise/reporting/reporting.go | 3 +- enterprise/suppress-user/syncer.go | 8 +-- gateway/gateway.go | 2 +- gateway/integration_test.go | 5 +- gateway/webhook/webhookTransformer.go | 3 +- integration_test/docker_test/docker_test.go | 5 +- .../multi_tentant_test/multi_tenant_test.go | 5 +- processor/processor.go | 3 +- processor/transformer/transformer.go | 5 +- regulation-worker/cmd/main_test.go | 3 +- regulation-worker/internal/client/client.go | 10 +-- regulation-worker/internal/delete/api/api.go | 3 +- router/batchrouter/batchrouter.go | 4 +- router/eventorder_test.go | 3 +- router/network.go | 3 +- router/router_dest_isolation_test.go | 3 +- router/transformer/transformer.go | 5 +- services/alert/pagerduty.go | 3 +- services/alert/victorops.go | 3 +- services/controlplane/features/client.go | 2 +- services/debugger/uploader.go | 3 +- .../destination_connection_tester.go | 3 +- services/filemanager/fileManager_test.go | 3 +- services/oauth/oauth.go | 7 +- testhelper/destination/minio.go | 3 +- testhelper/destination/transformer.go | 3 +- testhelper/health/checker.go | 4 +- utils/httputil/client.go | 16 ++++- utils/httputil/client_test.go | 35 ++++++++++ utils/misc/misc.go | 7 +- warehouse/testhelper/events.go | 7 +- warehouse/testhelper/setup.go | 3 +- warehouse/utils/utils.go | 5 +- 36 files changed, 163 insertions(+), 94 deletions(-) diff --git a/cmd/devtool/commands/event.go b/cmd/devtool/commands/event.go index 2fba89c14c..992fc3bbeb 100644 --- a/cmd/devtool/commands/event.go +++ b/cmd/devtool/commands/event.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/urfave/cli/v2" ) @@ -64,41 +65,43 @@ func EventSend(c *cli.Context) error { } for i := 0; i < c.Int("count"); i++ { - anonymousId := uuid.New().String() - - buf := bytes.NewBuffer(nil) - err = t.Execute(buf, map[string]string{ - "AnonymousId": anonymousId, - "Timestamp": time.Now().Format(time.RFC3339), - }) - if err != nil { - return err - } - - req, err := http.NewRequestWithContext(c.Context, "POST", url, buf) - if err != nil { + if err := func() error { + anonymousId := uuid.New().String() + buf := bytes.NewBuffer(nil) + err = t.Execute(buf, map[string]string{ + "AnonymousId": anonymousId, + "Timestamp": time.Now().Format(time.RFC3339), + }) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(c.Context, "POST", url, buf) + if err != nil { + return err + } + + req.SetBasicAuth(c.String("write-key"), "") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("AnonymousId", anonymousId) + + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { httputil.CloseResponse(resp) }() + b, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + fmt.Printf("%s\n%s\n", resp.Status, b) + return fmt.Errorf("status code: %d", resp.StatusCode) + } + return nil + }(); err != nil { return err } - - req.SetBasicAuth(c.String("write-key"), "") - req.Header.Set("Content-Type", "application/json") - req.Header.Set("AnonymousId", anonymousId) - - resp, err := client.Do(req) - if err != nil { - return err - } - b, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - fmt.Printf("%s\n%s\n", resp.Status, b) - - return fmt.Errorf("status code: %d", resp.StatusCode) - } } return nil diff --git a/config/backend-config/namespace_config.go b/config/backend-config/namespace_config.go index 3dbb9863c4..1cf27f4728 100644 --- a/config/backend-config/namespace_config.go +++ b/config/backend-config/namespace_config.go @@ -14,6 +14,7 @@ import ( "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/controlplane/identity" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -138,13 +139,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b return nil, err } + defer func() { httputil.CloseResponse(resp) }() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, err } - defer func() { _ = resp.Body.Close() }() - if resp.StatusCode >= 300 { return nil, getNotOKError(respBody, resp.StatusCode) } diff --git a/config/backend-config/single_workspace.go b/config/backend-config/single_workspace.go index 6b5a7d9256..d81ccb7e61 100644 --- a/config/backend-config/single_workspace.go +++ b/config/backend-config/single_workspace.go @@ -14,6 +14,7 @@ import ( "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/controlplane/identity" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -150,13 +151,12 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string return nil, err } + defer func() { httputil.CloseResponse(resp) }() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, err } - defer func() { _ = resp.Body.Close() }() - if resp.StatusCode >= 300 { return nil, getNotOKError(respBody, resp.StatusCode) } diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index ae943785a7..c8e2404e5b 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -19,6 +19,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types" @@ -413,7 +414,7 @@ func (handle *HandleT) sendMetric(ctx context.Context, netClient *http.Client, c httpStatTags["status"] = strconv.Itoa(resp.StatusCode) stats.Default.NewTaggedStat(STAT_REPORTING_HTTP_REQ, stats.CountType, httpStatTags).Count(1) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() respBody, err := io.ReadAll(resp.Body) if err != nil { handle.log.Error(err.Error()) diff --git a/enterprise/suppress-user/syncer.go b/enterprise/suppress-user/syncer.go index 4de681ac9b..3f65bc11e8 100644 --- a/enterprise/suppress-user/syncer.go +++ b/enterprise/suppress-user/syncer.go @@ -15,6 +15,7 @@ import ( "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" "github.com/rudderlabs/rudder-server/services/controlplane/identity" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types/deployment" @@ -175,12 +176,7 @@ func (s *Syncer) sync(token []byte) ([]model.Suppression, []byte, error) { if err != nil { return err } - defer func() { - err := resp.Body.Close() - if err != nil { - s.log.Error(err) - } - }() + defer func() { httputil.CloseResponse(resp) }() // If statusCode is not 2xx, then returning empty regulations if resp.StatusCode < 200 || resp.StatusCode >= 300 { diff --git a/gateway/gateway.go b/gateway/gateway.go index a668bc784d..0d50171f44 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -1004,7 +1004,7 @@ func (gateway *HandleT) getWarehousePending(payload []byte) bool { return false } - defer resp.Body.Close() + defer func() { rs_httputil.CloseResponse(resp) }() var whPendingResponse warehouseutils.PendingEventsResponseT respData, err := io.ReadAll(resp.Body) diff --git a/gateway/integration_test.go b/gateway/integration_test.go index 2b7abb9b8c..44504fcde3 100644 --- a/gateway/integration_test.go +++ b/gateway/integration_test.go @@ -27,6 +27,7 @@ import ( "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/rand" whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook" + "github.com/rudderlabs/rudder-server/utils/httputil" ) func TestGatewayIntegration(t *testing.T) { @@ -183,7 +184,7 @@ func testGatewayByAppType(t *testing.T, appType string) { resp, err := http.Get(healthEndpoint) require.ErrorContains(t, err, "connection refused") require.Nil(t, resp) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() // Checking now that the configuration has been processed and the server can start t.Log("Checking health endpoint at", healthEndpoint) @@ -316,7 +317,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr res, err := httpClient.Do(req) require.NoError(t, err) - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() body, err := io.ReadAll(res.Body) require.NoError(t, err) diff --git a/gateway/webhook/webhookTransformer.go b/gateway/webhook/webhookTransformer.go index 34d421c999..1b89ab07be 100644 --- a/gateway/webhook/webhookTransformer.go +++ b/gateway/webhook/webhookTransformer.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/rudderlabs/rudder-server/gateway/response" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -57,7 +58,7 @@ func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceType string } respBody, err := io.ReadAll(resp.Body) - _ = resp.Body.Close() + func() { httputil.CloseResponse(resp) }() if err != nil { bt.stats.failedStat.Count(len(events)) diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index 08895edca8..6872d2a3f7 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -40,6 +40,7 @@ import ( "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/rand" whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/types/deployment" ) @@ -655,7 +656,7 @@ func getEvent(url, method string) (string, error) { if err != nil { return "", err } - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() body, err := io.ReadAll(res.Body) if err != nil { @@ -719,7 +720,7 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string) t.Logf("sendEvent error: %v", err) return } - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() body, err := io.ReadAll(res.Body) if err != nil { diff --git a/integration_test/multi_tentant_test/multi_tenant_test.go b/integration_test/multi_tentant_test/multi_tenant_test.go index 13df85fe56..e7d656f08a 100644 --- a/integration_test/multi_tentant_test/multi_tenant_test.go +++ b/integration_test/multi_tentant_test/multi_tenant_test.go @@ -31,6 +31,7 @@ import ( "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/rand" whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/types/deployment" ) @@ -215,7 +216,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) { require.ErrorContains(t, err, "connection refused") require.Nil(t, resp) if err == nil { - defer func() { _ = resp.Body.Close() }() + defer func() { httputil.CloseResponse(resp) }() } // Pushing valid configuration via ETCD @@ -415,7 +416,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr res, err := httpClient.Do(req) require.NoError(t, err) - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() body, err := io.ReadAll(res.Body) require.NoError(t, err) diff --git a/processor/processor.go b/processor/processor.go index 76c9d21bd2..6acc7ef17e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -41,6 +41,7 @@ import ( "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/services/transientsource" "github.com/rudderlabs/rudder-server/utils/bytesize" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types" @@ -562,7 +563,7 @@ func (proc *HandleT) makeFeaturesFetchCall() bool { return true } - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() body, err := io.ReadAll(res.Body) if err != nil { return true diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index f3b280cbf3..ff9476b3ea 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -20,6 +20,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" "github.com/rudderlabs/rudder-server/processor/integrations" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -178,7 +179,7 @@ func GetVersion() (transformerBuildVersion string) { transformerBuildVersion = fmt.Sprintf("No response from transformer. %s", transformerBuildVersion) return } - defer func() { _ = resp.Body.Close() }() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode == http.StatusOK { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { @@ -387,8 +388,8 @@ func (trans *HandleT) doPost(ctx context.Context, rawJSON []byte, url string, ta if reqErr != nil { return reqErr } + defer func() { httputil.CloseResponse(resp) }() respData, reqErr = io.ReadAll(resp.Body) - _ = resp.Body.Close() return reqErr }, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry)), diff --git a/regulation-worker/cmd/main_test.go b/regulation-worker/cmd/main_test.go index dadf30ceb7..a645e55f6d 100644 --- a/regulation-worker/cmd/main_test.go +++ b/regulation-worker/cmd/main_test.go @@ -29,6 +29,7 @@ import ( "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/services/kvstoremanager" + "github.com/rudderlabs/rudder-server/utils/httputil" ) var ( @@ -127,7 +128,7 @@ func TestFlow(t *testing.T) { if err != nil { return err } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("status code not OK") } diff --git a/regulation-worker/internal/client/client.go b/regulation-worker/internal/client/client.go index 4a6d11375e..bfa09b5d47 100644 --- a/regulation-worker/internal/client/client.go +++ b/regulation-worker/internal/client/client.go @@ -14,6 +14,7 @@ import ( "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" ) @@ -51,12 +52,7 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) { if err != nil { return model.Job{}, err } - defer func() { - err := resp.Body.Close() - if err != nil { - pkgLogger.Errorf("error while closing response body: %v", err) - } - }() + defer func() { httputil.CloseResponse(resp) }() pkgLogger.Debugf("obtained response code: %v", resp.StatusCode, "response body: ", resp.Body) // if successful @@ -140,7 +136,7 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID if err != nil { return err } - defer func() { _ = resp.Body.Close() }() + defer func() { httputil.CloseResponse(resp) }() pkgLogger.Debugf("response code: %v", resp.StatusCode, "response body: %v", resp.Body) if resp.StatusCode >= 200 && resp.StatusCode < 300 { diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index c78f45afc9..268c70f953 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -18,6 +18,7 @@ import ( "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" ) @@ -95,7 +96,7 @@ func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, desti } return model.JobStatusFailed } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return model.JobStatusFailed diff --git a/router/batchrouter/batchrouter.go b/router/batchrouter/batchrouter.go index 1a88345539..b4ef3ade68 100644 --- a/router/batchrouter/batchrouter.go +++ b/router/batchrouter/batchrouter.go @@ -47,6 +47,7 @@ import ( "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/utils/bytesize" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types" @@ -1132,10 +1133,9 @@ func (brt *HandleT) postToWarehouse(batchJobs *BatchJobsT, output StorageUploadO brt.logger.Errorf("BRT: Failed to route staging file URL to warehouse service@%v, error:%v", uri, err) return } - defer func() { _ = resp.Body.Close() }() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode == http.StatusOK { - _, err = io.Copy(io.Discard, resp.Body) brt.logger.Infof("BRT: Routed successfully staging file URL to warehouse service@%v", uri) } else { body, _ := io.ReadAll(resp.Body) diff --git a/router/eventorder_test.go b/router/eventorder_test.go index 79cea2c41b..6da9ff7b88 100644 --- a/router/eventorder_test.go +++ b/router/eventorder_test.go @@ -28,6 +28,7 @@ import ( "github.com/rudderlabs/rudder-server/testhelper/destination" trand "github.com/rudderlabs/rudder-server/testhelper/rand" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/types/deployment" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" @@ -234,7 +235,7 @@ func TestEventOrderGuarantee(t *testing.T) { resp, err := client.Do(req) require.NoError(t, err, "should be able to send the request to gateway") require.Equal(t, http.StatusOK, resp.StatusCode, "should be able to send the request to gateway successfully", payload) - resp.Body.Close() + func() { httputil.CloseResponse(resp) }() } }() diff --git a/router/network.go b/router/network.go index 8303c98eb8..c569c363b7 100644 --- a/router/network.go +++ b/router/network.go @@ -17,6 +17,7 @@ import ( "github.com/rudderlabs/rudder-server/processor/integrations" "github.com/rudderlabs/rudder-server/router/utils" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/sysUtils" @@ -168,7 +169,7 @@ func (network *NetHandleT) SendPost(ctx context.Context, structData integrations } } - defer func() { _ = resp.Body.Close() }() + defer func() { httputil.CloseResponse(resp) }() respBody, err := io.ReadAll(resp.Body) if err != nil { diff --git a/router/router_dest_isolation_test.go b/router/router_dest_isolation_test.go index 64667f9a3c..7edc6c969b 100644 --- a/router/router_dest_isolation_test.go +++ b/router/router_dest_isolation_test.go @@ -22,6 +22,7 @@ import ( "github.com/rudderlabs/rudder-server/testhelper/health" trand "github.com/rudderlabs/rudder-server/testhelper/rand" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -171,7 +172,7 @@ func Test_RouterDestIsolation(t *testing.T) { resp, err := client.Do(req) require.NoError(t, err, "should be able to send the request to gateway") require.Equal(t, http.StatusOK, resp.StatusCode) - resp.Body.Close() + func() { httputil.CloseResponse(resp) }() } require.Eventually(t, func() bool { return atomic.LoadUint64(webhook2.count) == 100 && atomic.LoadUint64(webhook1.count) < 100 diff --git a/router/transformer/transformer.go b/router/transformer/transformer.go index ff389fbf5c..00221cd33e 100644 --- a/router/transformer/transformer.go +++ b/router/transformer/transformer.go @@ -20,6 +20,7 @@ import ( "github.com/rudderlabs/rudder-server/router/types" router_utils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/sysUtils" utilTypes "github.com/rudderlabs/rudder-server/utils/types" @@ -239,7 +240,7 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra destinationJobs = append(destinationJobs, resp) } } - resp.Body.Close() + func() { httputil.CloseResponse(resp) }() return destinationJobs } @@ -386,7 +387,7 @@ func (trans *handle) doProxyRequest(ctx context.Context, proxyReqParams *ProxyRe } respData, err = io.ReadAll(resp.Body) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() // error handling while reading from resp.Body if err != nil { respData = []byte(fmt.Sprintf(`failed to read response body, Error:: %+v`, err)) diff --git a/services/alert/pagerduty.go b/services/alert/pagerduty.go index ef937663d0..6b676449c3 100644 --- a/services/alert/pagerduty.go +++ b/services/alert/pagerduty.go @@ -8,6 +8,7 @@ import ( "time" "github.com/rudderlabs/rudder-server/config" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" ) @@ -43,7 +44,7 @@ func (ops *PagerDuty) Alert(message string) { } body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if err != nil { pkgLogger.Errorf("Alert: Failed to read response body: %s", err.Error()) return diff --git a/services/alert/victorops.go b/services/alert/victorops.go index 7c192330c5..da9129e2c5 100644 --- a/services/alert/victorops.go +++ b/services/alert/victorops.go @@ -9,6 +9,7 @@ import ( "time" "github.com/rudderlabs/rudder-server/config" + "github.com/rudderlabs/rudder-server/utils/httputil" ) func (ops *VictorOps) Alert(message string) { @@ -32,7 +33,7 @@ func (ops *VictorOps) Alert(message string) { } body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if err != nil { pkgLogger.Errorf("Alert: Failed to read response body: %s", err.Error()) return diff --git a/services/controlplane/features/client.go b/services/controlplane/features/client.go index 928f34d64c..37cf2bd65d 100644 --- a/services/controlplane/features/client.go +++ b/services/controlplane/features/client.go @@ -129,7 +129,7 @@ func (c *Client) Send(ctx context.Context, component string, features []string) if err != nil { return fmt.Errorf("doing http request: %w", err) } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode != http.StatusNoContent { // we don't expect a body, unless there is an error diff --git a/services/debugger/uploader.go b/services/debugger/uploader.go index 8df3faa648..9ca2027357 100644 --- a/services/debugger/uploader.go +++ b/services/debugger/uploader.go @@ -11,6 +11,7 @@ import ( "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/rruntime" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/sysUtils" ) @@ -125,7 +126,7 @@ func (uploader *Uploader) uploadEvents(eventBuffer []interface{}) { // Refresh the connection continue } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() break } diff --git a/services/destination-connection-tester/destination_connection_tester.go b/services/destination-connection-tester/destination_connection_tester.go index bba9f18522..fc0d2ba712 100644 --- a/services/destination-connection-tester/destination_connection_tester.go +++ b/services/destination-connection-tester/destination_connection_tester.go @@ -15,6 +15,7 @@ import ( "github.com/rudderlabs/rudder-server/config" backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" "github.com/rudderlabs/rudder-server/services/filemanager" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -87,7 +88,7 @@ func makePostRequest(url string, payload interface{}) error { resp, err = client.Do(req) if err == nil { - resp.Body.Close() + func() { httputil.CloseResponse(resp) }() break } diff --git a/services/filemanager/fileManager_test.go b/services/filemanager/fileManager_test.go index 4ccd93e718..53e099b466 100644 --- a/services/filemanager/fileManager_test.go +++ b/services/filemanager/fileManager_test.go @@ -29,6 +29,7 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/filemanager" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" ) @@ -91,7 +92,7 @@ func run(m *testing.M) int { if err != nil { return err } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("status code not OK") diff --git a/services/oauth/oauth.go b/services/oauth/oauth.go index 4677cbcecf..a7640e6d6a 100644 --- a/services/oauth/oauth.go +++ b/services/oauth/oauth.go @@ -17,6 +17,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" router_utils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/tidwall/gjson" @@ -497,7 +498,7 @@ func processResponse(resp *http.Response) (statusCode int, respBody string) { var ioUtilReadErr error if resp != nil && resp.Body != nil { respData, ioUtilReadErr = io.ReadAll(resp.Body) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if ioUtilReadErr != nil { return http.StatusInternalServerError, ioUtilReadErr.Error() } @@ -557,9 +558,7 @@ func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT) } return http.StatusBadRequest, doErr.Error() } - if res.Body != nil { - defer res.Body.Close() - } + defer func() { httputil.CloseResponse(res) }() statusCode, resp := processResponse(res) return statusCode, resp } diff --git a/testhelper/destination/minio.go b/testhelper/destination/minio.go index 6b05fefb1b..fd33db8c8d 100644 --- a/testhelper/destination/minio.go +++ b/testhelper/destination/minio.go @@ -12,6 +12,7 @@ import ( "github.com/ory/dockertest/v3" dc "github.com/ory/dockertest/v3/docker" "github.com/rudderlabs/rudder-server/testhelper" + "github.com/rudderlabs/rudder-server/utils/httputil" ) type MINIOResource struct { @@ -68,7 +69,7 @@ func SetupMINIO(pool *dockertest.Pool, d cleaner) (*MINIOResource, error) { if err != nil { return err } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("status code not OK") } diff --git a/testhelper/destination/transformer.go b/testhelper/destination/transformer.go index dd2d4675f5..5580bd6e8d 100644 --- a/testhelper/destination/transformer.go +++ b/testhelper/destination/transformer.go @@ -9,6 +9,7 @@ import ( _ "github.com/lib/pq" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" + "github.com/rudderlabs/rudder-server/utils/httputil" ) type TransformerResource struct { @@ -55,7 +56,7 @@ func SetupTransformer(pool *dockertest.Pool, d cleaner) (*TransformerResource, e if err != nil { return err } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode != 200 { return errors.New(resp.Status) } diff --git a/testhelper/health/checker.go b/testhelper/health/checker.go index 6b477b2f7c..415f27d8b8 100644 --- a/testhelper/health/checker.go +++ b/testhelper/health/checker.go @@ -5,6 +5,8 @@ import ( "net/http" "testing" "time" + + "github.com/rudderlabs/rudder-server/utils/httputil" ) func WaitUntilReady( @@ -26,7 +28,7 @@ func WaitUntilReady( if err != nil { continue } - defer resp.Body.Close() + func() { httputil.CloseResponse(resp) }() if resp.StatusCode == http.StatusOK { t.Log("Application ready") return diff --git a/utils/httputil/client.go b/utils/httputil/client.go index 95cadeb90c..3d3fcd4455 100644 --- a/utils/httputil/client.go +++ b/utils/httputil/client.go @@ -1,6 +1,9 @@ package httputil -import "net/http" +import ( + "io" + "net/http" +) // RetriableStatus returns true if the HTTP status code should be retried. // @@ -26,3 +29,14 @@ func RetriableStatus(statusCode int) bool { return false } } + +// CloseResponse closes the response's body. But reads at least some of the body so if it's +// small the underlying TCP connection will be re-used. No need to check for errors: if it +// fails, the Transport won't reuse it anyway. +func CloseResponse(resp *http.Response) { + if resp != nil && resp.Body != nil { + const maxBodySlurpSize = 2 << 10 // 2KB + _, _ = io.CopyN(io.Discard, resp.Body, maxBodySlurpSize) + resp.Body.Close() + } +} diff --git a/utils/httputil/client_test.go b/utils/httputil/client_test.go index 4f638c4506..a02d952d1a 100644 --- a/utils/httputil/client_test.go +++ b/utils/httputil/client_test.go @@ -1,10 +1,13 @@ package httputil_test import ( + "io" "net/http" + "net/http/httptest" "testing" "github.com/rudderlabs/rudder-server/utils/httputil" + "github.com/stretchr/testify/require" ) func TestRetriableStatus(t *testing.T) { @@ -67,3 +70,35 @@ func TestRetriableStatus(t *testing.T) { } } } + +func TestReadAndCloseResponse(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("test")) + })) + defer httpServer.Close() + + t.Run("it can read & close a non nil response that hasn't been read", func(t *testing.T) { + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + require.NotNil(t, resp) + func() { httputil.CloseResponse(resp) }() + }) + + t.Run("it can read & close a non nil response that has already been read", func(t *testing.T) { + resp, err := http.Get(httpServer.URL) + require.NoError(t, err) + require.NotNil(t, resp) + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + func() { httputil.CloseResponse(resp) }() + }) + + t.Run("it won't panic if we try to read & close a nil response", func(t *testing.T) { + httputil.CloseResponse(nil) + }) + + t.Run("it won't panic if we try to read & close a non-nil response with a nil body", func(t *testing.T) { + httputil.CloseResponse(&http.Response{}) + }) +} diff --git a/utils/misc/misc.go b/utils/misc/misc.go index 26bd34570f..2e38a9b5d9 100644 --- a/utils/misc/misc.go +++ b/utils/misc/misc.go @@ -42,6 +42,7 @@ import ( "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/metric" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/thoas/go-funk" @@ -577,7 +578,7 @@ func MakeHTTPRequestWithTimeout(url string, payload io.Reader, timeout time.Dura var respBody []byte if resp != nil && resp.Body != nil { respBody, _ = io.ReadAll(resp.Body) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() } return respBody, resp.StatusCode, nil @@ -1004,7 +1005,7 @@ func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response } body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() pkgLogger.Debugf("Post request: Successful %s", string(body)) return body, resp.StatusCode, nil @@ -1156,7 +1157,7 @@ func GetDatabricksVersion() (version string) { version = "No response from warehouse." return } - defer resp.Body.Close() + defer func() { httputil.CloseResponse(resp) }() if resp.StatusCode == http.StatusOK { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { diff --git a/warehouse/testhelper/events.go b/warehouse/testhelper/events.go index 1d6e54cb8c..af42f56079 100644 --- a/warehouse/testhelper/events.go +++ b/warehouse/testhelper/events.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/stretchr/testify/require" ) @@ -545,7 +546,7 @@ func send(t testing.TB, payload *strings.Reader, eventType, writeKey, method str t.Errorf("Error occurred while making http request for sending event with error: %s", err.Error()) return } - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() _, err = io.ReadAll(res.Body) if err != nil { @@ -604,7 +605,7 @@ func blockByWhJobStatus(t testing.TB, path, writeKey string) (string, error) { t.Errorf("Error occurred while making http request for sending event with error: %s", err.Error()) return "error", err } - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() response, err := io.ReadAll(res.Body) if err != nil { t.Errorf("Error occurred while reading http response for sending event with error: %s", err.Error()) @@ -655,7 +656,7 @@ func blockByPendingEvents(t testing.TB, payload *strings.Reader, writeKey string t.Errorf("Error occurred while making http request for sending event with error: %s", err.Error()) return 1 } - defer func() { _ = res.Body.Close() }() + defer func() { httputil.CloseResponse(res) }() response, err := io.ReadAll(res.Body) if err != nil { t.Errorf("Error occurred while reading http response for sending event with error: %s", err.Error()) diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 5681cc48a0..12624bd41d 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -25,6 +25,7 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/deltalake/databricks" "github.com/rudderlabs/rudder-server/warehouse/validations" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -521,7 +522,7 @@ func prometheusStats(t *testing.T) map[string]*promCLient.MetricFamily { require.NotNil(t, resp) require.NotNil(t, resp.Body) - defer func() { _ = resp.Body.Close() }() + defer func() { httputil.CloseResponse(resp) }() var parser expfmt.TextParser mf, err := parser.TextToMetricFamilies(resp.Body) diff --git a/warehouse/utils/utils.go b/warehouse/utils/utils.go index de9d72903a..67bdc571d7 100644 --- a/warehouse/utils/utils.go +++ b/warehouse/utils/utils.go @@ -30,6 +30,7 @@ import ( "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/utils/awsutils" + "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -1025,7 +1026,7 @@ func GetRequestWithTimeout(ctx context.Context, url string, timeout time.Duratio var respBody []byte if resp != nil && resp.Body != nil { respBody, _ = io.ReadAll(resp.Body) - defer resp.Body.Close() + func() { httputil.CloseResponse(resp) }() } return respBody, nil @@ -1049,7 +1050,7 @@ func PostRequestWithTimeout(ctx context.Context, url string, payload []byte, tim var respBody []byte if resp != nil && resp.Body != nil { respBody, _ = io.ReadAll(resp.Body) - defer resp.Body.Close() + func() { httputil.CloseResponse(resp) }() } return respBody, nil