Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: close http responses in a way to allow the Transport to re-use the TCP connection #2718

Merged
merged 1 commit into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions cmd/devtool/commands/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -64,41 +65,43 @@ func EventSend(c *cli.Context) error {
}

for i := 0; i < c.Int("count"); i++ {
anonymousId := uuid.New().String()

buf := bytes.NewBuffer(nil)
err = t.Execute(buf, map[string]string{
"AnonymousId": anonymousId,
"Timestamp": time.Now().Format(time.RFC3339),
})
if err != nil {
return err
}

req, err := http.NewRequestWithContext(c.Context, "POST", url, buf)
if err != nil {
if err := func() error {
anonymousId := uuid.New().String()
buf := bytes.NewBuffer(nil)
err = t.Execute(buf, map[string]string{
"AnonymousId": anonymousId,
"Timestamp": time.Now().Format(time.RFC3339),
})
if err != nil {
return err
}

req, err := http.NewRequestWithContext(c.Context, "POST", url, buf)
if err != nil {
return err
}

req.SetBasicAuth(c.String("write-key"), "")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("AnonymousId", anonymousId)

resp, err := client.Do(req)
if err != nil {
return err
}
defer func() { httputil.CloseResponse(resp) }()
b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
fmt.Printf("%s\n%s\n", resp.Status, b)
return fmt.Errorf("status code: %d", resp.StatusCode)
}
return nil
}(); err != nil {
return err
}

req.SetBasicAuth(c.String("write-key"), "")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("AnonymousId", anonymousId)

resp, err := client.Do(req)
if err != nil {
return err
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
fmt.Printf("%s\n%s\n", resp.Status, b)

return fmt.Errorf("status code: %d", resp.StatusCode)
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions config/backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -138,13 +139,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 300 {
return nil, getNotOKError(respBody, resp.StatusCode)
}
Expand Down
4 changes: 2 additions & 2 deletions config/backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -150,13 +151,12 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 300 {
return nil, getNotOKError(respBody, resp.StatusCode)
}
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -413,7 +414,7 @@ func (handle *HandleT) sendMetric(ctx context.Context, netClient *http.Client, c
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat(STAT_REPORTING_HTTP_REQ, stats.CountType, httpStatTags).Count(1)

defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
handle.log.Error(err.Error())
Expand Down
8 changes: 2 additions & 6 deletions enterprise/suppress-user/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
Expand Down Expand Up @@ -175,12 +176,7 @@ func (s *Syncer) sync(token []byte) ([]model.Suppression, []byte, error) {
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
s.log.Error(err)
}
}()
defer func() { httputil.CloseResponse(resp) }()

// If statusCode is not 2xx, then returning empty regulations
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func (gateway *HandleT) getWarehousePending(payload []byte) bool {
return false
}

defer resp.Body.Close()
defer func() { rs_httputil.CloseResponse(resp) }()

var whPendingResponse warehouseutils.PendingEventsResponseT
respData, err := io.ReadAll(resp.Body)
Expand Down
5 changes: 3 additions & 2 deletions gateway/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

func TestGatewayIntegration(t *testing.T) {
Expand Down Expand Up @@ -183,7 +184,7 @@ func testGatewayByAppType(t *testing.T, appType string) {
resp, err := http.Get(healthEndpoint)
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()

// Checking now that the configuration has been processed and the server can start
t.Log("Checking health endpoint at", healthEndpoint)
Expand Down Expand Up @@ -316,7 +317,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr

res, err := httpClient.Do(req)
require.NoError(t, err)
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceType string
}

respBody, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
func() { httputil.CloseResponse(resp) }()

if err != nil {
bt.stats.failedStat.Count(len(events))
Expand Down
5 changes: 3 additions & 2 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -655,7 +656,7 @@ func getEvent(url, method string) (string, error) {
if err != nil {
return "", err
}
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
Expand Down Expand Up @@ -719,7 +720,7 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string)
t.Logf("sendEvent error: %v", err)
return
}
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions integration_test/multi_tentant_test/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

Expand Down Expand Up @@ -215,7 +216,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
if err == nil {
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()
}

// Pushing valid configuration via ETCD
Expand Down Expand Up @@ -415,7 +416,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr

res, err := httpClient.Do(req)
require.NoError(t, err)
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/bytesize"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -562,7 +563,7 @@ func (proc *HandleT) makeFeaturesFetchCall() bool {
return true
}

defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()
body, err := io.ReadAll(res.Body)
if err != nil {
return true
Expand Down
5 changes: 3 additions & 2 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -178,7 +179,7 @@ func GetVersion() (transformerBuildVersion string) {
transformerBuildVersion = fmt.Sprintf("No response from transformer. %s", transformerBuildVersion)
return
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -387,8 +388,8 @@ func (trans *HandleT) doPost(ctx context.Context, rawJSON []byte, url string, ta
if reqErr != nil {
return reqErr
}
defer func() { httputil.CloseResponse(resp) }()
respData, reqErr = io.ReadAll(resp.Body)
_ = resp.Body.Close()
return reqErr
},
backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry)),
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/kvstoremanager"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

var (
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestFlow(t *testing.T) {
if err != nil {
return err
}
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("status code not OK")
}
Expand Down
10 changes: 3 additions & 7 deletions regulation-worker/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
)

Expand Down Expand Up @@ -51,12 +52,7 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
if err != nil {
return model.Job{}, err
}
defer func() {
err := resp.Body.Close()
if err != nil {
pkgLogger.Errorf("error while closing response body: %v", err)
}
}()
defer func() { httputil.CloseResponse(resp) }()
pkgLogger.Debugf("obtained response code: %v", resp.StatusCode, "response body: ", resp.Body)

// if successful
Expand Down Expand Up @@ -140,7 +136,7 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()

pkgLogger.Debugf("response code: %v", resp.StatusCode, "response body: %v", resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, desti
}
return model.JobStatusFailed
}
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return model.JobStatusFailed
Expand Down
Loading