-
Notifications
You must be signed in to change notification settings - Fork 0
/
general_utils.py
134 lines (104 loc) · 4.33 KB
/
general_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python
'''
Author: Prag Batra [email protected]
Purpose:
General helper functions for STMP (e.g. resolving relative vs absolute paths, etc.)
Explanation:
Example:
'''
import os
import datetime
import subprocess
import gzip
import mmap
import logging
# improved open function that handles .gz files properly
def open_compressed_or_regular(f, options):
if(f.endswith('.gz')):
return gzip.open(f, options)
#else
return open(f, options)
# gets number of lines in a file
def get_num_lines(filePath, ignore_vcf_info_lines=True):
logger = logging.getLogger(__name__)
if(ignore_vcf_info_lines):
cmd = 'grep -v "##" {file}|wc -l'.format(file=filePath)
else:
cmd = 'wc -l {file}'.format(file = filePath)
# cmd = 'wc -l {file}'.format(file=filePath)
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
stdout,stderr = proc.communicate()
returncode = proc.returncode
if(returncode != 0):
logger.error('Failed to get number of lines in file. Cmd: ' + cmd + '\nError: ' + str(stderr) + '\nOutput: ' + str(stdout) + '\nReturn Code: ' + str(returncode), exc_info=True)
raise ValueError('Failed to get number of lines in file. Cmd: ' + cmd + '\nError: ' + str(stderr) + '\nOutput: ' + str(stdout) + '\nReturn Code: ' + str(returncode))
#else
numLines = int(stdout.split(' ')[0])
return numLines
# gets absolute path to directory where the code (i.e. this script file) is
def get_code_dir_abs():
return os.path.dirname(os.path.realpath(__file__))
# prints the given string every [interval] s or longer (depending on how often this method is called). Call it each time the progress (e.g. line number) changes.
# NOTE: prevTime and interval must be in SECONDS
#THIS FUNCTION HAS BEEN EDITED TO ONLY WRITE TO THE LOG--it can be changed later
def print_progress_timed(progressStr, prevTime, interval=5):
logger = logging.getLogger(__name__)
currtime = datetime.datetime.now().second
if(abs(currtime - prevTime) > interval):
logger.debug(str(datetime.datetime.now()) + ': ' + progressStr)
#print str(datetime.datetime.now()) + ': ' + progressStr
return currtime
#else
return prevTime
def list2dict(list):
dict = {}
for elt in list:
dict[elt] = 1
return dict
def get_file_extension(file_path):
filename,extension = os.path.splitext(file_path)
return extension
# assumes headlist is first line of file
def get_headlist_tsv(myfile):
f = open(myfile, 'r')
lineContents = f.readline().rstrip("\n").split("\t")
return lineContents
# if needed, converts relative path (relative to the current working directory) to an absolute path
def root_or_cwd(mydir):
# If the user has specified a file using a partial filepath relative to the current working directory, complete that path so that
# the file may be located. If the filepath is absolute (i.e. starting from the root directory) then leave it alone.
if mydir[0] != '/':
return os.path.join(os.getcwd(), mydir) # merge the current working directory to the provided partial path
else:
return(mydir) # no change
# if needed, converts relative path to absolute path (with respect to code directory)
def root_or_code_dir(mydir):
if mydir[0] != '/':
return os.path.join(get_code_dir_abs(), mydir)
else:
return mydir
# gets the absolute path to the parent dir of the given path
def get_parent_dir(path):
return os.path.abspath(os.path.join(path, os.pardir))
# gets name of file/directory (including extension)
def get_file_or_dir_name(path):
return os.path.split(path)[-1]
# strips LAST file extension of a filename/path (e.g. .tar.gz would become .tar)
def strip_file_extension(path_or_filename):
return os.path.splitext(path_or_filename)[0]
# replaces the last _ number of occurrences (default 1) from the END of the string to the beginning
def rreplace(s, old, new, num_occurrences=1):
li = s.rsplit(old, num_occurrences)
return new.join(li)
# CURRENTLY UNUSED
# helper function: uses mmap to delete a given set of positions from a file
def deleteFromMmap(f, mm, start, end):
length = end - start
size = len(mm)
newsize = size - length
mm.move(start,end,size-end)
mm.flush()
mm.close()
f.truncate(newsize)
mm = mmap.mmap(f.fileno(),0)
return mm