Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor parent package dependency in network module #2255

Draft
wants to merge 2 commits into
base: v0.6
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bitmessageqt/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def accept(self):
'bitmessagesettings', 'udp'):
self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
if udp_enabled:
announceThread = AnnounceThread()
announceThread = AnnounceThread(self.config)
announceThread.daemon = True
announceThread.start()
else:
Expand Down
19 changes: 14 additions & 5 deletions src/network/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
"""
Network subsystem package
"""
try:
import networkdepsinterface
except ImportError:
from pybitmessage import networkdepsinterface

from .dandelion import Dandelion
from .threads import StoppableThread

(
state, queues, config, protocol,
randomtrackingdict, addresses, paths) = networkdepsinterface.importParentPackageDepsToNetwork()

dandelion_ins = Dandelion()

__all__ = ["StoppableThread"]


def start(config, state):

Check warning on line 21 in src/network/__init__.py

View check run for this annotation

PyBitmessage Code Quality Checks / Code Quality - pylint

W0621 / redefined-outer-name

Redefining name 'state' from outer scope (line 13)

Check warning on line 21 in src/network/__init__.py

View check run for this annotation

PyBitmessage Code Quality Checks / Code Quality - pylint

W0621 / redefined-outer-name

Redefining name 'config' from outer scope (line 13)
"""Start network threads"""
import state
from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread
Expand All @@ -30,18 +38,19 @@
readKnownNodes()
connectionpool.pool.connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),
DownloadThread(), UploadThread()
BMNetworkThread(queues), InvThread(protocol, state, queues, addresses),
AddrThread(protocol, queues), DownloadThread(state, protocol, addresses),
UploadThread(protocol, state)
):
thread.daemon = True
thread.start()

# Optional components
for i in range(config.getint('threads', 'receive')):
thread = ReceiveQueueThread(i)
thread = ReceiveQueueThread(queues, i)
thread.daemon = True
thread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread()
state.announceThread = AnnounceThread(config)
state.announceThread.daemon = True
state.announceThread.start()
15 changes: 9 additions & 6 deletions src/network/addrthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
# magic imports!
import connectionpool
from helper_random import randomshuffle
from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue

from threads import StoppableThread

Expand All @@ -16,12 +14,17 @@ class AddrThread(StoppableThread):
"""(Node) address broadcasting thread"""
name = "AddrBroadcaster"

def __init__(self, protocol, queues):
self.protocol = protocol
self.queues = queues
StoppableThread.__init__(self)

def run(self):
while not self._stopped:
chunk = []
while True:
try:
data = addrQueue.get(False)
data = self.queues.addrQueue.get(False)
chunk.append(data)
except queue.Empty:
break
Expand All @@ -41,9 +44,9 @@ def run(self):
continue
filtered.append((stream, peer, seen))
if filtered:
i.append_write_buf(assembleAddrMessage(filtered))
i.append_write_buf(self.protocol.assembleAddrMessage(filtered))

addrQueue.iterate()
self.queues.addrQueue.iterate()
for i in range(len(chunk)):
addrQueue.task_done()
self.queues.addrQueue.task_done()
self.stop.wait(1)
2 changes: 1 addition & 1 deletion src/network/advanceddispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time

import network.asyncore_pollchoose as asyncore
import state
from network import state
from threads import BusyError, nonBlocking


Expand Down
9 changes: 6 additions & 3 deletions src/network/announcethread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

# magic imports!
import connectionpool
from bmconfigparser import config
from protocol import assembleAddrMessage

from node import Peer
Expand All @@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread):
name = "Announcer"
announceInterval = 60

def __init__(self, config):
self.config = config
StoppableThread.__init__(self)

def run(self):
lastSelfAnnounced = 0
while not self._stopped:
processed = 0
if lastSelfAnnounced < time.time() - self.announceInterval:
self.announceSelf()
self.announceSelf(self.config)
lastSelfAnnounced = time.time()
if processed == 0:
self.stop.wait(10)

@staticmethod
def announceSelf():
def announceSelf(config):
"""Announce our presence"""
for connection in connectionpool.pool.udpSockets.values():
if not connection.announcing:
Expand Down
3 changes: 1 addition & 2 deletions src/network/bmobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import logging
import time

import protocol
import state
from network import state, protocol
import connectionpool
from network import dandelion_ins
from highlevelcrypto import calculateInventoryHash
Expand Down
13 changes: 4 additions & 9 deletions src/network/bmproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@
import time

# magic imports!
import addresses
import knownnodes
import protocol
import state
from network import protocol, state, config, queues, addresses, dandelion_ins
import connectionpool
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import (
Expand All @@ -26,7 +22,6 @@
BMObjectUnwantedStreamError
)
from network.proxy import ProxyError
from network import dandelion_ins
from node import Node, Peer
from objectracker import ObjectTracker, missingObjects

Expand Down Expand Up @@ -409,7 +404,7 @@ def bm_command_object(self):

try:
self.object.checkObjectByType()
objectProcessorQueue.put((
queues.objectProcessorQueue.put((
self.object.objectType, buffer(self.object.data))) # noqa: F821
except BMObjectInvalidError:
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
Expand All @@ -431,7 +426,7 @@ def bm_command_object(self):
)
self.handleReceivedObject(
self.object.streamNumber, self.object.inventoryHash)
invQueue.put((
queues.invQueue.put((
self.object.streamNumber, self.object.inventoryHash,
self.destination))
return True
Expand Down Expand Up @@ -472,7 +467,7 @@ def bm_command_addr(self):

def bm_command_portcheck(self):
"""Incoming port check request, queue it."""
portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
queues.portCheckerQueue.put(Peer(self.destination, self.peerNode.port))
return True

def bm_command_ping(self):
Expand Down
11 changes: 4 additions & 7 deletions src/network/connectionchooser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import random

import knownnodes
import protocol
import state
from bmconfigparser import config
from queues import queue, portCheckerQueue
from network import protocol, state, config, queues

logger = logging.getLogger('default')

Expand All @@ -34,10 +31,10 @@ def chooseConnection(stream):
onionOnly = config.safeGetBoolean(
"bitmessagesettings", "onionservicesonly")
try:
retval = portCheckerQueue.get(False)
portCheckerQueue.task_done()
retval = queues.portCheckerQueue.get(False)
queues.portCheckerQueue.task_done()
return retval
except queue.Empty:
except queues.queue.Empty:
pass
# with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion: # nosec B311
Expand Down
4 changes: 1 addition & 3 deletions src/network/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import asyncore_pollchoose as asyncore
import helper_random
import knownnodes
import protocol
import state
from bmconfigparser import config
from network import protocol, state, config
from connectionchooser import chooseConnection
from node import Peer
from proxy import Proxy
Expand Down
16 changes: 8 additions & 8 deletions src/network/downloadthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
`DownloadThread` class definition
"""
import time
import state
import addresses
from network import dandelion_ins
import helper_random
import protocol
import connectionpool
from network import dandelion_ins
from objectracker import missingObjects
from threads import StoppableThread

Expand All @@ -20,8 +17,11 @@ class DownloadThread(StoppableThread):
cleanInterval = 60
requestExpires = 3600

def __init__(self):
def __init__(self, state, protocol, addresses):
super(DownloadThread, self).__init__(name="Downloader")
self.state = state
self.protocol = protocol
self.addresses = addresses
self.lastCleaned = time.time()

def cleanPending(self):
Expand Down Expand Up @@ -60,7 +60,7 @@ def run(self):
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:
Expand All @@ -71,8 +71,8 @@ def run(self):
missingObjects[chunk] = now
if not chunkCount:
continue
payload[0:0] = addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload))
payload[0:0] = self.addresses.encodeVarint(chunkCount)
i.append_write_buf(self.protocol.CreatePacket('getdata', payload))
self.logger.debug(
'%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount)
Expand Down
31 changes: 17 additions & 14 deletions src/network/invthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
import random
from time import time

import addresses
import protocol
import state
import connectionpool
from network import dandelion_ins
from queues import invQueue
from threads import StoppableThread


Expand All @@ -37,6 +33,13 @@ class InvThread(StoppableThread):

name = "InvBroadcaster"

def __init__(self, protocol, state, queues, addresses):
self.protocol = protocol
self.state = state
self.queues = queues
self.addresses = addresses
StoppableThread.__init__(self)

@staticmethod
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
Expand All @@ -48,13 +51,13 @@ def handleLocallyGenerated(stream, hashId):
connection.objectsNewToThem[hashId] = time()

def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks
while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = []
while True:
# Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion_ins.expire(invQueue))
handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue))
try:
data = invQueue.get(False)
data = self.queues.invQueue.get(False)
chunk.append((data[0], data[1]))
# locally generated
if len(data) == 2 or data[2] is None:
Expand All @@ -81,7 +84,7 @@ def run(self): # pylint: disable=too-many-branches
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0:
elif connection.services & self.protocol.NODE_DANDELION > 0:
stems.append(inv[1])
else:
fluffs.append(inv[1])
Expand All @@ -90,20 +93,20 @@ def run(self): # pylint: disable=too-many-branches

if fluffs:
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'inv',
addresses.encodeVarint(
self.addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs)))
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'dinv',
addresses.encodeVarint(
self.addresses.encodeVarint(
len(stems)) + ''.join(stems)))

invQueue.iterate()
self.queues.invQueue.iterate()
for _ in range(len(chunk)):
invQueue.task_done()
self.queues.invQueue.task_done()

dandelion_ins.reRandomiseStems()

Expand Down
3 changes: 1 addition & 2 deletions src/network/knownnodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
except ImportError:
from collections import Iterable

import state
from bmconfigparser import config
from network import state, config
from network.node import Peer

state.Peer = Peer
Expand Down
Loading