Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/d60f272dc921/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:51:41 2013
Кодировка:
allpy: d60f272dc921 allpy/fileio.py

allpy

view allpy/fileio.py @ 842:d60f272dc921

blocks3d/wt: suggest better filenames for downloading If input alignment provided, name of input alignment file is concatenated with ".html" and used as name of output file. Otherwise "blocks3d.html" name is used
author boris (kodomo) <bnagaev@gmail.com>
date Wed, 20 Jul 2011 02:38:59 +0400
parents 4f896db3531d
children 6cc007e68af6
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 def get_markups_class(classname):
7 """This ugly helper is to avoid bad untimely import loops."""
8 import markups
9 return getattr(markups, classname)
11 class File(object):
12 """Automatical file IO."""
13 def __new__(cls, file, format="fasta", **kw):
14 if format == "fasta":
15 return FastaFile(file, **kw)
16 elif format == 'markup':
17 return MarkupFile(file, **kw)
18 elif format.startswith('markup:'):
19 subformat = format.split(':',1)[1]
20 return MarkupFile(file, format=subformat, **kw)
21 else:
22 return EmbossFile(file, format, **kw)
24 class AlignmentFile(object):
25 """Some helpers."""
27 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
28 self.file = file
29 self.format = format
30 self.gaps = gaps
31 self.wrap_column = wrap_column
33 def write_alignment(self, alignment):
34 """Append alignment to the file."""
35 self.write_strings(
36 (row, row.sequence.name, row.sequence.description)
37 for row in alignment.rows_as_strings(self.gaps)
38 )
40 def read_alignment(self, alignment):
41 """Read alignment from the file."""
42 append_row = alignment.append_row_from_string
43 source = getattr(self.file, 'name', '')
44 for name, description, body in self.read_strings():
45 append_row(body, name, description, source, self.gaps)
47 class FastaFile(AlignmentFile):
48 """Fasta parser & writer."""
50 def write_string(self, string, name, description=''):
51 """Append one sequence to file."""
52 if description:
53 name += " " + description
54 self.file.write(">%s\n" % name)
55 if self.wrap_column:
56 while string:
57 self.file.write(string[:self.wrap_column]+"\n")
58 string = string[self.wrap_column:]
59 else:
60 self.file.write(string+"\n")
61 self.file.flush()
63 def write_strings(self, sequences):
64 """Write sequences to file.
66 Sequences are given as list of tuples (string, name, description).
67 """
68 for string, name, description in sequences:
69 self.write_string(string, name, description)
71 def read_parts(self):
72 """Read parts beginning with > in FASTA file.
74 This is a drop-in replacement for self.file.read().split("\n>")
75 It is required for markup format, which combines parts read with
76 different parsers. Python prohibits combining iterators and file.read
77 methods on the same file.
78 """
79 part = None
80 for line in self.file:
81 if line.startswith(">"):
82 if part: yield part
83 part = ""
84 part += line
85 if part: yield part
87 def read_strings(self):
88 for part in self.read_parts():
89 header, _, body = part.partition("\n")
90 header = header.lstrip(">")
91 name, _, description = header.partition(" ")
92 name = name.strip()
93 description = description.strip()
94 body = util.remove_each(body, " \n\r\t\v")
95 yield (name, description, body)
97 class MarkupFile(AlignmentFile):
98 """Parser & writer for our own marked alignment file format.
100 Marked alignment file consists of a list of records, separated with one or
101 more empty lines. Each record consists of type name, header and optional
102 contents. Type name is a line, containing just one word, describing the
103 record type. Header is a sequence of lines, each in format `key: value`.
104 Content, if present, is separated from header with an empty line.
106 Type names and header key names are case-insensitive and '-' and '_' in
107 them are equivalent.
109 Known record types now are:
111 - `alignment` -- this must be the last record in file for now
112 - `sequence_markup`
113 - `alignment_markup`
115 Example::
117 sequence-markup
118 sequence-name: cyb5_mouse
119 sequence-description:
120 name: pdb_residue_number
121 type: SequencePDBResidueNumberMarkup
122 markup: -,12,121,122,123,124,13,14,15,-,-,16
124 alignment-markup
125 name: geometrical_core
126 type: AlignmentGeometricalCoreMarkup
127 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
129 alignment
130 format: fasta
132 > cyb5_mouse
133 seqvencemouse
134 """
136 _empty_line = ''
137 """Helper attribute for write_empty_line."""
139 def write_alignment(self, alignment):
140 """Write alignment to file."""
141 self.write_markups(alignment.markups, 'alignment_markup')
142 for sequence in alignment.sequences:
143 record = {
144 'sequence_name': sequence.name,
145 'sequence_description': sequence.description,
147 self.write_markups(sequence.markups, 'sequence_markup', record)
148 record = {'type': 'alignment', 'format': self.format}
149 self.write_record(record)
150 self.write_empty_line()
151 alignment.to_file(self.file, format=self.format, gap=self.gaps)
153 def write_markups(self, markups, type, pre_record={}):
154 """Write a dictionary of markups as series of records."""
155 for name, markup in markups.items():
156 record = markup.to_record()
157 record.update(pre_record)
158 record['type'] = type
159 record['name'] = name
160 record['class'] = markup.__class__.__name__
161 self.write_record(record)
163 def write_record(self, record):
164 """Write record to file. Add new line before every but first record."""
165 self.write_empty_line()
166 self.file.write('%s\n' % self.normalize('write', record['type']))
167 del record['type']
168 for key, value in record.items():
169 key = self.normalize('write', key)
170 self.file.write('%s: %s\n' % (key, value))
172 def write_empty_line(self):
173 """Add empty line every time except the first call."""
174 self.file.write(self._empty_line)
175 self._empty_line = '\n'
177 def read_alignment(self, alignment):
178 """Read alignment from file."""
179 for record in list(self.read_records(alignment)):
180 handler = getattr(self, 'add_%s' % record['type'])
181 handler(alignment, record)
183 def add_sequence_markup(self, alignment, record):
184 """Found sequence markup record in file. Do something about it."""
185 for sequence in alignment.sequences:
186 if sequence.name == record['sequence_name']:
187 description = record.get('sequence_description')
188 if description:
189 assert sequence.description == description
190 cls = get_markups_class(record['class'])
191 cls.from_record(sequence, record, name=record.get('name'))
192 return
193 raise AssertionError("Could not find sequence in alignment")
195 def add_alignment_markup(self, alignment, record):
196 """Found alignment markup record in file. Do something about it."""
197 cls = get_markups_class(record['class'])
198 cls.from_record(alignment, record, name=record.get('name'))
200 def add_alignment(self, alignment, record):
201 """Found alignment record. It has been handled in read_payload."""
202 pass
204 def read_records(self, alignment):
205 """Read records and return them as a list of dicts."""
206 for line in self.file:
207 if line.strip() == "":
208 continue
209 yield self.read_record(alignment, line)
211 def read_record(self, alignment, type):
212 """Read record headers and record payload."""
213 type = self.normalize('read', type)
214 record = {'type': type}
215 for line in self.file:
216 if line.strip() == "":
217 self.read_payload(alignment, record, type)
218 return record
219 key, value = line.split(':', 1)
220 key = self.normalize('read', key)
221 value = value.strip()
222 record[key] = value
223 return record
225 def read_payload(self, alignment, record, type):
226 """Read record payload, if necessary."""
227 if type == 'alignment':
228 io = File(self.file, record.get('format', 'fasta'), gaps=self.gaps)
229 io.read_alignment(alignment)
231 @staticmethod
232 def normalize(for_what, string):
233 if for_what == 'read':
234 return string.strip().replace('-', '_').lower()
235 if for_what == 'write':
236 return string.strip().replace('_', '-').capitalize()
238 class EmbossFile(AlignmentFile):
239 """Parser & writer for file formats supported by EMBOSS."""
241 def write_strings(self, sequences):
242 """Write sequences to file."""
243 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
244 stdin=PIPE, stdout=PIPE
246 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
247 pipe.stdin.close()
248 for line in pipe.stdout:
249 self.file.write(line)
251 def fix_sequences(self, sequences):
252 """EMBOSS does not permit : in file names. Fix sequences for that."""
253 for name, description, sequence in sequences:
254 yield name.replace(':', '_'), description, sequence
256 def read_strings(self):
257 """Read sequences from file."""
258 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
259 stdin=PIPE, stdout=PIPE
261 for line in self.file:
262 pipe.stdin.write(line)
263 pipe.stdin.close()
264 return FastaFile(pipe.stdout).read_strings()
266 # vim: set et ts=4 sts=4 sw=4: