Документ взят из кэша поисковой машины. Адрес оригинального документа : http://kodomo.fbb.msu.ru/hg/allpy/file/61e5d8e146c7/allpy/fileio.py
Дата изменения: Unknown
Дата индексирования: Mon Feb 4 03:51:06 2013
Кодировка:
allpy: 61e5d8e146c7 allpy/fileio.py

allpy

view allpy/fileio.py @ 806:61e5d8e146c7

WARNING! Changed EOL formatting to UNIX everywhere! WARNING! To avoid such nasty commits in the future, add the following lines to your ~/.hgrc: [extensions] hgext.eol = Note: kodomo has this setting on by default. Please also note .hgeol file, that has rules for EOL conversion.
author Daniil Alexeyevsky <dendik@kodomo.fbb.msu.ru>
date Thu, 14 Jul 2011 14:28:32 +0400
parents d16e8559b6dd
children 4f896db3531d
line source
1 import os
2 from subprocess import Popen, PIPE
3 from tempfile import NamedTemporaryFile
4 import util
6 def get_markups_class(classname):
7 """This ugly helper is to avoid bad untimely import loops."""
8 import markups
9 return getattr(markups, classname)
11 class File(object):
12 """Automatical file IO."""
13 def __new__(cls, file, format="fasta", **kw):
14 if format == "fasta":
15 return FastaFile(file, **kw)
16 elif format == 'markup':
17 return MarkupFile(file, **kw)
18 elif format.startswith('markup:'):
19 subformat = format.split(':',1)[1]
20 return MarkupFile(file, format=subformat, **kw)
21 else:
22 return EmbossFile(file, format, **kw)
24 class AlignmentFile(object):
25 """Some helpers."""
27 def __init__(self, file, format='fasta', gaps='-', wrap_column=70):
28 self.file = file
29 self.format = format
30 self.gaps = gaps
31 self.wrap_column = wrap_column
33 def write_alignment(self, alignment):
34 """Append alignment to the file."""
35 self.write_strings(
36 (row, row.sequence.name, row.sequence.description)
37 for row in alignment.rows_as_strings(self.gaps)
38 )
40 def read_alignment(self, alignment):
41 """Read alignment from the file."""
42 append_row = alignment.append_row_from_string
43 for name, description, body in self.read_strings():
44 append_row(body, name, description, file.name, self.gaps)
46 class FastaFile(AlignmentFile):
47 """Fasta parser & writer."""
49 def write_string(self, string, name, description=''):
50 """Append one sequence to file."""
51 if description:
52 name += " " + description
53 self.file.write(">%s\n" % name)
54 if self.wrap_column:
55 while string:
56 self.file.write(string[:self.wrap_column]+"\n")
57 string = string[self.wrap_column:]
58 else:
59 self.file.write(string+"\n")
60 self.file.flush()
62 def write_strings(self, sequences):
63 """Write sequences to file.
65 Sequences are given as list of tuples (string, name, description).
66 """
67 for string, name, description in sequences:
68 self.write_string(string, name, description)
70 def read_parts(self):
71 """Read parts beginning with > in FASTA file.
73 This is a drop-in replacement for self.file.read().split("\n>")
74 It is required for markup format, which combines parts read with
75 different parsers. Python prohibits combining iterators and file.read
76 methods on the same file.
77 """
78 part = None
79 for line in self.file:
80 if line.startswith(">"):
81 if part: yield part
82 part = ""
83 part += line
84 if part: yield part
86 def read_strings(self):
87 for part in self.read_parts():
88 header, _, body = part.partition("\n")
89 header = header.lstrip(">")
90 name, _, description = header.partition(" ")
91 name = name.strip()
92 description = description.strip()
93 body = util.remove_each(body, " \n\r\t\v")
94 yield (name, description, body)
96 class MarkupFile(AlignmentFile):
97 """Parser & writer for our own marked alignment file format.
99 Marked alignment file consists of a list of records, separated with one or
100 more empty lines. Each record consists of type name, header and optional
101 contents. Type name is a line, containing just one word, describing the
102 record type. Header is a sequence of lines, each in format `key: value`.
103 Content, if present, is separated from header with an empty line.
105 Type names and header key names are case-insensitive.
107 Known record types now are:
109 - `alignment` -- this must be the last record in file for now
110 - `sequence_markup`
111 - `alignment_markup`
113 Example::
115 sequence_markup
116 sequence_name: cyb5_mouse
117 sequence_description:
118 name: pdb_residue_number
119 type: SequencePDBResidueNumberMarkup
120 markup: -,12,121,122,123,124,13,14,15,-,-,16
122 alignment_markup
123 name: geometrical_core
124 type: AlignmentGeometricalCoreMarkup
125 markup: -,-,-,-,+,+,+,-,-,-,+,+,-,-,-,-
127 alignment
128 format: fasta
130 > cyb5_mouse
131 seqvencemouse
132 """
134 _empty_line = ''
135 """Helper attribute for write_empty_line."""
137 def write_alignment(self, alignment):
138 """Write alignment to file."""
139 self.write_markups(alignment.markups, 'alignment_markup')
140 for sequence in alignment.sequences:
141 record = {
142 'sequence_name': sequence.name,
143 'sequence_description': sequence.description,
145 self.write_markups(sequence.markups, 'sequence_markup', record)
146 record = {'type': 'alignment', 'format': self.format}
147 self.write_record(record)
148 self.write_empty_line()
149 alignment.to_file(self.file, format=self.format, gap=self.gaps)
151 def write_markups(self, markups, type, pre_record={}):
152 """Write a dictionary of markups as series of records."""
153 for name, markup in markups.items():
154 record = markup.to_record()
155 record.update(pre_record)
156 record['type'] = type
157 record['name'] = name
158 record['class'] = markup.__class__.__name__
159 self.write_record(record)
161 def write_record(self, record):
162 """Write record to file. Add new line before every but first record."""
163 self.write_empty_line()
164 self.file.write('%s\n' % record['type'])
165 del record['type']
166 for key, value in record.items():
167 self.file.write('%s: %s\n' % (key, value))
169 def write_empty_line(self):
170 """Add empty line every time except the first call."""
171 self.file.write(self._empty_line)
172 self._empty_line = '\n'
174 def read_alignment(self, alignment):
175 """Read alignment from file."""
176 for record in list(self.read_records(alignment)):
177 handler = getattr(self, 'add_%s' % record['type'])
178 handler(alignment, record)
180 def add_sequence_markup(self, alignment, record):
181 """Found sequence markup record in file. Do something about it."""
182 for sequence in alignment.sequences:
183 if sequence.name == record['sequence_name']:
184 description = record.get('sequence_description')
185 if description:
186 assert sequence.description == description
187 cls = get_markups_class(record['class'])
188 cls.from_record(sequence, record, name=record.get('name'))
189 return
190 raise AssertionError("Could not find sequence in alignment")
192 def add_alignment_markup(self, alignment, record):
193 """Found alignment markup record in file. Do something about it."""
194 cls = get_markups_class(record['class'])
195 cls.from_record(alignment, record, name=record.get('name'))
197 def add_alignment(self, alignment, record):
198 """Found alignment record. It has been handled in read_payload."""
199 pass
201 def read_records(self, alignment):
202 """Read records and return them as a list of dicts."""
203 for line in self.file:
204 if line.strip() == "":
205 continue
206 yield self.read_record(alignment, line)
208 def read_record(self, alignment, type):
209 """Read record headers and record payload."""
210 type = type.strip().lower()
211 record = {'type': type}
212 for line in self.file:
213 if line.strip() == "":
214 self.read_payload(alignment, record, type)
215 return record
216 key, value = line.split(':', 1)
217 key = key.strip().lower()
218 value = value.strip()
219 record[key] = value
220 return record
222 def read_payload(self, alignment, record, type):
223 """Read record payload, if necessary."""
224 if type == 'alignment':
225 io = File(self.file, record.get('format', 'fasta'), gaps=self.gaps)
226 io.read_alignment(alignment)
228 class EmbossFile(AlignmentFile):
229 """Parser & writer for file formats supported by EMBOSS."""
231 def write_strings(self, sequences):
232 """Write sequences to file."""
233 pipe = Popen(['seqret', 'stdin', '%s::stdout' % self.format],
234 stdin=PIPE, stdout=PIPE
236 FastaFile(pipe.stdin).write_strings(self.fix_sequences(sequences))
237 pipe.stdin.close()
238 for line in pipe.stdout:
239 self.file.write(line)
241 def fix_sequences(self, sequences):
242 """EMBOSS does not permit : in file names. Fix sequences for that."""
243 for name, description, sequence in sequences:
244 yield name.replace(':', '_'), description, sequence
246 def read_strings(self):
247 """Read sequences from file."""
248 pipe = Popen(['seqret', '%s::stdin' % self.format, 'stdout'],
249 stdin=PIPE, stdout=PIPE
251 for line in self.file:
252 pipe.stdin.write(line)
253 pipe.stdin.close()
254 return FastaFile(pipe.stdout).read_strings()
256 # vim: set et ts=4 sts=4 sw=4: