Skip to content
Navigation Menu
{{ message }}
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathsubmission.py
More file actions
310 lines (249 loc) · 10.4 KB
/
Copy pathsubmission.py
File metadata and controls
310 lines (249 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# -*- coding: utf-8 -*-
import os
import re
import copy
import inspect
try:
import readline
except ImportError:
import pyreadline as readline
from .logging import log
def rlinput(prompt, prefill=''):
readline.set_startup_hook(lambda: readline.insert_text(prefill))
try:
return input(prompt)
finally:
readline.set_startup_hook()
class SubmissionAttributeError(Exception):
pass
re_frender = re.compile("^_render_(?=[a-z_]*$)")
cat_map = {}
class RegisteringType(type):
def __init__(cls, name, bases, attrs):
cls.registry = copy.deepcopy(getattr(cls, 'registry',
{'mappers': {}, 'types': {}}))
if hasattr(cls, '_cat_id'):
if cls._cat_id not in cat_map:
cat_map[cls._cat_id] = cls
def add_mapper(f, ff, fft):
log.debug("{} adding mapper {} for {} ({})",
cls.__name__, f, ff, fft)
if f in cls.registry:
log.warning("Overwriting {} for class {} with {} "
"(previous value: {})", f, name, ff,
cls.registry['mappers'][f])
cls.registry['mappers'][f] = ff
cls.registry['types'][ff] = fft
# get form_field mappers from dunder string
form_field_mappers = getattr(cls, '__form_fields__', {})
for field, (form_field, form_field_type) in form_field_mappers.items():
add_mapper(field, form_field, form_field_type)
for key, val in attrs.items():
try:
form_field, form_field_type = getattr(val, 'form_field')
except AttributeError:
pass # most attributes are not a form_field mapper
else:
field, n = re.subn(re_frender, '', key)
assert n == 1 # only then is it a field renderer
add_mapper(field, form_field, form_field_type)
# get fields that need finalization
if getattr(val, 'needs_finalization', False):
field, n = re.subn(re_frender, '', key)
assert n == 1 # only then is it a field renderer
cls._to_finalize = getattr(cls, '_to_finalize', []) + [field]
form_field_types = {'text', 'checkbox', 'file'} # todo select
def form_field(field, type='text'):
def decorator(f):
f.form_field = (field, type)
return f
return decorator
def finalize(f):
f.needs_finalization = True
return f
class CachedRenderer(object):
def __init__(self, **kwargs):
log.debug("Creating cached renderer {}", kwargs)
self.fields = kwargs
self.depends_on = {}
def __getitem__(self, field):
# todo: better way to track dependencies. explicit @requires decorator?
try:
# get first calling field
caller = next(level[3] for level in inspect.stack()
if level[3].startswith('_render_'))
except StopIteration:
pass
else:
caller, n = re.subn(re_frender, '', caller, count=1)
if n: # called by another cached field
self.depends_on[field] = self.depends_on.setdefault(
field, set()) | {caller}
log.debug('Adding {} dependency {} -> {}',
type(self).__name__, caller, field)
try:
return self.fields[field]
except KeyError:
try:
field_renderer = getattr(self, '_render_' + field)
except AttributeError:
raise SubmissionAttributeError(
self.__class__.__name__ + " does not contain or "
"has no rules to generate field '" + field + "'")
log.debug('Rendering field {}[\'{}\']', type(self).__name__, field)
rv = field_renderer()
self.fields[field] = rv
return rv
def __setitem__(self, key, value):
self.invalidate_field_cache(key)
self.fields[key] = value
def invalidate_field_cache(self, field):
try:
dependent_fields = self.depends_on.pop(field)
except KeyError:
pass
self.fields.pop(field, None) and log.debug(
'del inval leaf {}', field)
else:
for f in dependent_fields:
self.invalidate_field_cache(f)
self.fields.pop(field, None) and log.debug(
'del inval node {}', field)
def build_payload(fd_val, form_field, fft):
# it's either a form field id
if isinstance(form_field, str):
if fft == 'text':
yield 'data', form_field, fd_val
elif fft == 'checkbox' and fd_val:
yield 'data', form_field, 'on'
elif fft == 'file':
yield 'files', form_field, (os.path.basename(fd_val),
open(fd_val, 'rb'),
'application/octet-stream')
# or a rule to generate form field ids
elif callable(form_field):
for i, val in enumerate(fd_val):
for pair in build_payload(
val, form_field(i, val), fft):
yield pair # yield from
else:
raise AssertionError(form_field, fd_val)
def toposort(depends_on):
depends_on = copy.deepcopy(depends_on)
sorted_funcs = []
depends = (set(f for v in depends_on.values() for f in v) -
set(depends_on.keys()))
for d in depends:
depends_on[d] = set()
ready_funcs = set(func for func, deps in depends_on.items() if not deps)
while ready_funcs:
executed = ready_funcs.pop()
depends_on.pop(executed)
sorted_funcs.append(executed)
from_selection = [func for func, deps in depends_on.items()
if executed in deps]
for func in from_selection:
depends_on[func].remove(executed)
if not depends_on[func]:
ready_funcs.add(func)
if depends_on:
raise Exception("Cyclic dependencies present: {}".format(
depends_on))
else:
return sorted_funcs
class Submission(CachedRenderer, metaclass=RegisteringType):
def __repr__(self):
return "\n".join(
["Field {k}:\n\t{v}\n".format(k=k, v=v)
for k, v in list(self.fields.items())])
@finalize
def _render_submit(self):
# todo dict map field names
# todo truncate long fields in preview
return self.show_fields(list(self.registry['mappers'].keys()))
def _finalize_submit(self):
return self.submit(self['payload'])
def needs_finalization(self):
return set(self._to_finalize) & set(self.fields.keys())
def finalize(self):
needs_finalization = self.needs_finalization()
order = toposort(self.depends_on)
needs_finalization = sorted(needs_finalization,
key=lambda x: order.index(x),
reverse=True)
for f in needs_finalization:
self[f] = getattr(self, '_finalize_' + f)()
setattr(self, 'finalized', None)
@staticmethod
def submit(payload):
raise NotImplementedError
def show_fields(self, fields):
def format_val(val):
if isinstance(val, str) and os.path.exists(val):
s = 'file://' + str(val)
elif isinstance(val, list) or isinstance(val, tuple):
s = "\n".join(format_val(v) for v in val)
else:
s = val
log.debug("No rule for formatting {} {}", type(val), val)
return str(s)
consolewidth = 80
s = ""
for field in fields:
val = self[field]
field_str = field
if field in self._to_finalize and not hasattr(self, 'finalized'):
field_str += " (will be finalized)"
s += (" " + field_str + " ").center(consolewidth, "=") + "\n"
s += format_val(val) + "\n"
s += "="*consolewidth + "\n"
return s
def confirm_finalization(self, fields):
# todo: disable editing on certain fields, e.g. those dependent on
# fields that require finalization
print(self.show_fields(fields))
while True:
print("Reminder: YOU are responsible for following the "
"submission rules!")
choice = input('Finalize these values? This will upload or '
'submit all necessary data. [y/n] ')
if not choice:
pass
elif choice.lower() == 'n':
amend = input("Amend a field? [N/<field name>] ")
if not amend.lower() or amend.lower() == 'n':
return False
try:
val = self[amend]
except SubmissionAttributeError:
print("No field named", amend)
print("Choices are:", list(self.fields.keys()))
else:
if not (isinstance(val, str) or
isinstance(val, bool) or
isinstance(val, int)):
print("Can't amend value of type", type(val))
continue
new_value = rlinput("New (empty to cancel): ", val)
if new_value:
if isinstance(val, bool):
string_true = {'true', 'True', 'y', 'yes'}
string_false = {'false', 'False', 'n', 'no'}
assert new_value in string_true | string_false
new_value = (new_value not in string_false)
elif isinstance(val, int):
new_value = int(new_value)
self[amend] = new_value
print(self.show_fields(fields))
elif choice.lower() == 'y':
return True
def _render_payload(self):
# must be rendered directly from editable fields
payload = {'files': {}, 'data': {}}
for fd_name, form_field in self.registry['mappers'].items():
fd_val = self[fd_name]
fft = self.registry['types'][form_field]
# todo: handle input types
for req_type, ff, val in build_payload(fd_val, form_field, fft):
payload[req_type][ff] = val
return payload
You can’t perform that action at this time.
