Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add foreground deletion and refactor deployer for cluster in bare-metal mode #106

Merged
merged 6 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions pkg/cmd/gtctl/cluster/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/spf13/cobra"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/cmd/gtctl/cluster/common"
"github.com/GreptimeTeam/gtctl/pkg/deployer"
"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal"
bmconfig "github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/deployer/k8s"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/status"
Expand Down Expand Up @@ -75,11 +78,18 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {

var (
clusterName = args[0]

// TODO(zyy17): should use timeout context.
ctx = context.TODO()
ctx = context.Background()
cancel context.CancelFunc
)

if options.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(options.Timeout)*time.Second)
defer cancel()
}

ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

clusterDeployer, err := newDeployer(l, clusterName, &options)
if err != nil {
return err
Expand Down Expand Up @@ -113,6 +123,12 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {
}

if err = deployGreptimeDBCluster(ctx, l, &options, spinner, clusterDeployer, clusterName); err != nil {
// Wait the cluster closing if deploy fails in bare-metal mode.
if options.BareMetal {
if err := waitChildProcess(ctx, clusterDeployer, true); err != nil {
return err
}
}
return err
}

Expand All @@ -121,20 +137,8 @@ func NewCreateClusterCommand(l logger.Logger) *cobra.Command {
}

if options.BareMetal {
d, ok := clusterDeployer.(*baremetal.Deployer)
if ok {
version := d.Config().Cluster.Artifact.Version
if version == "" {
version = "unknown"
}
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("The cluster(pid=%d, version=%s) is running in bare-metal mode now...\n",
os.Getpid(), version))
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("To view dashboard by accessing: %s\n", logger.Bold("http://localhost:4000/dashboard/")))

// Wait for all the child processes to exit.
if err := d.Wait(ctx); err != nil {
return err
}
if err := waitChildProcess(ctx, clusterDeployer, false); err != nil {
return err
}
}

Expand Down Expand Up @@ -183,7 +187,7 @@ func newDeployer(l logger.Logger, clusterName string, options *createClusterCliO
}

if options.Config != "" {
var config baremetal.Config
var config bmconfig.Config
data, err := os.ReadFile(options.Config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -319,3 +323,26 @@ func printTips(l logger.Logger, clusterName string, options *createClusterCliOpt
l.V(0).Infof("\nThank you for using %s! Check for more information on %s. 😊", logger.Bold("GreptimeDB"), logger.Bold("https://greptime.com"))
l.V(0).Infof("\n%s 🔑", logger.Bold("Invest in Data, Harvest over Time."))
}

func waitChildProcess(ctx context.Context, deployer deployer.Interface, close bool) error {
d, ok := deployer.(*baremetal.Deployer)
zyy17 marked this conversation as resolved.
Show resolved Hide resolved
if ok {
v := d.Config().Cluster.Artifact.Version
if len(v) == 0 {
v = "unknown"
}

if !close {
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("The cluster(pid=%d, version=%s) is running in bare-metal mode now...\n", os.Getpid(), v))
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("To view dashboard by accessing: %s\n", logger.Bold("http://localhost:4000/dashboard/")))
} else {
fmt.Printf("\x1b[32m%s\x1b[0m", fmt.Sprintf("The cluster(pid=%d, version=%s) run in bare-metal has been deleted now...\n", os.Getpid(), v))
}

// Wait for all the child processes to exit.
if err := d.Wait(ctx); err != nil {
return err
}
}
return nil
}
28 changes: 18 additions & 10 deletions pkg/deployer/baremetal/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"archive/zip"
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strings"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/utils"
)
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewArtifactManager(workingDir string, l logger.Logger, alwaysDownload bool)
}

// BinaryPath returns the path of the binary of the given type and version.
func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *Artifact) (string, error) {
func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *config.Artifact) (string, error) {
if artifact.Local != "" {
return artifact.Local, nil
}
Expand All @@ -95,7 +96,7 @@ func (am *ArtifactManager) BinaryPath(typ ArtifactType, artifact *Artifact) (str
}

// PrepareArtifact will download the artifact from the given URL and uncompressed it.
func (am *ArtifactManager) PrepareArtifact(typ ArtifactType, artifact *Artifact) error {
func (am *ArtifactManager) PrepareArtifact(ctx context.Context, typ ArtifactType, artifact *config.Artifact) error {
// If you use the local artifact, we don't need to download it.
if artifact.Local != "" {
return nil
Expand All @@ -106,7 +107,7 @@ func (am *ArtifactManager) PrepareArtifact(typ ArtifactType, artifact *Artifact)
binDir = path.Join(am.dir, typ.String(), artifact.Version, "bin")
)

artifactFile, err := am.download(typ, artifact.Version, pkgDir)
artifactFile, err := am.download(ctx, typ, artifact.Version, pkgDir)
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +187,7 @@ func (am *ArtifactManager) installGreptime(artifactFile, binDir string) error {
return nil
}

func (am *ArtifactManager) download(typ ArtifactType, version, pkgDir string) (string, error) {
func (am *ArtifactManager) download(ctx context.Context, typ ArtifactType, version, pkgDir string) (string, error) {
var extension string
switch runtime.GOOS {
case GOOSDarwin:
Expand Down Expand Up @@ -223,7 +224,7 @@ func (am *ArtifactManager) download(typ ArtifactType, version, pkgDir string) (s

am.logger.V(3).Infof("Downloading artifact from '%s' to '%s'", downloadURL, artifactFile)

req, err := http.NewRequest(http.MethodGet, downloadURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadURL, nil)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -325,15 +326,20 @@ func (am *ArtifactManager) unzip(file, dst string) error {
return err
}

dstFile.Close()
fileInArchive.Close()
if err := dstFile.Close(); err != nil {
return err
}

if err := fileInArchive.Close(); err != nil {
return err
}
}

return nil
}

func (am *ArtifactManager) untar(file, dst string) error {
data, err := ioutil.ReadFile(file)
data, err := os.ReadFile(file)
if err != nil {
return err
}
Expand Down Expand Up @@ -365,7 +371,9 @@ func (am *ArtifactManager) untar(file, dst string) error {
if _, err := io.Copy(outFile, tarReader); err != nil {
return err
}
outFile.Close()
if err := outFile.Close(); err != nil {
return err
}
case tar.TypeDir:
if err := os.Mkdir(dst+"/"+header.Name, 0755); err != nil {
return err
Expand Down
120 changes: 120 additions & 0 deletions pkg/deployer/baremetal/component/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"bufio"
"context"
"os"
"os/exec"
"path"
"strconv"
"sync"
"syscall"

"github.com/GreptimeTeam/gtctl/pkg/deployer/baremetal/config"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

// WorkDirs include all the dirs used in bare-metal mode.
type WorkDirs struct {
DataDir string
LogsDir string
PidsDir string
}

// BareMetalCluster describes all the components need to be deployed under bare-metal mode.
type BareMetalCluster struct {
MetaSrv BareMetalClusterComponent
Datanode BareMetalClusterComponent
Frontend BareMetalClusterComponent
Etcd BareMetalClusterComponent
}

// BareMetalClusterComponent is the basic unit of running GreptimeDB Cluster in bare-metal mode.
type BareMetalClusterComponent interface {
// Start starts cluster component by executing binary.
Start(ctx context.Context, binary string) error

// BuildArgs build up args for cluster component.
BuildArgs(ctx context.Context, params ...interface{}) []string

// IsRunning returns the status of current cluster component.
IsRunning(ctx context.Context) bool

// Delete deletes resources that allocated in the system for current component.
Delete(ctx context.Context) error
}

func NewGreptimeDBCluster(config *config.Cluster, workDirs WorkDirs, wg *sync.WaitGroup, logger logger.Logger) *BareMetalCluster {
return &BareMetalCluster{
MetaSrv: newMetaSrv(config.MetaSrv, workDirs, wg, logger),
Datanode: newDataNodes(config.Datanode, config.MetaSrv.ServerAddr, workDirs, wg, logger),
Frontend: newFrontend(config.Frontend, config.MetaSrv.ServerAddr, workDirs, wg, logger),
Etcd: newEtcd(workDirs, wg, logger),
}
}

func runBinary(ctx context.Context, binary string, args []string, logDir string, pidDir string,
wg *sync.WaitGroup, logger logger.Logger) error {
cmd := exec.CommandContext(ctx, binary, args...)

// output to binary.
logFile := path.Join(logDir, "log")
outputFile, err := os.Create(logFile)
if err != nil {
return err
}

outputFileWriter := bufio.NewWriter(outputFile)
cmd.Stdout = outputFileWriter
cmd.Stderr = outputFileWriter

logger.V(3).Infof("run binary '%s' with args: '%v', log: '%s', pid: '%s'", binary, args, logDir, pidDir)

if err := cmd.Start(); err != nil {
return err
}

pidFile := path.Join(pidDir, "pid")
f, err := os.Create(pidFile)
if err != nil {
return err
}

_, err = f.Write([]byte(strconv.Itoa(cmd.Process.Pid)))
if err != nil {
return err
}

go func() {
defer wg.Done()
wg.Add(1)
if err := cmd.Wait(); err != nil {
// Caught signal kill and interrupt error then ignore.
if exit, ok := err.(*exec.ExitError); ok {
if status, ok := exit.Sys().(syscall.WaitStatus); ok {
if status.Signaled() &&
(status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) {
return
}
}
}
logger.Errorf("binary '%s' exited with error: %v", binary, err)
}
}()

return nil
}
Loading
Loading