-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspecies_markovi.py
executable file
·256 lines (206 loc) · 8.87 KB
/
species_markovi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from sys import stderr, stdout
import string
import argparse
import os.path
import numpy as np
from scipy.stats import poisson
VERBOSE=True
def muted(*args, **kwargs):
pass
def def_verbosity():
if VERBOSE:
return print
else:
return muted
print_if_verbose = def_verbosity()
# Learning
## Define how to recognize syllables:
# - [starting sequence of consonants]
# - sequence of vowels/diphtongs
# - [ending sequence of consonants]
# allow hyphen?
## Only make a letter-transition matrix
# Indexed using the ascii number - 97 (the minimum value: 'a')
# + word boundary state 26
# + letters with accents
## Fill syllable transition matrix (sparse matrix?)
# Create vector of learned syllables, and index the matrix with it.
# Generate random Genus + species
## What scales of information to encode?
# 1. letter succession
# 2. syllable succession (markov chain of order 3?)
# 3. etymological semantic units (needs dictionary for learning)
# 3. taxonomic rank (genus/species)
# 4. taxonomic abundance: if there are more rodent species, it would be nice to have per-clade transition matrices (new names should look like rodent names more frequently).
#
## What algorithmic solutions?
#
# - HHMM (Hierarchical HMM): how to define the higher level states?
# Etymological resource:
# https://en.wikipedia.org/wiki/List_of_commonly_used_taxonomic_affixes
# https://en.wikipedia.org/wiki/List_of_Latin_and_Greek_words_commonly_used_in_systematic_names
#def key_to_state()
#def iter_states(sequence)
def enumerate_chunks(sequence, n=1):
N = len(sequence)
i = 0
while i < N:
yield i, sequence[i:(i+n)]
i += n
# yield prev_state, state
def learn_letters(infile, maxwords=2, maxwordlen=100, order=1):
s = 97 # ascii start: letter 'a' will be 0.
a = 26 # length of the alphabet: ascii end - ascii start
valid_wordchars = string.ascii_letters
word_ends = set()
transition_counts = [np.zeros(((a+1)**order, a+1)) for _ in range(maxwords)]
word_lengths = np.zeros((maxwords, maxwordlen))
state_index_maker = (a+1)**np.arange(order) # represent number in base a.
with open(infile) as f:
for line in f:
line = line.lower()
word_i = -1
prev_start = 0 # for word length.
prev_states = ' ' * order
for state_i, state in enumerate(line):
prev_is = np.array([ord(ch) for ch in prev_states]) - s
j = ord(state) - s
nonword = (prev_is < 0) | (prev_is >= a)
if nonword.any():
prev_is[nonword] = a
# erase memory of previous word. It's a new chain now.
startpos = np.argmin(nonword)
prev_is[:startpos] = a
#prev_states = ' '*(startpos-1) + prev_states[startpos:]
prev_states = prev_states[1:] + state
# prev char not a word character
if not (0 <= prev_is[-1] < a):
# if state also not, then go to next char.
if not (0 <= j < a):
continue
# Otherwise, it means it's a new word, increment.
word_i += 1
# But skip words beyond the maximum.
if word_i>=maxwords:
break
if not (0 <= j < a):
j = a
word_ends.add(state)
word_len = min(state_i - prev_start, maxwordlen)
word_lengths[word_i, word_len] += 1
prev_states = ' ' * order
#print_if_verbose(word_i, char_i, repr(line[char_i]))
i = (state_index_maker * prev_is).sum()
if state_i == 0:
assert i == (a+1)**order - 1, "i: %d; %d; prev_is: %s" % (i, (a+1)**order, prev_is)
try:
transition_counts[word_i][i, j] += 1
except IndexError:
print('prev_is:', prev_is,
'nonword:', nonword,
'test prev char:', (not (0<=prev_is[-1]<a)),
'word_i:', (type(word_i), word_i),
'i:', (type(i), i),
'j:', (type(j), j), file=stderr)
raise
unknown_ends = word_ends - set(string.whitespace + string.punctuation)
if unknown_ends:
print("Warning: unknown characters used as word boundary: " + \
' '.join(repr(ch) for ch in unknown_ends), file=stderr)
P_word_len = word_lengths.cumsum(axis=1) / word_lengths.sum(axis=1, keepdims=True)
state_counts = [tc.sum(axis=1, keepdims=True) for tc in transition_counts]
# Study observed/non-observed states
states = ['']
for o in range(order):
states = [s + chr(97+x) if x<a else s + ' ' \
for x in range(a+1) for s in states]
states = np.array(states)
for sc in state_counts:
print_if_verbose('#Observed: %d; #Notseen (under): %d' % ((sc>0).sum(),
(sc==0).sum()))
print_if_verbose(np.array2string(states[sc[:,0] == 0]))
# End of check
print_if_verbose(state_counts)
eq_freqs = [sc / sc.sum() for sc in state_counts]
#Transition rate matrix
Qs = [tc / sc for tc,sc in zip(transition_counts, state_counts)]
return Qs, P_word_len
def generate_word_seq(Qs, a=26, s=97, rep=1, P_stop=None, order=1,
length_model=False):
# TODO: respect expected word lengths:
# - Draw a random word length from the empirical distribution.
# - Draw next letter conditionned on its possible ending of not.
assert all((np.inf not in Q) for Q in Qs)
if P_stop is None:
P_stop = np.stack([poisson.sf(np.arange(70, 10))] * len(Qs))
max_iter = 100
states = list(range(a+1))
letters = [chr(s+x) for x in states]
letters[-1] = ' '
output = [''] * len(Qs)
for word_i, Q in enumerate(Qs):
prev_states_is = [a] * order
prev_states_i = ((a+1) ** order - 1) #* np.ones(rep)
#print(a+1, order, prev_states_i, type(prev_states_i))
i = 0
while i < max_iter:
# Proba of next letter, with proba of word end dependent on position:
p = Q[prev_states_i, :]
if np.isnan(p).any():
assert np.isnan(p).all()
print("WARNING: Ended in a state with count zero:",
prev_states_is, prev_states_i,
"at letter", i, file=stderr)
output[word_i] += '.' # THIS SHOULD NOT HAPPEN.
break
if length_model:
prop_continue = p[:a].sum()
if prop_continue > 0:
p[:a] *= (1 - P_stop[word_i, i])/prop_continue
p[a] = P_stop[word_i, i]
else:
assert p[a] == 1
state = np.random.choice(states, p=p)
prev_states_is = prev_states_is[1:] + [state]
prev_states_i = ((a+1) ** np.arange(order) * np.array(prev_states_is)).sum()
if state < a:
print_if_verbose(letters[state], end='')
#stdout.flush()
output[word_i] += letters[state]
else:
print_if_verbose(' ', end='')
break
i += 1
return ' '.join(output)
def deterministic_formatting(strdata):
return strdata.capitalize()
def main(infile, rep=10, maxwords=2, order=1, length_model=False):
dbbase = os.path.splitext(os.path.basename(infile))[0]
end = '-o%d.npy' % order
dbfile = dbbase + end
if not os.path.exists(dbfile):
Qs, P_stop = learn_letters(infile, maxwords, order=order)
print_if_verbose(np.array2string(Qs[0], precision=2, max_line_width=180))
np.save(dbfile, Qs)
np.save(dbfile.replace(end, '-pstop' + end), P_stop)
else:
Qs = np.load(dbfile)
P_stop = np.load(dbfile.replace(end, '-pstop' + end))
for r in range(rep):
print(deterministic_formatting(
generate_word_seq(Qs, rep=rep, P_stop=P_stop, order=order,
length_model=length_model)))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('infile')
parser.add_argument('-r', '--rep', type=int, default=10)
parser.add_argument('-w', '--maxwords', type=int, default=2)
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('-o', '--order', type=int, default=1)
parser.add_argument('-l', '--length-model', action='store_true')
dictargs = vars(parser.parse_args())
VERBOSE = dictargs.pop('verbose')
print_if_verbose = def_verbosity()
main(**dictargs)