Skip to content

Commit

Permalink
chore: close http responses in a way to allow the Transport to re-use…
Browse files Browse the repository at this point in the history
… the TCP connection
  • Loading branch information
atzoum committed Nov 23, 2022
1 parent 0e26b30 commit 70b8b9b
Show file tree
Hide file tree
Showing 36 changed files with 129 additions and 62 deletions.
3 changes: 2 additions & 1 deletion cmd/devtool/commands/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -88,11 +89,11 @@ func EventSend(c *cli.Context) error {
if err != nil {
return err
}
defer func() { httputil.CloseResponse(resp) }()
b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
fmt.Printf("%s\n%s\n", resp.Status, b)
Expand Down
4 changes: 2 additions & 2 deletions config/backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -138,13 +139,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 300 {
return nil, getNotOKError(respBody, resp.StatusCode)
}
Expand Down
4 changes: 2 additions & 2 deletions config/backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -150,13 +151,12 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 300 {
return nil, getNotOKError(respBody, resp.StatusCode)
}
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -413,7 +414,7 @@ func (handle *HandleT) sendMetric(ctx context.Context, netClient *http.Client, c
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat(STAT_REPORTING_HTTP_REQ, stats.CountType, httpStatTags).Count(1)

defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
handle.log.Error(err.Error())
Expand Down
8 changes: 2 additions & 6 deletions enterprise/suppress-user/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
Expand Down Expand Up @@ -175,12 +176,7 @@ func (s *Syncer) sync(token []byte) ([]model.Suppression, []byte, error) {
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
s.log.Error(err)
}
}()
defer func() { httputil.CloseResponse(resp) }()

// If statusCode is not 2xx, then returning empty regulations
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func (gateway *HandleT) getWarehousePending(payload []byte) bool {
return false
}

defer resp.Body.Close()
defer func() { rs_httputil.CloseResponse(resp) }()

var whPendingResponse warehouseutils.PendingEventsResponseT
respData, err := io.ReadAll(resp.Body)
Expand Down
5 changes: 3 additions & 2 deletions gateway/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

func TestGatewayIntegration(t *testing.T) {
Expand Down Expand Up @@ -183,7 +184,7 @@ func testGatewayByAppType(t *testing.T, appType string) {
resp, err := http.Get(healthEndpoint)
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()

// Checking now that the configuration has been processed and the server can start
t.Log("Checking health endpoint at", healthEndpoint)
Expand Down Expand Up @@ -316,7 +317,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr

res, err := httpClient.Do(req)
require.NoError(t, err)
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceType string
}

respBody, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
func() { httputil.CloseResponse(resp) }()

if err != nil {
bt.stats.failedStat.Count(len(events))
Expand Down
5 changes: 3 additions & 2 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -655,7 +656,7 @@ func getEvent(url, method string) (string, error) {
if err != nil {
return "", err
}
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
Expand Down Expand Up @@ -719,7 +720,7 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string)
t.Logf("sendEvent error: %v", err)
return
}
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions integration_test/multi_tentant_test/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

Expand Down Expand Up @@ -215,7 +216,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
if err == nil {
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()
}

// Pushing valid configuration via ETCD
Expand Down Expand Up @@ -415,7 +416,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr

res, err := httpClient.Do(req)
require.NoError(t, err)
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/bytesize"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -562,7 +563,7 @@ func (proc *HandleT) makeFeaturesFetchCall() bool {
return true
}

defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()
body, err := io.ReadAll(res.Body)
if err != nil {
return true
Expand Down
5 changes: 3 additions & 2 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -178,7 +179,7 @@ func GetVersion() (transformerBuildVersion string) {
transformerBuildVersion = fmt.Sprintf("No response from transformer. %s", transformerBuildVersion)
return
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -380,8 +381,8 @@ func (trans *HandleT) doPost(ctx context.Context, rawJSON []byte, url string, ta
if reqErr != nil {
return reqErr
}
defer func() { httputil.CloseResponse(resp) }()
respData, reqErr = io.ReadAll(resp.Body)
_ = resp.Body.Close()
return reqErr
},
backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry)),
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/kvstoremanager"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

var (
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestFlow(t *testing.T) {
if err != nil {
return err
}
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("status code not OK")
}
Expand Down
10 changes: 3 additions & 7 deletions regulation-worker/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
)

Expand Down Expand Up @@ -51,12 +52,7 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
if err != nil {
return model.Job{}, err
}
defer func() {
err := resp.Body.Close()
if err != nil {
pkgLogger.Errorf("error while closing response body: %v", err)
}
}()
defer func() { httputil.CloseResponse(resp) }()
pkgLogger.Debugf("obtained response code: %v", resp.StatusCode, "response body: ", resp.Body)

// if successful
Expand Down Expand Up @@ -140,7 +136,7 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()

pkgLogger.Debugf("response code: %v", resp.StatusCode, "response body: %v", resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
}
return model.JobStatusFailed
}
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return model.JobStatusFailed
Expand Down
4 changes: 2 additions & 2 deletions router/batchrouter/batchrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/bytesize"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -1132,10 +1133,9 @@ func (brt *HandleT) postToWarehouse(batchJobs *BatchJobsT, output StorageUploadO
brt.logger.Errorf("BRT: Failed to route staging file URL to warehouse service@%v, error:%v", uri, err)
return
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()

if resp.StatusCode == http.StatusOK {
_, err = io.Copy(io.Discard, resp.Body)
brt.logger.Infof("BRT: Routed successfully staging file URL to warehouse service@%v", uri)
} else {
body, _ := io.ReadAll(resp.Body)
Expand Down
3 changes: 2 additions & 1 deletion router/eventorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/destination"
trand "github.com/rudderlabs/rudder-server/testhelper/rand"
"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestEventOrderGuarantee(t *testing.T) {
resp, err := client.Do(req)
require.NoError(t, err, "should be able to send the request to gateway")
require.Equal(t, http.StatusOK, resp.StatusCode, "should be able to send the request to gateway successfully", payload)
resp.Body.Close()
func() { httputil.CloseResponse(resp) }()
}
}()

Expand Down
3 changes: 2 additions & 1 deletion router/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
Expand Down Expand Up @@ -168,7 +169,7 @@ func (network *NetHandleT) SendPost(ctx context.Context, structData integrations
}
}

defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down
Loading

0 comments on commit 70b8b9b

Please sign in to comment.