From 71e264c6b18b4c2c6895e971ae70a3ad16e83a63 Mon Sep 17 00:00:00 2001 From: Steven Date: Mon, 2 Dec 2024 19:04:46 +0800 Subject: [PATCH] fix: #64 OOM --- cmd/ipasd/ipasd.go | 38 +++++ cmd/ipasd/service/service.go | 13 +- cmd/ipasd/service/transport.go | 2 +- go.mod | 4 +- go.sum | 5 +- pkg/common/common.go | 13 ++ pkg/ipa/ipa.go | 3 +- pkg/storager/afero.go | 19 ++- pkg/websocketfile/websocketfile.go | 213 +++++++++++++++++++++++++++++ public/index.html | 201 ++++++++++++++------------- public/js/core.js | 105 ++++++++++++++ 11 files changed, 500 insertions(+), 116 deletions(-) create mode 100644 pkg/websocketfile/websocketfile.go diff --git a/cmd/ipasd/ipasd.go b/cmd/ipasd/ipasd.go index af4b46b..b99ef47 100644 --- a/cmd/ipasd/ipasd.go +++ b/cmd/ipasd/ipasd.go @@ -13,9 +13,11 @@ import ( httptransport "github.com/go-kit/kit/transport/http" "github.com/iineva/ipa-server/cmd/ipasd/service" + "github.com/iineva/ipa-server/pkg/common" "github.com/iineva/ipa-server/pkg/httpfs" "github.com/iineva/ipa-server/pkg/storager" "github.com/iineva/ipa-server/pkg/uuid" + "github.com/iineva/ipa-server/pkg/websocketfile" "github.com/iineva/ipa-server/public" ) @@ -136,6 +138,42 @@ func main() { serve.Handle("/api/delete", deleteHandler) serve.Handle("/api/delete/get", deleteGetHandler) serve.Handle("/plist/", plistHandler) + // upload file over Websocket + serve.Handle("/api/upload/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + f, err := websocketfile.NewWebsocketFile(w, r) + if err != nil { + logger.Log("msg", fmt.Sprintf("err: %v", err)) + return + } + + size, err := f.Size() + if err != nil { + logger.Log("msg", fmt.Sprintf("err: %v", err)) + return + } + + name, err := f.Name() + if err != nil { + logger.Log("msg", fmt.Sprintf("err: %v", err)) + return + } + t := service.FileType(name) + + logger.Log("name:", name, " size:", size) + + info, err := srv.Add(f, size, t) + if err != nil { + logger.Log("msg", fmt.Sprintf("err: %v", err)) + return + } + + err = f.Done(common.ToMap(info)) + if err != nil { + logger.Log("msg", fmt.Sprintf("err: %v", err)) + return + } + + })) // static files uploadFS := afero.NewBasePathFs(afero.NewOsFs(), *storageDir) diff --git a/cmd/ipasd/service/service.go b/cmd/ipasd/service/service.go index 3fcd68b..2f6fb88 100644 --- a/cmd/ipasd/service/service.go +++ b/cmd/ipasd/service/service.go @@ -69,14 +69,13 @@ type Service interface { Find(id string, publicURL string) (*Item, error) History(id string, publicURL string) ([]*Item, error) Delete(id string) error - Add(r Reader, t AppInfoType) (*AppInfo, error) + Add(r Reader, size int64, t AppInfoType) (*AppInfo, error) Plist(id, publicURL string) ([]byte, error) } type Reader interface { io.Reader io.ReaderAt - Size() int64 } type service struct { @@ -176,9 +175,9 @@ func (s *service) Delete(id string) error { return nil } -func (s *service) Add(r Reader, t AppInfoType) (*AppInfo, error) { +func (s *service) Add(r Reader, size int64, t AppInfoType) (*AppInfo, error) { - app, err := s.addPackage(r, t) + app, err := s.addPackage(r, size, t) if err != nil { return nil, err } @@ -191,7 +190,7 @@ func (s *service) Add(r Reader, t AppInfoType) (*AppInfo, error) { return app, s.saveMetadata() } -func (s *service) addPackage(r Reader, t AppInfoType) (*AppInfo, error) { +func (s *service) addPackage(r Reader, size int64, t AppInfoType) (*AppInfo, error) { // save ipa file to temp pkgTempFileName := filepath.Join(tempDir, uuid.NewString()) if err := s.store.Save(pkgTempFileName, r); err != nil { @@ -203,9 +202,9 @@ func (s *service) addPackage(r Reader, t AppInfoType) (*AppInfo, error) { var err error switch t { case AppInfoTypeIpa: - pkg, err = ipa.Parse(r, r.Size()) + pkg, err = ipa.Parse(r, size) case AppInfoTypeApk: - pkg, err = apk.Parse(r, r.Size()) + pkg, err = apk.Parse(r, size) } if err != nil { return nil, err diff --git a/cmd/ipasd/service/transport.go b/cmd/ipasd/service/transport.go index 5b88770..ff3d401 100644 --- a/cmd/ipasd/service/transport.go +++ b/cmd/ipasd/service/transport.go @@ -73,7 +73,7 @@ func MakeAddEndpoint(srv Service) endpoint.Endpoint { return nil, fmt.Errorf("do not support %s file", path.Ext(p.file.FileName())) } - app, err := srv.Add(buf, t) + app, err := srv.Add(buf, buf.Size(), t) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 625a798..acfbba7 100644 --- a/go.mod +++ b/go.mod @@ -11,10 +11,10 @@ require ( github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect github.com/go-kit/kit v0.10.0 github.com/google/uuid v1.2.0 // indirect - // TODO: wait PR merge github.com/poolqa/CgbiPngFix master - github.com/iineva/CgbiPngFix v0.0.0-20210523041253-b8869b346914 + github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c github.com/iineva/bom v0.0.0-20210604102127-81d8bcf0765e github.com/lithammer/shortuuid v3.0.0+incompatible + github.com/poolqa/CgbiPngFix v0.0.0-20211024081647-8ad4fb5c23e4 github.com/qiniu/go-sdk/v7 v7.9.5 github.com/satori/go.uuid v1.2.0 // indirect github.com/shogo82148/androidbinary v1.0.2 diff --git a/go.sum b/go.sum index 636fecb..bdfb40f 100644 --- a/go.sum +++ b/go.sum @@ -120,6 +120,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c h1:Lh2aW+HnU2Nbe1gqD9SOJLJxW1jBMmQOktN2acDyJk8= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= @@ -146,8 +147,6 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= -github.com/iineva/CgbiPngFix v0.0.0-20210523041253-b8869b346914 h1:l/3SkoXjUmXjj90Ho9FHksi6OVJDIOGU0Ow2KSZnNjE= -github.com/iineva/CgbiPngFix v0.0.0-20210523041253-b8869b346914/go.mod h1:caygohjd8B7TXErEL2NYMdJNLO9jIddQIALUKcoszZo= github.com/iineva/bom v0.0.0-20210604102127-81d8bcf0765e h1:LRrJgM6YtcVTzuDviS+xIglYVe/AbpqGCHD2bDtFI8Y= github.com/iineva/bom v0.0.0-20210604102127-81d8bcf0765e/go.mod h1:O7ivqgPZeHUc3Dk9ZA+bqH/+7Qr/2lo1pogmk78RaQQ= github.com/iineva/go-lzfse v1.1.13-0.20210604101847-2a555776c20a h1:c8iYO/eYhs1ec6n4oLzl7x0qt6o2ncMo5Tu4RqaTjNY= @@ -231,6 +230,8 @@ github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6J github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/poolqa/CgbiPngFix v0.0.0-20211024081647-8ad4fb5c23e4 h1:n3IkiWQgZS8Ref8BX/NWpYe7VClpQJ8pbzUSBZOqoKs= +github.com/poolqa/CgbiPngFix v0.0.0-20211024081647-8ad4fb5c23e4/go.mod h1:DG0Kw4br1oA1IRv7O/QNp8fzh+/YBgknQCLnMeioG1U= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= diff --git a/pkg/common/common.go b/pkg/common/common.go index 142e871..8105487 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,5 +1,7 @@ package common +import "encoding/json" + // get args until arg is not empty func Def(args ...string) string { for _, v := range args { @@ -9,3 +11,14 @@ func Def(args ...string) string { } return "" } + +// 结构体转 map +func ToMap(v interface{}) map[string]interface{} { + b, err := json.Marshal(v) + m := map[string]interface{}{} + if err != nil { + return m + } + _ = json.Unmarshal(b, &m) + return m +} diff --git a/pkg/ipa/ipa.go b/pkg/ipa/ipa.go index 26f987c..f99d07c 100644 --- a/pkg/ipa/ipa.go +++ b/pkg/ipa/ipa.go @@ -11,11 +11,10 @@ import ( "strconv" "strings" - "github.com/iineva/CgbiPngFix/ipaPng" - "github.com/iineva/bom/pkg/asset" "github.com/iineva/ipa-server/pkg/plist" "github.com/iineva/ipa-server/pkg/seekbuf" + "github.com/poolqa/CgbiPngFix/ipaPng" ) var ( diff --git a/pkg/storager/afero.go b/pkg/storager/afero.go index de83fcd..eabd695 100644 --- a/pkg/storager/afero.go +++ b/pkg/storager/afero.go @@ -1,6 +1,7 @@ package storager import ( + "bufio" "io" "path/filepath" @@ -14,6 +15,7 @@ type oferoStorager struct { const ( oferoStoragerDirPerm = 0755 + WRITER_BUFFER_SIZE = 1024 * 1024 * 2 // 1M ) var _ Storager = (*oferoStorager)(nil) @@ -36,10 +38,25 @@ func (f *oferoStorager) Save(name string, reader io.Reader) error { return err } fi, err := f.fs.Create(name) + defer func() { + _ = fi.Close() + }() if err != nil { return err } - _, err = io.Copy(fi, reader) + + // write with buffer + w := bufio.NewWriterSize(fi, WRITER_BUFFER_SIZE) + _, err = io.Copy(w, reader) + if err != nil { + return err + } + + err = w.Flush() + if err != nil { + return err + } + return err } diff --git a/pkg/websocketfile/websocketfile.go b/pkg/websocketfile/websocketfile.go new file mode 100644 index 0000000..76b0d7b --- /dev/null +++ b/pkg/websocketfile/websocketfile.go @@ -0,0 +1,213 @@ +// Open client side file from server side over websocket +package websocketfile + +import ( + "encoding/base64" + "errors" + "fmt" + "io" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +var ErrResponse = errors.New("ErrResponse") +var _ io.Reader = (*websocketFile)(nil) +var _ io.ReaderAt = (*websocketFile)(nil) + +type websocketFile struct { + sync.RWMutex + conn *websocket.Conn + rand *rand.Rand + offset int64 + size int64 +} + +type CommandType int32 + +const ( + CommandTypeReadAt CommandType = 1 + CommandTypeSize CommandType = 2 + CommandTypeName CommandType = 3 + CommandTypeDone CommandType = 4 +) + +type Command struct { + Command CommandType `json:"command"` // command type + Param map[string]interface{} `json:"param"` // command param + RequestId string `json:"requestId"` // requestId for check response +} + +type WebsocketFile interface { + io.ReaderAt + io.Reader + Size() (int64, error) + Name() (string, error) + Done(p map[string]interface{}) error +} + +func NewWebsocketFile(w http.ResponseWriter, r *http.Request) (WebsocketFile, error) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return nil, err + } + return &websocketFile{conn: conn, rand: rand.New(rand.NewSource(time.Now().UnixNano())), size: -1}, nil +} + +func (w *websocketFile) setOffset(off int64) { + w.Lock() + defer w.Unlock() + w.offset = off +} + +func (w *websocketFile) getOffset() int64 { + w.RLock() + defer w.RUnlock() + return w.offset +} + +func (w *websocketFile) ReadAt(p []byte, off int64) (n int, err error) { + + // EOF + size, err := w.Size() + if err != nil { + return 0, err + } + if off >= size { + return 0, io.EOF + } + + resp, err := w.request(CommandTypeReadAt, map[string]interface{}{ + "offset": off, + "length": len(p), + }) + if err != nil { + return 0, err + } + + data, ok := resp.Param["data"] + if !ok { + return 0, nil + } + + d, ok := data.(string) + if !ok { + return 0, nil + } + + if d == "" { + return 0, io.EOF + } + + // log.Printf("d: %v", d) + buf, err := base64.StdEncoding.DecodeString(d) + if err != nil { + return 0, err + } + + n = copy(p, buf) + w.setOffset(off + int64(n)) + + return n, nil +} + +func (w *websocketFile) Read(p []byte) (n int, err error) { + return w.ReadAt(p, w.getOffset()) +} + +func (w *websocketFile) Size() (n int64, err error) { + + // read from cache + if w.size != -1 { + return w.size, nil + } + + resp, err := w.request(CommandTypeSize, nil) + if err != nil { + return 0, err + } + + data, ok := resp.Param["size"] + if !ok { + return 0, nil + } + size, ok := data.(float64) + if !ok { + return 0, nil + } + + // cache size + w.size = int64(size) + + return w.size, nil +} + +func (w *websocketFile) Name() (n string, err error) { + resp, err := w.request(CommandTypeName, nil) + if err != nil { + return "", err + } + + data, ok := resp.Param["name"] + if !ok { + return "", nil + } + name, ok := data.(string) + if !ok { + return "", nil + } + + return name, nil +} + +func (w *websocketFile) Done(p map[string]interface{}) error { + err := w.send(CommandTypeDone, p) + if err != nil { + return err + } + return nil +} + +func (w *websocketFile) send(typ CommandType, p map[string]interface{}) error { + requestId := fmt.Sprintf("%d", rand.Uint64()) + err := w.conn.WriteJSON(&Command{ + Command: typ, + RequestId: requestId, + Param: p, + }) + if err != nil { + return err + } + return nil +} + +func (w *websocketFile) request(typ CommandType, p map[string]interface{}) (*Command, error) { + requestId := fmt.Sprintf("%d", rand.Uint64()) + err := w.conn.WriteJSON(&Command{ + Command: typ, + RequestId: requestId, + Param: p, + }) + if err != nil { + return nil, err + } + + resp := &Command{} + err = w.conn.ReadJSON(resp) + if err != nil { + return nil, err + } + if resp.Command != typ || resp.RequestId != requestId { + return nil, ErrResponse + } + + return resp, nil +} diff --git a/public/index.html b/public/index.html index 4eecf85..0a53b4c 100644 --- a/public/index.html +++ b/public/index.html @@ -1,115 +1,114 @@ + + + + + IPA Server + + + + + + + - - - -
- -
Add
-
-
- - + // start lazy load + instance.update().check().handlers(true); + }); + } + window.addEventListener("load", loadList); + document.querySelector(".add-btn").innerHTML = IPA.langString("Add"); + + diff --git a/public/js/core.js b/public/js/core.js index 203525f..0cc14f8 100644 --- a/public/js/core.js +++ b/public/js/core.js @@ -31,6 +31,110 @@ }); } + function newUpload(file, _onProgress) { + var onProgress = function(m) { + _onProgress && _onProgress({ + loaded: m.loaded, + total: m.total, + }) + } + return new Promise((res, rej) => { + const u = location.origin + .replace("https://", "wss://") + .replace("http://", "ws://"); + var ws = new WebSocket(u + "/api/upload/ws"); + var CommandTypeReadAt = 1; + var CommandTypeSize = 2; + var CommandTypeName = 3; + var CommandTypeDone = 4; + + function sendRequest(command, requestId, param) { + var obj = { + command: command, + requestId: requestId, + param: param, + }; + ws.send(JSON.stringify(obj)); + } + + ws.onopen = function () { + // console.log("ws opened"); + }; + + ws.onmessage = function (evt) { + var received_msg = evt.data; + if (!received_msg) return; + var msg = null; + try { + msg = JSON.parse(received_msg); + } catch (err) { + rej(err) + console.error(err) + } + if (!msg) return; + // console.log("onmessage", msg); + switch (msg.command) { + case CommandTypeReadAt: { + var start = msg.param.offset; + var end = Math.min(msg.param.offset + msg.param.length, file.size); + if (end - start <= 0) { + sendRequest(msg.command, msg.requestId, {data: ""}); + onProgress({ + loaded: end, + total: file.size, + }); + return; + } + var reader = new FileReader(); + reader.onload = function() { + var text = reader.result; + var data = text.substr(text.indexOf(',') + 1); + sendRequest(msg.command, msg.requestId, {data: data}); + onProgress({ + loaded: end, + total: file.size, + end: end, + }); + }; + reader.readAsDataURL(file.slice(start, end)); + break + } + case CommandTypeSize: { + sendRequest(msg.command, msg.requestId, { + size: file.size, + }); + break + } + case CommandTypeName: { + sendRequest(msg.command, msg.requestId, { + name: file.name, + }) + break + } + case CommandTypeDone: { + onProgress({ + loaded: file.size, + total: file.size, + }) + res(msg.param) + break + } + } + }; + + ws.onerror = function() { + console.error("onerror"); + rej(err) + } + + ws.onclose = function () { + // websocket closed + // console.log("onclose"); + }; + + }) + } + function getApiUrl(path) { return path } @@ -177,6 +281,7 @@ sizeStr: sizeStr, createItem: createItem, getApiUrl: getApiUrl, + newUpload: newUpload, } })(window) \ No newline at end of file