First attempt to implement forkenter lambda server · open-lambda/open-lambda@d9f5775 · GitHub
Skip to content

Commit d9f5775

Browse files
committed
First attempt to implement forkenter lambda server
1 parent 7b77d44 commit d9f5775

13 files changed

Lines changed: 246 additions & 154 deletions

File tree

Makefile

Lines changed: 3 additions & 2 deletions

lambda/Dockerfile-fork

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
FROM ubuntu:trusty
2+
3+
RUN apt-get -y update
4+
RUN apt-get -y install wget apt-transport-https
5+
RUN apt-get -y install python python-pip build-essential
6+
RUN pip install --upgrade pip
7+
RUN pip install virtualenv
8+
RUN pip install rethinkdb
9+
RUN pip install tornado
10+
11+
# for getting namespace pid and forkenter
12+
COPY init /
13+
14+
CMD ["./init"]

lambda/Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
.PHONY: all, clean
2+
3+
all: nsmodule.c setup.py
4+
python setup.py build_ext --inplace
5+
6+
clean:
7+
rm -rf build/ ns.so

lambda/server.py

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
#!/usr/bin/python
2+
import ns
23
import traceback, json, socket, struct, os, sys, socket, threading
34
import rethinkdb
4-
import flask
55
import tornado.ioloop
66
import tornado.web
77
import tornado.httpserver
88
import tornado.netutil
9-
10-
sys.path.append('/handler')
11-
import lambda_func # assume submitted .py file is /handler/lambda_func
9+
import time
1210

1311
flask_app = flask.Flask(__name__)
14-
1512
PROCESSES_DEFAULT = 10
1613
PORT = 8080
1714
initialized = False
@@ -24,7 +21,6 @@ def init():
2421
if initialized:
2522
return
2623
sys.stdout = sys.stderr # flask supresses stdout :(
27-
config = json.loads(os.environ['ol.config'])
2824
if config.get('db', None) == 'rethinkdb':
2925
host = config.get('rethinkdb.host', 'localhost')
3026
port = config.get('rethinkdb.port', 28015)
@@ -38,14 +34,14 @@ def init():
3834
def flask_post(path):
3935
try:
4036
init()
41-
flask.request.get_data()
42-
data = flask.request.data
43-
try :
37+
data = flask.request.get_data()
38+
try:
4439
event = json.loads(data)
45-
except:
46-
return ('bad POST data: "%s"'%str(data), 400)
40+
except Exception:
41+
return ('bad POST data: "%s"' % str(data), 400)
4742
return json.dumps(lambda_func.handler(db_conn, event))
48-
except Exception:
43+
except Exception as e:
44+
print(e)
4945
return (traceback.format_exc(), 500) # internal error
5046

5147
class SockFileHandler(tornado.web.RequestHandler):
@@ -68,19 +64,64 @@ def post(self):
6864
(r".*", SockFileHandler),
6965
])
7066

71-
def main():
72-
config = json.loads(os.environ['ol.config'])
67+
def start_container(conf):
68+
sys.path.append('/handler')
69+
global lambda_func, config
70+
71+
import lambda_func # assume submitted .py file is /handler/lambda_func
72+
config = conf
7373

7474
if 'sock_file' in config:
75+
#f.write("listening socket\n")
7576
# listen on sock file with Tornado
7677
server = tornado.httpserver.HTTPServer(tornado_app)
7778
socket = tornado.netutil.bind_unix_socket('/host/' + config['sock_file'])
7879
server.add_socket(socket)
7980
tornado.ioloop.IOLoop.instance().start()
8081
else:
82+
#f.write("listening flask\n")
8183
# listen on port with Flask
8284
procs = config.get('processes', PROCESSES_DEFAULT)
8385
flask_app.run(processes=procs, host='0.0.0.0', port=PORT)
8486

87+
def listen(path):
88+
args = ""
89+
with open(path) as fifo:
90+
while True:
91+
data = fifo.read()
92+
if len(data) == 0:
93+
break
94+
args += data
95+
return args
96+
97+
def main():
98+
sys.stdout = sys.stderr
99+
if len(sys.argv) < 2:
100+
print("Usage: %s <fifo>" % sys.argv[0])
101+
sys.exit(1)
102+
103+
fifo = os.path.abspath(sys.argv[1])
104+
105+
#f = open('/tmp/log', 'w')
106+
while True:
107+
print("host listening")
108+
#f.write("host listening\n")
109+
pid, conf = listen(fifo).split(None, 1)
110+
conf = json.loads(conf)
111+
print("pid: %s\nconf: %s\n" % (pid, conf))
112+
#f.write("pid: %s\nconf: %s\n" % (pid, conf))
113+
114+
r = ns.forkenter(pid)
115+
# child escape
116+
if r == 0:
117+
print("forkentered")
118+
#f.write("forkentered\n")
119+
break
120+
121+
try:
122+
start_container(conf)
123+
except Exception as e:
124+
print(e)
125+
85126
if __name__ == '__main__':
86127
main()

lambda/setup.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from distutils.core import setup, Extension
2+
3+
setup(
4+
ext_modules=[Extension("ns", ["nsmodule.c"])]
5+
)

worker/namespace/parent.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
import os, sys, ns, json
2-
from subprocess import check_output
32

43
sys.path.append('/handler') # assume submitted .py file is /handler/lambda_func
54

65
def handler(args, path):
7-
import lambda_func
86
try:
97
ret = lambda_func.handler(None, json.loads(args))
108
except Exception as e:
119
ret = json.dumps({"error": "handler execution failed: %s" % str(e)})
1210

1311
with open(path, 'wb') as fifo:
14-
fifo.write(ret+'\n')
12+
fifo.write(ret)
1513

1614
def listen(path):
1715
args = ""
@@ -21,25 +19,30 @@ def listen(path):
2119
if len(data) == 0:
2220
break
2321
args += data
24-
2522
return args
2623

27-
def main(pid, inpath, outpath):
28-
cwd = os.getcwd()
24+
def main(hfifo, cfifo):
25+
# change to absolute path in case cwd changed during forkenter
26+
hfifo = os.path.abspath(hfifo)
27+
28+
# listen to forkenter request from worker
2929
while True:
30-
args = listen(inpath)
30+
pid = listen(hfifo)
3131
r = ns.forkenter(pid)
3232

3333
# child escapes
3434
if r == 0:
3535
break
36-
os.chdir(cwd)
3736

38-
handler(args, outpath)
37+
import lambda_func
38+
# listen to requests to run handler
39+
while True:
40+
args = listen(cfifo)
41+
handler(args, cfifo)
3942

4043
if __name__ == '__main__':
41-
if len(sys.argv) < 3:
42-
print('Usage: parent.py <ns_pid> <input_fifo> <output_fifo>')
44+
if len(sys.argv) < 2:
45+
print('Usage: parent.py <host_fifo> <container_fifo>')
4346
sys.exit(1)
4447
else:
45-
main(sys.argv[1], sys.argv[2], sys.argv[3])
48+
main(sys.argv[1], sys.argv[2])

worker/sandbox-manager/dockerManager.go

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,40 +31,14 @@ func NewDockerManager(opts *config.Config) (manager *DockerManager, err error) {
3131
}
3232

3333
func (dm *DockerManager) Create(name string, sandbox_dir string) (sb.Sandbox, error) {
34-
internalAppPort := map[docker.Port]struct{}{"8080/tcp": {}}
35-
portBindings := map[docker.Port][]docker.PortBinding{
36-
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "0"}}}
37-
38-
volumes := []string{fmt.Sprintf("%s:%s", sandbox_dir, "/host/")}
39-
40-
container, err := dm.client().CreateContainer(
41-
docker.CreateContainerOptions{
42-
Config: &docker.Config{
43-
Image: name,
44-
AttachStdout: true,
45-
AttachStderr: true,
46-
ExposedPorts: internalAppPort,
47-
Labels: dm.docker_labels(),
48-
Env: dm.env,
49-
},
50-
HostConfig: &docker.HostConfig{
51-
PortBindings: portBindings,
52-
PublishAllPorts: true,
53-
Binds: volumes,
54-
},
55-
},
56-
)
57-
58-
if err != nil {
59-
return nil, err
60-
}
61-
62-
nspid, err := dm.getNsPid(container)
63-
if err != nil {
64-
return nil, err
65-
}
66-
67-
sandbox := sb.NewDockerSandbox(name, sandbox_dir, nspid, container, dm.client())
34+
handler := filepath.Join(rm.handler_dir, name)
35+
volumes := []string{
36+
fmt.Sprintf("%s:%s", sandbox_dir, "/host/")}
37+
38+
sandbox, err := rm.create(name, sandbox_dir, name, volumes)
39+
if err != nil {
40+
return nil, err
41+
}
6842

6943
return sandbox, nil
7044
}

worker/sandbox-manager/dockerManagerBase.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,48 @@ type DockerManagerBase struct {
2828
env []string
2929
}
3030

31-
func (manager *DockerManagerBase) init(opts *config.Config) {
31+
func (dm *DockerManagerBase) init(opts *config.Config) {
3232
// NOTE: This requires a running docker daemon on the host
3333
if c, err := docker.NewClientFromEnv(); err != nil {
3434
log.Fatal("failed to get docker client: ", err)
3535
} else {
36-
manager.dClient = c
36+
dm.dClient = c
3737
}
38-
manager.env = []string{fmt.Sprintf("ol.config=%s", opts.SandboxConfJson())}
38+
dm.env = []string{fmt.Sprintf("ol.config=%s", opts.SandboxConfJson())}
3939

40-
manager.opts = opts
40+
dm.opts = opts
41+
}
42+
43+
func (dm *DockerManagerBase) create(name string, sandbox_dir string, image string, volumes []string) (sb.Sandbox, error) {
44+
internalAppPort := map[docker.Port]struct{}{"8080/tcp": {}}
45+
portBindings := map[docker.Port][]docker.PortBinding{
46+
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "0"}}}
47+
48+
container, err := dm.client().CreateContainer(
49+
docker.CreateContainerOptions{
50+
Config: &docker.Config{
51+
Image: image,
52+
AttachStdout: true, //TODO: why do we need these?
53+
AttachStderr: true,
54+
ExposedPorts: internalAppPort,
55+
Labels: dm.docker_labels(),
56+
Env: dm.env,
57+
},
58+
HostConfig: &docker.HostConfig{
59+
PortBindings: portBindings,
60+
PublishAllPorts: true,
61+
Binds: volumes,
62+
},
63+
},
64+
)
65+
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
sandbox := dm.NewDockerSandbox(name, sandbox_dir, container, rm.client(), rm.opts)
71+
72+
return sandbox, nil
4173
}
4274

4375
func (dm *DockerManagerBase) docker_labels() map[string]string {
@@ -47,10 +79,6 @@ func (dm *DockerManagerBase) docker_labels() map[string]string {
4779
return labels
4880
}
4981

50-
func (dm *DockerManagerBase) getNsPid(container *docker.Container) (int, error) {
51-
return 0, nil
52-
}
53-
5482
func (dm *DockerManagerBase) client() *docker.Client {
5583
return dm.dClient
5684
}

worker/sandbox-manager/localManager.go

Lines changed: 6 additions & 33 deletions

0 commit comments

Comments
 (0)