rev |
line source |
me@261
|
1 import sys |
bnagaev@357
|
2 import re |
me@261
|
3 |
me@315
|
4 import util |
bnagaev@418
|
5 import fileio |
me@260
|
6 |
dendik@382
|
7 # import this very module as means of having all related classes in one place |
dendik@382
|
8 import base |
dendik@382
|
9 |
me@306
|
10 default_gaps = set((".", "-", "~")) |
me@306
|
11 """Set of characters to recoginze as gaps when parsing alignment.""" |
me@306
|
12 |
me@328
|
13 class Monomer(object): |
me@328
|
14 """Monomer object.""" |
me@260
|
15 |
me@328
|
16 type = None |
me@328
|
17 """Either of 'dna', 'rna', 'protein'.""" |
me@260
|
18 |
dendik@382
|
19 types = base |
dendik@382
|
20 """Mapping of related types. SHOULD be redefined in subclasses.""" |
dendik@382
|
21 |
me@260
|
22 by_code1 = {} |
me@328
|
23 """A mapping from 1-letter code to Monomer subclass.""" |
me@328
|
24 |
me@260
|
25 by_code3 = {} |
me@328
|
26 """A mapping from 3-letter code to Monomer subclass.""" |
me@328
|
27 |
me@260
|
28 by_name = {} |
me@328
|
29 """A mapping from full monomer name to Monomer subclass.""" |
me@260
|
30 |
me@260
|
31 @classmethod |
me@328
|
32 def _subclass(cls, name='', code1='', code3='', is_modified=False): |
me@328
|
33 """Create new subclass of Monomer for given monomer type.""" |
me@328
|
34 class TheMonomer(cls): |
me@328
|
35 pass |
me@328
|
36 name = name.strip().capitalize() |
me@328
|
37 code1 = code1.upper() |
me@328
|
38 code3 = code3.upper() |
bnagaev@357
|
39 TheMonomer.__name__ = re.sub(r"[^\w]", "_", name) |
me@328
|
40 TheMonomer.name = name |
me@328
|
41 TheMonomer.code1 = code1 |
me@328
|
42 TheMonomer.code3 = code3 |
me@328
|
43 TheMonomer.is_modified = is_modified |
me@328
|
44 if not is_modified: |
me@328
|
45 cls.by_code1[code1] = TheMonomer |
me@328
|
46 cls.by_code3[code3] = TheMonomer |
me@328
|
47 cls.by_name[name] = TheMonomer |
me@328
|
48 # We duplicate distinguished long names into Monomer itself, so that we |
me@328
|
49 # can use Monomer.from_code3 to create the relevant type of monomer. |
me@328
|
50 Monomer.by_code3[code3] = TheMonomer |
me@328
|
51 Monomer.by_name[name] = TheMonomer |
me@260
|
52 |
me@328
|
53 @classmethod |
me@353
|
54 def _initialize(cls, codes=None): |
me@328
|
55 """Create all relevant subclasses of Monomer.""" |
bnagaev@378
|
56 for code1, is_modified, code3, name in codes: |
bnagaev@378
|
57 cls._subclass(name, code1, code3, is_modified) |
me@260
|
58 |
me@260
|
59 @classmethod |
me@260
|
60 def from_code1(cls, code1): |
me@328
|
61 """Create new monomer from 1-letter code.""" |
me@328
|
62 return cls.by_code1[code1.upper()]() |
me@260
|
63 |
me@260
|
64 @classmethod |
me@260
|
65 def from_code3(cls, code3): |
me@328
|
66 """Create new monomer from 3-letter code.""" |
me@328
|
67 return cls.by_code3[code3.upper()]() |
me@260
|
68 |
me@260
|
69 @classmethod |
me@260
|
70 def from_name(cls, name): |
me@328
|
71 """Create new monomer from full name.""" |
me@328
|
72 return cls.by_name[name.strip().capitalize()]() |
me@260
|
73 |
me@329
|
74 def __repr__(self): |
me@329
|
75 return '<Monomer %s>' % self.code3 |
me@329
|
76 |
me@329
|
77 def __str__(self): |
me@329
|
78 """Returns one-letter code""" |
me@329
|
79 return self.code1 |
me@329
|
80 |
me@260
|
81 def __eq__(self, other): |
me@328
|
82 """Monomers within same monomer type are compared by code1.""" |
me@328
|
83 assert self.type == other.type |
me@328
|
84 return self.code1 == other.code1 |
bnagaev@239
|
85 |
bnagaev@239
|
86 class Sequence(list): |
me@274
|
87 """Sequence of Monomers. |
bnagaev@243
|
88 |
me@274
|
89 This behaves like list of monomer objects. In addition to standard list |
me@274
|
90 behaviour, Sequence has the following attributes: |
me@270
|
91 |
me@274
|
92 * name -- str with the name of the sequence |
me@274
|
93 * description -- str with description of the sequence |
me@274
|
94 * source -- str denoting source of the sequence |
me@266
|
95 |
me@274
|
96 Any of them may be empty (i.e. hold empty string) |
me@274
|
97 """ |
me@270
|
98 |
dendik@382
|
99 types = base |
dendik@382
|
100 """Mapping of related types. SHOULD be redefined in subclasses.""" |
me@270
|
101 |
me@275
|
102 name = '' |
me@275
|
103 description = '' |
me@275
|
104 source = '' |
me@275
|
105 |
me@347
|
106 @classmethod |
me@347
|
107 def from_monomers(cls, monomers=[], name=None, description=None, source=None): |
me@347
|
108 """Create sequence from a list of monomer objecst.""" |
bnagaev@378
|
109 result = cls(monomers) |
me@275
|
110 if name: |
me@347
|
111 result.name = name |
me@275
|
112 if description: |
me@347
|
113 result.description = description |
me@275
|
114 if source: |
me@347
|
115 result.source = source |
me@347
|
116 return result |
me@347
|
117 |
me@347
|
118 @classmethod |
me@347
|
119 def from_string(cls, string, name='', description='', source=''): |
me@347
|
120 """Create sequences from string of one-letter codes.""" |
dendik@382
|
121 monomer = cls.types.Monomer.from_code1 |
me@347
|
122 monomers = [monomer(letter) for letter in string] |
me@347
|
123 return cls.from_monomers(monomers, name, description, source) |
me@270
|
124 |
me@329
|
125 def __repr__(self): |
me@329
|
126 return '<Sequence %s>' % str(self) |
me@329
|
127 |
me@262
|
128 def __str__(self): |
me@329
|
129 """Returns sequence of one-letter codes.""" |
me@275
|
130 return ''.join(monomer.code1 for monomer in self) |
me@270
|
131 |
me@316
|
132 def __hash__(self): |
me@316
|
133 """Hash sequence by identity.""" |
me@316
|
134 return id(self) |
me@316
|
135 |
bnagaev@396
|
136 def __eq__(self, other): |
bnagaev@396
|
137 """ equals operator by identity """ |
bnagaev@396
|
138 return id(self) == id(other) |
bnagaev@396
|
139 |
bnagaev@396
|
140 def __ne__(self, other): |
bnagaev@396
|
141 """ non-equals operator by identity """ |
bnagaev@396
|
142 return id(self) != id(other) |
bnagaev@396
|
143 |
me@295
|
144 class Alignment(object): |
me@295
|
145 """Alignment. It is a list of Columns.""" |
bnagaev@249
|
146 |
dendik@382
|
147 types = base |
dendik@382
|
148 """Mapping of related types. SHOULD be redefined in subclasses.""" |
me@288
|
149 |
me@289
|
150 sequences = None |
me@289
|
151 """Ordered list of sequences in alignment. Read, but DO NOT FIDDLE!""" |
bnagaev@249
|
152 |
me@287
|
153 def __init__(self): |
me@287
|
154 """Initialize empty alignment.""" |
me@287
|
155 self.sequences = [] |
me@295
|
156 self.columns = [] |
me@282
|
157 |
me@362
|
158 # Alignment grow & IO methods |
me@299
|
159 # ============================== |
me@299
|
160 |
me@294
|
161 def append_sequence(self, sequence): |
me@365
|
162 """Add sequence to alignment. Return self. |
me@294
|
163 |
me@294
|
164 If sequence is too short, pad it with gaps on the right. |
me@294
|
165 """ |
me@294
|
166 self.sequences.append(sequence) |
dendik@388
|
167 self._pad_to_width(len(sequence)) |
dendik@388
|
168 for column, monomer in zip(self.columns, sequence): |
dendik@388
|
169 column[sequence] = monomer |
me@365
|
170 return self |
me@294
|
171 |
me@364
|
172 def append_row_from_string(self, string, |
me@364
|
173 name='', description='', source='', gaps=default_gaps): |
me@364
|
174 """Add row from a string of one-letter codes and gaps. Return self.""" |
dendik@382
|
175 Sequence = self.types.Sequence |
me@349
|
176 without_gaps = util.remove_each(string, gaps) |
me@321
|
177 sequence = Sequence.from_string(without_gaps, name, description, source) |
dendik@388
|
178 self._pad_to_width(len(string)) |
dendik@388
|
179 non_gap_columns = [column |
dendik@390
|
180 for column, char in zip(self.columns, string) |
dendik@388
|
181 if char not in gaps |
dendik@388
|
182 ] |
dendik@388
|
183 for monomer, column in zip(sequence, non_gap_columns): |
dendik@388
|
184 column[sequence] = monomer |
me@287
|
185 self.sequences.append(sequence) |
me@364
|
186 return self |
me@287
|
187 |
dendik@388
|
188 def _pad_to_width(self, n): |
dendik@388
|
189 """Pad alignment with empty columns on the right to width n.""" |
dendik@388
|
190 for i in range(len(self.columns), n): |
me@302
|
191 self.columns.append(Column()) |
me@302
|
192 |
me@362
|
193 def append_file(self, file, format='fasta', gaps=default_gaps): |
me@365
|
194 """Append sequences from file to alignment. Return self. |
me@299
|
195 |
me@362
|
196 If sequences in file have gaps (detected as characters belonging to |
me@362
|
197 `gaps` set), treat them accordingly. |
me@362
|
198 """ |
bnagaev@418
|
199 sequences = [] |
bnagaev@418
|
200 if format == 'fasta': |
bnagaev@418
|
201 sequences = fileio.FastaIo(file).get_all_strings() |
bnagaev@418
|
202 elif format == 'msf': |
bnagaev@418
|
203 sequences = fileio.MsfIo(file).get_all_strings() |
bnagaev@418
|
204 else: |
bnagaev@418
|
205 raise Exception("We don't support other formats yet") |
bnagaev@418
|
206 for (name, description, body) in sequences: |
bnagaev@378
|
207 self.append_row_from_string(body, name, description, file.name, gaps) |
me@287
|
208 return self |
bnagaev@249
|
209 |
bnagaev@418
|
210 def to_file(self, file, format='fasta', gap='-'): |
me@292
|
211 """Write alignment in FASTA file as sequences with gaps.""" |
me@367
|
212 assert format == "fasta", "We don't support other formats yet" |
me@292
|
213 def char(monomer): |
me@292
|
214 if monomer: |
me@292
|
215 return monomer.code1 |
bnagaev@418
|
216 return gap |
bnagaev@418
|
217 if format == 'fasta': |
bnagaev@418
|
218 io = fileio.FastaIo(file) |
bnagaev@418
|
219 elif format == 'msf': |
bnagaev@418
|
220 io = fileio.MsfIo(file) |
bnagaev@418
|
221 else: |
bnagaev@418
|
222 raise Exception("We don't support other formats yet") |
me@292
|
223 for row in self.rows_as_lists(): |
me@292
|
224 seq = row.sequence |
me@292
|
225 line = "".join(map(char, row)) |
bnagaev@418
|
226 io.save_string(line, seq.name, seq.description) |
me@292
|
227 |
me@299
|
228 # Data access methods for alignment |
me@299
|
229 # ================================= |
me@299
|
230 |
me@299
|
231 def rows(self): |
me@299
|
232 """Return list of rows (temporary objects) in alignment. |
me@299
|
233 |
me@299
|
234 Each row is a dictionary of { column : monomer }. |
me@363
|
235 |
me@299
|
236 For gap positions there is no key for the column in row. |
me@299
|
237 |
me@299
|
238 Each row has attribute `sequence` pointing to the sequence the row is |
me@299
|
239 describing. |
me@299
|
240 |
me@299
|
241 Modifications of row have no effect on the alignment. |
me@299
|
242 """ |
me@299
|
243 # For now, the function returns a list rather than iterator. |
me@299
|
244 # It is yet to see, whether memory performance here becomes critical, |
me@299
|
245 # or is random access useful. |
me@299
|
246 rows = [] |
me@299
|
247 for sequence in self.sequences: |
me@299
|
248 row = util.UserDict() |
me@299
|
249 row.sequence = sequence |
me@299
|
250 for column in self.columns: |
me@299
|
251 if sequence in column: |
me@299
|
252 row[column] = column[sequence] |
me@299
|
253 rows.append(row) |
me@299
|
254 return rows |
me@299
|
255 |
me@299
|
256 def rows_as_lists(self): |
me@299
|
257 """Return list of rows (temporary objects) in alignment. |
me@299
|
258 |
me@299
|
259 Each row here is a list of either monomer or None (for gaps). |
me@299
|
260 |
me@299
|
261 Each row has attribute `sequence` pointing to the sequence of row. |
me@299
|
262 |
me@299
|
263 Modifications of row have no effect on the alignment. |
me@299
|
264 """ |
me@299
|
265 rows = [] |
me@299
|
266 for sequence in self.sequences: |
me@299
|
267 row = util.UserList() |
me@299
|
268 row.sequence = sequence |
me@299
|
269 for column in self.columns: |
me@299
|
270 row.append(column.get(sequence)) |
me@299
|
271 rows.append(row) |
me@299
|
272 return rows |
me@299
|
273 |
me@299
|
274 def columns_as_lists(self): |
me@299
|
275 """Return list of columns (temorary objects) in alignment. |
me@299
|
276 |
me@299
|
277 Each column here is a list of either monomer or None (for gaps). |
me@299
|
278 |
me@299
|
279 Items of column are sorted in the same way as alignment.sequences. |
me@299
|
280 |
me@299
|
281 Modifications of column have no effect on the alignment. |
me@299
|
282 """ |
me@299
|
283 columns = [] |
me@299
|
284 for column in self.columns: |
me@299
|
285 col = [] |
me@299
|
286 for sequence in self.sequences: |
me@299
|
287 col.append(column.get(sequence)) |
me@299
|
288 columns.append(col) |
me@299
|
289 return columns |
me@299
|
290 |
me@368
|
291 # Alignment / Block editing methods |
me@368
|
292 # ================================= |
me@368
|
293 |
me@368
|
294 def _flush_row(self, row, whence='left'): |
me@368
|
295 """Helper for `flush`: flush to one side all monomers in one row.""" |
me@368
|
296 row = filter(None, row) |
me@368
|
297 padding = [None] * len(self.columns) |
me@368
|
298 if whence == 'left': |
me@368
|
299 return row + padding |
me@368
|
300 if whence == 'right': |
me@368
|
301 return padding + row |
me@368
|
302 if whence == 'center': |
me@368
|
303 pad_len = (len(self.columns) - len(row)) // 2 |
me@368
|
304 # vvv fix padding for case when length is odd: better have more |
me@368
|
305 pad_len += len(self.columns) - 2 * pad_len |
me@368
|
306 padding = [None] * pad_len |
me@368
|
307 return padding + row + padding |
me@368
|
308 assert True, "whence must be either 'left' or 'right' or 'center'" |
me@368
|
309 |
me@368
|
310 def flush(self, whence='left'): |
me@368
|
311 """Remove all gaps from alignment and flush results to one side. |
me@368
|
312 |
me@368
|
313 `whence` must be one of 'left', 'right' or 'center' |
me@368
|
314 """ |
me@368
|
315 for row in self.rows_as_lists(): |
me@368
|
316 sequence = row.sequence |
me@368
|
317 row = self._flush_row(row, whence) |
me@368
|
318 for monomer, column in zip(row, self.columns): |
me@368
|
319 if monomer: |
me@368
|
320 column[sequence] = monomer |
me@368
|
321 elif sequence in column: |
me@368
|
322 del column[sequence] |
me@368
|
323 |
me@369
|
324 def remove_gap_columns(self): |
me@369
|
325 """Remove all empty columns.""" |
me@369
|
326 for n, column in reversed(enumerate(self.columns)): |
me@369
|
327 if column == {}: |
me@369
|
328 self.columns[n:n+1] = [] |
me@369
|
329 |
me@371
|
330 def _wipe(self): |
me@371
|
331 """Make all positions gaps (but keep sequences intact).""" |
me@371
|
332 for column in self.columns: |
dendik@412
|
333 for sequence in list(column): |
me@371
|
334 del column[sequence] |
me@371
|
335 |
me@372
|
336 def _merge(self, dst, new, merge): |
me@373
|
337 """Replace contents of `dst` with those of `new`. |
me@372
|
338 |
me@372
|
339 Replace contents of elements using function `merge(dst_el, new_le)`. |
me@372
|
340 """ |
bnagaev@384
|
341 for el, new_el in zip(dst, new): |
bnagaev@384
|
342 merge(el, new_el) |
me@372
|
343 dst[len(dst):] = new[len(dst):] |
me@372
|
344 del dst[len(new):] |
me@371
|
345 |
me@373
|
346 def _replace_sequence_contents(self, new, copy_descriptions): |
me@373
|
347 """Replace contents of sequences with those of `new` alignment.""" |
me@371
|
348 # XXX: we manually copy sequence contents here |
me@372
|
349 # XXX: we only copy, overlapping parts and link to the rest |
me@372
|
350 def merge_monomers(dst, new): |
me@372
|
351 dst.__class__ = new.__class__ |
me@372
|
352 def merge_sequences(dst, new): |
me@373
|
353 if copy_descriptions: |
me@373
|
354 vars(dst).update(vars(new)) |
me@372
|
355 self._merge(dst, new, merge_monomers) |
me@372
|
356 self._merge(self.sequences, new.sequences, merge_sequences) |
me@371
|
357 |
me@371
|
358 def _replace_column_contents(self, new): |
me@373
|
359 """Replace column contents with those of `new` alignment. |
me@371
|
360 |
me@373
|
361 Synonym: copy gap patterns from `new` to `self`. |
me@372
|
362 |
me@373
|
363 `self.sequences` and `new.sequences` should have the same contents. |
me@371
|
364 """ |
me@371
|
365 self._wipe() |
me@371
|
366 not_gap = lambda (a,b): a != None |
me@371
|
367 for sequence, new_row in zip(self.sequences, new.rows_as_lists()): |
me@371
|
368 assert len(sequence) == len(new_row.sequence) |
dendik@388
|
369 non_gap_columns = [column |
dendik@388
|
370 for column, monomer in zip(self.columns, new_row) |
dendik@388
|
371 if monomer |
dendik@388
|
372 ] |
dendik@388
|
373 for monomer, column in zip(sequence, non_gap_columns): |
dendik@388
|
374 column[sequence] = monomer |
me@371
|
375 |
me@373
|
376 def _replace_contents(self, new, copy_descriptions, copy_contents): |
me@371
|
377 """Replace alignment contents with those of other alignment.""" |
me@373
|
378 if copy_contents: |
me@373
|
379 self._replace_sequence_contents(new, copy_descriptions) |
bnagaev@378
|
380 self._replace_column_contents(new) |
me@371
|
381 |
me@373
|
382 def process(self, function, copy_descriptions=True, copy_contents=True): |
me@371
|
383 """Apply function to the alignment (or block); inject results back. |
me@371
|
384 |
me@373
|
385 - `function(block)` must return block with same line order. |
me@373
|
386 - if `copy_descriptions` is False, ignore new sequence names. |
me@373
|
387 - if `copy_contents` is False, don't copy sequence contents too. |
dendik@380
|
388 |
dendik@380
|
389 `function` (object) may have attributes `copy_descriptions` and |
dendik@380
|
390 `copy_contents`, which override the same named arguments. |
me@371
|
391 """ |
me@371
|
392 new = function(self) |
dendik@380
|
393 if hasattr(function, 'copy_descriptions'): |
dendik@380
|
394 copy_descriptions = function.copy_descriptions |
dendik@380
|
395 if hasattr(function, 'copy_contents'): |
dendik@380
|
396 copy_contents = function.copy_contents |
me@373
|
397 self._replace_contents(new, copy_descriptions, copy_contents) |
me@371
|
398 |
me@300
|
399 class Column(dict): |
me@300
|
400 """Column of alignment. |
me@300
|
401 |
me@300
|
402 Column is a dict of { sequence : monomer }. |
me@300
|
403 |
me@300
|
404 For sequences that have gaps in current row, given key is not present in |
me@300
|
405 the column. |
me@300
|
406 """ |
me@325
|
407 |
dendik@382
|
408 types = base |
dendik@382
|
409 """Mapping of related types. SHOULD be redefined in subclasses.""" |
dendik@382
|
410 |
me@325
|
411 def __hash__(self): |
me@325
|
412 """Return hash by identity.""" |
me@325
|
413 return id(self) |
me@300
|
414 |
bnagaev@396
|
415 def __eq__(self, other): |
bnagaev@396
|
416 """ equals operator by identity """ |
bnagaev@396
|
417 return id(self) == id(other) |
bnagaev@396
|
418 |
bnagaev@396
|
419 def __ne__(self, other): |
bnagaev@396
|
420 """ non-equals operator by identity """ |
bnagaev@396
|
421 return id(self) != id(other) |
bnagaev@396
|
422 |
me@317
|
423 class Block(Alignment): |
me@307
|
424 """Block of alignment. |
me@301
|
425 |
dendik@415
|
426 Block is an intersection of several rows & columns. (The collections of |
dendik@415
|
427 rows and columns are represented as ordered lists, to retain display order |
dendik@415
|
428 of Alignment or add ability to tweak it). Most of blocks look like |
dendik@415
|
429 rectangular part of alignment if you shuffle alignment rows the right way. |
me@261
|
430 """ |
me@270
|
431 |
me@307
|
432 alignment = None |
me@307
|
433 """Alignment the block belongs to.""" |
me@270
|
434 |
me@307
|
435 sequences = () |
me@307
|
436 """List of sequences in block.""" |
me@307
|
437 |
me@307
|
438 columns = () |
me@307
|
439 """List of columns in block.""" |
me@307
|
440 |
me@317
|
441 @classmethod |
me@317
|
442 def from_alignment(cls, alignment, sequences=None, columns=None): |
me@307
|
443 """Build new block from alignment. |
me@307
|
444 |
me@307
|
445 If sequences are not given, the block uses all sequences in alignment. |
me@307
|
446 |
me@307
|
447 If columns are not given, the block uses all columns in alignment. |
me@307
|
448 |
me@307
|
449 In both cases we use exactly the list used in alignment, thus, if new |
me@307
|
450 sequences or columns are added to alignment, the block tracks this too. |
me@261
|
451 """ |
me@307
|
452 if sequences is None: |
me@307
|
453 sequences = alignment.sequences |
me@318
|
454 if columns is None: |
me@307
|
455 columns = alignment.columns |
me@320
|
456 block = cls() |
me@320
|
457 block.alignment = alignment |
me@320
|
458 block.sequences = sequences |
me@320
|
459 block.columns = columns |
me@320
|
460 return block |
me@270
|
461 |
me@260
|
462 # vim: set ts=4 sts=4 sw=4 et: |