Skip to content

Commit

Permalink
[WIP][misc] aliZsync mbuffer transport
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Jan 29, 2021
1 parent c26219b commit da34174
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 9 deletions.
2 changes: 2 additions & 0 deletions hacking/aliZsync
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ sub_init(){
HOST=$(echo "$LINE" | awk '{ print $1}')
echo -e "\t$HOST"
sshpass -p "$password" ssh-copy-id -i /root/.ssh/id_rsa_aliZsync "root@$HOST"
ssh -n -i /root/.ssh/id_rsa_aliZsync "root@$HOST" "firewall-cmd --zone=public --permanent --add-port=47099/tcp"
ssh -n -i /root/.ssh/id_rsa_aliZsync "root@$HOST" "firewall-cmd --reload"
done < "$INVENTORY_FILE"
fi

Expand Down
57 changes: 48 additions & 9 deletions hacking/aliZsync-sync-zfs-snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def main(argv):
parser = argparse.ArgumentParser(description='Synchronise all snapshots in a ZFS filesystem to another zfs filesystem. Useful for synchronising backups.')

parser.add_argument ("--debug", dest='debug', nargs='?', const=1, type=int, help='Debug level of the application. Uses debug 1 if flag is passed without a number.')
parser.add_argument ("--transport", dest='transport', nargs=1, help='transport to use for zfs send/receive, can be "mbuffer" (default) or "ssh"')
parser.add_argument ("--sshIdentity", dest='sshIdentity', nargs=1, help='ssh identity key file to use when ssh-ing to destination servers')
parser.add_argument ("source", help='Source ZFS filesystem. Local filsystems are specified as zpool/filesystem. Remote ones are specified as ssh://user@server[:port]:zpool/filesystem.')
parser.add_argument ("destination", help='Destination ZFS filesystem. Same format as source.')
Expand Down Expand Up @@ -90,22 +91,60 @@ def main(argv):
predecessorSubvolume = sourceSubvolumesByCreation[predecessorSubvolumeKey]

print ("# Missing subvolume:", sourceSubvolume, " predecessor:", predecessorSubvolume)
sendCmd = source.getZFSCmdLine (['/sbin/zfs', 'send', '-i', source.getZPoolFilesystem () + '@' + predecessorSubvolume, source.getZPoolFilesystem () + '@' + sourceSubvolume])
rawZfsSend = ['/sbin/zfs', 'send', '-i', source.getZPoolFilesystem () + '@' + predecessorSubvolume, source.getZPoolFilesystem () + '@' + sourceSubvolume]
sendCmd = source.getZFSCmdLine (rawZfsSend)
#print ("/sbin/zfs send -i "+ sourceZfsRoot + '@' + predecessorSubvolume + " " + sourceZfsRoot + '@' + sourceSubvolume + "| pv | ssh -p " + destinationPort + " " + " ".join(sshArgs) + " root@" + destinationServer + " /sbin/zfs receive -Fv " + destinationZfsRoot)
else:
print ("# Missing initial subvolume:", sourceSubvolume)
sendCmd = source.getZFSCmdLine (['/sbin/zfs', 'send', source.getZPoolFilesystem () + '@' + sourceSubvolume])
#print ("/sbin/zfs send " + sourceZfsRoot + '@' + sourceSubvolume + "| pv | ssh -p " + destinationPort + " " + " ".join(sshArgs) + " root@" + destinationServer + " /sbin/zfs receive -Fv " + destinationZfsRoot)
rawZfsSend = ['/sbin/zfs', 'send', source.getZPoolFilesystem () + '@' + sourceSubvolume]
sendCmd = source.getZFSCmdLine (rawZfsSend)
#print ("/sbin/zfs send " + sourceZfsRoot + '@' + sourceSubvolume + "| pv | ssh -p " + destinationPort + " " + " ".join(sshArgs) + " root@" + destinationServer + " /sbin/zfs receive -Fv " + destinationZfsRoot)

rawZfsReceive = ['/sbin/zfs', 'receive', '-Fv', destination.getZPoolFilesystem ()]
receiveCmd = destination.getZFSCmdLine (['/sbin/zfs', 'receive', '-Fv', destination.getZPoolFilesystem ()])
fullCmdLine = ' '.join (sendCmd) + ' | dd | ' + ' '.join (receiveCmd)

logLevel > 0 and print (fullCmdLine)
result = subprocess.call(fullCmdLine, shell = True)
if args.transport == 'ssh':
fullCmdLine = ' '.join (sendCmd) + ' | dd | ' + ' '.join (receiveCmd)

if result:
print ("Error running:" + fullCmdLine)
sys.exit(1)
logLevel > 0 and print (fullCmdLine)
result = subprocess.call(fullCmdLine, shell = True)

if result:
print ("Error running:" + fullCmdLine)
sys.exit(1)
else:
fullReceiveLine = ' '.join (rawZfsReceive)
fullSendLine = ' '.join (rawZfsSend)
#first:
# mbuffer -4 -s 128k -m 1G -I 9090 | zfs receive data/filesystem
# then:
# zfs send -i data/filesystem@1 data/filesystem@2 | mbuffer -s 128k -m 1G -O 10.0.0.1:9090
remoteMvReceive = '"mbuffer -4 -s131072 -m2G -I47099 | ' + fullReceiveLine + '"'
mbReceive = ' '.join (destination.getZFSCmdLine([remoteMvReceive]))
print ("mbReceive:" + mbReceive)
mbSend = fullSendLine + ' | mbuffer -s131072 -m2G -O' + destination.server + ':47099'
print ("mbSend:" + mbSend)

spReceive = subprocess.Popen(mbReceive, shell=True)
print ("receive started")

time.sleep(1) # we wait for the receiver to be ready

spSend = subprocess.Popen(mbSend, shell=True)
print ("send started")

spSend.wait()
spReceive.wait()
# commands = [mbReceive, mbSend]
# procs = [ subprocess.Popen(i, shell = True) for i in commands ]
# for p in procs:
# p.wait()

# to run on receiver:
# fuser -k -n tcp 47099
fuserKill = ' '.join (destination.getZFSCmdLine(['"fuser -k -n tcp 47099"']))
subprocess.call(fuserKill, shell=True)
print ("all done")


#print ("########## Local volumes #######")
Expand Down

0 comments on commit da34174

Please sign in to comment.