diff --git a/loxilb-ebpf b/loxilb-ebpf index ea174aca9..82bc922d7 160000 --- a/loxilb-ebpf +++ b/loxilb-ebpf @@ -1 +1 @@ -Subproject commit ea174aca9b419252fb6461b2fe423c98e32a4246 +Subproject commit 82bc922d7f27921b3bc76b4aded103abfab043dd diff --git a/loxinet/dpbroker.go b/loxinet/dpbroker.go index 494a657a1..7a46170b2 100644 --- a/loxinet/dpbroker.go +++ b/loxinet/dpbroker.go @@ -545,7 +545,11 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { rpcCallStr = "XSync.DpWorkOnCtAdd" } } else if op == DpSyncDelete { - rpcCallStr = "XSync.DpWorkOnCtDelete" + if len(blkCti) > 0 { + rpcCallStr = "XSync.DpWorkOnBlockCtDelete" + } else { + rpcCallStr = "XSync.DpWorkOnCtDelete" + } } else if op == DpSyncGet { rpcCallStr = "XSync.DpWorkOnCtGet" } else { @@ -631,9 +635,6 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { } *ret = 0 - - mh.dp.DpHooks.DpGetLock() - for _, cti := range blockCtis { tk.LogIt(tk.LogDebug, "RPC - Block CT Add %s\n", cti.Key()) @@ -643,8 +644,6 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { } } - mh.dp.DpHooks.DpRelLock() - return nil } @@ -682,6 +681,25 @@ func (xs *XSync) DpWorkOnCtAdd(cti DpCtInfo, ret *int) error { return nil } +// DpWorkOnBlockCtDelete - Add block CT entries from remote +func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error { + if !mh.ready { + return errors.New("Not-Ready") + } + + *ret = 0 + for _, cti := range blockCtis { + + tk.LogIt(tk.LogDebug, "RPC - Block CT Del %s\n", cti.Key()) + r := mh.dp.DpHooks.DpCtDel(&cti) + if r != 0 { + *ret = r + } + } + + return nil +} + // DpWorkOnCtDelete - Delete a CT entry from remote func (xs *XSync) DpWorkOnCtDelete(cti DpCtInfo, ret *int) error { if !mh.ready { diff --git a/loxinet/dpebpf_linux.go b/loxinet/dpebpf_linux.go index 057fc0102..fe18fec2c 100644 --- a/loxinet/dpebpf_linux.go +++ b/loxinet/dpebpf_linux.go @@ -133,6 +133,7 @@ type DpEbpfH struct { ticker *time.Ticker tDone chan bool ctBcast chan bool + nID uint tbN uint CtSync bool RssEn bool @@ -280,12 +281,13 @@ func DpEbpfInit(clusterEn bool, nodeNum int, rssEn bool, egrHooks bool, logLevel ne := new(DpEbpfH) ne.tDone = make(chan bool) - ne.ToMapCh = make(chan interface{}, DpWorkQLen) + ne.ToMapCh = make(chan interface{}, 65536) ne.ToFinCh = make(chan int) ne.ctBcast = make(chan bool) ne.ticker = time.NewTicker(DpEbpfLinuxTiVal * time.Second) ne.ctMap = make(map[string]*DpCtInfo) ne.RssEn = rssEn + ne.nID = uint((C.LLB_CT_MAP_ENTRIES / C.LLB_MAX_LB_NODES) * nodeNum) go dpEbpfTicker() go dpMapNotifierWorker(ne.ToFinCh, ne.ToMapCh) @@ -299,12 +301,16 @@ func (e *DpEbpfH) DpEbpfUnInit() { e.tDone <- true e.ToFinCh <- 1 + tk.LogIt(tk.LogInfo, "ebpf uninit \n") + // Make sure to unload eBPF programs ifList, err := net.Interfaces() if err != nil { return } + tk.LogIt(tk.LogInfo, "ebpf uninit begin\n") + for _, intf := range ifList { tk.LogIt(tk.LogInfo, "ebpf unload - %s\n", intf.Name) @@ -1604,6 +1610,9 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { if len(cti.PVal) != 0 { tact = (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0])) act = &tact.ctd + if (uint)(act.nid) != mh.dpEbpf.nID { + return + } addOp = true opStr = "Add" } else { @@ -1644,23 +1653,23 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { if addOp == false { cti = mh.dpEbpf.ctMap[mapKey] - if cti == nil { + if cti == nil || cti.Deleted > 0 { return } cti.Deleted = 1 cti.XSync = true cti.NTs = time.Now() // Immediately notify for delete - ret := mh.dp.DpXsyncRPC(DpSyncDelete, cti) - if ret == 0 { - delete(mh.dpEbpf.ctMap, cti.Key()) - // This is a strange fix - Sometimes loxilb runs as multiple docker - // instances in the same host. So, the map tracing infra can send notifications - // generated by other instances here. Depending on the timing, it is possible - // that the original deleter gets notified after it is handled in the remote - // instance. This is to handle such special cases. - C.llb_del_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0])) - } + //ret := mh.dp.DpXsyncRPC(DpSyncDelete, cti) + //if ret == 0 { + // delete(mh.dpEbpf.ctMap, cti.Key()) + // This is a strange fix - Sometimes loxilb runs as multiple docker + // instances in the same host. So, the map tracing infra can send notifications + // generated by other instances here. Depending on the timing, it is possible + // that the original deleter gets notified after it is handled in the remote + // instance. This is to handle such special cases. + // C.llb_del_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0])) + //} } else { cte := mh.dpEbpf.ctMap[cti.Key()] if cte != nil { @@ -1676,7 +1685,7 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { } } - tk.LogIt(tk.LogInfo, "[CT] %s - %s\n", opStr, cti.String()) + tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", opStr, cti.String()) } func dpCTMapBcast() { @@ -1715,10 +1724,15 @@ func dpCTMapChkUpdates() { var tact C.struct_dp_ct_tact var act *C.struct_dp_ct_dat var blkCti []DpCtInfo + var blkDelCti []DpCtInfo tc := time.Now() fd := C.llb_map2fd(C.LL_DP_CT_MAP) + if len(mh.dpEbpf.ctMap) > 0 { + tk.LogIt(tk.LogDebug, "[CT] Map size %d\n", len(mh.dpEbpf.ctMap)) + } + for _, cti := range mh.dpEbpf.ctMap { // tk.LogIt(tk.LogDebug, "[CT] check %s:%s:%v\n", cti.Key(), cti.CState, cti.XSync) if cti.CState != "est" { @@ -1730,7 +1744,7 @@ func dpCTMapChkUpdates() { act = &tact.ctd goCtEnt := new(DpCtInfo) goCtEnt.convDPCt2GoObj((*C.struct_dp_ct_key)(unsafe.Pointer(&cti.PKey[0])), act) - goCtEnt.LTs = time.Now() + goCtEnt.LTs = tc if goCtEnt.CState != cti.CState || goCtEnt.CAct != cti.CState { @@ -1750,7 +1764,7 @@ func dpCTMapChkUpdates() { tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", "update", ctStr) if goCtEnt.CState == "est" { goCtEnt.XSync = true - goCtEnt.NTs = time.Now() + goCtEnt.NTs = tc } continue } @@ -1776,19 +1790,21 @@ func dpCTMapChkUpdates() { } if C.bpf_map_lookup_elem(C.int(fd), unsafe.Pointer(&cti.PKey[0]), unsafe.Pointer(&tact)) != 0 { tk.LogIt(tk.LogInfo, "[CT] ent not found %s\n", cti.Key()) - delete(mh.dpEbpf.ctMap, cti.Key()) - continue - } - ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0])) - ret := C.llb_fetch_map_stats_cached(C.int(C.LL_DP_CT_STATS_MAP), C.uint(ptact.ca.cidx), C.int(0), - (unsafe.Pointer(&b)), unsafe.Pointer(&p)) - if ret == 0 { - if cti.Packets != p+uint64(tact.ctd.pb.packets) { - cti.Bytes = b + uint64(tact.ctd.pb.bytes) - cti.Packets = p + uint64(tact.ctd.pb.packets) - cti.XSync = true - cti.NTs = tc - cti.LTs = tc + //delete(mh.dpEbpf.ctMap, cti.Key()) + cti.Deleted++ + cti.XSync = true + } else { + ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0])) + ret := C.llb_fetch_map_stats_cached(C.int(C.LL_DP_CT_STATS_MAP), C.uint(ptact.ca.cidx), C.int(0), + (unsafe.Pointer(&b)), unsafe.Pointer(&p)) + if ret == 0 { + if cti.Packets != p+uint64(tact.ctd.pb.packets) { + cti.Bytes = b + uint64(tact.ctd.pb.bytes) + cti.Packets = p + uint64(tact.ctd.pb.packets) + cti.XSync = true + cti.NTs = tc + cti.LTs = tc + } } } } @@ -1799,7 +1815,8 @@ func dpCTMapChkUpdates() { ret := 0 if cti.Deleted > 0 { - ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti) + //ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti) + blkDelCti = append(blkDelCti, *cti) cti.Deleted++ } else { blkCti = append(blkCti, *cti) @@ -1815,11 +1832,34 @@ func dpCTMapChkUpdates() { } } } + + if len(blkCti) > 1024 { + tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") + tc1 := time.Now() + mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) + tc2 := time.Now() + tk.LogIt(tk.LogInfo, "[CT] Block Add Sync %d took %v- \n", len(blkCti), time.Duration(tc2.Sub(tc1))) + blkCti = nil + } + + if len(blkDelCti) > 1024 { + tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n") + mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti) + blkDelCti = nil + } } if len(blkCti) > 0 { - tk.LogIt(tk.LogDebug, "[CT] Block Sync - \n") + tc1 := time.Now() + tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) + tc2 := time.Now() + tk.LogIt(tk.LogInfo, "[CT] Block Add Sync %d took %v- \n", len(blkCti), time.Duration(tc2.Sub(tc1))) + } + + if len(blkDelCti) > 0 { + tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n") + mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti) } } @@ -1870,7 +1910,8 @@ func (e *DpEbpfH) DpCtAdd(w *DpCtInfo) int { // Fix few things ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&w.PVal[0])) - ptact.ctd.rid = C.uint(r.ruleNum) // Race-condition here + ptact.ctd.rid = C.ushort(r.ruleNum) // Race-condition here + ptact.ctd.nid = C.uint(mh.dpEbpf.nID) ptact.lts = C.get_os_nsecs() mh.dpEbpf.mtx.Lock() @@ -1921,7 +1962,7 @@ func (e *DpEbpfH) DpCtDel(w *DpCtInfo) int { cti := mh.dpEbpf.ctMap[mapKey] if cti == nil { tk.LogIt(tk.LogError, "ctInfo-key (%v) not present\n", mapKey) - return EbpfErrCtDel + return 0 } delete(mh.dpEbpf.ctMap, mapKey)