Skip to content

Commit

Permalink
feat: clickhouse proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark committed Nov 25, 2024
1 parent b55f5d5 commit 8878ae5
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const testCases: {

for (const { limit, duration, rps, seconds } of testCases) {
const name = `[${limit} / ${duration / 1000}s], attacked with ${rps} rps for ${seconds}s`;
test.concurrent(name, { skip: process.env.TEST_LOCAL, retry: 3, timeout: 600_000 }, async (t) => {
test(name, { skip: process.env.TEST_LOCAL, retry: 3, timeout: 600_000 }, async (t) => {
const h = await IntegrationHarness.init(t);

const { key, keyId } = await h.createKey();
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/routes/v1_ratelimits_limit.accuracy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const testCases: {

for (const { limit, duration, rps, seconds } of testCases) {
const name = `[${limit} / ${duration / 1000}s], attacked with ${rps} rps for ${seconds}s`;
test.concurrent(name, { skip: process.env.TEST_LOCAL, retry: 3, timeout: 600_000 }, async (t) => {
test(name, { skip: process.env.TEST_LOCAL, retry: 3, timeout: 600_000 }, async (t) => {
const h = await IntegrationHarness.init(t);
const namespace = {
id: newId("test"),
Expand Down
39 changes: 10 additions & 29 deletions apps/chproxy/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,37 +1,18 @@
FROM node:lts AS base
FROM golang:1.23-alpine AS builder

RUN npm i -g pnpm turbo bun
RUN corepack enable


FROM base AS builder

# Set working directory
WORKDIR /unkey
WORKDIR /go/src/github.com/unkeyed/unkey/apps/chproxy
COPY go.mod ./
# COPY go.sum ./
# RUN go mod download

COPY . .
RUN turbo prune chproxy --docker

# Add lockfile and package.json's of isolated subworkspace
FROM base AS installer
WORKDIR /unkey

# First install dependencies (as they change less often)
COPY .gitignore .gitignore
COPY --from=builder /unkey/out/json/ .
COPY --from=builder /unkey/out/pnpm-lock.yaml ./pnpm-lock.yaml
RUN pnpm install

# Build the project and its dependencies
COPY --from=builder /unkey/out/full/ .
COPY turbo.json turbo.json

RUN pnpm turbo build --filter=chproxy...
RUN go build -o bin/chproxy ./main.go

FROM base AS runner
WORKDIR /unkey

COPY --from=installer /unkey .
FROM golang:1.23-alpine
WORKDIR /usr/local/bin
COPY --from=builder /go/src/github.com/unkeyed/unkey/apps/chproxy/bin/chproxy .

WORKDIR /unkey/apps/chproxy
CMD bun run ./src/main.ts
CMD [ "/usr/local/bin/chproxy"]
12 changes: 12 additions & 0 deletions apps/chproxy/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/unkeyed/unkey/apps/chproxy

go 1.23.2

require (
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/tsenart/vegeta v12.7.0+incompatible // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/text v0.20.0 // indirect
)
17 changes: 17 additions & 0 deletions apps/chproxy/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/tsenart/vegeta v12.7.0+incompatible h1:sGlrv11EMxQoKOlDuMWR23UdL90LE5VlhKw/6PWkZmU=
github.com/tsenart/vegeta v12.7.0+incompatible/go.mod h1:Smz/ZWfhKRcyDDChZkG3CyTHdj87lHzio/HOCkbndXM=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo=
golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
200 changes: 200 additions & 0 deletions apps/chproxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package main

import (
"context"
"encoding/base64"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
)

const (
MAX_BUFFER_SIZE = 50000
MAX_BATCH_SIZE = 10000
FLUSH_INTERVAL = time.Second * 3
)

var (
CLICKHOUSE_URL string
BASIC_AUTH string
PORT string
)

func init() {

CLICKHOUSE_URL = os.Getenv("CLICKHOUSE_URL")
if CLICKHOUSE_URL == "" {
panic("CLICKHOUSE_URL must be defined")
}
BASIC_AUTH = os.Getenv("BASIC_AUTH")
if BASIC_AUTH == "" {
panic("BASIC_AUTH must be defined")
}
PORT = os.Getenv("PORT")
if PORT == "" {
PORT = "7123"
}
}

type Batch struct {
Rows []string
Params url.Values
}

func persist(batch *Batch) error {
if len(batch.Rows) == 0 {
return nil
}

u, err := url.Parse(CLICKHOUSE_URL)
if err != nil {
return err
}

u.RawQuery = batch.Params.Encode()

req, err := http.NewRequest("POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n")))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/plain")
username := u.User.Username()
password, ok := u.User.Password()
if !ok {
return fmt.Errorf("password not set")
}
req.SetBasicAuth(username, password)

client := http.Client{}
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode == http.StatusOK {
log.Printf("GOLANG persisted %d rows for %s\n", len(batch.Rows), batch.Params.Get("query"))
} else {
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
fmt.Println("unable to persist", string(body))
}
return nil
}

func main() {
requiredAuthorization := "Basic " + base64.StdEncoding.EncodeToString([]byte(BASIC_AUTH))

buffer := make(chan *Batch, MAX_BUFFER_SIZE)
// blocks until we've persisted everything and the process may stop
done := make(chan bool)

go func() {

buffered := 0

batchesByParams := make(map[string]*Batch)

ticker := time.NewTicker(FLUSH_INTERVAL)

flushAndReset := func() {
for _, batch := range batchesByParams {
err := persist(batch)
if err != nil {
log.Println("Error flushing:", err.Error())
}
}
buffered = 0
batchesByParams = make(map[string]*Batch)
ticker.Reset(FLUSH_INTERVAL)
}
for {
select {
case b, ok := <-buffer:
if !ok {
// channel closed
flushAndReset()
done <- true
return
}

params := b.Params.Encode()
batch, ok := batchesByParams[params]
if !ok {
batchesByParams[params] = b
} else {

batch.Rows = append(batch.Rows, b.Rows...)
}

buffered += len(b.Rows)

if buffered >= MAX_BATCH_SIZE {
log.Println("Flushing due to max size")
flushAndReset()
}
case <-ticker.C:
log.Println("Flushing from ticker")

flushAndReset()
}
}
}()

http.HandleFunc("/v1/liveness", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Authorization") != requiredAuthorization {
log.Println("invaldu authorization header, expected", requiredAuthorization, r.Header.Get("Authorization"))
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}

query := r.URL.Query().Get("query")
if query == "" || !strings.HasPrefix(strings.ToLower(query), "insert into") {
http.Error(w, "wrong query", http.StatusBadRequest)
return
}

params := r.URL.Query()
params.Del("query_id")

body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "cannot read body", http.StatusInternalServerError)
}
rows := strings.Split(string(body), "\n")

buffer <- &Batch{
Params: params,
Rows: rows,
}

w.Write([]byte("ok"))
})

ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

fmt.Println("listening on", PORT)
if err := http.ListenAndServe(fmt.Sprintf(":%s", PORT), nil); err != nil {
log.Fatalln("error starting server:", err)
}

<-ctx.Done()
log.Println("shutting down")
close(buffer)
<-done

}
35 changes: 28 additions & 7 deletions apps/chproxy/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { heapStats } from "bun:jsc";
import { generateHeapSnapshot } from "bun";
import { z } from "zod";

const MAX_BATCH_SIZE = 10000;
Expand Down Expand Up @@ -29,6 +31,8 @@ const requiredAuthorization = `Basic ${btoa(env.BASIC_AUTH)}`;

const buffer = new Map<string, Batch>();

let inflight = 0;

async function flush(force?: boolean): Promise<void> {
const now = Date.now();

Expand All @@ -46,11 +50,15 @@ async function persist(key: string): Promise<void> {
return;
}

buffer.delete(key);

const url = new URL(env.CLICKHOUSE_URL);
batch.params.forEach((v, k) => {
url.searchParams.set(k, v);
});

inflight += 1;

const res = await fetch(url, {
method: "POST",
headers: {
Expand All @@ -59,15 +67,30 @@ async function persist(key: string): Promise<void> {
},
body: batch.rows.join("\n"),
});
inflight -= 1;

const body = await res.text();
if (res.ok) {
buffer.delete(key);
console.info(`persisted ${batch.rows.length} rows`);
console.info(`BUN github persisted ${batch.rows.length} rows`, { inflight });
} else {
console.error("unable to persist", await res.text(), JSON.stringify(batch));
console.error("unable to persist", body);
}
}

setInterval(flush, 1000);
setInterval(flush, 10000);
let snapshotId = 0;
setInterval(async () => {
const snapshot = generateHeapSnapshot();
await Bun.write(`heap_${snapshotId++}.json`, JSON.stringify(snapshot, null, 2));
}, 60_000);

setInterval(() => {
console.info("running gc", {
v: Bun.version,
});
Bun.gc(true);
console.info("memory:", heapStats());
}, 10_000);

const server = Bun.serve({
port: env.PORT,
Expand Down Expand Up @@ -121,9 +144,7 @@ console.info("listening on", server.hostname, server.port);
process.on("SIGTERM", async (s) => {
console.warn("Received signal", s);

await server.stop();
await flush(true);
// calling this twice to catch any newly added rows from active connections
server.stop();
await flush(true);
process.exit(0);
});
9 changes: 7 additions & 2 deletions deployment/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ services:

chproxy:
build:
context: ..
dockerfile: ./apps/chproxy/Dockerfile
context: ../apps/chproxy
dockerfile: Dockerfile
deploy:
resources:
limits:
cpus: "1"
memory: 1024M
depends_on:
- clickhouse
environment:
Expand Down
Loading

0 comments on commit 8878ae5

Please sign in to comment.