Skip to content

Commit

Permalink
feat: Add Websocekt
Browse files Browse the repository at this point in the history
- add websocket chatroom demo

#17
  • Loading branch information
lc-1010 committed Jul 26, 2023
1 parent b8166f8 commit cc76a0e
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 5 deletions.
Binary file removed OneBlogService
Binary file not shown.
4 changes: 4 additions & 0 deletions chatroom/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## ChartRoom

## 基于tcp的聊天室

- 如果大量用户广播延迟问题
- 消息的堵塞

```md
├── chatroom
Expand Down
33 changes: 33 additions & 0 deletions chatroom/cmd/tcp/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"fmt"
"io"
"log"
"net"
"strings"
"testing"
"time"
)

func TestSendMsg(t *testing.T) {

for i := 0; i < 100; i++ {

go func(i int) {
conn, err := net.Dial("tcp", ":2020")
if err != nil {
panic(err)
}
for j := 0; j < 1000; j++ {
time.Sleep(time.Microsecond * 500)
s := strings.NewReader(fmt.Sprintf("%s%d\n", "hello", i))
if _, err := io.Copy(conn, s); err != nil {
log.Fatal(err)
}
}
}(i)
}
time.Sleep(time.Second * 10)
log.Println("done")
}
22 changes: 17 additions & 5 deletions chatroom/cmd/tcp/server.go → chatroom/cmd/tcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ import (
)

var (
enteringChannel = make(chan *User)
leavingChannel = make(chan *User)
messageChannel = make(chan string, 8)
// 用户列表
enteringChannel = make(chan *User)
// 用户离开
leavingChannel = make(chan *User)
// messageChannel 缓冲区的大小是 8,这意味着 channel 可以存储最多 8 个字符串元素
messageChannel = make(chan string, 2)
// 增长id
userIdListChannel = make(chan int, 1)
)

Expand All @@ -40,16 +44,24 @@ func main() {
}

func broadcaster() {

usersMap := make(map[*User]struct{})
i := 0
for {

select {
case user := <-enteringChannel:
usersMap[user] = struct{}{}
case user := <-leavingChannel:
delete(usersMap, user)
close(user.MessageChannel)
case msg := <-messageChannel:
for user := range usersMap {
//缓冲区的大小是 8,这意味着最多可以存储 8 条消息。
//如果缓冲区已满,新的消息将会被阻塞,
//直到有一个接收操作从 channel 中读取一个消息。
case msg := <-messageChannel: //channel 空间限制住了
i++
msg = msg + "_" + fmt.Sprint(i)
for user := range usersMap { //这里如果没有用户太多
user.MessageChannel <- msg
}
}
Expand Down
106 changes: 106 additions & 0 deletions chatroom/cmd/tcp/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"fmt"
"net"
"sync"
"testing"
"time"
)

func BenchmarkBroadcaster(b *testing.B) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
b.Fatal(err)
}
defer listener.Close()

// 启动广播器
go broadcaster()

// 启动客户端
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()

conn, err := net.Dial("tcp", listener.Addr().String())
if err != nil {
b.Fatal(err)
}
defer conn.Close()

// 发送消息到服务器
fmt.Fprintf(conn, "Hello, %d\n", i)

// 读取服务器发回的消息
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
b.Fatal(err)
}
msg := string(buf[:n])

// 检查消息是否正确
expect := fmt.Sprintf("Hello, %d\n", i)
if msg != expect {
b.Errorf("expect %q, but got %q", expect, msg)
}
conn.Close()
}()
}

// 等待所有客户端完成测试
wg.Wait()
listener.Close()
}

func TestBroadcasterWithBlocking(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
defer listener.Close()

// 启动广播器
go broadcaster()

// 启动客户端
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

conn, err := net.Dial("tcp", listener.Addr().String())
if err != nil {
t.Fatal(err)
}
defer conn.Close()

// 模拟消息延迟和阻塞
time.Sleep(5 * time.Second)

// 发送消息到服务器
fmt.Fprintf(conn, "Hello, %d\n", id)

// 读取服务器发回的消息
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
t.Fatal(err)
}
msg := string(buf[:n])

// 检查消息是否正确
expect := fmt.Sprintf("Hello, %d\n", id)
if msg != expect {
t.Errorf("expect %q, but got %q", expect, msg)
}
}(i)
}

// 等待所有客户端完成测试
wg.Wait()
}
34 changes: 34 additions & 0 deletions chatroom/cmd/websocket/nhooyr/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"context"
"log"
"time"

"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)

func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

c, _, err := websocket.Dial(ctx, "ws://localhost:2021/ws", nil)
if err != nil {
panic(err)
}
defer c.Close(websocket.StatusInternalError, "server eroor")

err = wsjson.Write(ctx, c, "Hello WebSocket server")
if err != nil {
panic(err)
}
var v any
err = wsjson.Read(ctx, c, &v)
if err != nil {
panic(err)
}
log.Printf("recv: %v", v)
c.Close(websocket.StatusNormalClosure, "")

}
44 changes: 44 additions & 0 deletions chatroom/cmd/websocket/nhooyr/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"context"
"fmt"
"log"
"net/http"
"time"

"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)

func main() {
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintln(w, "HTTP , Hello")
})
http.HandleFunc("/ws", func(w http.ResponseWriter, req *http.Request) {
conn, err := websocket.Accept(w, req, nil)
if err != nil {
log.Println(err)
return
}
defer conn.Close(websocket.StatusInternalError, "servr error")
ctx, cancel := context.WithTimeout(req.Context(), time.Second*10)
defer cancel()

var v any
err = wsjson.Read(ctx, conn, &v)
if err != nil {
log.Println(err)
return
}
log.Printf("recv: %v", v)
err = wsjson.Write(ctx, conn, "Hello WebSocket clinet")
if err != nil {
log.Println(err)
return
}

conn.Close(websocket.StatusNormalClosure, "200")
})
log.Fatal(http.ListenAndServe(":2021", nil))
}
9 changes: 9 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"testing"
)

func TestGetClientConn(t *testing.T) {
main()
}
37 changes: 37 additions & 0 deletions client/test/clinet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package test

import (
"context"
"fmt"
"log"
"testing"

"github.com/lc-1010/OneBlogService/global"
"github.com/lc-1010/OneBlogService/pkg/tracer"
)

func TestTacer(t *testing.T) {
TSpan()
}

type b string

func TSpan() {

tracerPorvider, err := tracer.NewJaegerTrancer(
"cligrpc",
"127.0.0.1",
"6831",
)
if err != nil {
log.Fatal(err)
}
global.Tracer = tracerPorvider

ctx := context.Background()
tr := global.Tracer.Tracer("test11")
pp := context.WithValue(ctx, b("ba"), b("j"))
_, span := tr.Start(pp, "cli-rpc")
defer span.End()
fmt.Println("ok tracer")
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/klauspost/compress v1.10.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
Expand Down Expand Up @@ -117,6 +118,7 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/plugin/opentelemetry v0.1.3
nhooyr.io/websocket v1.8.7
)

replace google.golang.org/grpc => google.golang.org/grpc v1.52.3
Loading

0 comments on commit cc76a0e

Please sign in to comment.