Skip to content
This repository has been archived by the owner on Jan 7, 2023. It is now read-only.

Support testing on more than 1 ceph pools #257

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def format_result_for_visualizer(self, data):
output_sort = OrderedDict()
output_sort["summary"] = OrderedDict()
monitor_interval = int(self.cluster["monitor_interval"])
res = re.search('^(\d+)-(\w+)-(\w+)-(\w+)-(\w+)-(\w+)-(\w+)-(\d+)-(\d+)-(\w+)$',data["session_name"])
res = re.search('^(\d+)-([^-]+)-(\w+)-(\w+)-(\w+)-(\w+)-(\w+)-(\d+)-(\d+)-([^-]+)$',data["session_name"])
if not res:
return output_sort
rampup = int(res.group(8))
Expand Down Expand Up @@ -376,7 +376,8 @@ def summary_result(self, data):
# generate summary
benchmark_tool = ["fio", "cosbench", "vdbench"]
data["summary"]["run_id"] = {}
res = re.search('^(\d+)-(\w+)-(\w+)-(\w+)-(\w+)-(\w+)-(\w+)-(\d+)-(\d+)-(\w+)$',data["session_name"])
redata = data["session_name"]
res = re.search('^(\d+)-([^-]+)-(\w+)-(\w+)-(\w+)-(\w+)-(\w+)-(\d+)-(\d+)-([^-]+)$', redata)
if not res:
common.printout("ERROR", "Unable to get result infomation")
return data
Expand Down Expand Up @@ -450,7 +451,8 @@ def summary_result(self, data):
tmp_data["99.00th%_lat(ms)"] = "%.3f" % (max_lat_99/rbd_count)
tmp_data["99.99th%_lat(ms)"] = "%.3f" % (max_lat/rbd_count)
except:
pass
err_log = traceback.format_exc()
common.printout("ERROR","%s" % err_log)
read_SN_IOPS = 0
read_SN_BW = 0
read_SN_Latency = 0
Expand Down
20 changes: 11 additions & 9 deletions benchmarking/mod/bblock/fiorbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def load_parameter(self):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
super(self.__class__, self).load_parameter()
self.cluster["rbdlist"] = self.get_rbd_list(self.benchmark["poolname"])
if len(self.cluster["rbdlist"]) < int(self.all_conf_data.get("rbd_volume_count")):
if common.get_total(self.cluster["rbdlist"]) < int(self.all_conf_data.get("rbd_volume_count")):
self.prepare_images()

disk_num_per_client = self.cluster["disk_num_per_client"]
Expand All @@ -22,7 +22,7 @@ def prepare_images(self):
controller = self.cluster["head"]
rbd_count = self.all_conf_data.get("rbd_volume_count")
rbd_size = self.all_conf_data.get("volume_size")
common.printout("LOG","Creating rbd volume")
common.printout("LOG","Preparing rbd volume")
if rbd_count and rbd_size:
super(self.__class__, self).create_image(rbd_count, rbd_size, self.benchmark["poolname"])
else:
Expand All @@ -37,10 +37,11 @@ def prepare_images(self):
clients = self.cluster["testjob_distribution"].keys()
for client in self.cluster["testjob_distribution"]:
common.scp(user, client, "../conf/fio_init.conf", dest_dir)
rbdlist = ' '.join(self.cluster["testjob_distribution"][client])
res = common.pdsh(user, [client], "for rbdname in %s; do POOLNAME=%s RBDNAME=${rbdname} fio --section init-write %s/fio_init.conf & done" % (rbdlist, self.benchmark["poolname"], dest_dir), option = "force")
fio_job_num_total += len(self.cluster["testjob_distribution"][client])
common.printout("LOG","%d FIO Jobs starts on %s" % (len(self.cluster["testjob_distribution"][client]), client))
for pool_name in self.benchmark["poolname"].split(":"):
rbdlist = ' '.join(self.cluster["testjob_distribution"][client][pool_name])
common.printout("LOG","%d FIO Jobs starts on %s" % (len(self.cluster["testjob_distribution"][client]), client))
res = common.pdsh(user, [client], "for rbdname in %s; do POOLNAME=%s RBDNAME=${rbdname} fio --section init-write %s/fio_init.conf & done" % (rbdlist, pool_name, dest_dir), option = "force")
fio_job_num_total += len(self.cluster["testjob_distribution"][client][pool_name])
time.sleep(1)
if not self.check_fio_pgrep(clients, fio_job_num_total):
common.printout("ERROR","Failed to start FIO process",log_level="LVL1")
Expand Down Expand Up @@ -98,9 +99,10 @@ def run(self):
fio_job_num_total = 0
poolname = self.benchmark["poolname"]
for client in self.benchmark["distribution"]:
rbdlist = ' '.join(self.benchmark["distribution"][client])
res = common.pdsh(user, [client], "for rbdname in %s; do POOLNAME=%s RBDNAME=${rbdname} fio --output %s/`hostname`_${rbdname}_fio.txt --write_bw_log=%s/`hostname`_${rbdname}_fio --write_lat_log=%s/`hostname`_${rbdname}_fio --write_iops_log=%s/`hostname`_${rbdname}_fio --section %s %s/fio.conf 2>%s/`hostname`_${rbdname}_fio_errorlog.txt & done" % (rbdlist, poolname, dest_dir, dest_dir, dest_dir, dest_dir, self.benchmark["section_name"], dest_dir, dest_dir), option = "force")
fio_job_num_total += len(self.benchmark["distribution"][client])
for pool_name in poolname.split(":"):
rbdlist = ' '.join(self.benchmark["distribution"][client][pool_name])
res = common.pdsh(user, [client], "for rbdname in %s; do POOLNAME=%s RBDNAME=${rbdname} fio --output %s/`hostname`_${rbdname}_fio.txt --write_bw_log=%s/`hostname`_${rbdname}_fio --write_lat_log=%s/`hostname`_${rbdname}_fio --write_iops_log=%s/`hostname`_${rbdname}_fio --section %s %s/fio.conf 2>%s/`hostname`_${rbdname}_fio_errorlog.txt & done" % (rbdlist, pool_name, dest_dir, dest_dir, dest_dir, dest_dir, self.benchmark["section_name"], dest_dir, dest_dir), option = "force")
fio_job_num_total += len(self.benchmark["distribution"][client][pool_name])
self.chkpoint_to_log("fio start")
time.sleep(1)
if not self.check_fio_pgrep(self.benchmark["distribution"].keys(), fio_job_num_total):
Expand Down
128 changes: 88 additions & 40 deletions benchmarking/mod/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import re
import uuid
import traceback
from analyzer import *
lib_path = ( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

Expand Down Expand Up @@ -71,16 +72,17 @@ def go(self, testcase, tuning):
except:
common.printout("ERROR","analyzer failed, pls try cd analyzer; python analyzer.py --path %s process_data " % self.cluster["dest_dir"],log_level="LVL1")
except:
common.printout("ERROR","The test has been stopped.",log_level="LVL1")
err_log = traceback.format_exc()
common.printout("ERROR","The test has been stopped, error_log: %s." % err_log,log_level="LVL1")

def create_image(self, volume_count, volume_size, poolname):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
user = self.cluster["user"]
controller = self.cluster["head"]
rbd_list = self.get_rbd_list(poolname)
need_to_create = 0
if not len(rbd_list) >= int(volume_count):
need_to_create = int(volume_count) - len(rbd_list)
if not common.get_total(rbd_list) >= int(volume_count):
need_to_create = int(volume_count) - common.get_total(rbd_list)
if need_to_create != 0:
for i in range(0, need_to_create):
volume = 'volume-%s' % str(uuid.uuid4())
Expand All @@ -91,16 +93,19 @@ def get_rbd_list(self, poolname):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
user = self.cluster["user"]
controller = self.cluster["head"]
stdout, stderr = common.pdsh(user, [controller], "rbd ls -p %s" % poolname, option="check_return")
if stderr:
common.printout("ERROR","unable get rbd list, return msg: %s" % stderr,log_level="LVL1")
#sys.exit()
res = common.format_pdsh_return(stdout)
if res != {}:
rbd_list_tmp = (res[controller]).split()
else:
rbd_list_tmp = []
return rbd_list_tmp
ret = {}
for pool_name in poolname.split(':'):
stdout, stderr = common.pdsh(user, [controller], "rbd ls -p %s" % pool_name, option="check_return")
if stderr:
common.printout("ERROR","unable get rbd list, return msg: %s" % stderr,log_level="LVL1")
#sys.exit()
res = common.format_pdsh_return(stdout)
if res != {}:
rbd_list_tmp = res[controller].split()
else:
rbd_list_tmp = []
ret[pool_name] = rbd_list_tmp
return ret

def after_run(self):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
Expand Down Expand Up @@ -355,36 +360,79 @@ def set_runid(self):

def testjob_distribution(self, disk_num_per_client, instance_list):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
start_vclient_num = 0
client_num = 0
self.cluster["testjob_distribution"] = {}
for client in self.cluster["client"]:
vclient_total = int(disk_num_per_client[client_num])
end_vclient_num = start_vclient_num + vclient_total
self.cluster["testjob_distribution"][client] = copy.deepcopy(instance_list[start_vclient_num:end_vclient_num])
start_vclient_num = end_vclient_num
client_num += 1
total_disk = 0
workload_engine = "fiorbd"
tmp_instance_list = {}
if isinstance( instance_list, list ):
tmp_instance_list["vclient"] = instance_list
workload_engine = "qemu"
else:
tmp_instance_list = instance_list
for disk_num in disk_num_per_client:
total_disk += int(disk_num)
for pool_name in tmp_instance_list.keys():
start_vclient_num = 0
end_vclient_num = 0
client_num = 0
for client in self.cluster["client"]:
vclient_total = (int(disk_num_per_client[client_num])/total_disk) * len(tmp_instance_list[pool_name])
end_vclient_num = start_vclient_num + vclient_total
if workload_engine != "qemu":
if client not in self.cluster["testjob_distribution"]:
self.cluster["testjob_distribution"][client] = {}
self.cluster["testjob_distribution"][client][pool_name] = copy.deepcopy(tmp_instance_list[pool_name][start_vclient_num:end_vclient_num])
else:
self.cluster["testjob_distribution"][client] = copy.deepcopy(tmp_instance_list[pool_name][start_vclient_num:end_vclient_num])
start_vclient_num = end_vclient_num
client_num += 1
print self.cluster["testjob_distribution"]

def cal_run_job_distribution(self):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
number = int(self.benchmark["instance_number"])
client_total = len(self.cluster["client"])
if (number % client_total) > 0:
volume_max_per_client = number / client_total + 1
else:
volume_max_per_client = number / client_total

self.benchmark["distribution"] = {}
remained_instance_num = number
for client in self.cluster["testjob_distribution"]:
if not remained_instance_num:
break
if remained_instance_num < volume_max_per_client:
volume_num_upper_bound = remained_instance_num
else:
volume_num_upper_bound = volume_max_per_client
self.benchmark["distribution"][client] = copy.deepcopy(self.cluster["testjob_distribution"][client][:volume_num_upper_bound])
remained_instance_num = remained_instance_num - volume_num_upper_bound
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
client_total = len(self.cluster["client"])
self.benchmark["distribution"] = {}
workload_engine = "fiorbd"

for client in self.cluster["client"]:
volume_count_per_pool = {}
pool_id = 0
tmp_testjob_distribution = {}
instance_num_per_pool = self.benchmark["instance_number"].split(":")
if isinstance( self.cluster["testjob_distribution"][client], list ):
tmp_testjob_distribution["vclient"] = self.cluster["testjob_distribution"][client]
workload_engine = "qemu"
else:
tmp_testjob_distribution = self.cluster["testjob_distribution"][client]
for pool_name in tmp_testjob_distribution.keys():
volume_count_per_pool[pool_name] = {"volume_max_per_client":0, "remained_instance_num": 0}
if pool_id < len(instance_num_per_pool):
number = int(instance_num_per_pool[pool_id])
pool_id += 1
#use old number
volume_count_per_pool[pool_name]["remained_instance_num"] = number
if (number % client_total) > 0:
volume_count_per_pool[pool_name]["volume_max_per_client"] = number / client_total + 1
else:
volume_count_per_pool[pool_name]["volume_max_per_client"] = number / client_total

for pool_name in tmp_testjob_distribution.keys():
remained_instance_num = volume_count_per_pool[pool_name]["remained_instance_num"]
volume_max_per_client = volume_count_per_pool[pool_name]["volume_max_per_client"]
if not remained_instance_num:
break
if remained_instance_num < volume_max_per_client:
volume_num_upper_bound = remained_instance_num
else:
volume_num_upper_bound = volume_max_per_client
if workload_engine != "qemu":
if client not in self.benchmark["distribution"]:
self.benchmark["distribution"][client] = {}
self.benchmark["distribution"][client][pool_name] = copy.deepcopy(tmp_testjob_distribution[pool_name][:volume_num_upper_bound])
else:
self.benchmark["distribution"][client] = copy.deepcopy(tmp_testjob_distribution[pool_name][:volume_num_upper_bound])
remained_instance_num = remained_instance_num - volume_num_upper_bound
print self.benchmark["distribution"]

def check_fio_pgrep(self, nodes, fio_node_num = 1, check_type="jobnum"):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
Expand Down
14 changes: 12 additions & 2 deletions conf/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,10 @@ def remove_unit(data):

def size_to_Kbytes(size, dest_unit='KB', arg=1024.0):
if not str(size).isdigit():
res = re.search('(\d+\.*\d*)\s*(\D*)',size)
size_s = size.replace('\n','')
res = re.search('(\d+\.*\d*)\s*(\D*)',size_s)
space_num = float(res.group(1))
space_unit = res.group(2)
space_unit = res.group(2).strip()
if space_unit == "":
space_unit = 'B'
else:
Expand Down Expand Up @@ -558,6 +559,15 @@ def unique_extend( list_data, new_list ):
list_data.append( data )
return list_data

def get_total( data ):
total = 0
if isinstance(data, list):
total = len(data)
elif isinstance(data, dict):
for key, value in data.items():
total += get_total(value)
return total

def read_file_after_stamp(path, stamp = None):
lines = []
output = False
Expand Down
9 changes: 5 additions & 4 deletions visualizer/create_DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def createTB(self,dbpath):
driver varchar(10) not null,
snnumber int not null,
cnnumber int not null,
worker int not null,
worker varchar(20) not null,
runtime int not null,
iops float not null,
bw float not null,
Expand Down Expand Up @@ -77,11 +77,12 @@ def insert_to_TB(self,data,dbpath):
if rowdata[4] == '':
rowdata[4] = 'None'
if len(rowdata) == 22:
sqlstr = "insert into tb_report (runid,runid_tr,timestamp,status,description,opsize,optype,qd,driver,snnumber,cnnumber,worker,runtime,iops,bw,latency,latency_95,latency_99,latency_9999,sniops,snbw,snlatency) values ("+rowdata[1]+",'"+rowdata[0]+"','"+rowdata[2]+"','"+rowdata[3]+"','"+rowdata[4]+"','"+rowdata[5]+"','"+rowdata[6]+"','"+rowdata[7]+"','"+rowdata[8]+"',"+rowdata[9]+","+rowdata[10]+","+rowdata[11]+","+rowdata[12]+",'"+rowdata[13]+"','"+rowdata[14]+"','"+rowdata[15]+"','"+rowdata[16]+"','"+rowdata[17]+"','"+rowdata[18]+"','"+rowdata[19]+"','"+rowdata[20]+"','"+rowdata[21]+"')"
sqlstr = "insert into tb_report (runid,runid_tr,timestamp,status,description,opsize,optype,qd,driver,snnumber,cnnumber,worker,runtime,iops,bw,latency,latency_95,latency_99,latency_9999,sniops,snbw,snlatency) values ("+rowdata[1]+",'"+rowdata[0]+"','"+rowdata[2]+"','"+rowdata[3]+"','"+rowdata[4]+"','"+rowdata[5]+"','"+rowdata[6]+"','"+rowdata[7]+"','"+rowdata[8]+"',"+rowdata[9]+","+rowdata[10]+",'"+rowdata[11]+"',"+rowdata[12]+",'"+rowdata[13]+"','"+rowdata[14]+"','"+rowdata[15]+"','"+rowdata[16]+"','"+rowdata[17]+"','"+rowdata[18]+"','"+rowdata[19]+"','"+rowdata[20]+"','"+rowdata[21]+"')"
elif len(rowdata) == 20:
sqlstr = "insert into tb_report (runid,runid_tr,timestamp,status,description,opsize,optype,qd,driver,snnumber,cnnumber,worker,runtime,iops,bw,latency,latency_95,latency_99,latency_9999,sniops,snbw,snlatency) values ("+rowdata[1]+",'"+rowdata[0]+"','"+rowdata[2]+"','"+rowdata[3]+"','"+rowdata[4]+"','"+rowdata[5]+"','"+rowdata[6]+"','"+rowdata[7]+"','"+rowdata[8]+"',"+rowdata[9]+","+rowdata[10]+","+rowdata[11]+","+rowdata[12]+",'"+rowdata[13]+"','"+rowdata[14]+"','"+rowdata[15]+"','0.00','"+rowdata[16]+"','0.00','"+rowdata[17]+"','"+rowdata[18]+"','"+rowdata[19]+"')"
sqlstr = "insert into tb_report (runid,runid_tr,timestamp,status,description,opsize,optype,qd,driver,snnumber,cnnumber,worker,runtime,iops,bw,latency,latency_95,latency_99,latency_9999,sniops,snbw,snlatency) values ("+rowdata[1]+",'"+rowdata[0]+"','"+rowdata[2]+"','"+rowdata[3]+"','"+rowdata[4]+"','"+rowdata[5]+"','"+rowdata[6]+"','"+rowdata[7]+"','"+rowdata[8]+"',"+rowdata[9]+","+rowdata[10]+",'"+rowdata[11]+"',"+rowdata[12]+",'"+rowdata[13]+"','"+rowdata[14]+"','"+rowdata[15]+"','0.00','"+rowdata[16]+"','0.00','"+rowdata[17]+"','"+rowdata[18]+"','"+rowdata[19]+"')"
elif len(rowdata) == 19:
sqlstr = "insert into tb_report (runid,runid_tr,timestamp,status,description,opsize,optype,qd,driver,snnumber,cnnumber,worker,runtime,iops,bw,latency,latency_95,latency_99,latency_9999,sniops,snbw,snlatency) values ("+rowdata[1]+",'"+rowdata[0]+"','"+rowdata[2]+"','"+rowdata[3]+"','"+rowdata[4]+"','"+rowdata[5]+"','"+rowdata[6]+"','"+rowdata[7]+"','"+rowdata[8]+"',"+rowdata[9]+","+rowdata[10]+","+rowdata[11]+","+rowdata[12]+",'"+rowdata[13]+"','"+rowdata[14]+"','"+rowdata[15]+"','0.00','0.00','0.00','"+rowdata[16]+"','"+rowdata[17]+"','"+rowdata[18]+"')"
sqlstr = "insert into tb_report (runid,runid_tr,timestamp,status,description,opsize,optype,qd,driver,snnumber,cnnumber,worker,runtime,iops,bw,latency,latency_95,latency_99,latency_9999,sniops,snbw,snlatency) values ("+rowdata[1]+",'"+rowdata[0]+"','"+rowdata[2]+"','"+rowdata[3]+"','"+rowdata[4]+"','"+rowdata[5]+"','"+rowdata[6]+"','"+rowdata[7]+"','"+rowdata[8]+"',"+rowdata[9]+","+rowdata[10]+",'"+rowdata[11]+"',"+rowdata[12]+",'"+rowdata[13]+"','"+rowdata[14]+"','"+rowdata[15]+"','0.00','0.00','0.00','"+rowdata[16]+"','"+rowdata[17]+"','"+rowdata[18]+"')"
print sqlstr
conn.execute(sqlstr)
conn.commit()
print "Add data to TB successfully."
Expand Down
Loading