diff --git a/backend/plugins/gitextractor/main.go b/backend/plugins/gitextractor/main.go index 0224b0c97e1..a62f1561d44 100644 --- a/backend/plugins/gitextractor/main.go +++ b/backend/plugins/gitextractor/main.go @@ -78,7 +78,7 @@ func main() { "git extractor", nil, ) - repo, err := tasks.NewGitRepo(subTaskCtx.GetContext(), logger, storage, &tasks.GitExtractorOptions{ + repo, err := tasks.NewGitRepo(subTaskCtx, logger, storage, &tasks.GitExtractorOptions{ RepoId: *id, Url: *url, User: *user, diff --git a/backend/plugins/gitextractor/parser/clone.go b/backend/plugins/gitextractor/parser/clone.go index b2d0b46abe3..2e7576284f9 100644 --- a/backend/plugins/gitextractor/parser/clone.go +++ b/backend/plugins/gitextractor/parser/clone.go @@ -18,24 +18,29 @@ limitations under the License. package parser import ( + "bytes" "context" "crypto/tls" "encoding/base64" "fmt" "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/plugin" "github.com/go-git/go-git/v5/plumbing/protocol/packp/capability" "github.com/go-git/go-git/v5/plumbing/transport" - "github.com/go-git/go-git/v5/plumbing/transport/client" - "net" - "net/http" - "os" - "strings" gogit "github.com/go-git/go-git/v5" + "github.com/go-git/go-git/v5/plumbing/transport/client" githttp "github.com/go-git/go-git/v5/plumbing/transport/http" "github.com/go-git/go-git/v5/plumbing/transport/ssh" + "github.com/spf13/cast" ssh2 "golang.org/x/crypto/ssh" + "net" + "net/http" neturl "net/url" + "os" + "regexp" + "strings" + "time" ) // We have done comparison experiments for git2go and go-git, and the results show that git2go has better performance. @@ -43,7 +48,7 @@ import ( const DefaultUser = "git" -func cloneOverSSH(ctx context.Context, url, dir, passphrase string, pk []byte) errors.Error { +func cloneOverSSH(ctx plugin.SubTaskContext, url, dir, passphrase string, pk []byte) errors.Error { key, err := ssh.NewPublicKeys(DefaultUser, pk, passphrase) if err != nil { return errors.Convert(err) @@ -53,19 +58,32 @@ func cloneOverSSH(ctx context.Context, url, dir, passphrase string, pk []byte) e return nil }, } - _, err = gogit.PlainCloneContext(ctx, dir, true, &gogit.CloneOptions{ - URL: url, - Auth: key, + var data []byte + buf := bytes.NewBuffer(data) + done := make(chan struct{}, 1) + go refreshCloneProgress(ctx, done, buf) + _, err = gogit.PlainCloneContext(ctx.GetContext(), dir, true, &gogit.CloneOptions{ + URL: url, + Auth: key, + Progress: buf, }) + done <- struct{}{} if err != nil { return errors.Convert(err) } return nil } -func (l *GitRepoCreator) CloneOverHTTP(ctx context.Context, repoId, url, user, password, proxy string) (*GitRepo, errors.Error) { +func (l *GitRepoCreator) CloneOverHTTP(ctx plugin.SubTaskContext, repoId, url, user, password, proxy string) (*GitRepo, errors.Error) { return withTempDirectory(func(dir string) (*GitRepo, error) { - cloneOptions := &gogit.CloneOptions{URL: url} + var data []byte + buf := bytes.NewBuffer(data) + done := make(chan struct{}, 1) + go refreshCloneProgress(ctx, done, buf) + cloneOptions := &gogit.CloneOptions{ + URL: url, + Progress: buf, + } if proxy != "" { proxyUrl, err := neturl.Parse(proxy) if err != nil { @@ -93,14 +111,15 @@ func (l *GitRepoCreator) CloneOverHTTP(ctx context.Context, repoId, url, user, p } } // fmt.Printf("CloneOverHTTP clone opt: %+v\ndir: %v, repo: %v, id: %v, user: %v, passwd: %v, proxy: %v\n", cloneOptions, dir, url, repoId, user, password, proxy) - if isAzureRepo(ctx, url) { + if isAzureRepo(ctx.GetContext(), url) { // https://github.com/go-git/go-git/issues/64 // https://github.com/go-git/go-git/blob/master/_examples/azure_devops/main.go#L34 transport.UnsupportedCapabilities = []capability.Capability{ capability.ThinPack, } } - _, err := gogit.PlainCloneContext(ctx, dir, true, cloneOptions) + _, err := gogit.PlainCloneContext(ctx.GetContext(), dir, true, cloneOptions) + done <- struct{}{} if err != nil { l.logger.Error(err, "PlainCloneContext") return nil, err @@ -109,7 +128,7 @@ func (l *GitRepoCreator) CloneOverHTTP(ctx context.Context, repoId, url, user, p }) } -func (l *GitRepoCreator) CloneOverSSH(ctx context.Context, repoId, url, privateKey, passphrase string) (*GitRepo, errors.Error) { +func (l *GitRepoCreator) CloneOverSSH(ctx plugin.SubTaskContext, repoId, url, privateKey, passphrase string) (*GitRepo, errors.Error) { return withTempDirectory(func(dir string) (*GitRepo, error) { pk, err := base64.StdEncoding.DecodeString(privateKey) if err != nil { @@ -144,6 +163,45 @@ func withTempDirectory(f func(tempDir string) (*GitRepo, error)) (*GitRepo, erro return repo, errors.Convert(err) } +func setCloneProgress(subTaskCtx plugin.SubTaskContext, cloneProgressInfo string) { + if cloneProgressInfo == "" { + return + } + re, err := regexp.Compile(`\d+/\d+`) // find strings like 12/123. + if err != nil { + panic(err) + } + progress := re.FindAllString(cloneProgressInfo, -1) + lenProgress := len(progress) + if lenProgress == 0 { + return + } + latestProgress := progress[lenProgress-1] + latestProgressInfo := strings.Split(latestProgress, "/") + if len(latestProgressInfo) == 2 { + step := latestProgressInfo[0] + total := latestProgressInfo[1] + subTaskCtx.SetProgress(cast.ToInt(step), cast.ToInt(total)) + } +} + +func refreshCloneProgress(subTaskCtx plugin.SubTaskContext, done chan struct{}, buf *bytes.Buffer) { + ticker := time.NewTicker(time.Second * 1) + func() { + for { + select { + case <-done: + return + case <-ticker.C: + if buf != nil { + cloneProgressInfo := buf.String() + setCloneProgress(subTaskCtx, cloneProgressInfo) + } + } + } + }() +} + func isAzureRepo(ctx context.Context, repoUrl string) bool { return strings.Contains(repoUrl, "dev.azure.com") } diff --git a/backend/plugins/gitextractor/parser/clone_test.go b/backend/plugins/gitextractor/parser/clone_test.go new file mode 100644 index 00000000000..562998f3951 --- /dev/null +++ b/backend/plugins/gitextractor/parser/clone_test.go @@ -0,0 +1,129 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package parser + +import ( + gocontext "context" + "fmt" + "github.com/apache/incubator-devlake/core/config" + "github.com/apache/incubator-devlake/core/context" + "github.com/apache/incubator-devlake/core/dal" + "github.com/apache/incubator-devlake/core/log" + "github.com/apache/incubator-devlake/core/plugin" + "testing" +) + +func Test_setCloneProgress(t *testing.T) { + type args struct { + subTaskCtx plugin.SubTaskContext + cloneProgressInfo string + } + tests := []struct { + name string + args args + }{ + { + name: "test-0", + args: args{ + subTaskCtx: testSubTaskContext{}, + cloneProgressInfo: ` + Enumerating objects: 103, done. + Counting objects: 100% (103/103), done. + Compressing objects: 100% (81/81), done. + `, + }, + }, + { + name: "test-1", + args: args{ + subTaskCtx: testSubTaskContext{}, + cloneProgressInfo: ` + Enumerating objects: 103, done. + Counting objects: 100% (103/103), done. + `, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setCloneProgress(tt.args.subTaskCtx, tt.args.cloneProgressInfo) + }) + } +} + +type testSubTaskContext struct{} + +func (testSubTaskContext) GetConfigReader() config.ConfigReader { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) GetConfig(name string) string { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) GetLogger() log.Logger { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) NestedLogger(name string) context.BasicRes { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) ReplaceLogger(logger log.Logger) context.BasicRes { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) GetDal() dal.Dal { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) GetName() string { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) GetContext() gocontext.Context { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) GetData() interface{} { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) SetProgress(current int, total int) { + //TODO implement me + fmt.Printf("set current: %d, total: %d\n", current, total) +} + +func (testSubTaskContext) IncProgress(quantity int) { + //TODO implement me + panic("implement me") +} + +func (testSubTaskContext) TaskContext() plugin.TaskContext { + //TODO implement me + panic("implement me") +} diff --git a/backend/plugins/gitextractor/tasks/clone.go b/backend/plugins/gitextractor/tasks/clone.go index 88c7694034e..87479c328e4 100644 --- a/backend/plugins/gitextractor/tasks/clone.go +++ b/backend/plugins/gitextractor/tasks/clone.go @@ -18,7 +18,6 @@ limitations under the License. package tasks import ( - "context" "fmt" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/log" @@ -45,7 +44,7 @@ func CloneGitRepo(subTaskCtx plugin.SubTaskContext) errors.Error { } op := taskData.Options storage := store.NewDatabase(subTaskCtx, op.RepoId) - repo, err := NewGitRepo(subTaskCtx.GetContext(), subTaskCtx.GetLogger(), storage, op) + repo, err := NewGitRepo(subTaskCtx, subTaskCtx.GetLogger(), storage, op) if err != nil { return err } @@ -55,7 +54,7 @@ func CloneGitRepo(subTaskCtx plugin.SubTaskContext) errors.Error { } // NewGitRepo create and return a new parser git repo -func NewGitRepo(ctx context.Context, logger log.Logger, storage models.Store, op *GitExtractorOptions) (*parser.GitRepo, errors.Error) { +func NewGitRepo(ctx plugin.SubTaskContext, logger log.Logger, storage models.Store, op *GitExtractorOptions) (*parser.GitRepo, errors.Error) { var err errors.Error var repo *parser.GitRepo p := parser.NewGitRepoCreator(storage, logger)