-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclose_intervals.py
executable file
·161 lines (124 loc) · 3.88 KB
/
close_intervals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python
"""
"Close" genomic intervals (in the sense of Minkowski/morphology set operations).
see en.wikipedia.org/wiki/Closing_(morphology)
"""
from sys import argv,stdin,stderr,exit
from math import ceil
def usage(s=None):
message = """
usage: cat intervals_file | close_intervals <length> [options] > intervals_file
<length> the number of bases to 'close' between intervals
--origin=one intervals are origin-one, closed
--origin=zero intervals are origin-zero, half-open
(this is the default)
Note that we allow incoming intervals to extend beyond the end of a
chromosome (and thus output intervals might also)."""
if (s == None): exit (message)
else: exit ("%s\n%s" % (s,message))
def main():
global debug
# parse args
closingLength = None
origin = "zero"
debug = []
for arg in argv[1:]:
if ("=" in arg):
argVal = arg.split("=",1)[1]
if (arg.startswith("--origin=")):
origin = argVal
if (origin == "0"): origin = "zero"
if (origin == "1"): origin = "one"
assert (origin in ["zero","one"]), "can't understand %s" % arg
elif (arg == "--debug"):
debug += ["debug"]
elif (arg.startswith("--debug=")):
debug += argVal.split(",")
elif (arg.startswith("--")):
usage("unrecognized option: %s" % arg)
elif (closingLength == None):
closingLength = int_with_unit(arg)
if (closingLength < 0): usage("length must be non-negative")
else:
usage("unrecognized option: %s" % arg)
if (closingLength == None):
usage("you must provide the length")
# collect the intervals
chromToIntervals = {}
chroms = []
for (chrom,start,end) in read_intervals(stdin,origin=origin):
if (chrom not in chromToIntervals):
chromToIntervals[chrom] = []
chroms += [chrom]
chromToIntervals[chrom] += [(start,end)]
# perform dilation, then erosion (and merge)
dilationLength = (closingLength+1) / 2
erosionLength = dilationLength
for chrom in chroms:
intervals = [(start-dilationLength,end+dilationLength)
for (start,end) in chromToIntervals[chrom]]
for (start,end) in non_overlapping_intervals(intervals):
start += erosionLength
end -= erosionLength
if (origin == "one"): start += 1
print "%s %d %d" % (chrom,start,end)
def read_intervals(f,origin="zero"):
numFields = None
lineNumber = 0
for line in f:
lineNumber += 1
line = line.strip()
if (line == ""): continue
if (line.startswith("#")): continue
fields = line.split()
if (numFields == None):
assert (len(fields) >= 3), \
"not enough fields at line %d (%d, expected at least %d)" \
% (lineNumber,len(fields),3)
numFields = len(fields)
else:
assert (len(fields) == numFields), \
"inconsistent number of fields at line %d (%d, expected %d)" \
% (lineNumber,len(fields),numFields)
try:
chrom = fields[0]
start = int(fields[1])
end = int(fields[2])
if (end < start): raise ValueError
if (origin == "one"): start -= 1
except ValueError:
assert (False), "bad line (%d): %s" % (lineNumber,line)
yield (chrom,start,end)
# merge into non-overlapping intervals
def non_overlapping_intervals(intervals):
intervals.sort()
overlaps = []
start = end = None
for (s,e) in intervals:
if (start == None):
(start,end) = (s,e)
elif (s < end):
end = max(end,e)
else:
overlaps += [(start,end)]
(start,end) = (s,e)
if (start != None):
overlaps += [(start,end)]
return overlaps
# int_with_unit--
# Parse a string as an integer, allowing unit suffixes
def int_with_unit(s):
if (s.endswith("K")):
multiplier = 1000
s = s[:-1]
elif (s.endswith("M")):
multiplier = 1000 * 1000
s = s[:-1]
elif (s.endswith("G")):
multiplier = 1000 * 1000 * 1000
s = s[:-1]
else:
multiplier = 1
try: return int(s) * multiplier
except ValueError: return int(ceil(float(s) * multiplier))
if __name__ == "__main__": main()