pipeline package

Submodules

pipeline.classes module

AVI Pipeline Classes

Pipeline classes which extend luigi classes. To be used when defining an AVI pipeline (avi/tasks.py) Luigi features will be exposed, such as processing the pipeline on a compute cluster

@author Daniel Vagg @req: REQ-0008 @req: SOW-FUN-005 @comp: AVI Pipeline System @api: developer

class pipeline.classes.AviLocalTarget(path=None, format=None, is_tmp=False)[source]

Bases: luigi.file.LocalTarget

class pipeline.classes.AviParameter(default=<object object>, is_global=False, significant=True, description=None, config_path=None, positional=True, always_in_help=False, batch_method=None)[source]

Bases: luigi.parameter.Parameter

class pipeline.classes.AviTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Extension of luigi.Task to add request_id. request_id is used to track the task. Model object associated with the corresponding pipeline job

job_model = <pipeline.classes.AviParameter object>
static output()[source]

Used to specify the task output file.

Example:
>>> outputFile = AviParameter()
>>> def output(self):
>>>     return AviLocalTarget(os.path.join(
>>>     settings.OUTPUT_PATH, 'simulatedData_%s.vot' % self.outputFile
>>>     ))
request_id = <luigi.parameter.IntParameter object>
task_dependency(dep_task, **kwargs)[source]

Used to abstract the inclusion of request_id in additional tasks Also automatically finds attributes that were available in the associated AVI model, and includes them in the dependency as kwargs.

Example:
>>> def requires(self):
>>>     parameter_one = "sample"
>>>     return self.task_dependency(OtherTask)

Additional named parameters can also be passed from the dependent task:

>>> def requires(self):
>>>     return self.task_dependency(OtherTask, additional_parameter="parameter")
task_namespace = None

pipeline.events module

AVI Pipeline Events

A collection of functions used for triggering behaviour from pipeline events. E.g. if a task failed, recording the exception to the associated job instance.

@req: REQ-0033 @comp: AVI Pipeline System

pipeline.events.get_pipeline_from_task(task)[source]
pipeline.events.handle_dependency_found(task, dependency)[source]
pipeline.events.handle_dependency_present(task)[source]
pipeline.events.handle_failure(task, exception)[source]
pipeline.events.handle_success(task)[source]
pipeline.events.handle_task_started(task)[source]

pipeline.models module

@comp: AVI Pipeline System @req: REQ-0001 @req: SOW-FUN-005 @api: developer

These models are used to store data required by the AVI pipeline system.

class pipeline.models.AviJob(*args, **kwargs)[source]

Bases: django.db.models.base.Model

Parameters:
  • user (CharField) – User
  • request_id (OneToOneField to AviJobRequest) – Request
  • expected_runtime (IntegerField) – Expected runtime
  • resources_ram_mb (IntegerField) – Amount of RAM (M) to be allocated for the AviJob
  • resources_cpu_cores (IntegerField) – Number of CPU cores to be allocated to the AviJob
class Meta[source]
abstract = False
AviJob.generate_hash_output_path(*args, **kwargs)[source]

Returns a hash of input parameters to generate a unique output folder

AviJob.request

Accessor to the related object on the forward side of a many-to-one or one-to-one relation.

In the example:

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

child.parent is a ForwardManyToOneDescriptor instance.

AviJob.save(*args, **kwargs)[source]

Saves the AviJob model and instantiates an associated AviJobRequest which stores an associated pipeline state and job details

AviJob.time_to_completion(avi_model_name, *args, **kwargs)[source]
class pipeline.models.AviJobRequest(*args, **kwargs)[source]

Bases: django.db.models.base.Model

This class is used by the AviJob class to record the information required to process a pipeline, including tracking results, the user, and background worker identifiers.

This is impliclty required by the AviJob, which is extended in all user-defined AviJob models.

Parameters:
  • created (DateTimeField) – Created
  • job_id (AutoField) – Job id
  • task_name (CharField) – Task name
  • gavip_task_id (CharField) – Gavip task id
  • pipeline_state_id (OneToOneField to PipeState) – Pipeline state
  • result_path (CharField) – Result path
  • output_path (CharField) – Output path
  • avi_model_name (CharField) – Avi model name
exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception AviJobRequest.MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

AviJobRequest.abandon_job(reason)[source]
AviJobRequest.delete_job_task()[source]

Remove the job task for the backend worker

If running in standalone mode, this is a delete via Celery revoke Otherwise we send a message to the portal informing it that the task is to be removed

AviJobRequest.delete_outputs()[source]

Delete the jobs outputs First try to delete the result_path file Then try to delete the output_path directory

Both result_path and output_path are verified to exist within the OUTPUT_PATH specified by AVI settings (/data/output)

Note: If the output or result is shared with another job, it is not deleted. This preserves the integrity of other jobs. If the other jobs are deleted they will not find a matching record, and delete as expected.

AviJobRequest.delete_shared()[source]

Checks if the if deleted jobs have shared objects that need to be deleted by sending a request to the portal

AviJobRequest.demomodel_model

Accessor to the related object on the reverse side of a one-to-one relation.

In the example:

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

place.restaurant is a ReverseOneToOneDescriptor instance.

AviJobRequest.dummymodel_model

Accessor to the related object on the reverse side of a one-to-one relation.

In the example:

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

place.restaurant is a ReverseOneToOneDescriptor instance.

AviJobRequest.get_absolute_url()[source]
AviJobRequest.get_job_model()[source]
AviJobRequest.get_job_parameters()[source]

Retrieve all non-AVI framework fields from a job model.

list
list of django.db.models.Field
AviJobRequest.get_next_by_created(*moreargs, **morekwargs)
AviJobRequest.get_previous_by_created(*moreargs, **morekwargs)
AviJobRequest.objects = <django.db.models.manager.Manager object>
AviJobRequest.pipeline_state

Accessor to the related object on the forward side of a many-to-one or one-to-one relation.

In the example:

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

child.parent is a ForwardManyToOneDescriptor instance.

AviJobRequest.state
AviJobRequest.submit_job(user, resources_ram_mb, resources_cpu_cores)[source]

@req: REQ-0001 @comp: AVI Pipeline System

Starts a queued pipeline job using the associated ID.

Calls a function to process the pipeline asynchronously. The job is stored in the message bus for the AVI until a worker is able to processs it.

See run_pipeline().

Warning

The parameters for these functions are being consolodidated for clarity

AviJobRequest.submit_job_locally(user, resources_ram_mb, resources_cpu_cores)[source]
AviJobRequest.submit_job_to_gavip(user, resources_ram_mb, resources_cpu_cores, retry=True)[source]

Submits the necessary parameters of a job to GAVIP so that it can be scheduled. The job is sent on a message bus which GAVIP is monitoring to create a GavipTask

If the job is not sent due to a “RecoverableConnectionError” (thrown by AMQP) we retry once. This is related to issues in the Celery stack observed in Python3.

class pipeline.models.PipeState(*args, **kwargs)[source]

Bases: django.db.models.base.Model

This model is included to record the progress of a pipeline.

The tasks_required/tasks_completed attributes are used to record the tasks required, and those completed, using their names separated by spaces. Progress is calculated automatically based on these fields.

for a pipeline to be processed.

See :class: AviJobRequest

Parameters:
  • id (AutoField) – Id
  • tasks_required (TextField) – Tasks required
  • tasks_completed (TextField) – Tasks completed
  • dependency_graph (TextField) – Dependency graph
  • task_active (CharField) – Task active
  • state (CharField) – State
  • progress (DecimalField) – Progress
  • exception (TextField) – Exception
  • started_time (DateTimeField) – Started time
  • last_activity_time (DateTimeField) – Last activity time
exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception PipeState.MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

PipeState.add_complete_task(taskname)[source]
PipeState.add_required_task(taskname)[source]
PipeState.get_complete_tasks()[source]
PipeState.get_next_by_last_activity_time(*moreargs, **morekwargs)
PipeState.get_pdg_graph()[source]
PipeState.get_percentage_complete()[source]
PipeState.get_previous_by_last_activity_time(*moreargs, **morekwargs)
PipeState.get_required_tasks()[source]
PipeState.get_sigma_graph()[source]
PipeState.ingest_task_dependency_graph(task)[source]

Here we convert the task and its dependencies into a (DAG)graph (DAG: Directed Acyclic Graph) This could be

PipeState.job_request

Accessor to the related object on the reverse side of a one-to-one relation.

In the example:

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

place.restaurant is a ReverseOneToOneDescriptor instance.

PipeState.objects = <django.db.models.manager.Manager object>
PipeState.save(*args, **kwargs)[source]
PipeState.update_job_progress()[source]

@req: REQ-0033 @comp: AVI Pipeline System

pipeline.serializers module

class pipeline.serializers.AviJobRequestDetailSerializer(instance=None, data=<class rest_framework.fields.empty>, **kwargs)[source]

Bases: rest_framework.serializers.ModelSerializer

class Meta[source]
fields = ('created', 'job_id', 'task_name', 'gavip_task_id', 'output_path', 'result_path', 'avi_model_name', 'state', 'progress', 'exception')
model

alias of AviJobRequest

class pipeline.serializers.AviJobRequestSerializer(instance=None, data=<class rest_framework.fields.empty>, **kwargs)[source]

Bases: rest_framework.serializers.ModelSerializer

class Meta[source]
fields = ('url', 'job_id', 'task_name', 'active_task', 'started', 'duration', 'state', 'progress', 'output_path', 'result_path')
model

alias of AviJobRequest

static AviJobRequestSerializer.get_duration(obj)[source]
class pipeline.serializers.PipeStateSerializer(instance=None, data=<class rest_framework.fields.empty>, **kwargs)[source]

Bases: rest_framework.serializers.ModelSerializer

class Meta[source]
fields = '__all__'
model

alias of PipeState

pipeline.tasks module

class pipeline.tasks.CeleryGavipTask[source]

Bases: celery.app.task.Task

on_failure(exc, task_id, args, kwargs, einfo)[source]
pipeline.tasks.create_pipeline_task(job, task_name)[source]
pipeline.tasks.on_failure(exc, task_id, args, kwargs, einfo)[source]

Module contents