Skip to content

Commit

Permalink
Resolved merge conflict , usinthe one multi_process version
Browse files Browse the repository at this point in the history
  • Loading branch information
Maux82 committed Feb 28, 2017
2 parents 27d02eb + db3eb02 commit b543963
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 170 deletions.
250 changes: 157 additions & 93 deletions moff.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import sys
import time
from sys import platform as _platform
import multiprocessing



import numpy as np
import pandas as pd
Expand All @@ -32,6 +35,24 @@
TXIC_PATH = os.environ.get('TXIC_PATH', './')


def save_moff_apex_result(list_df, result, folder_output, name):
xx = []
for df_index in range(0,len(list_df)):
if result[df_index].get()[1] == -1:
exit ('Raw file not retrieved: wrong path or upper/low case mismatch')
else:
xx.append( result[df_index].get()[0] )

final_res = pd.concat(xx)
#print final_res.shape
# print os.path.join(folder_output,os.path.basename(name).split('.')[0] + "_moff_result.txt")
final_res.to_csv(os.path.join(folder_output, os.path.basename(name).split('.')[0] + "_moff_result.txt"), sep="\t",
index=False)

return (1)



def map_ps2moff(data):
data.drop(data.columns[[0]], axis=1, inplace=True)
data.columns = data.columns.str.lower()
Expand Down Expand Up @@ -87,86 +108,79 @@ def pyMZML_xic_out(name, ppmPrecision, minRT, maxRT, MZValue):
else:
return (pd.DataFrame(timeDependentIntensities, columns=['rt', 'intensity']), -1)

def check_log_existence(file_to_check):
if os.path.isfile(file_to_check):
os.remove(file_to_check)




def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_output,log):
# OS detect
def apex_multithr(data_ms2,name_file, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_output,offset_index):

#setting logger for multiprocess
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
log.addHandler(ch)

#setting flag and ptah
moff_path = os.path.dirname(sys.argv[0])
flag_mzml = False
flag_windows = False
mbr_flag = 0

# set platform
if _platform in ["linux", "linux2", 'darwin']:
flag_windows = False
elif _platform == "win32":
flag_windows = True

# flag_for matching
mbr_flag = 0
config = ConfigParser.RawConfigParser()
# get the running path of moff
moff_path = os.path.dirname(sys.argv[0])

# it s always placed in same folder of moff.py

config.read(os.path.join(moff_path, 'moff_setting.properties'))

# case of moff_all more than one subfolderi
name = os.path.basename(file_name).split('.')[0]
if '_match' in name:
# in case of mbr , here i dont have evaluate the flag mbr
start = name.find('_match')
# extract the name of the file
name = name[0:start]


# check output log file in right location
if loc_output != '':
if not (os.path.isdir(loc_output)):
os.makedirs(loc_output)
log.info("created output folder: ", loc_output)

# outputname : name of the output
# it should be ok also in linux
outputname = os.path.join(loc_output, name + "_moff_result.txt")
fh = logging.FileHandler(os.path.join(loc_output, name + '__moff.log'), mode='w')
else:
outputname = name + "_moff_result.txt"
fh = logging.FileHandler(os.path.join(name + '__moff.log'), mode='w')
# to be checked if it is works ffor both caseses
fh = logging.FileHandler(os.path.join(loc_output, name_file + '__moff.log'), mode='a')

fh.setLevel(logging.INFO)
log.addHandler(fh)
flag_mzml = False

# check mbr input file
if '_match' in name_file:
# in case of mbr , here i dont have evaluate the flag mbr
start = name_file.find('_match')
# extract the name of the file
name_file = name_file[0:start]

if loc_raw is not None:
if flag_windows:
loc = os.path.join(loc_raw, name + '.RAW')
loc = os.path.join(loc_raw, name_file.upper()+ '.RAW')

else:
# raw file name must have capitals letters :) this shloud be checked
loc = os.path.join(loc_raw, name.upper() + '.RAW')
# this should be done in moe elegant way

loc = os.path.normcase(os.path.join(loc_raw, name_file + '.RAW'))

if not (os.path.isfile(loc)):
loc = os.path.join(loc_raw, name_file + '.raw')

else:
## only with specific file name I usu mzML file :::///
## I have already the full location
loc = raw_name
#mzML work only with --inputraw option
loc = raw_name
if ('MZML' in raw_name.upper()):
flag_mzml = True

#print loc
if os.path.isfile(loc):
log.info('raw file exist')
else:
exit('ERROR: Wrong path or wrong file name included: %s' % loc)
#exit('ERROR: Wrong path or wrong raw file name included: %s' % loc )
log.info('ERROR: Wrong path or wrong raw file name included: %s' % loc )
return (None,-1)

log.critical('moff Input file: %s XIC_tol %s XIC_win %4.4f moff_rtWin_peak %4.4f ' % (file_name, tol, h_rt_w, s_w))
log.critical('RAW file : %s' % (loc))
log.info('Output_file in : %s', outputname)
log.info('RAW file location : %s', loc)

# read data from file
data_ms2 = pd.read_csv(file_name, sep="\t", header=0)
if not 'matched' in data_ms2.columns:
# check if it is a PS file ,
list_name = data_ms2.columns.values.tolist()
# get the lists of PS defaultcolumns from properties file
list = ast.literal_eval(config.get('moFF', 'ps_default_export'))
# here it controls if the input file is a PS export; if yes it maps the input in right moFF name
if check_ps_input_data(list_name, list) == 1:
# map the columns name according to moFF input requirements
data_ms2, list_name = map_ps2moff(data_ms2)
if check_columns_name(data_ms2.columns.tolist(), ast.literal_eval(config.get('moFF', 'col_must_have_x'))) == 1:
exit('ERROR minimal field requested are missing or wrong')

index_offset = data_ms2.columns.shape[0] - 1

Expand All @@ -189,42 +203,42 @@ def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_outp
data_ms2["log_L_R"] = data_ms2['log_L_R'].astype('float64')
data_ms2["log_int"] = data_ms2['log_int'].astype('float64')



# set mbr_flag
if 'matched' in data_ms2.columns:
mbr_flag = 1
log.info('Apex module has detected mbr peptides')
log.info('moff_rtWin_peak for matched peptide: %4.4f ', s_w_match)
c = 0
log.critical('Starting apex .........')
#log.critical('Apex module has detected mbr peptides')
#log.info('moff_rtWin_peak for matched peptide: %4.4f ', s_w_match)

start_time = time.time()
c =0
for index_ms2, row in data_ms2.iterrows():
# log.info('peptide at line: %i',c)

mz_opt = "-mz=" + str(row['mz'])
#### PAY ATTENTION HERE , we assume that input RT is in second
## RT in Thermo file is in minutes
#### if it is not the case change the following line
time_w = row['rt'] / 60
if mbr_flag == 0:
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f ', c, row['mz'], time_w)
#print 'xx here mbr_flag == 0'
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f',(offset_index +c +2), row['mz'], time_w)
temp_w = s_w
else:
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f matched(y/n): %i', c, row['mz'], time_w,
row['matched'])
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f matched (yes=1/no=0): %i',(offset_index +c +2), row['mz'], time_w,row['matched'])
# row['matched'])
if row['matched'] == 1:
temp_w = s_w_match
else:
temp_w = s_w
if row['rt'] == -1:
log.warning('rt not found. Wrong matched peptide in the mbr step line: %i', c)
log.warning('rt not found. Wrong matched peptide in the mbr step line: %i', (offset_index +c +2))
c += 1
continue
try:
if flag_mzml:
# mzml raw file
# transform the tollerance in ppm
data_xic, status = pyMZML_xic_out(loc, float(tol / (10 ** 6)), time_w - h_rt_w, time_w + h_rt_w,
row['mz'])
data_xic, status = pyMZML_xic_out(loc, float(tol / (10 ** 6)), time_w - h_rt_w, time_w + h_rt_w,row['mz'])

if status == -1:
log.warning("WARNINGS: XIC not retrived line: %i", c)
Expand All @@ -235,16 +249,13 @@ def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_outp
# Thermo RAW file
if flag_windows:
os.path.join('folder_name', 'file_name')
args_txic = shlex.split(
os.path.join(moff_path, "txic.exe") + " " + mz_opt + " -tol=" + str(tol) + " -t " + str(
time_w - h_rt_w) + " -t " + str(time_w + h_rt_w) + " " + loc, posix=False)
args_txic = shlex.split(os.path.join(moff_path, "txic.exe") + " " + mz_opt + " -tol=" + str(tol) + " -t " + str( time_w - h_rt_w) + " -t " + str(time_w + h_rt_w) + " " + loc, posix=False)
else:
args_txic = shlex.split(TXIC_PATH + "txic " + mz_opt + " -tol=" + str(tol) + " -t " + str(
time_w - h_rt_w) + " -t " + str(
time_w + h_rt_w) + " " + loc)

args_txic = shlex.split(TXIC_PATH + "txic " + mz_opt + " -tol=" + str(tol) + " -t " + str(time_w - h_rt_w) + " -t " + str(time_w + h_rt_w) + " " + loc )

p = subprocess.Popen(args_txic, stdout=subprocess.PIPE)
output, err = p.communicate()

data_xic = pd.read_csv(StringIO.StringIO(output.strip()), sep=' ', names=['rt', 'intensity'], header=0)
if data_xic[(data_xic['rt'] > (time_w - temp_w)) & (data_xic['rt'] < (time_w + temp_w))].shape[0] >= 1:
ind_v = data_xic.index
Expand All @@ -253,15 +264,14 @@ def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_outp
'intensity'].max()].index
pos_p = ind_v[pp]
if pos_p.values.shape[0] > 1:
log.warning(" RT gap for the time windows searched. Probably the ppm values is too small %i", c)
continue
val_max = data_xic.ix[pos_p, 1].values
else:
log.info("LW_BOUND window %4.4f", time_w - temp_w)
log.info("UP_BOUND window %4.4f", time_w + temp_w)
log.info(data_xic[(data_xic['rt'] > (time_w - +0.60)) & (data_xic['rt'] < (time_w + 0.60))])
log.info("WARNINGS: moff_rtWin_peak is not enough to detect the max peak line : %i", c)
log.info('MZ: %4.4f RT: %4.4f Mass: %i', row['mz'], row['rt'], index_ms2)
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f ', (offset_index +c +2), row['mz'], time_w)
log.info("\t LW_BOUND window %4.4f", time_w - temp_w)
log.info("\t UP_BOUND window %4.4f", time_w + temp_w)
#cosche log.info(data_xic[(data_xic['rt'] > (time_w - +0.60)) & (data_xic['rt'] < (time_w + 0.60))])
log.info("\t WARNINGS: moff_rtWin_peak is not enough to detect the max peak ")
c += 1
continue
pnoise_5 = np.percentile(
Expand All @@ -271,13 +281,14 @@ def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_outp
data_xic[(data_xic['rt'] > (time_w - (h_rt_w / 2))) & (data_xic['rt'] < (time_w + (h_rt_w / 2)))][
'intensity'], 10)
except (IndexError, ValueError, TypeError):
log.warning(" size is not enough to detect the max peak line : %i", c)
log.info('MZ: %4.4f RT: %4.4f index: %i', row['mz'], row['rt'], index_ms2)
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f ', (offset_index +c +2), row['mz'], time_w)
log.warning("\t size is not enough to detect the max peak line : %i", c)
continue
c += 1
except pd.parser.CParserError:
log.warning("WARNINGS: XIC not retrived line: %i", c)
log.warning('MZ: %4.4f RT: %4.4f Mass: %i', row['mz'], row['rt'], index_ms2)
log.info('peptide at line %i --> MZ: %4.4f RT: %4.4f ', (offset_index +c +2), row['mz'], time_w)
log.warning("\t WARNINGS: XIC not retrived line:")
#log.warning('MZ: %4.4f RT: %4.4f Mass: %i', row['mz'], row['rt'], index_ms2)

c += 1
continue
Expand Down Expand Up @@ -313,27 +324,23 @@ def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_outp
if (pnoise_5 == 0 and pnoise_10 > 0):
data_ms2.ix[index_ms2, (index_offset + 7)] = 20 * np.log10(data_xic.ix[pos_p, 1].values / pnoise_10)
else:
data_ms2.ix[index_ms2, (index_offset + 7)] = 20 * np.log10(data_xic.ix[pos_p, 1].values / pnoise_5)
if pnoise_5 != 0:
data_ms2.ix[index_ms2, (index_offset + 7)] = 20 * np.log10(data_xic.ix[pos_p, 1].values / pnoise_5)
else:
log.info('\t 5 percentile is %4.4f (added 0.5)', pnoise_5)
data_ms2.ix[index_ms2, (index_offset + 7)] = 20 * np.log10(data_xic.ix[pos_p, 1].values / (pnoise_5 +0.5))
# WARNING time - log_time 0 / time -log_time 1
data_ms2.ix[index_ms2, (index_offset + 8)] = np.log2(
abs(data_ms2.ix[index_ms2, index_offset + 2] - log_time[0]) / abs(
data_ms2.ix[index_ms2, index_offset + 2] - log_time[1]))
data_ms2.ix[index_ms2, (index_offset + 9)] = np.log2(val_max)
c += 1

# save result
log.critical('..............apex terminated')
log.critical('Writing result in %s' % (outputname))
log.info("--- Running time (measured when start the loop) %s seconds ---" % (time.time() - start_time))
# print time.time() - start_time
data_ms2.to_csv(path_or_buf=outputname, sep="\t", header=True, index=False)
fh.close()
log.removeHandler(fh)

return
return (data_ms2,1)


if __name__ == '__main__':
def main_apex_alone():
parser = argparse.ArgumentParser(description='moFF input parameter')

parser.add_argument('--inputtsv', dest='name', action='store',
Expand Down Expand Up @@ -381,4 +388,61 @@ def run_apex(file_name, raw_name, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_outp
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
log.addHandler(ch)
run_apex(file_name, args.raw_list, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_output,log)

config = ConfigParser.RawConfigParser()
config.read(os.path.join(os.path.dirname(sys.argv[0]), 'moff_setting.properties'))

df = pd.read_csv(file_name, sep="\t")
## check and eventually tranf for PS template
if not 'matched' in df.columns:
# check if it is a PS file ,
list_name = df.columns.values.tolist()
# get the lists of PS defaultcolumns from properties file
list = ast.literal_eval(config.get('moFF', 'ps_default_export'))
# here it controls if the input file is a PS export; if yes it maps the input in right moFF name
if check_ps_input_data(list_name, list) == 1:
# map the columns name according to moFF input requirements
data_ms2, list_name = map_ps2moff(df)
## check if the field names are good
if check_columns_name(df.columns.tolist(), ast.literal_eval(config.get('moFF', 'col_must_have_x'))) == 1:
exit('ERROR minimal field requested are missing or wrong')

log.critical('moff Input file: %s XIC_tol %s XIC_win %4.4f moff_rtWin_peak %4.4f ' % (file_name, tol, h_rt_w, s_w))
if args.raw_list is None:
log.critical('RAW file from folder : %s' % loc_raw)
else:
log.critical('RAW file : %s' % args.raw_list)

log.critical('Output file in : %s', loc_output)

data_split = np.array_split(df, multiprocessing.cpu_count())

log.critical('Starting Apex for .....')
#print 'Original input size', df.shape
name = os.path.basename(file_name).split('.')[0]

##check the existencce of the log file before to go to multiprocess
check_log_existence(os.path.join(loc_output, name + '__moff.log'))

myPool = multiprocessing.Pool(multiprocessing.cpu_count())

result = {}
offset = 0
start_time = time.time()
for df_index in range(0, len(data_split)):
result[df_index] = myPool.apply_async(apex_multithr, args=(
data_split[df_index], name, args.raw_list, tol, h_rt_w, s_w, s_w_match, loc_raw, loc_output, offset))
offset += len(data_split[df_index])

myPool.close()
myPool.join()

log.critical('...apex terminated')
print 'Time no result collect', time.time() -start_time
start_time_2 = time.time()
save_moff_apex_result(data_split, result, loc_output, file_name)
#print 'Time no result collect 2', time.time() -start_time_2


if __name__ == '__main__':
main_apex_alone()
Loading

0 comments on commit b543963

Please sign in to comment.