diff --git a/cicd/sync/Vagrantfile b/cicd/sync/Vagrantfile new file mode 100644 index 000000000..d097a9fc2 --- /dev/null +++ b/cicd/sync/Vagrantfile @@ -0,0 +1,48 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +loxilbs = (ENV['LOXILBS'] || "2").to_i +eps = (ENV['LOXILBS'] || "1").to_i +box_name = (ENV['VAGRANT_BOX'] || "sysnet4admin/Ubuntu-k8s") +box_version = "0.7.1" +Vagrant.configure("2") do |config| + config.vm.box = "#{box_name}" + config.vm.box_version = "#{box_version}" + + (1..loxilbs).each do |node_number| + config.vm.define "llb#{node_number}" do |loxilb| + loxilb.vm.hostname = "llb#{node_number}" + ip = node_number + 10 + loxilb.vm.network :private_network, ip: "192.168.80.#{ip}", :netmask => "255.255.255.0" + loxilb.vm.network :private_network, ip: "192.168.90.#{ip}", :netmask => "255.255.255.0" + loxilb.vm.provision :shell, :path => "loxilb.sh" + loxilb.vm.provider :virtualbox do |vbox| + vbox.customize ["modifyvm", :id, "--memory", 6000] + vbox.customize ["modifyvm", :id, "--cpus", 4] + end + end + end + + config.vm.define "client" do |client| + client.vm.hostname = 'client' + client.vm.network :private_network, ip: "192.168.80.100", :netmask => "255.255.255.0" + client.vm.provision :shell, :path => "client.sh" + client.vm.provider :virtualbox do |vbox| + vbox.customize ["modifyvm", :id, "--memory", 6000] + vbox.customize ["modifyvm", :id, "--cpus", 4] + end + end + + (1..eps).each do |node_number| + config.vm.define "ep#{node_number}" do |ep| + ep.vm.hostname = "ep#{node_number}" + ip = node_number + 100 + ep.vm.network :private_network, ip: "192.168.90.#{ip}", :netmask => "255.255.255.0" + ep.vm.provision :shell, :path => "ep.sh" + ep.vm.provider :virtualbox do |vbox| + vbox.customize ["modifyvm", :id, "--memory", 6000] + vbox.customize ["modifyvm", :id, "--cpus", 4] + end + end + end +end diff --git a/cicd/sync/client.sh b/cicd/sync/client.sh new file mode 100644 index 000000000..8eea07ef0 --- /dev/null +++ b/cicd/sync/client.sh @@ -0,0 +1,3 @@ +sudo apt -y install autoconf automake libtool bison flex gcc ncurses-dev +git clone https://github.com/satori-com/tcpkali.git +sudo ip route add 20.20.20.1 via 192.168.80.11 diff --git a/cicd/sync/config.sh b/cicd/sync/config.sh new file mode 100755 index 000000000..6b8ee48ef --- /dev/null +++ b/cicd/sync/config.sh @@ -0,0 +1,3 @@ +#!/bin/bash +vagrant global-status | grep -i virtualbox | cut -f 1 -d ' ' | xargs -L 1 vagrant destroy -f +vagrant up diff --git a/cicd/sync/ep.sh b/cicd/sync/ep.sh new file mode 100644 index 000000000..e838b8cc4 --- /dev/null +++ b/cicd/sync/ep.sh @@ -0,0 +1,4 @@ +sudo ip route add 20.20.20.1 via 192.168.90.11 +sudo ip route add 192.168.80.0/24 via 192.168.90.11 +sudo apt -y install autoconf automake libtool bison flex gcc ncurses-dev +git clone https://github.com/satori-com/tcpkali.git diff --git a/cicd/sync/loxilb.sh b/cicd/sync/loxilb.sh new file mode 100644 index 000000000..02b7d1501 --- /dev/null +++ b/cicd/sync/loxilb.sh @@ -0,0 +1,17 @@ +export LOXILB_IP=$(ip a |grep global | grep -v '10.0.2.15' | grep -v '192.168.80' | awk '{print $2}' | cut -f1 -d '/') + +apt-get update +apt-get install -y software-properties-common +curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - +add-apt-repository -y "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" +apt-get install -y docker-ce +docker run -u root --cap-add SYS_ADMIN --restart unless-stopped --privileged --entrypoint /bin/bash -dit -v /dev/log:/dev/log --net=host --name loxilb ghcr.io/loxilb-io/loxilb:latest + +docker exec -it loxilb apt update +docker exec -it loxilb apt -y install clang-10 llvm libelf-dev gcc-multilib libpcap-dev linux-tools-$(uname -r) elfutils dwarves git libbsd-dev bridge-utils unzip build-essential bison flex iperf iproute2 nodejs socat ethtool +docker exec -it loxilb git clone https://github.com/loxilb-io/loxilb.git --recurse-submodules + +echo alias loxicmd=\"sudo docker exec -it loxilb loxicmd\" >> ~/.bashrc +echo alias loxilb=\"sudo docker exec -it loxilb \" >> ~/.bashrc + +echo $LOXILB_IP > /vagrant/loxilb-$(hostname) diff --git a/cicd/sync/rmconfig.sh b/cicd/sync/rmconfig.sh new file mode 100755 index 000000000..3fe1be339 --- /dev/null +++ b/cicd/sync/rmconfig.sh @@ -0,0 +1,5 @@ +#!/bin/bash +vagrant destroy -f ep1 +vagrant destroy -f client +vagrant destroy -f llb1 +vagrant destroy -f llb2 diff --git a/cicd/sync/validation.sh b/cicd/sync/validation.sh new file mode 100755 index 000000000..a8fc56fef --- /dev/null +++ b/cicd/sync/validation.sh @@ -0,0 +1,49 @@ +#!/bin/bash +source ../common.sh +echo k3s-cilium-cluster + +if [ "$1" ]; then + KUBECONFIG="$1" +fi + +# Set space as the delimiter +IFS=' ' + +sleep 45 +extIP="123.123.123.1" +echo $extIP + +echo "Service Info" +vagrant ssh master -c 'sudo kubectl get svc' +echo "LB Info" +vagrant ssh loxilb -c 'sudo docker exec -i loxilb loxicmd get lb -o wide' +echo "EP Info" +vagrant ssh loxilb -c 'sudo docker exec -i loxilb loxicmd get ep -o wide' + +print_debug_info() { + echo "llb1 route-info" + vagrant ssh loxilb -c 'ip route' + vagrant ssh master -c 'sudo kubectl get pods -A' + vagrant ssh master -c 'sudo kubectl get svc' + vagrant ssh master -c 'sudo kubectl get nodes' +} + +out=$(curl -s --connect-timeout 10 http://$extIP:55002) +if [[ ${out} == *"Welcome to nginx"* ]]; then + echo "k3s-cilium-cluster (kube-loxilb) tcp [OK]" +else + echo "k3s-cilium-cluster (kube-loxilb) tcp [FAILED]" + print_debug_info + exit 1 +fi + +out=$(timeout 10 ../common/udp_client $extIP 55003) +if [[ ${out} == *"Client"* ]]; then + echo "k3s-cilium-cluster (kube-loxilb) udp [OK]" +else + echo "k3s-cilium-cluster (kube-loxilb) udp [FAILED]" + print_debug_info + exit 1 +fi + +exit diff --git a/loxinet/dpbroker.go b/loxinet/dpbroker.go index 7a46170b2..3524cc6c0 100644 --- a/loxinet/dpbroker.go +++ b/loxinet/dpbroker.go @@ -17,13 +17,8 @@ package loxinet import ( - "bufio" - "errors" "fmt" - "io" "net" - "net/http" - "net/rpc" "runtime/debug" "sync" "time" @@ -294,28 +289,40 @@ type NatDpWorkQ struct { // DpCtInfo - representation of a datapath conntrack information type DpCtInfo struct { - DIP net.IP - SIP net.IP - Dport uint16 - Sport uint16 - Proto string - CState string - CAct string - CI string - Packets uint64 - Bytes uint64 - Deleted int - PKey []byte - PVal []byte - LTs time.Time - NTs time.Time - XSync bool + DIP net.IP `json:"dip"` + SIP net.IP `json:"sip"` + Dport uint16 `json:"dport"` + Sport uint16 `json:"sport"` + Proto string `json:"proto"` + CState string `json:"cstate"` + CAct string `json:"cact"` + CI string `json:"ci"` + Packets uint64 `json:"packets"` + Bytes uint64 `json:"bytes"` + Deleted int `json:"deleted"` + PKey []byte `json:"pkey"` + PVal []byte `json:"pval"` + LTs time.Time `json:"lts"` + NTs time.Time `json:"nts"` + XSync bool `json:"xsync"` // LB Association Data - ServiceIP net.IP - ServProto string - L4ServPort uint16 - BlockNum uint16 + ServiceIP net.IP `json:"serviceip"` + ServProto string `json:"servproto"` + L4ServPort uint16 `json:"l4servproto"` + BlockNum uint16 `json:"blocknum"` +} + +const ( + RPCTypeNetRPC = iota + RPCTypeGRPC +) + +type RPCHookInterface interface { + RPCConnect(*DpPeer) int + RPCClose(*DpPeer) int + RPCReset(*DpPeer) int + RPCSend(*DpPeer, string, any) (int, error) } // XSync - Remote sync peer information @@ -323,6 +330,8 @@ type XSync struct { RemoteID int RPCState bool // For peer to peer RPC + RPCType int + RPCHooks RPCHookInterface } // UlClDpWorkQ - work queue entry for ul-cl filter related operation @@ -403,8 +412,9 @@ type DpHookInterface interface { // DpPeer - Remote DP Peer information type DpPeer struct { - Peer net.IP - Client *rpc.Client + Peer net.IP + //Client *rpc.Client + Client interface{} } // DpH - datapath context container @@ -419,55 +429,13 @@ type DpH struct { Remotes []XSync } -// dialHTTPPath connects to an HTTP RPC server -// at the specified network address and path. -// This is based on rpc package's DialHTTPPath but with added timeout -func dialHTTPPath(network, address, path string) (*rpc.Client, error) { - var connected = "200 Connected to Go RPC" - timeOut := 2 * time.Second - - conn, err := net.DialTimeout(network, address, timeOut) - if err != nil { - return nil, err - } - io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") - - // Require successful HTTP response - // before switching to RPC protocol. - resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) - if err == nil && resp.Status == connected { - return rpc.NewClient(conn), nil - } - if err == nil { - err = errors.New("unexpected HTTP response: " + resp.Status) - } - conn.Close() - return nil, &net.OpError{ - Op: "dial-http", - Net: network + " " + address, - Addr: nil, - Err: err, - } -} - // DpXsyncRPCReset - Routine to reset Sunc RPC Client connections func (dp *DpH) DpXsyncRPCReset() int { dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() for idx := range mh.dp.Peers { pe := &mh.dp.Peers[idx] - if pe.Client != nil { - pe.Client.Close() - pe.Client = nil - } - if pe.Client == nil { - cStr := fmt.Sprintf("%s:%d", pe.Peer.String(), XSyncPort) - pe.Client, _ = dialHTTPPath("tcp", cStr, rpc.DefaultRPCPath) - if pe.Client == nil { - return -1 - } - tk.LogIt(tk.LogInfo, "XSync RPC - %s :Reset\n", cStr) - } + dp.RPC.RPCHooks.RPCReset(pe) } return 0 } @@ -477,10 +445,7 @@ func (dp *DpH) DpXsyncInSync() bool { dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() - if len(dp.Remotes) >= len(mh.has.NodeMap) { - return true - } - return false + return len(dp.Remotes) >= len(mh.has.NodeMap) } // WaitXsyncReady - Routine to wait till it ready for syncing the peer entity @@ -500,8 +465,9 @@ func (dp *DpH) WaitXsyncReady(who string) { // DpXsyncRPC - Routine for syncing connection information with peers func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { - var reply int - timeout := 2 * time.Second + var reply,ret int + var err error + dp.SyncMtx.Lock() defer dp.SyncMtx.Unlock() @@ -525,15 +491,11 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { restartRPC: pe := &mh.dp.Peers[idx] if pe.Client == nil { - cStr := fmt.Sprintf("%s:%d", pe.Peer.String(), XSyncPort) - var err error - pe.Client, err = dialHTTPPath("tcp", cStr, rpc.DefaultRPCPath) - if pe.Client == nil { - tk.LogIt(tk.LogInfo, "XSync RPC - %s :Fail(%s)\n", cStr, err) + ret = dp.RPC.RPCHooks.RPCConnect(pe) + if ret != 0 { rpcErr = true continue } - tk.LogIt(tk.LogInfo, "XSync RPC - %s :Connected\n", cStr) } reply = 0 @@ -556,7 +518,6 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { return -1 } - var call *rpc.Call if op == DpSyncAdd || op == DpSyncDelete || op == DpSyncBcast { if op != DpSyncBcast { if cti == nil && len(blkCti) <= 0 { @@ -576,34 +537,27 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { } } if cti != nil { - call = pe.Client.Go(rpcCallStr, *cti, &reply, make(chan *rpc.Call, 1)) + reply, err = dp.RPC.RPCHooks.RPCSend(pe, rpcCallStr, *cti) } else { - call = pe.Client.Go(rpcCallStr, blkCti, &reply, make(chan *rpc.Call, 1)) + reply, err = dp.RPC.RPCHooks.RPCSend(pe, rpcCallStr, blkCti) } } else { async := 1 - call = pe.Client.Go(rpcCallStr, async, &reply, make(chan *rpc.Call, 1)) + reply, err = dp.RPC.RPCHooks.RPCSend(pe, rpcCallStr, int32(async)) } - select { - case <-time.After(timeout): - tk.LogIt(tk.LogError, "rpc call timeout(%v)\n", timeout) - if pe.Client != nil { - pe.Client.Close() - } + + if err != nil { + tk.LogIt(tk.LogError, "XSync call failed(%s)\n", err) + rpcErr = true pe.Client = nil rpcRetries++ if rpcRetries < 2 { goto restartRPC } + } + if reply != 0 { + tk.LogIt(tk.LogError, "Xsync server returned error (%d)\n", reply) rpcErr = true - case resp := <-call.Done: - if resp != nil && resp.Error != nil { - tk.LogIt(tk.LogError, "rpc call failed(%s)\n", resp.Error) - rpcErr = true - } - if reply != 0 { - rpcErr = true - } } } @@ -614,7 +568,7 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { } // DpBrokerInit - initialize the DP broker subsystem -func DpBrokerInit(dph DpHookInterface) *DpH { +func DpBrokerInit(dph DpHookInterface, rpcMode int) *DpH { nDp := new(DpH) nDp.ToDpCh = make(chan interface{}, DpWorkQLen) @@ -623,108 +577,16 @@ func DpBrokerInit(dph DpHookInterface) *DpH { nDp.DpHooks = dph nDp.RPC = new(XSync) - go DpWorker(nDp, nDp.ToFinCh, nDp.ToDpCh) - - return nDp -} - -// DpWorkOnBlockCtAdd - Add block CT entries from remote -func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { - if !mh.ready { - return errors.New("Not-Ready") + nDp.RPC.RPCType = rpcMode + if (rpcMode == RPCTypeNetRPC) { + nDp.RPC.RPCHooks = &netRPCClient{} + } else { + nDp.RPC.RPCHooks = &gRPCClient{} } - *ret = 0 - for _, cti := range blockCtis { - - tk.LogIt(tk.LogDebug, "RPC - Block CT Add %s\n", cti.Key()) - r := mh.dp.DpHooks.DpCtAdd(&cti) - if r != 0 { - *ret = r - } - } - - return nil -} - -// DpWorkOnCtAdd - Add a CT entry from remote -func (xs *XSync) DpWorkOnCtAdd(cti DpCtInfo, ret *int) error { - if !mh.ready { - return errors.New("Not-Ready") - } - - if cti.Proto == "xsync" { - mh.dp.SyncMtx.Lock() - defer mh.dp.SyncMtx.Unlock() - - for idx := range mh.dp.Remotes { - r := &mh.dp.Remotes[idx] - if r.RemoteID == int(cti.Sport) { - r.RPCState = true - *ret = 0 - return nil - } - } - - r := XSync{RemoteID: int(cti.Sport), RPCState: true} - mh.dp.Remotes = append(mh.dp.Remotes, r) - - tk.LogIt(tk.LogDebug, "RPC - CT Xsync Remote-%v\n", cti.Sport) - - *ret = 0 - return nil - } - - tk.LogIt(tk.LogDebug, "RPC - CT Add %s\n", cti.Key()) - r := mh.dp.DpHooks.DpCtAdd(&cti) - *ret = r - return nil -} - -// DpWorkOnBlockCtDelete - Add block CT entries from remote -func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error { - if !mh.ready { - return errors.New("Not-Ready") - } - - *ret = 0 - for _, cti := range blockCtis { - - tk.LogIt(tk.LogDebug, "RPC - Block CT Del %s\n", cti.Key()) - r := mh.dp.DpHooks.DpCtDel(&cti) - if r != 0 { - *ret = r - } - } - - return nil -} - -// DpWorkOnCtDelete - Delete a CT entry from remote -func (xs *XSync) DpWorkOnCtDelete(cti DpCtInfo, ret *int) error { - if !mh.ready { - return errors.New("Not-Ready") - } - tk.LogIt(tk.LogDebug, "RPC - CT Del %s\n", cti.Key()) - r := mh.dp.DpHooks.DpCtDel(&cti) - *ret = r - return nil -} - -// DpWorkOnCtGet - Get all CT entries asynchronously -func (xs *XSync) DpWorkOnCtGet(async int, ret *int) error { - if !mh.ready { - return errors.New("Not-Ready") - } - - // Most likely need to reset reverse rpc channel - mh.dp.DpXsyncRPCReset() - - tk.LogIt(tk.LogDebug, "RPC - CT Get %d\n", async) - mh.dp.DpHooks.DpCtGetAsync() - *ret = 0 + go DpWorker(nDp, nDp.ToFinCh, nDp.ToDpCh) - return nil + return nDp } // DpWorkOnPort - routine to work on a port work queue request @@ -865,7 +727,7 @@ func (dp *DpH) DpWorkOnPeerOp(pWq *PeerDpWorkQ) DpRetT { pe := &dp.Peers[idx] if pe.Peer.Equal(pWq.PeerIP) { if pe.Client != nil { - pe.Client.Close() + dp.RPC.RPCHooks.RPCClose(pe) } dp.Peers = append(dp.Peers[:idx], dp.Peers[idx+1:]...) tk.LogIt(tk.LogInfo, "Deleted cluster-peer %s\n", pWq.PeerIP.String()) diff --git a/loxinet/dpebpf_linux.go b/loxinet/dpebpf_linux.go index ef521166b..bf7c3397a 100644 --- a/loxinet/dpebpf_linux.go +++ b/loxinet/dpebpf_linux.go @@ -92,7 +92,7 @@ const ( const ( DpEbpfLinuxTiVal = 10 ctiDeleteSyncRetries = 3 - blkCtiMaxLen = 4096 + blkCtiMaxLen = 8192 mapNotifierChLen = 8096 mapNotifierWorkers = 1 ) @@ -187,7 +187,6 @@ func dpEbpfTicker() { // Age any entries related to Conntrack /* No need to fetch all stats in this fashion */ //C.llb_collect_map_stats(C.int(C.LL_DP_CT_STATS_MAP)) - /* Per entry stats will be fetched in C.ll_ct_map_ent_has_aged */ C.llb_age_map_entries(C.LL_DP_CT_MAP) C.llb_age_map_entries(C.LL_DP_FCV4_MAP) @@ -454,7 +453,7 @@ func (e *DpEbpfH) DpPortPropMod(w *PortDpWorkQ) int { lRet := e.loadEbpfPgm(w.LoadEbpf) if lRet != 0 { tk.LogIt(tk.LogError, "ebpf load - %d error\n", w.PortNum) - return EbpfErrEbpfLoad + syscall.Exit(1) } } data := new(intfMapDat) @@ -1829,7 +1828,7 @@ func dpCTMapChkUpdates() { } } - if len(blkCti) > blkCtiMaxLen { + if len(blkCti) >= blkCtiMaxLen { tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") tc1 := time.Now() mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) @@ -1838,7 +1837,7 @@ func dpCTMapChkUpdates() { blkCti = nil } - if len(blkDelCti) > blkCtiMaxLen { + if len(blkDelCti) >= blkCtiMaxLen { tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n") mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti) blkDelCti = nil @@ -1927,7 +1926,8 @@ func (e *DpEbpfH) DpCtAdd(w *DpCtInfo) int { cte.XSync = false cte.NTs = time.Now() - cte.LTs = cti.NTs + //cte.LTs = cti.NTs + cte.LTs = time.Now() ret := C.llb_add_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0]), unsafe.Pointer(&cti.PVal[0])) if ret != 0 { diff --git a/loxinet/loxinet.go b/loxinet/loxinet.go index f17963d8a..0aeb187a9 100644 --- a/loxinet/loxinet.go +++ b/loxinet/loxinet.go @@ -25,11 +25,8 @@ import ( cmn "github.com/loxilb-io/loxilb/common" opts "github.com/loxilb-io/loxilb/options" tk "github.com/loxilb-io/loxilib" - "io" "net" - "net/http" _ "net/http/pprof" - "net/rpc" "os" "os/signal" "runtime/debug" @@ -80,42 +77,6 @@ type loxiNetH struct { pFile *os.File } -// LoxiXsyncMain - State Sync subsystem init -func LoxiXsyncMain() { - if opts.Opts.ClusterNodes == "none" { - return - } - - // Stack trace logger - defer func() { - if e := recover(); e != nil { - if mh.logger != nil { - tk.LogIt(tk.LogCritical, "%s: %s", e, debug.Stack()) - } - } - }() - - for { - rpcObj := new(XSync) - err := rpc.Register(rpcObj) - if err != nil { - panic("Failed to register rpc") - } - - rpc.HandleHTTP() - - http.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { - io.WriteString(res, "loxilb-xsync\n") - }) - - listener := fmt.Sprintf(":%d", XSyncPort) - err = http.ListenAndServe(listener, nil) - if err != nil { - panic("Failed to rpc-listen") - } - } -} - // NodeWalker - an implementation of node walker interface func (mh *loxiNetH) NodeWalker(b string) { tk.LogIt(tk.LogDebug, "%s\n", b) @@ -203,6 +164,8 @@ func loxiNetTicker(bgpPeerMode bool) { var mh loxiNetH func loxiNetInit() { + var rpcMode int + spawnKa, kaMode := KAString2Mode(opts.Opts.Ka) clusterMode := false if opts.Opts.ClusterNodes != "none" { @@ -251,11 +214,16 @@ func loxiNetInit() { return } } + if opts.Opts.Rpc == "netrpc" { + rpcMode = RPCTypeNetRPC + } else { + rpcMode = RPCTypeGRPC + } if !opts.Opts.BgpPeerMode { // Initialize the ebpf datapath subsystem mh.dpEbpf = DpEbpfInit(clusterMode, mh.self, mh.rssEn, mh.eHooks, -1) - mh.dp = DpBrokerInit(mh.dpEbpf) + mh.dp = DpBrokerInit(mh.dpEbpf, rpcMode) // Initialize the security zone subsystem mh.zn = ZoneInit() diff --git a/loxinet/xsync.pb.go b/loxinet/xsync.pb.go new file mode 100644 index 000000000..5e0e20825 --- /dev/null +++ b/loxinet/xsync.pb.go @@ -0,0 +1,588 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.6.1 +// source: loxinet/xsync.proto + +package loxinet + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type XSyncReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Response int32 `protobuf:"varint,1,opt,name=response,proto3" json:"response,omitempty"` +} + +func (x *XSyncReply) Reset() { + *x = XSyncReply{} + if protoimpl.UnsafeEnabled { + mi := &file_loxinet_xsync_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *XSyncReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*XSyncReply) ProtoMessage() {} + +func (x *XSyncReply) ProtoReflect() protoreflect.Message { + mi := &file_loxinet_xsync_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use XSyncReply.ProtoReflect.Descriptor instead. +func (*XSyncReply) Descriptor() ([]byte, []int) { + return file_loxinet_xsync_proto_rawDescGZIP(), []int{0} +} + +func (x *XSyncReply) GetResponse() int32 { + if x != nil { + return x.Response + } + return 0 +} + +type ConnGet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Async int32 `protobuf:"varint,1,opt,name=async,proto3" json:"async,omitempty"` +} + +func (x *ConnGet) Reset() { + *x = ConnGet{} + if protoimpl.UnsafeEnabled { + mi := &file_loxinet_xsync_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ConnGet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConnGet) ProtoMessage() {} + +func (x *ConnGet) ProtoReflect() protoreflect.Message { + mi := &file_loxinet_xsync_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConnGet.ProtoReflect.Descriptor instead. +func (*ConnGet) Descriptor() ([]byte, []int) { + return file_loxinet_xsync_proto_rawDescGZIP(), []int{1} +} + +func (x *ConnGet) GetAsync() int32 { + if x != nil { + return x.Async + } + return 0 +} + +type CtInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Dip []byte `protobuf:"bytes,1,opt,name=dip,proto3" json:"dip,omitempty"` + Sip []byte `protobuf:"bytes,2,opt,name=sip,proto3" json:"sip,omitempty"` + Dport int32 `protobuf:"varint,3,opt,name=dport,proto3" json:"dport,omitempty"` + Sport int32 `protobuf:"varint,4,opt,name=sport,proto3" json:"sport,omitempty"` + Proto string `protobuf:"bytes,5,opt,name=proto,proto3" json:"proto,omitempty"` + Cstate string `protobuf:"bytes,6,opt,name=cstate,proto3" json:"cstate,omitempty"` + Cact string `protobuf:"bytes,7,opt,name=cact,proto3" json:"cact,omitempty"` + Ci string `protobuf:"bytes,8,opt,name=ci,proto3" json:"ci,omitempty"` + Packets int64 `protobuf:"varint,9,opt,name=packets,proto3" json:"packets,omitempty"` + Bytes int64 `protobuf:"varint,10,opt,name=bytes,proto3" json:"bytes,omitempty"` + Deleted int32 `protobuf:"varint,11,opt,name=deleted,proto3" json:"deleted,omitempty"` + Pkey []byte `protobuf:"bytes,12,opt,name=pkey,proto3" json:"pkey,omitempty"` + Pval []byte `protobuf:"bytes,13,opt,name=pval,proto3" json:"pval,omitempty"` + Xsync bool `protobuf:"varint,14,opt,name=xsync,proto3" json:"xsync,omitempty"` + Serviceip []byte `protobuf:"bytes,15,opt,name=serviceip,proto3" json:"serviceip,omitempty"` + Servproto string `protobuf:"bytes,16,opt,name=servproto,proto3" json:"servproto,omitempty"` + L4Servport int32 `protobuf:"varint,17,opt,name=l4servport,proto3" json:"l4servport,omitempty"` + Blocknum int32 `protobuf:"varint,18,opt,name=blocknum,proto3" json:"blocknum,omitempty"` +} + +func (x *CtInfo) Reset() { + *x = CtInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_loxinet_xsync_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CtInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CtInfo) ProtoMessage() {} + +func (x *CtInfo) ProtoReflect() protoreflect.Message { + mi := &file_loxinet_xsync_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CtInfo.ProtoReflect.Descriptor instead. +func (*CtInfo) Descriptor() ([]byte, []int) { + return file_loxinet_xsync_proto_rawDescGZIP(), []int{2} +} + +func (x *CtInfo) GetDip() []byte { + if x != nil { + return x.Dip + } + return nil +} + +func (x *CtInfo) GetSip() []byte { + if x != nil { + return x.Sip + } + return nil +} + +func (x *CtInfo) GetDport() int32 { + if x != nil { + return x.Dport + } + return 0 +} + +func (x *CtInfo) GetSport() int32 { + if x != nil { + return x.Sport + } + return 0 +} + +func (x *CtInfo) GetProto() string { + if x != nil { + return x.Proto + } + return "" +} + +func (x *CtInfo) GetCstate() string { + if x != nil { + return x.Cstate + } + return "" +} + +func (x *CtInfo) GetCact() string { + if x != nil { + return x.Cact + } + return "" +} + +func (x *CtInfo) GetCi() string { + if x != nil { + return x.Ci + } + return "" +} + +func (x *CtInfo) GetPackets() int64 { + if x != nil { + return x.Packets + } + return 0 +} + +func (x *CtInfo) GetBytes() int64 { + if x != nil { + return x.Bytes + } + return 0 +} + +func (x *CtInfo) GetDeleted() int32 { + if x != nil { + return x.Deleted + } + return 0 +} + +func (x *CtInfo) GetPkey() []byte { + if x != nil { + return x.Pkey + } + return nil +} + +func (x *CtInfo) GetPval() []byte { + if x != nil { + return x.Pval + } + return nil +} + +func (x *CtInfo) GetXsync() bool { + if x != nil { + return x.Xsync + } + return false +} + +func (x *CtInfo) GetServiceip() []byte { + if x != nil { + return x.Serviceip + } + return nil +} + +func (x *CtInfo) GetServproto() string { + if x != nil { + return x.Servproto + } + return "" +} + +func (x *CtInfo) GetL4Servport() int32 { + if x != nil { + return x.L4Servport + } + return 0 +} + +func (x *CtInfo) GetBlocknum() int32 { + if x != nil { + return x.Blocknum + } + return 0 +} + +type CtInfoMod struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Add bool `protobuf:"varint,1,opt,name=add,proto3" json:"add,omitempty"` + Ct *CtInfo `protobuf:"bytes,2,opt,name=ct,proto3" json:"ct,omitempty"` +} + +func (x *CtInfoMod) Reset() { + *x = CtInfoMod{} + if protoimpl.UnsafeEnabled { + mi := &file_loxinet_xsync_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CtInfoMod) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CtInfoMod) ProtoMessage() {} + +func (x *CtInfoMod) ProtoReflect() protoreflect.Message { + mi := &file_loxinet_xsync_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CtInfoMod.ProtoReflect.Descriptor instead. +func (*CtInfoMod) Descriptor() ([]byte, []int) { + return file_loxinet_xsync_proto_rawDescGZIP(), []int{3} +} + +func (x *CtInfoMod) GetAdd() bool { + if x != nil { + return x.Add + } + return false +} + +func (x *CtInfoMod) GetCt() *CtInfo { + if x != nil { + return x.Ct + } + return nil +} + +type BlockCtInfoMod struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Add bool `protobuf:"varint,1,opt,name=add,proto3" json:"add,omitempty"` + Ct []*CtInfo `protobuf:"bytes,2,rep,name=ct,proto3" json:"ct,omitempty"` +} + +func (x *BlockCtInfoMod) Reset() { + *x = BlockCtInfoMod{} + if protoimpl.UnsafeEnabled { + mi := &file_loxinet_xsync_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BlockCtInfoMod) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlockCtInfoMod) ProtoMessage() {} + +func (x *BlockCtInfoMod) ProtoReflect() protoreflect.Message { + mi := &file_loxinet_xsync_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlockCtInfoMod.ProtoReflect.Descriptor instead. +func (*BlockCtInfoMod) Descriptor() ([]byte, []int) { + return file_loxinet_xsync_proto_rawDescGZIP(), []int{4} +} + +func (x *BlockCtInfoMod) GetAdd() bool { + if x != nil { + return x.Add + } + return false +} + +func (x *BlockCtInfoMod) GetCt() []*CtInfo { + if x != nil { + return x.Ct + } + return nil +} + +var File_loxinet_xsync_proto protoreflect.FileDescriptor + +var file_loxinet_xsync_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x6c, 0x6f, 0x78, 0x69, 0x6e, 0x65, 0x74, 0x2f, 0x78, 0x73, 0x79, 0x6e, 0x63, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x28, 0x0a, 0x0a, 0x58, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, + 0x70, 0x6c, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x1f, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x47, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x73, + 0x79, 0x6e, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x61, 0x73, 0x79, 0x6e, 0x63, + 0x22, 0xaa, 0x03, 0x0a, 0x06, 0x43, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x64, + 0x69, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x64, 0x69, 0x70, 0x12, 0x10, 0x0a, + 0x03, 0x73, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x73, 0x69, 0x70, 0x12, + 0x14, 0x0a, 0x05, 0x64, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, + 0x64, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x63, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x61, 0x63, + 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x61, 0x63, 0x74, 0x12, 0x0e, 0x0a, + 0x02, 0x63, 0x69, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x63, 0x69, 0x12, 0x18, 0x0a, + 0x07, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, + 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x79, 0x74, 0x65, 0x73, + 0x18, 0x0a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x62, 0x79, 0x74, 0x65, 0x73, 0x12, 0x18, 0x0a, + 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, + 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6b, 0x65, 0x79, 0x18, + 0x0c, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x6b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, + 0x76, 0x61, 0x6c, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x76, 0x61, 0x6c, 0x12, + 0x14, 0x0a, 0x05, 0x78, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, + 0x78, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x69, 0x70, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x69, 0x70, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x18, 0x10, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x72, 0x76, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x1e, 0x0a, 0x0a, 0x6c, 0x34, 0x73, 0x65, 0x72, 0x76, 0x70, 0x6f, 0x72, 0x74, 0x18, + 0x11, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6c, 0x34, 0x73, 0x65, 0x72, 0x76, 0x70, 0x6f, 0x72, + 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x6e, 0x75, 0x6d, 0x18, 0x12, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x08, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x6e, 0x75, 0x6d, 0x22, 0x36, 0x0a, + 0x09, 0x43, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x4d, 0x6f, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x64, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x61, 0x64, 0x64, 0x12, 0x17, 0x0a, 0x02, + 0x63, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x43, 0x74, 0x49, 0x6e, 0x66, + 0x6f, 0x52, 0x02, 0x63, 0x74, 0x22, 0x3b, 0x0a, 0x0e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x43, 0x74, + 0x49, 0x6e, 0x66, 0x6f, 0x4d, 0x6f, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x64, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x61, 0x64, 0x64, 0x12, 0x17, 0x0a, 0x02, 0x63, 0x74, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x43, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x02, + 0x63, 0x74, 0x32, 0x9f, 0x01, 0x0a, 0x05, 0x58, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x2c, 0x0a, 0x11, + 0x44, 0x70, 0x57, 0x6f, 0x72, 0x6b, 0x4f, 0x6e, 0x43, 0x74, 0x47, 0x65, 0x74, 0x47, 0x52, 0x50, + 0x43, 0x12, 0x08, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x47, 0x65, 0x74, 0x1a, 0x0b, 0x2e, 0x58, 0x53, + 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x11, 0x44, 0x70, + 0x57, 0x6f, 0x72, 0x6b, 0x4f, 0x6e, 0x43, 0x74, 0x4d, 0x6f, 0x64, 0x47, 0x52, 0x50, 0x43, 0x12, + 0x0a, 0x2e, 0x43, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x4d, 0x6f, 0x64, 0x1a, 0x0b, 0x2e, 0x58, 0x53, + 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x16, 0x44, 0x70, + 0x57, 0x6f, 0x72, 0x6b, 0x4f, 0x6e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x43, 0x74, 0x4d, 0x6f, 0x64, + 0x47, 0x52, 0x50, 0x43, 0x12, 0x0f, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x43, 0x74, 0x49, 0x6e, + 0x66, 0x6f, 0x4d, 0x6f, 0x64, 0x1a, 0x0b, 0x2e, 0x58, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6c, 0x6f, 0x78, 0x69, 0x6e, 0x65, + 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_loxinet_xsync_proto_rawDescOnce sync.Once + file_loxinet_xsync_proto_rawDescData = file_loxinet_xsync_proto_rawDesc +) + +func file_loxinet_xsync_proto_rawDescGZIP() []byte { + file_loxinet_xsync_proto_rawDescOnce.Do(func() { + file_loxinet_xsync_proto_rawDescData = protoimpl.X.CompressGZIP(file_loxinet_xsync_proto_rawDescData) + }) + return file_loxinet_xsync_proto_rawDescData +} + +var file_loxinet_xsync_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_loxinet_xsync_proto_goTypes = []interface{}{ + (*XSyncReply)(nil), // 0: XSyncReply + (*ConnGet)(nil), // 1: ConnGet + (*CtInfo)(nil), // 2: CtInfo + (*CtInfoMod)(nil), // 3: CtInfoMod + (*BlockCtInfoMod)(nil), // 4: BlockCtInfoMod +} +var file_loxinet_xsync_proto_depIdxs = []int32{ + 2, // 0: CtInfoMod.ct:type_name -> CtInfo + 2, // 1: BlockCtInfoMod.ct:type_name -> CtInfo + 1, // 2: XSync.DpWorkOnCtGetGRPC:input_type -> ConnGet + 3, // 3: XSync.DpWorkOnCtModGRPC:input_type -> CtInfoMod + 4, // 4: XSync.DpWorkOnBlockCtModGRPC:input_type -> BlockCtInfoMod + 0, // 5: XSync.DpWorkOnCtGetGRPC:output_type -> XSyncReply + 0, // 6: XSync.DpWorkOnCtModGRPC:output_type -> XSyncReply + 0, // 7: XSync.DpWorkOnBlockCtModGRPC:output_type -> XSyncReply + 5, // [5:8] is the sub-list for method output_type + 2, // [2:5] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_loxinet_xsync_proto_init() } +func file_loxinet_xsync_proto_init() { + if File_loxinet_xsync_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_loxinet_xsync_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*XSyncReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_loxinet_xsync_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ConnGet); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_loxinet_xsync_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CtInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_loxinet_xsync_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CtInfoMod); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_loxinet_xsync_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BlockCtInfoMod); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_loxinet_xsync_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_loxinet_xsync_proto_goTypes, + DependencyIndexes: file_loxinet_xsync_proto_depIdxs, + MessageInfos: file_loxinet_xsync_proto_msgTypes, + }.Build() + File_loxinet_xsync_proto = out.File + file_loxinet_xsync_proto_rawDesc = nil + file_loxinet_xsync_proto_goTypes = nil + file_loxinet_xsync_proto_depIdxs = nil +} diff --git a/loxinet/xsync.proto b/loxinet/xsync.proto new file mode 100644 index 000000000..a92f06e48 --- /dev/null +++ b/loxinet/xsync.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; +option go_package = "./loxinet"; + +message XSyncReply { + int32 response = 1; +} + +message ConnGet { + int32 async = 1; +} + +message CtInfo { + bytes dip = 1; + bytes sip = 2; + int32 dport = 3; + int32 sport = 4; + string proto = 5; + string cstate = 6; + string cact = 7; + string ci = 8; + int64 packets = 9; + int64 bytes = 10; + int32 deleted = 11; + bytes pkey = 12; + bytes pval = 13; + bool xsync = 14; + bytes serviceip = 15; + string servproto = 16; + int32 l4servport = 17; + int32 blocknum = 18; +} + +message CtInfoMod { + bool add = 1; + CtInfo ct = 2; +} + +message BlockCtInfoMod { + bool add = 1; + repeated CtInfo ct = 2; +} + +// The xsync service definition. +service XSync { + rpc DpWorkOnCtGetGRPC (ConnGet) returns (XSyncReply) {} + rpc DpWorkOnCtModGRPC (CtInfoMod) returns (XSyncReply) {} + rpc DpWorkOnBlockCtModGRPC (BlockCtInfoMod) returns (XSyncReply) {} + } + \ No newline at end of file diff --git a/loxinet/xsync_client.go b/loxinet/xsync_client.go new file mode 100644 index 000000000..32d7cc8fd --- /dev/null +++ b/loxinet/xsync_client.go @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2022 NetLOX Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loxinet + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/rpc" + "time" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + tk "github.com/loxilb-io/loxilib" +) + +type netRPCClient struct { + client *rpc.Client +} + +type gRPCClient struct { + conn *grpc.ClientConn + xclient XSyncClient +} + +// dialHTTPPath connects to an HTTP RPC server +// at the specified network address and path. +// This is based on rpc package's DialHTTPPath but with added timeout +func dialHTTPPath(network, address, path string) (*rpc.Client, error) { + var connected = "200 Connected to Go RPC" + timeOut := 2 * time.Second + + conn, err := net.DialTimeout(network, address, timeOut) + if err != nil { + return nil, err + } + io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") + + // Require successful HTTP response + // before switching to RPC protocol. + resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) + if err == nil && resp.Status == connected { + return rpc.NewClient(conn), nil + } + if err == nil { + err = errors.New("unexpected HTTP response: " + resp.Status) + } + conn.Close() + return nil, &net.OpError{ + Op: "dial-http", + Net: network + " " + address, + Addr: nil, + Err: err, + } +} + +func netRPCConnect(pe *DpPeer) int { + cStr := fmt.Sprintf("%s:%d", pe.Peer.String(), XSyncPort) + client, err := dialHTTPPath("tcp", cStr, rpc.DefaultRPCPath) + if client == nil || err != nil { + tk.LogIt(tk.LogInfo, "XSync netRPC Connect - %s :Fail(%s)\n", cStr, err) + pe.Client = nil + return -1 + } + pe.Client = client + tk.LogIt(tk.LogInfo, "XSync netRPC - %s :Connected\n", cStr) + return 0 +} + +func (*netRPCClient) RPCConnect(pe *DpPeer) int { + return netRPCConnect(pe) +} + +func (*netRPCClient) RPCReset(pe *DpPeer) int { + cStr := fmt.Sprintf("%s:%d", pe.Peer.String(), XSyncPort) + client, ok := pe.Client.(*rpc.Client) + if ok && client != nil { + client.Close() + pe.Client = nil + } + if pe.Client == nil { + tk.LogIt(tk.LogInfo, "XSync netRPC - %s :Reset\n", cStr) + return netRPCConnect(pe) + } + return 0 +} + +func (*netRPCClient) RPCClose(pe *DpPeer) int { + if pe.Client != nil { + pe.Client.(*rpc.Client).Close() + } + pe.Client = nil + return 0 +} + +func (*netRPCClient) RPCSend(pe *DpPeer, rpcCallStr string, args any) (int, error) { + var reply int + client, _ := pe.Client.(*rpc.Client) + timeout := 2 * time.Second + call := client.Go(rpcCallStr, args, &reply, make(chan *rpc.Call, 1)) + select { + case <-time.After(timeout): + tk.LogIt(tk.LogError, "netRPC call timeout(%v)\n", timeout) + if pe.Client != nil { + pe.Client.(*rpc.Client).Close() + } + pe.Client = nil + + return reply, errors.New("netrpc call timeout") + case resp := <-call.Done: + if resp != nil && resp.Error != nil { + tk.LogIt(tk.LogError, "netRPC send failed(%s)\n", resp.Error) + return reply, resp.Error + } + } + return reply, nil +} + +func gRPCConnect(pe *DpPeer) int { + var err error + var opts []grpc.DialOption + var cinfo gRPCClient + cStr := fmt.Sprintf("%s:%d", pe.Peer.String(), XSyncPort) + + timeOut := 2 * time.Second + + _, err = net.DialTimeout("tcp", cStr, timeOut) + if err != nil { + tk.LogIt(tk.LogInfo, "Failed to dial xsync pair(%s): %v\n", cStr, err) + return -1 + } + + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + cinfo.conn, err = grpc.Dial(cStr, opts...) + + if cinfo.conn == nil || err != nil { + tk.LogIt(tk.LogInfo, "Failed to dial xsync gRPC pair: %v\n", err) + return -1 + } + + cinfo.xclient = NewXSyncClient(cinfo.conn) + pe.Client = cinfo + tk.LogIt(tk.LogInfo, "XSync gRPC - %s :Connected\n", cStr) + return 0 +} + +func (*gRPCClient) RPCConnect(pe *DpPeer) int { + return gRPCConnect(pe) +} + +func (*gRPCClient) RPCReset(pe *DpPeer) int { + cStr := fmt.Sprintf("%s:%d", pe.Peer.String(), XSyncPort) + client, ok := pe.Client.(gRPCClient) + if ok { + client.conn.Close() + pe.Client = nil + } + + if pe.Client == nil { + tk.LogIt(tk.LogInfo, "XSync gRPC - %s :Reset\n", cStr) + return gRPCConnect(pe) + } + return 0 +} + +func (*gRPCClient) RPCClose(pe *DpPeer) int { + if pe.Client != nil { + pe.Client.(gRPCClient).conn.Close() + } + pe.Client = nil + return 0 +} + +func (ci *DpCtInfo) ConvertToCtInfo(c *CtInfo) { + c.Dip = ci.DIP + c.Sip = ci.SIP + c.Dport = int32(ci.Dport) + c.Sport = int32(ci.Sport) + c.Proto = ci.Proto + c.Cstate = ci.CState + c.Cact = ci.CAct + c.Ci = ci.CI + c.Packets = int64(ci.Packets) + c.Bytes = int64(ci.Bytes) + c.Deleted = int32(ci.Deleted) + c.Pkey = ci.PKey + c.Pval = ci.PVal + c.Xsync = ci.XSync + c.Serviceip = ci.ServiceIP + c.Servproto = ci.ServProto + c.L4Servport = int32(ci.L4ServPort) + c.Blocknum = int32(ci.BlockNum) +} + +func callGRPC(client XSyncClient, rpcCallStr string, args interface{}, reply *int) error { + var err error + var xreply *XSyncReply + var ctis []*CtInfo + var ct *CtInfo + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if (rpcCallStr == "XSync.DpWorkOnBlockCtAdd") || + (rpcCallStr == "XSync.DpWorkOnBlockCtDelete") { + blkCtis := args.([]DpCtInfo) + ctis = make([]*CtInfo, len(blkCtis)) + for i, c := range blkCtis { + ctis[i] = &CtInfo{} + c.ConvertToCtInfo(ctis[i]) + } + } else if (rpcCallStr == "XSync.DpWorkOnCtAdd") || + (rpcCallStr == "XSync.DpWorkOnCtDelete") { + c := args.(DpCtInfo) + ct = &CtInfo{} + c.ConvertToCtInfo(ct) + } + + if rpcCallStr == "XSync.DpWorkOnBlockCtAdd" { + xreply, err = client.DpWorkOnBlockCtModGRPC(ctx, &BlockCtInfoMod{Add: true, Ct: ctis}) + } else if rpcCallStr == "XSync.DpWorkOnBlockCtDelete" { + xreply, err = client.DpWorkOnBlockCtModGRPC(ctx, &BlockCtInfoMod{Add: false, Ct: ctis}) + } else if rpcCallStr == "XSync.DpWorkOnCtAdd" { + xreply, err = client.DpWorkOnCtModGRPC(ctx, &CtInfoMod{Add: true, Ct: ct}) + } else if rpcCallStr == "XSync.DpWorkOnCtDelete" { + xreply, err = client.DpWorkOnCtModGRPC(ctx, &CtInfoMod{Add: false, Ct: ct}) + } else if rpcCallStr == "XSync.DpWorkOnCtGet" { + xreply, err = client.DpWorkOnCtGetGRPC(ctx, &ConnGet{Async: args.(int32)}) + } + + if err != nil { + *reply = -1 + tk.LogIt(tk.LogError, "XSync %s reply - %v[NOK]\n", rpcCallStr, err.Error()) + } else if xreply != nil { + *reply = int(xreply.Response) + tk.LogIt(tk.LogDebug, "XSync %s peer reply - %d\n", rpcCallStr, *reply) + } + return err +} + +func (*gRPCClient) RPCSend(pe *DpPeer, rpcCallStr string, args any) (int, error) { + var reply int + err := callGRPC(pe.Client.(gRPCClient).xclient, rpcCallStr, args, &reply) + + return reply,err +} diff --git a/loxinet/xsync_grpc.pb.go b/loxinet/xsync_grpc.pb.go new file mode 100644 index 000000000..84c011e2b --- /dev/null +++ b/loxinet/xsync_grpc.pb.go @@ -0,0 +1,177 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.6.1 +// source: loxinet/xsync.proto + +package loxinet + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// XSyncClient is the client API for XSync service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type XSyncClient interface { + DpWorkOnCtGetGRPC(ctx context.Context, in *ConnGet, opts ...grpc.CallOption) (*XSyncReply, error) + DpWorkOnCtModGRPC(ctx context.Context, in *CtInfoMod, opts ...grpc.CallOption) (*XSyncReply, error) + DpWorkOnBlockCtModGRPC(ctx context.Context, in *BlockCtInfoMod, opts ...grpc.CallOption) (*XSyncReply, error) +} + +type xSyncClient struct { + cc grpc.ClientConnInterface +} + +func NewXSyncClient(cc grpc.ClientConnInterface) XSyncClient { + return &xSyncClient{cc} +} + +func (c *xSyncClient) DpWorkOnCtGetGRPC(ctx context.Context, in *ConnGet, opts ...grpc.CallOption) (*XSyncReply, error) { + out := new(XSyncReply) + err := c.cc.Invoke(ctx, "/XSync/DpWorkOnCtGetGRPC", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *xSyncClient) DpWorkOnCtModGRPC(ctx context.Context, in *CtInfoMod, opts ...grpc.CallOption) (*XSyncReply, error) { + out := new(XSyncReply) + err := c.cc.Invoke(ctx, "/XSync/DpWorkOnCtModGRPC", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *xSyncClient) DpWorkOnBlockCtModGRPC(ctx context.Context, in *BlockCtInfoMod, opts ...grpc.CallOption) (*XSyncReply, error) { + out := new(XSyncReply) + err := c.cc.Invoke(ctx, "/XSync/DpWorkOnBlockCtModGRPC", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// XSyncServer is the server API for XSync service. +// All implementations must embed UnimplementedXSyncServer +// for forward compatibility +type XSyncServer interface { + DpWorkOnCtGetGRPC(context.Context, *ConnGet) (*XSyncReply, error) + DpWorkOnCtModGRPC(context.Context, *CtInfoMod) (*XSyncReply, error) + DpWorkOnBlockCtModGRPC(context.Context, *BlockCtInfoMod) (*XSyncReply, error) + mustEmbedUnimplementedXSyncServer() +} + +// UnimplementedXSyncServer must be embedded to have forward compatible implementations. +type UnimplementedXSyncServer struct { +} + +func (UnimplementedXSyncServer) DpWorkOnCtGetGRPC(context.Context, *ConnGet) (*XSyncReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method DpWorkOnCtGetGRPC not implemented") +} +func (UnimplementedXSyncServer) DpWorkOnCtModGRPC(context.Context, *CtInfoMod) (*XSyncReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method DpWorkOnCtModGRPC not implemented") +} +func (UnimplementedXSyncServer) DpWorkOnBlockCtModGRPC(context.Context, *BlockCtInfoMod) (*XSyncReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method DpWorkOnBlockCtModGRPC not implemented") +} +func (UnimplementedXSyncServer) mustEmbedUnimplementedXSyncServer() {} + +// UnsafeXSyncServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to XSyncServer will +// result in compilation errors. +type UnsafeXSyncServer interface { + mustEmbedUnimplementedXSyncServer() +} + +func RegisterXSyncServer(s grpc.ServiceRegistrar, srv XSyncServer) { + s.RegisterService(&XSync_ServiceDesc, srv) +} + +func _XSync_DpWorkOnCtGetGRPC_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConnGet) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(XSyncServer).DpWorkOnCtGetGRPC(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/XSync/DpWorkOnCtGetGRPC", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(XSyncServer).DpWorkOnCtGetGRPC(ctx, req.(*ConnGet)) + } + return interceptor(ctx, in, info, handler) +} + +func _XSync_DpWorkOnCtModGRPC_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CtInfoMod) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(XSyncServer).DpWorkOnCtModGRPC(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/XSync/DpWorkOnCtModGRPC", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(XSyncServer).DpWorkOnCtModGRPC(ctx, req.(*CtInfoMod)) + } + return interceptor(ctx, in, info, handler) +} + +func _XSync_DpWorkOnBlockCtModGRPC_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BlockCtInfoMod) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(XSyncServer).DpWorkOnBlockCtModGRPC(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/XSync/DpWorkOnBlockCtModGRPC", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(XSyncServer).DpWorkOnBlockCtModGRPC(ctx, req.(*BlockCtInfoMod)) + } + return interceptor(ctx, in, info, handler) +} + +// XSync_ServiceDesc is the grpc.ServiceDesc for XSync service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var XSync_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "XSync", + HandlerType: (*XSyncServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "DpWorkOnCtGetGRPC", + Handler: _XSync_DpWorkOnCtGetGRPC_Handler, + }, + { + MethodName: "DpWorkOnCtModGRPC", + Handler: _XSync_DpWorkOnCtModGRPC_Handler, + }, + { + MethodName: "DpWorkOnBlockCtModGRPC", + Handler: _XSync_DpWorkOnBlockCtModGRPC_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "loxinet/xsync.proto", +} diff --git a/loxinet/xsync_server.go b/loxinet/xsync_server.go new file mode 100644 index 000000000..d71e607a6 --- /dev/null +++ b/loxinet/xsync_server.go @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2022 NetLOX Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loxinet + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/rpc" + "runtime/debug" + + "google.golang.org/grpc" + opts "github.com/loxilb-io/loxilb/options" + tk "github.com/loxilb-io/loxilib" +) + +// DpWorkOnBlockCtAdd - Add block CT entries from remote goRPC client +func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { + if !mh.ready { + return errors.New("Not-Ready") + } + + *ret = 0 + + for _, cti := range blockCtis { + + tk.LogIt(tk.LogDebug, "RPC - Block CT Add %s\n", cti.Key()) + r := mh.dp.DpHooks.DpCtAdd(&cti) + if r != 0 { + *ret = r + } + } + + return nil +} + +// DpWorkOnBlockCtDelete - Add block CT entries from remote +func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error { + if !mh.ready { + return errors.New("Not-Ready") + } + + *ret = 0 + + for _, cti := range blockCtis { + + tk.LogIt(tk.LogDebug, "RPC - Block CT Del %s\n", cti.Key()) + r := mh.dp.DpHooks.DpCtDel(&cti) + if r != 0 { + *ret = r + } + } + + return nil +} + +// DpWorkOnCtAdd - Add a CT entry from remote goRPC client +func (xs *XSync) DpWorkOnCtAdd(cti DpCtInfo, ret *int) error { + if !mh.ready { + *ret = -1 + tk.LogIt(tk.LogDebug, "RPC - CT Xsync Not-Ready") + return errors.New("Not-Ready") + } + + if cti.Proto == "xsync" { + mh.dp.SyncMtx.Lock() + defer mh.dp.SyncMtx.Unlock() + + for idx := range mh.dp.Remotes { + r := &mh.dp.Remotes[idx] + if r.RemoteID == int(cti.Sport) { + r.RPCState = true + *ret = 0 + tk.LogIt(tk.LogDebug, "RPC - CT Xsync Remote-%v Already present\n", cti.Sport) + return nil + } + } + + r := XSync{RemoteID: int(cti.Sport), RPCState: true} + mh.dp.Remotes = append(mh.dp.Remotes, r) + + tk.LogIt(tk.LogDebug, "RPC - CT Xsync Remote-%v\n", cti.Sport) + + *ret = 0 + return nil + } + + tk.LogIt(tk.LogDebug, "RPC - CT Add %s\n", cti.Key()) + + r := mh.dp.DpHooks.DpCtAdd(&cti) + *ret = r + return nil +} + +// DpWorkOnCtDelete - Delete a CT entry from remote goRPC client +func (xs *XSync) DpWorkOnCtDelete(cti DpCtInfo, ret *int) error { + if !mh.ready { + return errors.New("Not-Ready") + } + tk.LogIt(tk.LogDebug, "RPC - CT Del %s\n", cti.Key()) + r := mh.dp.DpHooks.DpCtDel(&cti) + *ret = r + return nil +} + +// DpWorkOnCtGet - Get all CT entries asynchronously goRPC client +func (xs *XSync) DpWorkOnCtGet(async int, ret *int) error { + if !mh.ready { + return errors.New("Not-Ready") + } + + // Most likely need to reset reverse rpc channel + mh.dp.DpXsyncRPCReset() + + tk.LogIt(tk.LogDebug, "RPC - CT Get %d\n", async) + mh.dp.DpHooks.DpCtGetAsync() + *ret = 0 + + return nil +} + +func (xs *XSync) DpWorkOnCtGetGRPC(ctx context.Context, m *ConnGet) (*XSyncReply, error) { + + var resp int + err := xs.DpWorkOnCtGet(int(m.Async), &resp) + + return &XSyncReply{Response: int32(resp)}, err +} + +func (ci *CtInfo) ConvertToDpCtInfo() DpCtInfo { + + cti := DpCtInfo{ + DIP: ci.Dip, SIP: ci.Sip, + Dport: uint16(ci.Dport), Sport: uint16(ci.Sport), + Proto: ci.Proto, CState: ci.Cstate, CAct: ci.Cact, CI: ci.Ci, + Packets: uint64(ci.Packets), Bytes: uint64(ci.Bytes), Deleted: int(ci.Deleted), + PKey: ci.Pkey, PVal: ci.Pval, + XSync: ci.Xsync, ServiceIP: ci.Serviceip, ServProto: ci.Servproto, + L4ServPort: uint16(ci.L4Servport), BlockNum: uint16(ci.Blocknum), + } + return cti +} + +func (xs *XSync) DpWorkOnBlockCtModGRPC(ctx context.Context, m *BlockCtInfoMod) (*XSyncReply, error) { + var ctis []DpCtInfo + var resp int + var err error + + for _, ci := range m.Ct { + cti := ci.ConvertToDpCtInfo() + ctis = append(ctis, cti) + } + if m.Add { + err = xs.DpWorkOnBlockCtAdd(ctis, &resp) + } else { + err = xs.DpWorkOnBlockCtDelete(ctis, &resp) + } + return &XSyncReply{Response: int32(resp)}, err +} + +func (xs *XSync) DpWorkOnCtModGRPC(ctx context.Context, m *CtInfoMod) (*XSyncReply, error) { + + var resp int + var err error + + ci := m.Ct + cti := ci.ConvertToDpCtInfo() + + if m.Add { + err = xs.DpWorkOnCtAdd(cti, &resp) + } else { + err = xs.DpWorkOnCtDelete(cti, &resp) + } + return &XSyncReply{Response: int32(resp)}, err +} + +func (xs *XSync) mustEmbedUnimplementedXSyncServer() {} + +func startxSyncGRPCServer() { + lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", XSyncPort)) + if err != nil { + tk.LogIt(tk.LogEmerg, "gRPC - Server Start Error\n") + return + } + grpcServer := grpc.NewServer() + s := XSync{} + RegisterXSyncServer(grpcServer, &s) + tk.LogIt(tk.LogNotice, "*******************gRPC - Server Started*****************\n") + grpcServer.Serve(lis) +} + +// LoxiXsyncMain - State Sync subsystem init +func LoxiXsyncMain(mode string) { + if opts.Opts.ClusterNodes == "none" { + return + } + + // Stack trace logger + defer func() { + if e := recover(); e != nil { + if mh.logger != nil { + tk.LogIt(tk.LogCritical, "%s: %s", e, debug.Stack()) + } + } + }() + if mode == "netrpc" { + for { + rpcObj := new(XSync) + err := rpc.Register(rpcObj) + if err != nil { + panic("Failed to register rpc") + } + + rpc.HandleHTTP() + + http.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { + io.WriteString(res, "loxilb-xsync\n") + }) + + listener := fmt.Sprintf(":%d", XSyncPort) + err = http.ListenAndServe(listener, nil) + if err != nil { + panic("Failed to rpc-listen") + } + } + } else { + go startxSyncGRPCServer() + } +} diff --git a/main.go b/main.go index dc6e33d28..709091794 100644 --- a/main.go +++ b/main.go @@ -43,7 +43,7 @@ func main() { os.Exit(0) } - go ln.LoxiXsyncMain() + go ln.LoxiXsyncMain(opts.Opts.Rpc) // Need some time for RPC Handler to be up time.Sleep(2 * time.Second) diff --git a/options/options.go b/options/options.go index 826587afe..875c4c345 100644 --- a/options/options.go +++ b/options/options.go @@ -27,4 +27,5 @@ var Opts struct { EgrHooks bool `long:"egr-hooks" description:"Enable eBPF egress hooks(experimental)"` BgpPeerMode bool `short:"r" long:"peer" description:"Run loxilb with goBGP only, no Datapath"` BlackList string `long:"blacklist" description:"Regex string of blacklisted ports" default:"none"` + Rpc string `long:"rpc" description:"RPC mode for syncing - netrpc or grpc" default:"netrpc"` }