Skip to content
Navigation Menu
{{ message }}
forked from pythonql/pythonql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathExecutor.py
More file actions
616 lines (510 loc) · 17.8 KB
/
Copy pathExecutor.py
File metadata and controls
616 lines (510 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
from pythonql.algebra.operator import plan_from_list
from pythonql.algebra.operators import *
from pythonql.PQTuple import PQTuple
from pythonql.helpers import flatten
from pythonql.Rewriter import rewrite
from pythonql.debug import Debug
import json
import types
def make_pql_tuple(vals,lcs):
t = []
als = []
for v in vals:
t.append(eval(v[0],lcs,globals()))
alias = v[1] if v[1] else v[0]
als.append(alias)
schema = {n:i for (i,n) in enumerate(als)}
return PQTuple(t,schema)
def str_dec(string):
res = ""
prev_slash = False
for ch in string:
if ch == chr(92):
if not prev_slash:
prev_slash = True
else:
res += ch
prev_slash = False
else:
prev_slash = False
res += ch
return res
# isList predicate for path expressions
def isList(x):
return (hasattr(x,'__iter__') and not
hasattr(x,'keys') and not
isinstance(x,str))
# isMap predicate for path expression
def isMap(x):
return hasattr(x,'keys')
# Implement a child step on some collection or map
def PQChildPath (coll,f,lcs):
f = eval(str_dec(f), globals(), lcs) if f!='_' else None
if isList(coll):
for i in flatten(coll):
if isMap(i):
for j in i.keys():
if f is None:
yield i[j]
elif f and j==f:
yield i[j]
if isMap(coll):
for i in coll.keys():
if f is None:
yield coll[i]
elif f and i==f:
yield coll[i]
class map_tuple:
def __init__(self,key,value):
self.key = key
self.value = value
def __repr__(self):
return ("<" + repr(self.key) + ":" + repr(self.value) + ">")
# Implement a descendents path on some collection or map
def PQDescPath(coll,f,lcs):
f = eval(f,globals(),lcs) if f!='_' else None
stack = []
if isList(coll):
stack = [i for i in flatten(coll)]
elif isMap(coll):
stack = [map_tuple(k,v) for (k,v) in coll.items()]
while stack:
i = stack.pop()
if isinstance(i,map_tuple):
if f is None:
yield i.value
elif f and i.key==f:
yield i.value
i = i.value
if isList(i):
it = iter(i)
frst = next(it)
[stack.append(j) for j in it]
if isList(frst):
stack.extend([ci for ci in frst])
elif isMap(frst):
stack.extend([map_tuple(k,v) for (k,v) in frst.items()])
elif isMap(i):
keys = list(i.keys())
[stack.append(map_tuple(j,i[j])) for j in keys]
def PQTry( try_expr, except_expr, lcs, globs):
try_expr = str_dec(try_expr)
except_expr = str_dec(except_expr)
try:
return eval(try_expr,lcs,globs)
except:
return eval(except_expr,lcs,globs)
# create a table with an empty tuple
def emptyTuple(schema):
return PQTuple([None] * len(schema), schema)
# Execute the query
def PyQuery( clauses, prior_locs, prior_globs, returnType ):
data = []
data.append( emptyTuple([]) )
clauses = list(clauses)
clauses.reverse()
plan = plan_from_list(clauses)
plan = rewrite(plan, prior_locs)
if Debug().print_optimized:
print("Rewritten query:",plan)
data = plan.execute(data, prior_locs, prior_globs)
if returnType == "gen":
return data
elif returnType == "list":
return list(data)
elif returnType == "set":
return set(data)
else:
return dict(data)
# Process Select clause
# We still keep that feature of generating tuples for now
def processSelectClause(c, table, prior_lcs, prior_globs):
# If this is a list/set comprehension:
if c.expr:
# Compile the expression:
e = compile(c.expr.lstrip(), '<string>','eval')
for t in table:
lcs = dict(prior_lcs)
lcs.update(t.getDict())
yield eval(e,prior_globs,lcs)
else:
k_expr = compile(c.key_expr.lstrip(),'<string>','eval')
v_expr = compile(c.value_expr.lstrip(),'<string>','eval')
for t in table:
lcs = prior_lcs
lcs.update(t.getDict())
k = eval(k_expr,prior_globs,lcs)
v = eval(v_expr,prior_globs,lcs)
yield (k,v)
# Process the for clause. This clause creates a cartesian
# product of the input table with new sequence
def processForClause(c, table, prior_lcs, prior_globs):
new_schema = None
comp_expr = compile(c.expr.lstrip(), "<string>", "eval")
for t in table:
if not new_schema:
new_schema = dict(t.schema)
for (i,v) in enumerate(c.vars):
new_schema[v] = len(t.schema) + i
lcs = dict(prior_lcs)
lcs.update(t.getDict())
vals = eval(comp_expr, prior_globs, lcs)
if len(c.vars) == 1:
for v in vals:
new_t_data = list(t.tuple)
new_t_data.append(v)
new_t = PQTuple(new_t_data, new_schema)
yield new_t
else:
for v in vals:
unpack_expr = "[ %s for %s in [ __v ]]" % (
'(' + ",".join(c.vars) + ')', c.unpack)
unpacked_vals = eval(unpack_expr, prior_globs, {'__v':v})
new_t_data = list(t.tuple)
for tv in unpacked_vals[0]:
new_t_data.append(tv)
new_t = PQTuple(new_t_data, new_schema)
yield new_t
# Process the let clause. Here we just add a variable to each
# input tuple
def processLetClause(c, table, prior_lcs, prior_globs):
comp_expr = compile(c.expr.lstrip(), "<string>", "eval")
new_schema = None
for t in table:
if not new_schema:
new_schema = dict(t.schema)
for (i,v) in enumerate(c.vars):
new_schema[v] = len(t.schema) + i
lcs = dict(prior_lcs)
lcs.update(t.getDict())
v = eval(comp_expr, prior_globs, lcs)
if len(c.vars) == 1:
t.tuple.append(v)
new_t = PQTuple( t.tuple, new_schema )
yield new_t
else:
unpack_expr = "[ %s for %s in [ __v ]]" % (
'(' + ",".join(c.vars) + ')', c.unpack)
unpacked_vals = eval(unpack_expr, prior_globs, {'__v':v})
new_t_data = list(t.tuple)
for tv in unpacked_vals[0]:
new_t_data.append(tv)
new_t = PQTuple(new_t_data, new_schema)
yield new_t
# Process a join
def processJoin(c, table, prior_lcs, prior_globs, left_arg, right_arg):
new_schema = None
left_conds = c.left_conds
right_conds = c.right_conds
join_type = 'nl'
dir = 'right'
if c.hint:
join_type = c.hint['join_type']
dir = c.hint['dir']
if dir == 'left':
left_arg,right_arg = right_arg,left_arg
r_init_data = []
r_init_data.append( emptyTuple([]) )
# Build an index on the right relation, if we're doing
# an index join.
index = None
if join_type == 'index':
index = {}
r_data = r_init_data
r_data = right_arg.execute(r_data, prior_lcs)
for t in r_data:
index_tuple = []
for rcond in right_conds:
lcs = dict(prior_lcs)
lcs.update(t.getDict())
rcond_val = eval(rcond, prior_globs, lcs)
index_tuple.append( rcond_val )
index_tuple = tuple(index_tuple)
if not index_tuple in index:
index[ index_tuple ] = []
index[ index_tuple ].append( t )
# Iterate over the tuples of the left relation and
# compute the tuple of condition vars
table = left_arg.execute(table, prior_lcs, prior_globs)
for t in table:
cond_tuple = []
for lcond in left_conds:
lcs = dict(prior_lcs)
lcs.update(t.getDict())
lcond_val = eval(lcond, prior_globs, lcs)
cond_tuple.append( lcond_val )
cond_tuple = tuple(cond_tuple)
if index:
if cond_tuple in index:
for t2 in index[cond_tuple]:
if not new_schema:
new_schema = dict(t.schema)
for i,_ in enumerate(t2):
v = [x for x in t2.schema.items() if x[1]==i][0][0]
new_schema[v] = len(new_schema) + i
new_t_data = list(t.tuple)
new_t_data += list(t2.tuple)
new_t = PQTuple(new_t_data, new_schema)
yield new_t
else:
continue
else:
r_data = r_init_data
r_data = right_arg.execute(r_data, prior_lcs, prior_globs)
for t2 in r_data:
rcond_tuple = []
for rcond in right_conds:
lcs = dict(prior_lcs)
lcs.update(t2.getDict())
rcond_val = eval(rcond, prior_globs, lcs)
rcond_tuple.append( rcond_val )
rcond_tuple = tuple(rcond_tuple)
if cond_tuple == rcond_tuple:
if not new_schema:
new_schema = dict(t.schema)
for i,_ in enumerate(t2):
v = [x for x in t2.schema.items() if x[1]==i][0][0]
new_schema[v] = len(new_schema) + i
new_t_data = list(t.tuple)
new_t_data += list(t2.tuple)
new_t = PQTuple(new_t_data, new_schema)
yield new_t
# Process the match claise
def processMatchClause(c, table, prior_lcs, prior_globs):
clause_expr = compile(c.expr, "<string>", "eval")
# Fetch and compile all expressions in the
# pattern match clause
e_patterns = []
patterns = list(c.pattern)
while patterns:
p = patterns.pop()
if 'expr_cond' in p:
e_patterns.append(p)
if 'pattern' in p:
patterns.append(p['pattern'])
for ep in e_patterns:
ep['expr_cond'] = compile(ep["expr_cond"], "<string>", "eval")
new_schema = None
for t in table:
if not new_schema:
new_schema = dict(t.schema)
for (i,v) in enumerate(c.vars):
new_schema[v] = len(t.schema) + i
lcs = dict(prior_lcs)
lcs.update(t.getDict())
vals = eval(clause_expr, prior_globs, lcs)
for v in vals:
if not hasattr(v, '__contains__'):
continue
new_t_data = list(t.tuple) + [None]*len(c.vars)
new_t = PQTuple(new_t_data, new_schema)
if match_pattern(c.pattern, c.exact, v, new_t, lcs, prior_globs):
yield new_t
def match_pattern(ps, isExact, v, new_t, lcs, prior_globs):
all_heads = []
for p in [x for x in ps if 'match' in x]:
match = p['match'][1:-1]
all_heads.append(match)
if match not in v:
return False
if 'const_cond' in p:
if v[match] != p['const_cond'][1:-1]:
return False
if 'bind_to' in p:
new_t[p['bind_to']] = v[match]
lcs.update({p['bind_to']:v[match]})
if 'expr_cond' in p:
val = eval(p['expr_cond'], prior_globs, lcs)
if not val:
return False
if 'pattern' in p:
if not match_pattern(p['pattern'], isExact, v[match], new_t, lcs, prior_globs):
return False
if isExact and any([x for x in v if x not in all_heads]):
return False
bind_parent = next((x for x in ps if 'bind_parent_to' in x), None)
if bind_parent:
new_t[bind_parent['bind_parent_to']] = v
lcs.update({bind_parent['bind_parent_to']:v})
return True
# Process the count clause. Similar to let, but simpler
def processCountClause(c, table, prior_lcs, prior_globs):
new_schema = None
for (i,t) in enumerate(table):
if not new_schema:
new_schema = dict(t.schema)
new_schema[c.var] = len(t.schema)
new_t = PQTuple( t.tuple + [i], new_schema )
yield new_t
# Process the group-by
def processGroupByClause(c, table, prior_lcs, prior_globs):
gby_aliases = [g if isinstance(g,str) else g[1]
for g in c.groupby_list]
gby_exprs = [g if isinstance(g,str) else g[0]
for g in c.groupby_list]
comp_exprs = [compile(e,'<string>','eval') for e in gby_exprs]
grp_table = {}
schema = None
# Group tuples in a hashtable
for t in table:
if not schema:
schema = t.schema
lcs = dict(prior_lcs)
lcs.update(t.getDict())
# Compute the key
k = tuple( [eval(e,prior_globs,lcs) for e in comp_exprs] )
if not k in grp_table:
grp_table[k] = []
grp_table[k].append(t)
if not grp_table:
return
yield
# Construct the new table
# Non-key variables
non_key_vars = [v for v in schema if not v in gby_aliases ]
new_schema = {v:i for (i,v) in enumerate( gby_aliases + non_key_vars )}
for k in grp_table:
t = PQTuple([None]*len(new_schema), new_schema)
#Copy over the key
for (i,v) in enumerate(gby_aliases):
t[v] = k[i]
#Every other variable (not in group by list) is turned into a lists
#First create empty lists
for v in non_key_vars:
t[v] = []
# Now fill in the lists:
for part_t in grp_table[k]:
for v in non_key_vars:
t[v].append( part_t[v] )
yield t
# Process where clause
def processWhereClause(c, table, prior_lcs, prior_globs):
comp_expr = compile(c.expr.lstrip(),"<string>","eval")
for t in table:
lcs = dict(prior_lcs)
lcs.update(t.getDict())
val = eval(comp_expr, prior_globs, lcs)
if val:
yield t
# Process the orderby clause
def processOrderByClause(c, table, prior_lcs, prior_globs):
# Here we do n sorts, n is the number of sort specifications
# For each sort we first need to compute a sort value (could
# be some expression)
sort_exprs = [ compile(os[0].lstrip(),"<string>","eval") for os in c.orderby_list]
sort_rev = [ o[1]=='desc' for o in c.orderby_list]
def computeSortSpec(tup,sort_spec):
lcs = dict(prior_lcs)
lcs.update(tup.getDict())
return eval(sort_spec, prior_globs, lcs)
sort_exprs.reverse()
sort_rev.reverse()
if isinstance(table,types.GeneratorType):
table = list(table)
for (i,e) in enumerate(sort_exprs):
table.sort( key = lambda x: computeSortSpec(x,e),
reverse= sort_rev[i])
for t in table:
yield t
# Create the set of variables for a new window
# This is the full set just for convienience, the
# query might not use all of these vars.
# The names of the variables coincide with the
# names in the specification of window clause
def make_window_vars():
return {"s_curr":None, "s_at":None, "s_prev":None, "s_next":None,
"e_curr":None, "e_at":None, "e_prev":None, "e_next":None}
# Start variables from a list of variables
all_start_vars = ["s_curr","s_at","s_prev","s_next"]
# Fill in the start vars of the window, given the value list and current index
def fill_in_start_vars(vars, binding_seq, i ):
vars["s_curr"] = binding_seq[i]
vars["s_at"] = i
vars["s_prev"] = binding_seq[i-1] if i>0 else None
vars["s_next"] = binding_seq[i+1] if i+1<len(binding_seq) else None
# Fill in the end vars of the window, given the values list and current index
def fill_in_end_vars(vars, binding_seq, i ):
vars["e_curr"] = binding_seq[i]
vars["e_at"] = i
vars["e_prev"] = binding_seq[i-1] if i>0 else None
vars["e_next"] = binding_seq[i+1] if i+1<len(binding_seq) else None
# Check the start condition of the window, i.e. whether we should
# start a new window at this location (without considering tumbling
# windows, that check is done elsewhere).
def check_start_condition(all_vars,clause,locals,prior_globs,var_mapping):
# we just need to evaluate the when expression
# but we need to set up the vars correctly, respecting the visibility
# conditions
start_vars = set(all_start_vars).intersection(
set(var_mapping.keys()) )
start_bindings = { var_mapping[v] : all_vars[v] for v in start_vars }
# add the binding to the locals
locals.update( start_bindings )
#evaluate the when condition
return eval( clause.s_when, prior_globs, locals )
# Check the end condition of the window.
def check_end_condition(vars,clause,locals,prior_globs,var_mapping):
# If there is no 'when' clause, return False
if not clause.e_when:
return False
end_vars = set(vars.keys()).intersection( set(var_mapping.keys()))
end_binding = { var_mapping[v] : vars[v] for v in end_vars }
locals.update( end_binding )
res = eval( clause.e_when, prior_globs, locals)
return res
# Process window clause
def processWindowClause(c, table, prior_lcs, prior_globs):
schema = None
new_schema = None
# Create window variable name mapping
var_mapping = {}
for v in c.vars:
var_mapping[v] = c.vars[v]
for t in table:
if not schema:
schema = t.schema
# Create a new schema with window variables added
new_schema = dict(t.schema)
for v in c.vars:
new_schema[c.vars[v]] = len(new_schema)
lcs = dict(prior_lcs)
lcs.update(t.getDict())
# Evaluate the binding sequence
binding_seq = list(eval(c.binding_seq, prior_globs, lcs))
# Create initial window variables
# Initialize the windows
open_windows = []
closed_windows = []
# Iterate over the binding sequence
for (i,v) in enumerate(binding_seq):
# Try to open a new window
# in case of tumbling windows, only open a
# window if there are no open windows
if not c.tumbling or (c.tumbling and not open_windows):
vars = make_window_vars()
fill_in_start_vars(vars,binding_seq,i)
if check_start_condition(vars,c,dict(lcs),prior_globs,var_mapping):
open_windows.append( {"window":[], "vars":vars} )
new_open_windows = []
#update all open windows, close those that are finished
for w in open_windows:
# Add currnt value to the window
w["window"].append(v)
fill_in_end_vars(w["vars"],binding_seq,i)
if check_end_condition(w["vars"],c,dict(lcs),prior_globs,var_mapping):
closed_windows.append(w)
else:
new_open_windows.append(w)
open_windows = new_open_windows
#close or remove all remaining open windows
#if only is specified, we ignore non-closed windows
if not c.only:
closed_windows.extend(open_windows)
# create a new tuple by extending the tuple from previous clauses
# with the window variables, for each closed window
for w in closed_windows:
new_t = PQTuple( t.tuple + [None]*(len(new_schema)-len(schema)), new_schema)
new_t[ var_mapping["var"] ] = w["window"]
for v in [v for v in w["vars"].keys() if v in var_mapping]:
new_t[ var_mapping[v] ] = w["vars"][v]
yield new_t
You can’t perform that action at this time.
