Skip to content

Commit

Permalink
internal/civisibility/utils/net: a new http client for rapid endpoint…
Browse files Browse the repository at this point in the history
…s used by ci visibility with json and message pack support, exponential backoff, ratelimiting status code, gzip compression and multipart bodies (attachments).
  • Loading branch information
tonyredondo committed Sep 16, 2024
1 parent 63f207d commit 3c7fdd7
Show file tree
Hide file tree
Showing 2 changed files with 1,299 additions and 0 deletions.
377 changes: 377 additions & 0 deletions internal/civisibility/utils/net/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,377 @@
package net

import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"strconv"
"time"

"github.com/vmihailenco/msgpack/v5"
)

// Constants for common strings
const (
ContentTypeJSON = "application/json"
ContentTypeMsgPack = "application/x-msgpack"
ContentTypeOctetStream = "application/octet-stream"
ContentEncodingGzip = "gzip"
HeaderContentType = "Content-Type"
HeaderContentEncoding = "Content-Encoding"
HeaderAcceptEncoding = "Accept-Encoding"
HeaderRateLimitReset = "x-ratelimit-reset"
HTTPStatusTooManyRequests = 429
FormatJSON = "json"
FormatMsgPack = "msgpack"
)

// FormFile represents a file to be uploaded in a multipart form request.
type FormFile struct {
FieldName string // The name of the form field
FileName string // The name of the file
Content interface{} // The content of the file (can be []byte, map, struct, etc.)
ContentType string // The MIME type of the file (e.g., "application/json", "application/octet-stream")
}

// RequestConfig holds configuration for a request.
type RequestConfig struct {
Method string // HTTP method: GET or POST
URL string // Request URL
Headers map[string]string // Additional HTTP headers
Body interface{} // Request body for JSON, MessagePack, or raw bytes
Format string // Format: "json" or "msgpack"
Compressed bool // Whether to use gzip compression
Files []FormFile // Files to be uploaded in a multipart form data request
MaxRetries int // Maximum number of retries
Backoff time.Duration // Initial backoff duration for retries
}

// Response represents the HTTP response with deserialization capabilities and status code.
type Response struct {
Body []byte // Response body in raw format
Format string // Format of the response (json or msgpack)
StatusCode int // HTTP status code
CanUnmarshal bool // Whether the response body can be unmarshalled
}

// Unmarshal deserializes the response body into the provided target based on the response format.
func (r *Response) Unmarshal(target interface{}) error {
if !r.CanUnmarshal {
return fmt.Errorf("cannot unmarshal response with status code %d", r.StatusCode)
}

switch r.Format {
case FormatJSON:
return json.Unmarshal(r.Body, target)
case FormatMsgPack:
return msgpack.Unmarshal(r.Body, target)
default:
return fmt.Errorf("unsupported format '%s' for unmarshalling", r.Format)
}
}

// RequestHandler handles HTTP requests with retries and different formats.
type RequestHandler struct {
Client *http.Client
}

// NewRequestHandler creates a new RequestHandler with a default HTTP client.
func NewRequestHandler() *RequestHandler {
return &RequestHandler{
Client: &http.Client{
Timeout: 10 * time.Second, // Customize timeout as needed
},
}
}

// SendRequest sends an HTTP request based on the provided configuration.
func (rh *RequestHandler) SendRequest(config RequestConfig) (*Response, error) {
if config.MaxRetries <= 0 {
config.MaxRetries = 3 // Default retries
}
if config.Backoff <= 0 {
config.Backoff = 1 * time.Second // Default backoff
}
if config.Method == "" {
return nil, errors.New("HTTP method is required")
}
if config.URL == "" {
return nil, errors.New("URL is required")
}

var responseBody []byte
var statusCode int

for attempt := 0; attempt <= config.MaxRetries; attempt++ {
// Now, MaxRetries represents the total number of attempts
var req *http.Request
var err error

// Check if it's a multipart form data request
if len(config.Files) > 0 {
// Create multipart form data body
body, contentType, err := createMultipartFormData(config.Files, config.Compressed)
if err != nil {
return nil, err
}
req, err = http.NewRequest(config.Method, config.URL, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Set(HeaderContentType, contentType)
if config.Compressed {
req.Header.Set(HeaderContentEncoding, ContentEncodingGzip)
}
} else if config.Body != nil {
// Handle JSON or MessagePack body
serializedBody, err := serializeData(config.Body, config.Format)
if err != nil {
return nil, err
}

// Compress body if needed
if config.Compressed {
serializedBody, err = compressData(serializedBody)
if err != nil {
return nil, err
}
}

req, err = http.NewRequest(config.Method, config.URL, bytes.NewBuffer(serializedBody))
if err != nil {
return nil, err
}
if config.Format == FormatJSON {
req.Header.Set(HeaderContentType, ContentTypeJSON)
} else if config.Format == FormatMsgPack {
req.Header.Set(HeaderContentType, ContentTypeMsgPack)
}
if config.Compressed {
req.Header.Set(HeaderContentEncoding, ContentEncodingGzip)
req.Header.Set(HeaderAcceptEncoding, ContentEncodingGzip)
}
} else {
// Handle requests without a body (e.g., GET requests)
req, err = http.NewRequest(config.Method, config.URL, nil)
if err != nil {
return nil, err
}

// Set gzip headers for requests without a body if compression is enabled
if config.Compressed {
req.Header.Set(HeaderAcceptEncoding, ContentEncodingGzip)
}
}

// Add custom headers if provided
for key, value := range config.Headers {
req.Header.Set(key, value)
}

resp, err := rh.Client.Do(req)
if err != nil {
// Retry if there's an error
exponentialBackoff(attempt, config.Backoff)
continue
}
// Close response body
defer resp.Body.Close()

// Capture the status code
statusCode = resp.StatusCode

// Check for rate-limiting (HTTP 429)
if resp.StatusCode == HTTPStatusTooManyRequests {
rateLimitReset := resp.Header.Get(HeaderRateLimitReset)
if rateLimitReset != "" {
if resetTime, err := strconv.ParseInt(rateLimitReset, 10, 64); err == nil {
var waitDuration time.Duration
if resetTime > time.Now().Unix() {
// Assume it's a Unix timestamp
waitDuration = time.Until(time.Unix(resetTime, 0))
} else {
// Assume it's a duration in seconds
waitDuration = time.Duration(resetTime) * time.Second
}
resp.Body.Close()
if waitDuration > 0 {
time.Sleep(waitDuration)
}
continue
}
}

// Fallback to exponential backoff if header is missing or invalid
resp.Body.Close()
exponentialBackoff(attempt, config.Backoff)
continue
}

// Check status code for retries
if statusCode >= 406 {
// Retry if the status code is >= 406
resp.Body.Close()
exponentialBackoff(attempt, config.Backoff)
continue
}

responseBody, err = io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

// Decompress response if it is gzip compressed
if resp.Header.Get(HeaderContentEncoding) == ContentEncodingGzip {
responseBody, err = decompressData(responseBody)
if err != nil {
return nil, err
}
}

// Determine response format from headers
responseFormat := "unknown"
mediaType, _, err := mime.ParseMediaType(resp.Header.Get(HeaderContentType))
if err == nil {
if mediaType == ContentTypeJSON {
responseFormat = FormatJSON
} else if mediaType == ContentTypeMsgPack {
responseFormat = FormatMsgPack
}
}

// Determine if we can unmarshal based on status code (2xx)
canUnmarshal := statusCode >= 200 && statusCode < 300

// Return the successful response with status code and unmarshal capability
return &Response{Body: responseBody, Format: responseFormat, StatusCode: statusCode, CanUnmarshal: canUnmarshal}, nil
}

return nil, errors.New("max retries exceeded")
}

// Helper functions for data serialization, compression, and handling multipart form data

// serializeData serializes the data based on the format.
func serializeData(data interface{}, format string) ([]byte, error) {
switch v := data.(type) {
case []byte:
// If it's already a byte array, use it directly
return v, nil
default:
// Otherwise, serialize it according to the specified format
if format == FormatJSON {
return json.Marshal(data)
} else if format == FormatMsgPack {
return msgpack.Marshal(data)
}
}
return nil, fmt.Errorf("unsupported format '%s' for data type '%T'", format, data)
}

// compressData compresses the data using gzip.
func compressData(data []byte) ([]byte, error) {
if data == nil {
return nil, errors.New("attempt to compress a nil data array")
}

var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(data)
if err != nil {
return nil, err
}
writer.Close()
return buf.Bytes(), nil
}

// decompressData decompresses gzip data.
func decompressData(data []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %v", err)
}
defer reader.Close()
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to decompress data: %v", err)
}
return decompressedData, nil
}

// exponentialBackoff performs an exponential backoff with retries.
func exponentialBackoff(retryCount int, initialDelay time.Duration) {
maxDelay := 30 * time.Second
delay := initialDelay * (1 << uint(retryCount)) // Exponential backoff
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
}

// prepareContent prepares the content for a FormFile by serializing it if needed.
func prepareContent(content interface{}, contentType string) ([]byte, error) {
if contentType == ContentTypeJSON {
return serializeData(content, FormatJSON)
} else if contentType == ContentTypeMsgPack {
return serializeData(content, FormatMsgPack)
} else if contentType == ContentTypeOctetStream {
// For binary data, ensure it's already in byte format
if data, ok := content.([]byte); ok {
return data, nil
}
return nil, errors.New("content must be []byte for octet-stream content type")
}
return nil, errors.New("unsupported content type for serialization")
}

// createMultipartFormData creates a multipart form data request body with the given files.
// It also compresses the data using gzip if compression is enabled.
func createMultipartFormData(files []FormFile, compressed bool) ([]byte, string, error) {
var buf bytes.Buffer
writer := multipart.NewWriter(&buf)

for _, file := range files {
partHeaders := textproto.MIMEHeader{}
partHeaders.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"; filename="%s"`, file.FieldName, file.FileName))
partHeaders.Set("Content-Type", file.ContentType)

part, err := writer.CreatePart(partHeaders)
if err != nil {
return nil, "", err
}

// Prepare the file content (serialize if necessary based on content type)
fileContent, err := prepareContent(file.Content, file.ContentType)
if err != nil {
return nil, "", err
}

if _, err := part.Write(fileContent); err != nil {
return nil, "", err
}
}

// Close the writer to set the terminating boundary
err := writer.Close()
if err != nil {
return nil, "", err
}

// Compress the multipart form data if compression is enabled
if compressed {
compressedData, err := compressData(buf.Bytes())
if err != nil {
return nil, "", err
}
return compressedData, writer.FormDataContentType(), nil
}

return buf.Bytes(), writer.FormDataContentType(), nil
}
Loading

0 comments on commit 3c7fdd7

Please sign in to comment.