Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to unbuffer sys.stdin for a node's shell process #126

Open
olafayomi opened this issue May 19, 2023 · 2 comments
Open

Is it possible to unbuffer sys.stdin for a node's shell process #126

olafayomi opened this issue May 19, 2023 · 2 comments

Comments

@olafayomi
Copy link

olafayomi commented May 19, 2023

Hi,
I'm experiencing a situation with exaBGP daemon in IPMininet. In my IPMininet topology, I've configured one of the nodes with
exaBGP. The exaBGP daemon has two processes announce-routes and receive-routes which both have two python scripts sender.py and receiver.py respectively. sender.py announce the routes to the exaBGP peer and receiver.py receives and parses all BGP messages while receiver.py receives and processes the BGP messages generated.

sender.py looks somewhat like this

import sys
import time

if __name__ == '__main__':
     while True:
            # get route 
            sys.stdout.write('neighbor 172.16.2.20 announce attribute next-hop self med 150 origin incomplete nlri 100.30.0.0/16 100.40.0.0/16'
            sys.stdout.flush()
            time.sleep(0.1)

receiver.py looks somewhat like this

import sys
import time
import json

 if __name__ == '__main__':
     sys.stderr = open('/tmp/exabgp-receiver.log', 'a+') 
     unbuffered_stdin = os.fdopen(sys.stdin.fileno(), 'r', buffering=1)
     while True:
           line = unbuffered_stdin.readline().strip()
           try:
                 jsonline = json.loads(line)
           except json.decoder.JSONDecodeError as e:
                sys.stderr.write("Error\n")
           else:
                  # process and write actual BGP message
                  sys.stderr.write(str(jsonline) + '\n')
           sys.stderr.flush()

If I run the network/topology and I trigger a link failure about four times which causes a bunch of BGP updates and withdrawal, the receiver.py script hangs and does not process any message again. However, if I run the topology with IPCLI() and run exabgp from the shell of the host with xterm, the script does not hang. This makes me think there is something buffering sys.stdin and sys.stdout when the topology is not run in interactive CLI mode. I was wondering if there is a way to turn off buffering or get around this ? Or if this has been encountered before ?

IPMininet version is 0.9, python version 3.6

@jadinm
Copy link
Collaborator

jadinm commented May 22, 2023

Hello,

This makes me think there is something buffering sys.stdin and sys.stdout when the topology is not run in interactive CLI mode

Not that I know of and, in any case, a buffering is not eternal, the buffer would eventually be flushed

Would you mind giving the script that you use to start the topology ? With or without IPCLI
Does the command that is generated by IPMininet (you can get it with ps -aux | grep <exec name>) and the command that you run in the shell are identical ? Do the configurations (IPMininet generates it in /tmp) are identical as well ?

Also, did you make sure that BGP updates/withdrawals do arrive to exaBGP when not launched in IPCLI ?
This might be a connectivity issue

@olafayomi
Copy link
Author

olafayomi commented Jun 7, 2023

Here is an example of my topology script. I configure the topology and the nodes and start an application that talks to ExaBGP. Once the nodes are configured, I trigger a link failure and restore the link a number of times. This causes a large number of BGP update messages to be generated and sent to ExaBGP. What's interesting is that, for the first three link failure and recovery events, ExaBGP is fine but after the fourth one, ExaBGP just hangs and does not process any messages again. When I run the topology script with IPCLI and then run Exabgp from the shell of the host in xterm, I can trigger a link failure and restore the link as many times as I want but ExaBGP does not stop running and does not hang.
I use the configurations generated by IPMininet to run the ExaBGP from the shell to the best of my knowledge.

from ipmininet.iptopo import IPTopo
from ipmininet.cli import IPCLI
from ipmininet.ipnet import IPNet
from ipmininet.router.config.ospf import OSPFRedistributedRoute
from ipmininet.srv6 import enable_srv6
from ipmininet.router.config import BGP, ebgp_session, set_rr, AccessList, \
     AF_INET6, AF_INET, BorderRouterConfig, RouterConfig, OSPF, OSPF6, \
     bgp_peering, ExaBGPDaemon, STATIC, StaticRoute, CLIENT_PROVIDER, SHARE
from ipmininet.link import IPLink


class TestTopo(IPTopo):
  
    def build(self, *args, **kwargs):
        # Add all routers

        GsASr1 = self.bgp('GsASr1')
        Tp1ASr1 = self.bgp('Tp1ASr1')
        Tp1ASr2 = self.bgp('Tp1ASr2')
        Tp1ASr3 = self.bgp('Tp1ASr3')
        Tp2ASr1 = self.bgp('Tp2ASr1')
        Tp2ASr2 = self.bgp('Tp2ASr2')
        Tp3ASr1 = self.bgp('Tp3ASr1')
        Tp3ASr2 = self.bgp('Tp3ASr2')
        Tp4ASr1 = self.bgp('Tp4ASr1')
        Tp5ASr1 = self.bgp('Tp5ASr1')

        AS1R1 = self.bgp('AS1R1')
        AS2R1 = self.bgp('AS2R1')
        AS3R1 = self.bgp('AS3R1')
        AS4R1 = self.bgp('AS4R1')
        AS5R1 = self.bgp('AS5R1')
        AS6R1 = self.bgp('AS6R1')
        AS7R1 = self.bgp('AS7R1')
        AS8R1 = self.bgp('AS8R1')
        AS9R1 = self.bgp('AS9R1')
        AS10R1 = self.bgp('AS10R1')
       

        Sw1Tp1 = self.addSwitch('Sw1Tp1')
        Sw2Tp2 = self.addSwitch('Sw2Tp2')
        Sw3Tp3 = self.addSwitch('Sw3Tp3')
  
        Tp1ASr1Sw1 = self.addLink(Tp1ASr1, Sw1Tp1)
        Tp1ASr1Sw1[Tp1ASr1].addParams(ip=("100::1/48",))
        Tp1ASr2Sw1 = self.addLink(Tp1ASr2, Sw1Tp1)
        Tp1ASr2Sw1[Tp1ASr2].addParams(ip=("100::2/48",))
        Tp1ASr3Sw1 = self.addLink(Tp1ASr3, Sw1Tp1)
        Tp1ASr3Sw1[Tp1ASr3].addParams(ip=("100::3/48",))

        # Add controller and ExaBGP speaker node
        Tp1ASctlr = self.addRouter("Tp1ASctlr", config=RouterConfig)
        Tp1ASctlrSw1 = self.addLink(Tp1ASctlr, Sw1Tp1)
        Tp1ASctlrSw1[Tp1ASctlr].addParams(ip=("100::4/48",))
        Tp1ASctlr.addDaemon(ExaBGPDaemon, env = { 'api' : {'cli':'true', 'encoder':'json',
                                                       'ack':'true', 'pipename':'\'exabgp\'',
                                                       'respawn':'true','chunk':1,
                                                       'terminate':'false'},
                                              'bgp' : {'openwait' : 60},
                                              'cache': {'attributes':'true', 'nexthops':'true'},
                                              'daemon': {'daemonize':'false', 'drop':'true', 
                                                         'pid': '\'\'', 'umask':'\'0o137\'', 
                                                         'user':'nobody'},
                                              'log': {'all':'true','configuration':'true','daemon':'true',
                                                      'message':'true','destination':'stdout',
                                                      'enable':'true','level':'INFO','network':'true',
                                                      'packets':'false','parser':'true',
                                                      'processes':'true','reactor':'true',
                                                      'rib':'false','routes':'true','short':'false',
                                                      'timers':'false'},
                                              'pdb': {'enable':'false'},
                                              'profile': { 'enable':'false', 'file':'\'\''},
                                              'reactor': {'speed':'1.0'},
                                              'tcp': {'acl':'false', 'bind':'', 'delay':0,
                                                      'once':'false', 'port': 179}
                                            }, passive=False )

        lTp1Tp2 = self.addLink(Tp1ASr2, Tp2ASr1)
        lTp1Tp2[Tp1ASr2].addParams(ip=("1002::100/48",))
        lTp1Tp2[Tp2ASr1].addParams(ip=("1002::200/48",))

        Tp2ASr1Sw2 = self.addLink(Tp2ASr1, Sw2Tp2)
        Tp2ASr1Sw2[Tp2ASr1].addParams(ip=("200::1/48",))
        Tp2ASr2Sw2 = self.addLink(Tp2ASr2, Sw2Tp2)
        Tp2ASr2Sw2[Tp2ASr2].addParams(ip=("200::2/48",))

        lTp1Tp3 = self.addLink(Tp1ASr3, Tp3ASr1)
        lTp1Tp3[Tp1ASr3].addParams(ip=("1003::100/48",))
        lTp1Tp3[Tp3ASr1].addParams(ip=("1003::300/48",))

        Tp3ASr1Sw3 = self.addLink(Tp3ASr1, Sw3Tp3)
        Tp3ASr1Sw3[Tp3ASr1].addParams(ip=("300::1/48",))
        Tp3ASr2Sw3 = self.addLink(Tp3ASr2, Sw3Tp3)
        Tp3ASr2Sw3[Tp3ASr2].addParams(ip=("300::2/48",))

       
        Server = self.addHost('Server')
        lGsR = self.addLink(Server, GsASr1)
        lGsR[Server].addParams(ip=("55::1/48",))
       
        lGsR[GsASr1].addParams(ip=("55::2/48",))


        for i in range(1, 11):
            exec(f"gCl{i} = self.addHost('gCl{i}')")
            exec(f"gClink{i} = self.addLink(AS{i}R1, gCl{i})")
            ip = f"2001:df{str(i).zfill(2)}::2/48"
            exec(f"gClink{i}[AS{i}R1].addParams(ip=('{ip}',))")
            ip = f"2001:df{str(i).zfill(2)}::1/48"
            exec(f"gClink{i}[gCl{i}].addParams(ip=('{ip}',))")

        self.addLinks((GsASr1, Tp1ASr1), (Tp2ASr2, Tp4ASr1),
                      (Tp3ASr2, Tp4ASr1),(GsASr1, Tp5ASr1),
                      (Tp5ASr1, Tp4ASr1))
                      
        link_delay = 10

        for i in range(1, 11):
            link = self.addLink(Tp4ASr1, eval("AS{}R1".format(i)),
                                delay="{}ms".format(link_delay/2))
            link_delay += 0.05

        self.addAS(55, (GsASr1,))
        self.addAS(100, (Tp1ASr1, Tp1ASr2, Tp1ASr3, Tp1ASctlr))
        self.addAS(200, (Tp2ASr1, Tp2ASr2))
        self.addAS(300, (Tp3ASr1, Tp3ASr2))
        self.addAS(400, (Tp4ASr1,))
        self.addAS(500, (Tp5ASr1,))

        for i in range(1, 51):
            exec(f"self.addAS(i, (AS{i}R1,))")

        bgp_peering(self, Tp1ASr1, Tp1ASctlr) 
        bgp_peering(self, Tp1ASr2, Tp1ASctlr)
        bgp_peering(self, Tp1ASr3, Tp1ASctlr)

        bgp_peering(self, Tp2ASr1, Tp2ASr2)
        bgp_peering(self, Tp3ASr1, Tp3ASr2)
        
        # Set ACL and prefer one path over the other
        acl4 = AccessList(name='all', entries=('any',), family='ipv4')
        acl = AccessList(name='all6', entries=('any',), family='ipv6')
    
        Tp1ASr3.get_config(BGP).set_local_pref(150, from_peer=Tp3ASr1,
                                               matching=(acl4,acl))

        Tp1ASr2.get_config(BGP).set_local_pref(450, from_peer=Tp2ASr1,
                                               matching=(acl4,acl))

        ebgp_session(self, GsASr1, Tp1ASr1, link_type=CLIENT_PROVIDER)
        ebgp_session(self, GsASr1, Tp5ASr1, link_type=CLIENT_PROVIDER)

        ebgp_session(self, Tp1ASr2, Tp2ASr1)
        ebgp_session(self, Tp1ASr3, Tp3ASr1)

        # Prefer return path from clients via Tp3 or Tp2
        Tp4ASr1.get_config(BGP).set_local_pref(100, from_peer=Tp2ASr2,
                                               matching=(acl4,acl))
        Tp4ASr1.get_config(BGP).set_local_pref(100, from_peer=Tp3ASr2,
                                               matching=(acl4,acl))
        ebgp_session(self, Tp4ASr1, Tp2ASr2) 
        ebgp_session(self, Tp4ASr1, Tp3ASr2)
        Tp5ASr1.get_config(BGP).deny(to_peer=GsASr1,
                                     matching=(acl4,acl))
        ebgp_session(self, Tp4ASr1, Tp5ASr1)


        for i in range(1, 51):
            exec(f"ebgp_session(self, AS{i}R1, Tp4ASr1, link_type=CLIENT_PROVIDER)")

        super().build(*args, **kwargs)

    def post_build(self, net):
        for n in net.hosts + net.routers:
            enable_srv6(n)
        super().post_build(net)

    def bgp(self, name):
        r = self.addRouter(name, config=RouterConfig)
        r.addDaemon(BGP,  address_families=(
            AF_INET(redistribute=('connected',)),
            AF_INET6(redistribute=('connected',))))
        return r


class PARNet(IPNet):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
      

    def modifyLink(self, node1, node2, delay="2ms", bw=None, max_queue_size=None, **opts):

        src_params = opts.get("params1", {})
        dst_params = opts.get("params2", {})        
        src_delay = src_params.get("delay")
        src_loss = src_params.get("loss")
        src_max_queue = src_params.get("max_queue_size")
        
        dst_delay = dst_params.get("delay")
        dst_loss = dst_params.get("loss")
        dst_max_queue = dst_params.get("max_queue_size")
        
        for sw in self.switches:
            src_link = node2.connectionsTo(sw)
            dst_link = node1.connectionsTo(sw)
            if src_link and dst_link:
                break

        src_int, _ = src_link[0]
        dst_int, _ = dst_link[0]

        src_delay = src_delay or delay
        src_loss = src_loss or 0
        
        src_int.config(delay=src_delay, max_queue_size=src_max_queue, loss=src_loss)
        dst_int.config(delay=dst_delay, max_queue_size= src_max_queue, loss=dst_loss)


if __name__ == '__main__':
    net = PARNet(topo=TestTopo(), use_v4=False)
    try:
        net.start()
        tp2_delay =  35.0
       
        tp3_delay = 45.0
        
            
        net.modifyLink(net["Tp2ASr1"], net["Tp2ASr2"],
                       params1={"delay": "{}ms".format(tp2_delay)},
                       params2={"delay": "{}ms".format(tp2_delay)})
        net.modifyLink(net["Tp3ASr1"], net["Tp3ASr2"],
                       params1={"delay": "{}ms".format(tp3_delay)},
                       params2={"delay": "{}ms".format(tp3_delay)})
      
        # Start a controller application that talks to ExaBGP here.
        net["Tp1ASctlr"].cmd("python Controller.py /home/ubuntu/config.yaml &> controller.log &")

        time.sleep(60)
        fail_count = 0
        while (time.time() - start_time) < 950:
            time.sleep(120)
            if fail_count < 1:
                fail_links = [("Tp3ASr2", "Tp4ASr1")]
                link_down = net.runFailurePlan(fail_links)
                time.sleep(100)
                net.restoreIntfs(link_down)
            fail_count += 1
            print("Fail count is %s" %fail_count)
            time.sleep(60)
        print("%s: End Experiment" % (str(datetime.now())))
    finally:
        net.stop()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants