icp/src/runner/tasks.py

288 lines
8.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from runner.celery import app
import subprocess
import os
import sys
import shutil
import json
import importlib.machinery
# To conserve resources consider using the ignore result decorator, when the result is not used by someone else
#@app.task(ignore_result=True)
###############################################################################
# Settings
###############################################################################
WORK_DIR = os.path.expanduser(os.environ["WORKER_DIR"])
OER_FORMATS_PATH = os.path.expanduser(os.environ["WORKER_OER_DIR"] + "/exercise-formats/")
###############################################################################
def load_handler(path):
handler = {'name': None, 'loader': None, 'mod': None}
for root, dirs, files in os.walk(path):
#print(root, file=sys.stderr)
#print(dirs, file=sys.stderr)
#print(files, file=sys.stderr)
if 'controller.py' in files:
print("INFO: controller.py present for '%s'!" % path, file=sys.stderr)
try:
#name = root.replace(path, '')
name = path
loader = importlib.machinery.SourceFileLoader(name, root + '/controller.py')
mod = loader.load_module()
handler = {'name': name, 'loader': loader, 'mod': mod}
except:
print("WARNING: Loading controller failed for '%s'!" % path, file=sys.stderr)
else:
print("INFO: No controller found for '%s'!" % path, file=sys.stderr)
return handler
class Job(object):
"""
The job class for tasks implements some default behaivior for every
format and tries to call special event handlers when they are defined
for the formats or exercises.
The lifecycle of a job in its current form is as follows:
1) class initialisation
2) import_handlers try to find handler by looking for controller.py in format and exercise
3) unpack
4) build: compile, default by calling make
5) execute: run the code
6) grade: run provided test scripts
7) response: provide output, msg and grade
"""
def __init__(self, job, celery_task=None):
print("Job.__init__()", file=sys.stderr)
self.data = job
self.work_path = WORK_DIR + '/' + str(job['_id']) + '/'
self.handler_format = {'name': None, 'loader': None, 'mod': None}
self.handler_exercise = {'name': None, 'loader': None, 'mod': None}
self.build_result = None
self.execute_result = None
self.grade_result = 'FAIL'
self.response_data = {}
# allow controllers to manipulate celery task
# e.g. to set meta data after submitting slurm jobs
self.celery_task = celery_task
def import_handlers(self):
print("Job.init_handlers()", file=sys.stderr)
# find exercise specific and default handler for exercise format
self.handler_format = load_handler("%s/%s" % (OER_FORMATS_PATH, self.data['type']))
self.handler_exercise = load_handler( self.data['source_path'])
if self.handler_format != None:
print("INFO: Format handler found ;", file=sys.stderr)
if self.handler_exercise != None:
print("INFO: Exercise handlers found ;", file=sys.stderr)
def extract(self):
print("Job.extract()", file=sys.stderr)
# creates workdir and files
print(self.data['source_path'], file=sys.stderr)
print(self.work_path, file=sys.stderr)
#os.makedirs(self.work_path)
src = self.data['source_path']
dst = self.work_path
shutil.copytree(src, dst)
self.sourcefile = self.work_path + 'program.c'
self.programfile = self.work_path + 'program'
# overwrite default with user submission
source = open(self.sourcefile, 'w')
source.write(self.data['solution'])
source.close()
pass
def build(self):
print("Job.build()", file=sys.stderr)
# Try to call type specific handler or fallback to default action
if 'build' in dir(self.handler_exercise['mod']):
self.build_result = self.handler_exercise['mod'].build(self)
elif 'build' in dir(self.handler_format['mod']):
self.build_result = self.handler_format['mod'].build(self)
else:
print("INFO: build() handler not found!", file=sys.stderr)
self.build_result = False
def execute(self):
print("Job.execute()", file=sys.stderr)
# Try to call type specific handler or fallback to default action
if 'execute' in dir(self.handler_exercise['mod']):
self.execute_result = self.handler_exercise['mod'].execute(self)
elif 'execute' in dir(self.handler_format['mod']):
self.execute_result = self.handler_format['mod'].execute(self)
else:
print("INFO: execute() handler not found!", file=sys.stderr)
self.execute_result = False
def grade(self):
print("Job.grade()", file=sys.stderr)
# Try to call type specific handler or fallback to default action
if 'grade' in dir(self.handler_exercise['mod']):
print("Job.grade() exercise", file=sys.stderr)
self.grade_result = self.handler_exercise['mod'].grade(self)
elif 'grade' in dir(self.handler_format['mod']):
print("Job.grade() format", file=sys.stderr)
self.grade_result = self.handler_format['mod'].grade(self)
else:
print("INFO: grade() handler not found!", file=sys.stderr)
#self.grade_result = 'PASS' # default to PASS so the course does not break?
self.grade_result = 'FAIL' # default to FAIL so student can not proceed in case there was a minor error with the course
def response(self):
print("Job.response()", file=sys.stderr)
result = None
# Try to call type specific handler or fallback to default action
if 'response' in dir(self.handler_exercise['mod']):
print("Job.response() exercise", file=sys.stderr)
print(self.handler_exercise, file=sys.stderr)
return self.handler_exercise['mod'].response(self)
elif 'response' in dir(self.handler_format['mod']):
print("Job.response() format", file=sys.stderr)
print(self.handler_format, file=sys.stderr)
return self.handler_format['mod'].response(self)
else:
print("INFO: response() handler not found!", file=sys.stderr)
output = ""
grade = self.grade_result
msg = ""
data = self.response_data
return {'output': output, 'grade': grade, 'msg': msg, 'data': data}
def clean(self):
print("Job.cleanup()", file=sys.stderr)
shutil.rmtree(self.work_path)
pass
def store_result(self):
# into file
self.jobfile = self.work_path + 'job.json'
# overwrite default with user submission
fp = open(self.jobfile, 'w')
json.dump({"job": self.data, "response": self.response_data}, fp)
fp.close()
# into mongodb
def get_file_content(self, filename):
try:
with open(self.work_path + '/' + filename, 'r') as f:
content = f.read().strip()
return content
except:
print('WARNING: Failed to read "%s", serving empty string.' % (filename), file=sys.stderr)
return ""
@app.task(time_limit=20)
def timelimited_task():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
@app.task
def compile():
return True
@app.task(bind=True)
def compile_and_run(self, data):
print(data, file=sys.stderr)
print(self.request.id, file=sys.stderr)
print(data['type'], file=sys.stderr)
print(data['action'], file=sys.stderr)
print(data['source_path'], file=sys.stderr)
data['_id'] = self.request.id
# create job object and call the different handlers
j = Job(data, celery_task=self)
j.import_handlers()
if data['action'] not in ["quiz"]:
j.extract()
j.build()
if data['action'] in ["test", "execute", "run"]:
j.execute()
if data['action'] in ["grade", "quiz"]:
j.grade()
print("Result prepare")
ret = j.response()
j.store_result()
print("Result sent!")
return ret
# self.update_state(state='SUBMITTED TO SLURM', meta={})
# Convienence task definitions for programming assignments
@app.task
def compile_run_and_grade():
# start the celeryropriete subtasks
return True