Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/e83572fff43f/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:48:58 2013
Кодировка:
allpy: e83572fff43f allpy/fileio.py

allpy

view allpy/fileio.py @ 746:e83572fff43f

Roll-back a bug introduces by dirty hand-merge in [723]. (closes #74) (see #76) Boris! Please do not do dirty hand merges! If you did hg fetch here, this bug would not appear! Please, be extremely careful when you do hand merges and double-check your changes. Do a diff with each parent and see what you remove related to the parent! If someone else's code is involved in the merge (which is almost always the case), do that diff twice just to make sure you have not missed anything!
author Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru>
date Mon, 11 Jul 2011 14:29:54 +0400
parents 80043822a41e
children d16e8559b6dd
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 def get_markups_class(classname):
7 """This ugly helper is to avoid bad untimely import loops."""
8 import markups
9 return getattr(markups, classname)
11 class File(object):
12 """Automatical file IO."""
13 def __new__(cls, file, format="fasta", **kw):
14 if format == "fasta":
15 return FastaFile(file, **kw)
16 elif format == 'markup':
17 return MarkupFile(file, **kw)
18 elif format.startswith('markup:'):
19 subformat = format.split(':',1)[1]
20 return MarkupFile(file, format=subformat, **kw)
21 else:
22 return EmbossFile(file, format, **kw)
24 class AlignmentFile(object):
25 """Some helpers."""
27 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
28 self.file = file
29 self.format = format
30 self.gaps = gaps
31 self.wrap_column = wrap_column
33 def write_alignment(self, alignment):
34 """Append alignment to the file."""
35 self.write_strings(
36 (row, row.sequence.name, row.sequence.description)
37 for row in alignment.rows_as_strings(self.gaps)
38 )
40 def read_alignment(self, alignment):
41 """Read alignment from the file."""
42 append_row = alignment.append_row_from_string
43 for name, description, body in self.read_strings():
44 append_row(body, name, description, file.name, self.gaps)
46 class FastaFile(AlignmentFile):
47 """Fasta parser & writer."""
49 def write_string(self, string, name, description=''):
50 """Append one sequence to file."""
51 if description:
52 name += " " + description
53 self.file.write(">%s\n" % name)
54 if self.wrap_column:
55 while string:
56 self.file.write(string[:self.wrap_column]+"\n")
57 string = string[self.wrap_column:]
58 else:
59 self.file.write(string+"\n")
60 self.file.flush()
62 def write_strings(self, sequences):
63 """Write sequences to file.
65 Sequences are given as list of tuples (string, name, description).
66 """
67 for string, name, description in sequences:
68 self.write_string(string, name, description)
70 def read_strings(self):
71 for part in self.file.read().split("\n>"):
72 header, _, body = part.partition("\n")
73 header = header.lstrip(">")
74 name, _, description = header.partition(" ")
75 name = name.strip()
76 description = description.strip()
77 body = util.remove_each(body, " \n\r\t\v")
78 yield (name, description, body)
80 class MarkupFile(AlignmentFile):
81 """Parser & writer for our own marked alignment file format.
83 Marked alignment file consists of a list of records, separated with one or
84 more empty lines. Each record consists of type name, header and optional
85 contents. Type name is a line, containing just one word, describing the
86 record type. Header is a sequence of lines, each in format `key: value`.
87 Content, if present, is separated from header with an empty line.
89 Type names and header key names are case-insensitive.
91 Known record types now are:
93 - `alignment` -- this must be the last record in file for now
94 - `sequence_markup`
95 - `alignment_markup`
97 Example::
99 sequence_markup
100 sequence_name: cyb5_mouse
101 sequence_description:
102 name: pdb_residue_number
103 type: SequencePDBResidueNumberMarkup
104 markup: -,12,121,122,123,124,13,14,15,-,-,16
106 alignment_markup
107 name: geometrical_core
108 type: AlignmentGeometricalCoreMarkup
109 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
111 alignment
112 format: fasta
114 > cyb5_mouse
115 seqvencemouse
116 """
118 _empty_line = ''
119 """Helper attribute for write_empty_line."""
121 def write_alignment(self, alignment):
122 """Write alignment to file."""
123 self.write_markups(alignment.markups, 'alignment_markup')
124 for sequence in alignment.sequences:
125 record = {
126 'sequence_name': sequence.name,
127 'sequence_description': sequence.description,
129 self.write_markups(sequence.markups, 'sequence_markup', record)
130 record = {'type': 'alignment', 'format': self.format}
131 self.write_record(record)
132 self.write_empty_line()
133 alignment.to_file(self.file)
135 def write_markups(self, markups, type, pre_record={}):
136 """Write a dictionary of markups as series of records."""
137 for name, markup in markups.items():
138 record = markup.to_record()
139 record.update(pre_record)
140 record['type'] = type
141 record['name'] = name
142 record['class'] = markup.__class__.__name__
143 self.write_record(record)
145 def write_record(self, record):
146 """Write record to file. Add new line before every but first record."""
147 self.write_empty_line()
148 self.file.write('%s\n' % record['type'])
149 del record['type']
150 for key, value in record.items():
151 self.file.write('%s: %s\n' % (key, value))
153 def write_empty_line(self):
154 """Add empty line every time except the first call."""
155 self.file.write(self._empty_line)
156 self._empty_line = '\n'
158 def read_alignment(self, alignment):
159 """Read alignment from file."""
160 for record in list(self.read_records(alignment)):
161 handler = getattr(self, 'add_%s' % record['type'])
162 handler(alignment, record)
164 def add_sequence_markup(self, alignment, record):
165 """Found sequence markup record in file. Do something about it."""
166 for sequence in alignment.sequences:
167 if sequence.name == record['sequence_name']:
168 description = record.get('sequence_description')
169 if description:
170 assert sequence.description == description
171 cls = get_markups_class(record['class'])
172 cls.from_record(sequence, record, name=record.get('name'))
173 return
174 raise AssertionError("Could not find sequence in alignment")
176 def add_alignment_markup(self, alignment, record):
177 """Found alignment markup record in file. Do something about it."""
178 cls = get_markups_class(record['class'])
179 cls.from_record(alignment, record, name=record.get('name'))
181 def add_alignment(self, alignment, record):
182 """Found alignment record. It has been handled in read_payload."""
183 pass
185 def read_records(self, alignment):
186 """Read records and return them as a list of dicts."""
187 for line in self.file:
188 if line.strip() == "":
189 continue
190 yield self.read_record(alignment, line)
192 def read_record(self, alignment, type):
193 """Read record headers and record payload."""
194 type = type.strip().lower()
195 record = {'type': type}
196 for line in self.file:
197 if line.strip() == "":
198 self.read_payload(alignment, record, type)
199 return record
200 key, value = line.split(':', 1)
201 key = key.strip().lower()
202 value = value.strip()
203 record[key] = value
204 return record
206 def read_payload(self, alignment, record, type):
207 """Read record payload, if necessary."""
208 if type == 'alignment':
209 io = File(self.file, record.get('format', 'fasta'))
210 io.read_alignment(alignment)
212 class EmbossFile(AlignmentFile):
213 """Parser & writer for file formats supported by EMBOSS."""
215 def write_strings(self, sequences):
216 """Write sequences to file."""
217 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
218 stdin=PIPE, stdout=PIPE
220 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
221 pipe.stdin.close()
222 for line in pipe.stdout:
223 self.file.write(line)
225 def fix_sequences(self, sequences):
226 """EMBOSS does not permit : in file names. Fix sequences for that."""
227 for name, description, sequence in sequences:
228 yield name.replace(':', '_'), description, sequence
230 def read_strings(self):
231 """Read sequences from file."""
232 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
233 stdin=PIPE, stdout=PIPE
235 for line in self.file:
236 pipe.stdin.write(line)
237 pipe.stdin.close()
238 return FastaFile(pipe.stdout).read_strings()
240 # vim: set et ts=4 sts=4 sw=4: