Skip to content

Commit

Permalink
[WIP][misc] aliZsync mbuffer transport
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Jan 29, 2021
1 parent c26219b commit 18f907c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
9 changes: 6 additions & 3 deletions hacking/aliZsync
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ ProgName=$(basename $0)
POOLNAME="${ALIZSYNC_POOL_NAME:-aliZsync}"
POOLSIZE="${ALIZSYNC_POOL_SIZE:-100G}"
IMG_FILE_PATH="${ALIZSYNC_IMG_FILE_PATH:-$HOME/$POOLNAME.img}"
TRANSPORT="${ALIZSYNC_TRANSPORT:-mbuffer}"

INVENTORY_FILE="${ALIZSYNC_INVENTORY:-/etc/o2.d/aliZsync_inventory}"
TARGET_ROOT="${ALIZSYNC_TARGET_ROOT:-/opt/alizsw}"
TIMESTAMP=$(date +"%Y-%m-%d_%H-%M-%S")
TAG_NAME="${2:-$TIMESTAMP}"
N_WORKERS="${ALIZSYNC_WORKERS:-8}"
N_WORKERS="${ALIZSYNC_WORKERS:-10}"

DATASET_SW="$POOLNAME/sw"
DATASET_BINARIES="$DATASET_SW/slc7_x86-64"
Expand All @@ -72,9 +73,9 @@ sub_help(){

sync_host(){
HOST="$1"
$SYNC_CMD --sshIdentity "/root/.ssh/id_rsa_aliZsync" "$DATASET_BINARIES" "ssh://root@$HOST:alizsw/sw/slc7_x86-64"
$SYNC_CMD --transport "$TRANSPORT" --sshIdentity "/root/.ssh/id_rsa_aliZsync" "$DATASET_BINARIES" "ssh://root@$HOST:alizsw/sw/slc7_x86-64"
echo -e "\t\t$HOST\tbinaries synchronized"
$SYNC_CMD --sshIdentity "/root/.ssh/id_rsa_aliZsync" "$DATASET_MODULES" "ssh://root@$HOST:alizsw/sw/MODULES"
$SYNC_CMD --transport "$TRANSPORT" --sshIdentity "/root/.ssh/id_rsa_aliZsync" "$DATASET_MODULES" "ssh://root@$HOST:alizsw/sw/MODULES"
echo -e "\t\t$HOST\tmodules synchronized"
}

Expand Down Expand Up @@ -128,6 +129,8 @@ sub_init(){
HOST=$(echo "$LINE" | awk '{ print $1}')
echo -e "\t$HOST"
sshpass -p "$password" ssh-copy-id -i /root/.ssh/id_rsa_aliZsync "root@$HOST"
ssh -n -i /root/.ssh/id_rsa_aliZsync "root@$HOST" "firewall-cmd --zone=public --permanent --add-port=47099/tcp"
ssh -n -i /root/.ssh/id_rsa_aliZsync "root@$HOST" "firewall-cmd --reload"
done < "$INVENTORY_FILE"
fi

Expand Down
51 changes: 42 additions & 9 deletions hacking/aliZsync-sync-zfs-snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def main(argv):
parser = argparse.ArgumentParser(description='Synchronise all snapshots in a ZFS filesystem to another zfs filesystem. Useful for synchronising backups.')

parser.add_argument ("--debug", dest='debug', nargs='?', const=1, type=int, help='Debug level of the application. Uses debug 1 if flag is passed without a number.')
parser.add_argument ("--transport", dest='transport', nargs=1, help='transport to use for zfs send/receive, can be "mbuffer" (default) or "ssh"')
parser.add_argument ("--sshIdentity", dest='sshIdentity', nargs=1, help='ssh identity key file to use when ssh-ing to destination servers')
parser.add_argument ("source", help='Source ZFS filesystem. Local filsystems are specified as zpool/filesystem. Remote ones are specified as ssh://user@server[:port]:zpool/filesystem.')
parser.add_argument ("destination", help='Destination ZFS filesystem. Same format as source.')
Expand Down Expand Up @@ -90,22 +91,54 @@ def main(argv):
predecessorSubvolume = sourceSubvolumesByCreation[predecessorSubvolumeKey]

print ("# Missing subvolume:", sourceSubvolume, " predecessor:", predecessorSubvolume)
sendCmd = source.getZFSCmdLine (['/sbin/zfs', 'send', '-i', source.getZPoolFilesystem () + '@' + predecessorSubvolume, source.getZPoolFilesystem () + '@' + sourceSubvolume])
rawZfsSend = ['/sbin/zfs', 'send', '-i', source.getZPoolFilesystem () + '@' + predecessorSubvolume, source.getZPoolFilesystem () + '@' + sourceSubvolume]
sendCmd = source.getZFSCmdLine (rawZfsSend)
#print ("/sbin/zfs send -i "+ sourceZfsRoot + '@' + predecessorSubvolume + " " + sourceZfsRoot + '@' + sourceSubvolume + "| pv | ssh -p " + destinationPort + " " + " ".join(sshArgs) + " root@" + destinationServer + " /sbin/zfs receive -Fv " + destinationZfsRoot)
else:
print ("# Missing initial subvolume:", sourceSubvolume)
sendCmd = source.getZFSCmdLine (['/sbin/zfs', 'send', source.getZPoolFilesystem () + '@' + sourceSubvolume])
#print ("/sbin/zfs send " + sourceZfsRoot + '@' + sourceSubvolume + "| pv | ssh -p " + destinationPort + " " + " ".join(sshArgs) + " root@" + destinationServer + " /sbin/zfs receive -Fv " + destinationZfsRoot)
rawZfsSend = ['/sbin/zfs', 'send', source.getZPoolFilesystem () + '@' + sourceSubvolume]
sendCmd = source.getZFSCmdLine (rawZfsSend)
#print ("/sbin/zfs send " + sourceZfsRoot + '@' + sourceSubvolume + "| pv | ssh -p " + destinationPort + " " + " ".join(sshArgs) + " root@" + destinationServer + " /sbin/zfs receive -Fv " + destinationZfsRoot)

rawZfsReceive = ['/sbin/zfs', 'receive', '-Fv', destination.getZPoolFilesystem ()]
receiveCmd = destination.getZFSCmdLine (['/sbin/zfs', 'receive', '-Fv', destination.getZPoolFilesystem ()])
fullCmdLine = ' '.join (sendCmd) + ' | dd | ' + ' '.join (receiveCmd)

logLevel > 0 and print (fullCmdLine)
result = subprocess.call(fullCmdLine, shell = True)
if args.transport == ['ssh']:
fullCmdLine = ' '.join (sendCmd) + ' | dd | ' + ' '.join (receiveCmd)

if result:
print ("Error running:" + fullCmdLine)
sys.exit(1)
logLevel > 0 and print (fullCmdLine)
result = subprocess.call(fullCmdLine, shell = True)

if result:
print ("Error running:" + fullCmdLine)
sys.exit(1)
else:
fullReceiveLine = ' '.join (rawZfsReceive)
fullSendLine = ' '.join (rawZfsSend)
#first:
# mbuffer -4 -s 128k -m 1G -I 9090 | zfs receive data/filesystem
# then:
# zfs send -i data/filesystem@1 data/filesystem@2 | mbuffer -s 128k -m 1G -O 10.0.0.1:9090
remoteMvReceive = '"mbuffer -t -4 -s 512k -m 1G -I 47099 | ' + fullReceiveLine + '"'
mbReceive = ' '.join (destination.getZFSCmdLine([remoteMvReceive]))
print ("mbReceive:" + mbReceive)
mbSend = fullSendLine + ' | mbuffer -t -s 512k -m 1G -O ' + destination.server + ':47099'
print ("mbSend:" + mbSend)

spReceive = subprocess.Popen(mbReceive, shell=True)
print ("receive started")

time.sleep(1) # we wait for the receiver to be ready

spSend = subprocess.Popen(mbSend, shell=True)
print ("send started")

spSend.wait()
spReceive.wait()

fuserKill = ' '.join (destination.getZFSCmdLine(['"fuser -k -n tcp 47099"']))
subprocess.call(fuserKill, shell=True)
print ("all done")


#print ("########## Local volumes #######")
Expand Down

0 comments on commit 18f907c

Please sign in to comment.