forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
146 lines (121 loc) · 4.02 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package kafka
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/streamdal/segmentio-kafka-go/protocol"
)
const (
defaultCreateTopicsTimeout = 2 * time.Second
defaultDeleteTopicsTimeout = 2 * time.Second
defaultCreatePartitionsTimeout = 2 * time.Second
defaultProduceTimeout = 500 * time.Millisecond
defaultMaxWait = 500 * time.Millisecond
)
// Client is a high-level API to interract with kafka brokers.
//
// All methods of the Client type accept a context as first argument, which may
// be used to asynchronously cancel the requests.
//
// Clients are safe to use concurrently from multiple goroutines, as long as
// their configuration is not changed after first use.
type Client struct {
// Address of the kafka cluster (or specific broker) that the client will be
// sending requests to.
//
// This field is optional, the address may be provided in each request
// instead. The request address takes precedence if both were specified.
Addr net.Addr
// Time limit for requests sent by this client.
//
// If zero, no timeout is applied.
Timeout time.Duration
// A transport used to communicate with the kafka brokers.
//
// If nil, DefaultTransport is used.
Transport RoundTripper
}
// A ConsumerGroup and Topic as these are both strings we define a type for
// clarity when passing to the Client as a function argument
//
// N.B TopicAndGroup is currently experimental! Therefore, it is subject to
// change, including breaking changes between MINOR and PATCH releases.
//
// DEPRECATED: this type will be removed in version 1.0, programs should
// migrate to use kafka.(*Client).OffsetFetch instead.
type TopicAndGroup struct {
Topic string
GroupId string
}
// ConsumerOffsets returns a map[int]int64 of partition to committed offset for
// a consumer group id and topic.
//
// DEPRECATED: this method will be removed in version 1.0, programs should
// migrate to use kafka.(*Client).OffsetFetch instead.
func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
metadata, err := c.Metadata(ctx, &MetadataRequest{
Topics: []string{tg.Topic},
})
if err != nil {
return nil, fmt.Errorf("failed to get topic metadata :%w", err)
}
topic := metadata.Topics[0]
partitions := make([]int, len(topic.Partitions))
for i := range topic.Partitions {
partitions[i] = topic.Partitions[i].ID
}
offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
GroupID: tg.GroupId,
Topics: map[string][]int{
tg.Topic: partitions,
},
})
if err != nil {
return nil, fmt.Errorf("failed to get offsets: %w", err)
}
topicOffsets := offsets.Topics[topic.Name]
partitionOffsets := make(map[int]int64, len(topicOffsets))
for _, off := range topicOffsets {
partitionOffsets[off.Partition] = off.CommittedOffset
}
return partitionOffsets, nil
}
func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
if c.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.Timeout)
defer cancel()
}
if addr == nil {
if addr = c.Addr; addr == nil {
return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
}
}
return c.transport().RoundTrip(ctx, addr, msg)
}
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
timeout := c.Timeout
if deadline, ok := ctx.Deadline(); ok {
if remain := time.Until(deadline); remain < timeout {
timeout = remain
}
}
if timeout > 0 {
// Half the timeout because it is communicated to kafka in multiple
// requests (e.g. Fetch, Produce, etc...), this adds buffer to account
// for network latency when waiting for the response from kafka.
return timeout / 2
}
return defaultTimeout
}
func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
return milliseconds(c.timeout(ctx, defaultTimeout))
}