forked from paulranum11/SPLiT-Seq_demultiplexing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
splitseq_utilities.py
executable file
·134 lines (119 loc) · 3.54 KB
/
splitseq_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python
#
import math
import os
import psutil
#
###############
# PERSISTANCE #
###############
#
# flushBuffers - Transfer all data stored in the script buffers to disk, then free the memory in the buffers
#
# buffers - dictionary object with filenames as the keys, and a list of strings as the values
#
def flushBuffers(directory, buffers):
for bufferKey in buffers:
file = open(os.path.join(directory, bufferKey), "a+")
file.write(''.join(buffers[bufferKey]))
file.close()
buffers.clear()
#
# createDirectory - Create a directory in the current working directory
#
# directory - Name of the directory to create
#
def createDirectory(directory):
try:
os.mkdir(directory)
except OSError:
print ("Directory already present [{}]".format(directory))
else:
print ("Successfully created the directory [{}]".format(directory))
#
# clearFilesMatchingFilter - Clean up any files that didn't meet filter criteria, having less reads than the minimum reads
#
# directory - Directory to clean up files from
# filter - Lambda predicate function to test whether or not to
#
def clearFilesMatchingFilter(directory, filter):
for (dirpath, dirnames, filenames) in os.walk(directory):
for filename in filenames:
if filter(filename):
os.remove(os.path.join(directory, filename))
#
# countFiles - Count the number of files in a given directory matching a specified filter
#
# directory - Directory to count files in
# filter - Lambda predicate function to test whether or not to include the file in the count
#
def countFilesMatchingFilter(directory, filter):
count = 0
for (dirpath, dirnames, filenames) in os.walk(directory):
for filename in filenames:
if filter(filename):
count = count + 1
return count
#
##########
# MEMORY #
##########
#
# analyzeMemoryUsage - Analyze the current memory usage of the script. Return RSS memory used in bytes
#
def analyzeMemoryUsage():
pid = os.getpid()
py = psutil.Process(pid)
rss = py.memory_info().rss
return rss
#
# bytesToDisplay - Convert number of bytes to a human-readable string
#
# bytes - number of bytes
#
def bytesToDisplay(bytes):
if bytes == 0:
return "0B"
units = ("B", "KB", "MB", "GB")
i = int(math.floor(math.log(bytes, 1024)))
p = math.pow(1024, i)
s = round(bytes / p, 2)
return "%s %s" % (s, units[i])
#
# mbToBytes - Convert input in MB to bytes
#
# value - Value to convert, in megabytes
#
def mbToBytes(value):
mb = int(value)
return mb * 1024 * 1024
#
#####################
# DATA MANIPULATION #
#####################
#
# addToDictionaryList - Either append new value to an existing dictionary value list or initialize a new value list
#
# dictionary - Dictionary data structure to populate
# key - Key in the dictionary, will be created if it does not previously exist
# value - Value to add to the list at the given key, create a new list if key did not exist previously
#
def addToDictionaryList(dictionary, key, value):
if key in dictionary:
dictionary[key].append(value)
else:
dictionary[key] = []
dictionary[key].append(value)
#
# addToDictionarySet - Either append new value to an existing dictionary value set or initialize a new value set
#
# dictionary - Dictionary data structure to populate
# key - Key in the dictionary, will be created if it does not previously exist
# value - Value to add to the set for the given key, create a new set if key did not exist previously
#
def addToDictionarySet(dictionary, key, value):
if key in dictionary:
dictionary[key].add(value)
else:
dictionary[key] = set()
dictionary[key].add(value)