autosubmit.job#

Main module for Autosubmit. Only contains an interface class to all functionality implemented on Autosubmit

class autosubmit.job.job.Job(name, job_id, status, priority)#

Class to handle all the tasks with Jobs at HPC.

A job is created by default with a name, a jobid, a status and a type. It can have children and parents. The inheritance reflects the dependency between jobs. If Job2 must wait until Job1 is completed then Job2 is a child of Job1. Inversely Job1 is a parent of Job2

add_children(children)#

Add children for the job. It also adds current job as a parent for all the new children

Parameters:

children (list of Job objects) – job’s children to add

add_edge_info(parent, special_conditions)#

Adds edge information to the job

Parameters:
  • parent (Job) – parent job

  • special_conditions (dict) – special variables

add_parent(*parents)#

Add parents for the job. It also adds current job as a child for all the new parents

Parameters:

parents (Job) – job’s parents to add

calendar_chunk(parameters)#

Calendar for chunks

Parameters:

parameters

Returns:

calendar_split(as_conf, parameters)#

Calendar for splits :param parameters: :return:

check_completion(default_status=-1, over_wallclock=False)#

Check the presence of COMPLETED file. Change status to COMPLETED if COMPLETED file exists and to FAILED otherwise. :param over_wallclock: :param default_status: status to set if job is not completed. By default, is FAILED :type default_status: Status

check_end_time(fail_count=-1)#

Returns end time from stat file

Returns:

date and time

Return type:

str

check_retrials_end_time()#

Returns list of end datetime for retrials from total stats file

Returns:

date and time

Return type:

list[int]

check_retrials_start_time()#

Returns list of start datetime for retrials from total stats file

Returns:

date and time

Return type:

list[int]

check_running_after(date_limit)#

Checks if the job was running after the given date :param date_limit: reference date :type date_limit: datetime.datetime :return: True if job was running after the given date, false otherwise :rtype: bool

check_script(as_conf, parameters, show_logs='false')#

Checks if script is well-formed

Parameters:
  • parameters (dict) – script parameters

  • as_conf (AutosubmitConfig) – configuration file

  • show_logs (Bool) – Display output

Returns:

true if not problem has been detected, false otherwise

Return type:

bool

check_start_time(fail_count=-1)#

Returns job’s start time

Returns:

start time

Return type:

str

check_started_after(date_limit)#

Checks if the job started after the given date :param date_limit: reference date :type date_limit: datetime.datetime :return: True if job started after the given date, false otherwise :rtype: bool

property checkpoint#

Generates a checkpoint step for this job based on job.type.

property children#

Returns a list containing all children of the job

Returns:

child jobs

Return type:

set

property children_names_str#

Comma separated list of children’s names

property chunk#

Current chunk.

create_script(as_conf)#

Creates script file to be run for the job

Parameters:

as_conf (AutosubmitConfig) – configuration object

Returns:

script’s filename

Return type:

str

property custom_directives#

List of custom directives.

property delay#

Current delay.

property delay_retrials#

TODO

delete_child(child)#

Removes a child from the job

Parameters:

child (Job) – child to remove

delete_parent(parent)#

Remove a parent from the job

Parameters:

parent (Job) – parent to remove

property dependencies#

Current job dependencies.

property export#

TODO.

property fail_count#

Number of failed attempts to run this job.

property frequency#

TODO.

get_checkpoint_files()#

Check if there is a file on the remote host that contains the checkpoint

get_last_retrials() List[Union[datetime, str]]#

Returns the retrials of a job, including the last COMPLETED run. The selection stops, and does not include, when the previous COMPLETED job is located or the list of registers is exhausted.

Returns:

list of dates of retrial [submit, start, finish] in datetime format

Return type:

list of list

get_new_remotelog_name(count=-1)#

Checks if remote log file exists on remote host if it exists, remote_log variable is updated :param

has_children()#

Returns true if job has any children, else return false

Returns:

true if job has any children, otherwise return false

Return type:

bool

has_parents()#

Returns true if job has any parents, else return false

Returns:

true if job has any parent, otherwise return false

Return type:

bool

property hyperthreading#

Detects if hyperthreading is enabled or not.

inc_fail_count()#

Increments fail count

static is_a_completed_retrial(fields)#

Returns true only if there are 4 fields: submit start finish status, and status equals COMPLETED.

is_ancestor(job)#

Check if the given job is an ancestor :param job: job to be checked if is an ancestor :return: True if job is an ancestor, false otherwise :rtype bool

is_over_wallclock(start_time, wallclock)#

Check if the job is over the wallclock time, it is an alternative method to avoid platform issues :param start_time: :param wallclock: :return:

is_parent(job)#

Check if the given job is a parent :param job: job to be checked if is a parent :return: True if job is a parent, false otherwise :rtype bool

property long_name#

Job’s long name. If not set, returns name

Returns:

long name

Return type:

str

property member#

Current member.

property memory#

Memory requested for the job.

property memory_per_task#

Memory requested per task.

property name#

Current job full name.

property nodes#

Number of nodes that the job will use.

property packed#

TODO

property parents#

Returns parent jobs list

Returns:

parent jobs

Return type:

set

property partition#

Returns the queue to be used by the job. Chooses between serial and parallel platforms

:return HPCPlatform object for the job to use :rtype: HPCPlatform

property platform#

Returns the platform to be used by the job. Chooses between serial and parallel platforms

:return HPCPlatform object for the job to use :rtype: HPCPlatform

process_scheduler_parameters(as_conf, parameters, job_platform, chunk)#

Parsers yaml data stored in the dictionary and calculates the components of the heterogeneous job if any :return:

property processors#

Number of processors that the job will use.

property processors_per_node#

Number of processors per node that the job can use.

property queue#

Returns the queue to be used by the job. Chooses between serial and parallel platforms.

:return HPCPlatform object for the job to use :rtype: HPCPlatform

read_header_tailer_script(script_path: str, as_conf: AutosubmitConfig, is_header: bool)#

Opens and reads a script. If it is not a BASH script it will fail :(

Will strip away the line with the hash bang (#!)

Parameters:
  • script_path – relative to the experiment directory path to the script

  • as_conf – Autosubmit configuration file

  • is_header – boolean indicating if it is header extended script

remove_redundant_parents()#

Checks if a parent is also an ancestor, if true, removes the link in both directions. Useful to remove redundant dependencies.

property retrials#

Max amount of retrials to run this job.

retrieve_logfiles(platform, raise_error=False)#

Retrieves log files from remote host meant to be used inside a process. :param platform: platform that is calling the function, already connected. :param raise_error: boolean to raise an error if the logs are not retrieved :return:

property scratch_free_space#

Percentage of free space required on the scratch.

property sdate#

Current start date.

property section#

Type of the job, as given on job configuration file.

property shape#

Returns the shape of the job. Chooses between serial and parallel platforms

:return HPCPlatform object for the job to use :rtype: HPCPlatform

property split#

Current split.

property splits#

Max number of splits.

property status_str#

String representation of the current status

property synchronize#

TODO.

property tasks#

Number of tasks that the job will use.

property threads#

Number of threads that the job will use.

property total_processors#

Number of processors requested by job. Reduces ‘:’ separated format if necessary.

update_content(as_conf)#

Create the script content to be run for the job

Parameters:

as_conf (config) – config

Returns:

script code

Return type:

str

update_job_variables_final_values(parameters)#

Jobs variables final values based on parameters dict instead of as_conf This function is called to handle %CURRENT_% placeholders as they are filled up dynamically for each job

update_parameters(as_conf, parameters, default_parameters={'M': '%M%', 'M_': '%M_%', 'Y': '%Y%', 'Y_': '%Y_%', 'd': '%d%', 'd_': '%d_%', 'm': '%m%', 'm_': '%m_%'})#

Refresh parameters value

Parameters:
  • default_parameters (dict) –

  • as_conf (AutosubmitConfig) –

  • parameters (dict) –

update_status(as_conf, failed_file=False)#

Updates job status, checking COMPLETED file if needed

Parameters:
  • as_conf

  • failed_file – boolean, if True, checks if the job failed

Returns:

property wallclock#

Duration for which nodes used by job will remain allocated.

write_end_time(completed, enable_vertical_write=False, count=-1)#

Writes ends date and time to TOTAL_STATS file :param completed: True if job was completed successfully, False otherwise :type completed: bool

write_start_time(enable_vertical_write=False, from_stat_file=False, count=-1)#

Writes start date and time to TOTAL_STATS file :return: True if successful, False otherwise :rtype: bool

write_submit_time()#

Writes submit date and time to TOTAL_STATS file. It doesn’t write if hold is True.

write_total_stat_by_retries(total_stats, first_retrial=False)#

Writes all data to TOTAL_STATS file :param total_stats: data gathered by the wrapper :type total_stats: dict :param first_retrial: True if this is the first retry, False otherwise :type first_retrial: bool

class autosubmit.job.job.WrapperJob(name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config, hold)#

Defines a wrapper from a package.

Calls Job constructor.

Parameters:
  • name (String) – Name of the Package

  • job_id (Integer) – ID of the first Job of the package

  • status (String) – ‘READY’ when coming from submit_ready_jobs()

  • priority (Integer) – 0 when coming from submit_ready_jobs()

  • job_list (List() of Job() objects) – List of jobs in the package

  • total_wallclock (String Formatted) – Wallclock of the package

  • num_processors (Integer) – Number of processors for the package

  • platform (Platform Object. e.g. EcPlatform()) – Platform object defined for the package

  • as_config (AutosubmitConfig object) – Autosubmit basic configuration object

class autosubmit.job.job_common.StatisticsSnippetBash#

Class to handle the statistics snippet of a job. It contains header and tailer for local and remote jobs

class autosubmit.job.job_common.StatisticsSnippetEmpty#

Class to handle the statistics snippet of a job. It contains header and footer for local and remote jobs

class autosubmit.job.job_common.StatisticsSnippetPython(version='3')#

Class to handle the statistics snippet of a job. It contains header and tailer for local and remote jobs

class autosubmit.job.job_common.StatisticsSnippetR#

Class to handle the statistics snippet of a job. It contains header and tailer for local and remote jobs

class autosubmit.job.job_common.Status#

Class to handle the status of a job

class autosubmit.job.job_common.Type#

Class to handle the status of a job

autosubmit.job.job_common.increase_wallclock_by_chunk(current, increase, chunk)#

Receives the wallclock times an increases it according to a quantity times the number of the current chunk. The result cannot be larger than 48:00. If Chunk = 0 then no increment.

Parameters:
  • current (str) – WALLCLOCK HH:MM

  • increase (str) – WCHUNKINC HH:MM

  • chunk (int) – chunk number

Returns:

HH:MM wallclock

Return type:

str

autosubmit.job.job_common.parse_output_number(string_number)#

Parses number in format 1.0K 1.0M 1.0G

Parameters:

string_number (str) – String representation of number

Returns:

number in float format

Return type:

float

class autosubmit.job.job_list.JobList(expid, config, parser_factory, job_list_persistence, as_conf)#

Class to manage the list of jobs to be run by autosubmit

add_logs(logs)#

add logs to the current job_list :return: logs :rtype: dict(tuple)

add_special_conditions(job, special_conditions, filters_to_apply, parent)#

Add special conditions to the job edge :param job: Job :param special_conditions: dict :param filters_to_apply: dict :param parent: parent job :return:

backup_save()#

Persists the job list

check_checkpoint(job, parent)#

Check if a checkpoint step exists for this edge

check_scripts(as_conf)#

When we have created the scripts, all parameters should have been substituted. %PARAMETER% handlers not allowed

Parameters:

as_conf (AutosubmitConfig) – experiment configuration

check_special_status()#

Check if all parents of a job have the correct status for checkpointing :return: jobs that fullfill the special conditions

property expid#

Returns the experiment identifier

Returns:

experiment’s identifier

Return type:

str

find_and_delete_redundant_relations(problematic_jobs)#

Jobs with intrisic rules than can’t be safelty not added without messing other workflows. The graph will have the least amount of edges added as much as safely possible before this function. Structure: problematic_jobs structure is {section: {child_name: [parent_names]}}

Returns:

generate(as_conf, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, default_job_type, wrapper_jobs={}, new=True, run_only_members=[], show_log=True, monitor=False, force=False, create=False)#

Creates all jobs needed for the current workflow. :param as_conf: AutosubmitConfig object :type as_conf: AutosubmitConfig :param date_list: list of dates :type date_list: list :param member_list: list of members :type member_list: list :param num_chunks: number of chunks :type num_chunks: int :param chunk_ini: initial chunk :type chunk_ini: int :param parameters: parameters :type parameters: dict :param date_format: date format ( D/M/Y ) :type date_format: str :param default_retrials: default number of retrials :type default_retrials: int :param default_job_type: default job type :type default_job_type: str :param wrapper_jobs: wrapper jobs :type wrapper_jobs: dict :param new: new :type new: bool :param run_only_members: run only members :type run_only_members: list :param show_log: show log :type show_log: bool :param monitor: monitor :type monitor: bool

get_active(platform=None, wrapper=False)#

Returns a list of active jobs (In platforms queue + Ready)

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

active jobs

Return type:

list

get_all(platform=None, wrapper=False)#

Returns a list of all jobs

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

all jobs

Return type:

list

get_chunk_list()#

Get inner chunk list

Returns:

chunk list

Return type:

list

get_completed(platform=None, wrapper=False)#

Returns a list of completed jobs

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

completed jobs

Return type:

list

get_completed_without_logs(platform=None)#

Returns a list of completed jobs without updated logs

Parameters:

platform (HPCPlatform) – job platform

Returns:

completed jobs

Return type:

list

get_date_list()#

Get inner date list

Returns:

date list

Return type:

list

get_delayed(platform=None)#

Returns a list of delayed jobs

Parameters:

platform (HPCPlatform) – job platform

Returns:

delayed jobs

Return type:

list

get_failed(platform=None, wrapper=False)#

Returns a list of failed jobs

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

failed jobs

Return type:

list

get_finished(platform=None, wrapper=False)#

Returns a list of jobs finished (Completed, Failed)

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

finished jobs

Return type:

list

get_held_jobs(platform=None)#

Returns a list of jobs in the platforms (Held)

Parameters:

platform (HPCPlatform) – job platform

Returns:

jobs in platforms

Return type:

list

get_in_queue(platform=None, wrapper=False)#

Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown,Held)

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

jobs in platforms

Return type:

list

get_job_by_name(name)#

Returns the job that its name matches parameter name

Parameters:

name (str) – name to look for

Returns:

found job

Return type:

job

get_job_list()#

Get inner job list

Returns:

job list

Return type:

list

get_job_names(lower_case=False)#

Returns a list of all job names :param: lower_case: if true, returns lower case job names :type: lower_case: bool

Returns:

all job names

Return type:

list

Parameters:
  • two_step_start

  • select_jobs_by_name – job name

  • select_all_jobs_by_section – section name

  • filter_jobs_by_section – section, date , member? , chunk?

Returns:

jobs_list names

Return type:

list

get_jobs_by_section(section_list)#

Returns the job that its name matches parameter section :parameter section_list: list of sections to look for :type section_list: list :return: found job :rtype: job

get_logs()#

Returns a dict of logs by jobs_name jobs

Returns:

logs

Return type:

dict(tuple)

get_member_list()#

Get inner member list

Returns:

member list

Return type:

list

get_not_in_queue(platform=None, wrapper=False)#

Returns a list of jobs NOT in the platforms (Ready, Waiting)

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

jobs not in platforms

Return type:

list

get_ordered_jobs_by_date_member(section)#

Get the dictionary of jobs ordered according to wrapper’s expression divided by date and member

Returns:

jobs ordered divided by date and member

Return type:

dict

get_prepared(platform=None)#

Returns a list of prepared jobs

Parameters:

platform (HPCPlatform) – job platform

Returns:

prepared jobs

Return type:

list

get_queuing(platform=None, wrapper=False)#

Returns a list of jobs queuing

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

queuedjobs

Return type:

list

get_ready(platform=None, hold=False, wrapper=False)#

Returns a list of ready jobs

Parameters:
  • wrapper

  • hold

  • platform (HPCPlatform) – job platform

Returns:

ready jobs

Return type:

list

get_running(platform=None, wrapper=False)#

Returns a list of jobs running

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

running jobs

Return type:

list

get_skipped(platform=None)#

Returns a list of skipped jobs

Parameters:

platform (HPCPlatform) – job platform

Returns:

skipped jobs

Return type:

list

get_submitted(platform=None, hold=False, wrapper=False)#

Returns a list of submitted jobs

Parameters:
  • wrapper

  • hold

  • platform (HPCPlatform) – job platform

Returns:

submitted jobs

Return type:

list

get_suspended(platform=None, wrapper=False)#

Returns a list of jobs on unknown state

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

unknown state jobs

Return type:

list

get_uncompleted(platform=None, wrapper=False)#

Returns a list of completed jobs

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

completed jobs

Return type:

list

get_uncompleted_and_not_waiting(platform=None, wrapper=False)#

Returns a list of completed jobs and waiting

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

completed jobs

Return type:

list

get_unknown(platform=None, wrapper=False)#

Returns a list of jobs on unknown state

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

unknown state jobs

Return type:

list

get_unsubmitted(platform=None, wrapper=False)#

Returns a list of unsubmitted jobs

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

all jobs

Return type:

list

get_waiting(platform=None, wrapper=False)#

Returns a list of jobs waiting

Parameters:
  • wrapper

  • platform (HPCPlatform) – job platform

Returns:

waiting jobs

Return type:

list

get_waiting_remote_dependencies(platform_type='slurm')#

Returns a list of jobs waiting on slurm scheduler :param platform_type: platform type :type platform_type: str :return: waiting jobs :rtype: list

load(create=False, backup=False)#

Recreates a stored job list from the persistence

Returns:

loaded job list object

Return type:

JobList

static load_file(filename)#

Recreates a stored joblist from the pickle file

Parameters:

filename (str) – pickle file to load

Returns:

loaded joblist object

Return type:

JobList

property parameters#

List of parameters common to all jobs :return: parameters :rtype: dict

print_with_status(statusChange=None, nocolor=False, existingList=None)#

Returns the string representation of the dependency tree of the Job List

Parameters:
  • statusChange (List of strings) – List of changes in the list, supplied in set status

  • nocolor (Boolean) – True if the result should not include color codes

  • existingList (List of Job Objects) – External List of Jobs that will be printed, this excludes the inner list of jobs.

Returns:

String representation

Return type:

String

remove_rerun_only_jobs(notransitive=False)#

Removes all jobs to be run only in reruns

rerun(job_list_unparsed, as_conf, monitor=False)#

Updates job list to rerun the jobs specified by a job list :param job_list_unparsed: list of jobs to rerun :type job_list_unparsed: str :param as_conf: experiment configuration :type as_conf: AutosubmitConfig :param monitor: if True, the job list will be monitored :type monitor: bool

static retrieve_packages(BasicConfig, expid, current_jobs=None)#

Retrieves dictionaries that map the collection of packages in the experiment

Parameters:
  • BasicConfig (Configuration Object) – Basic configuration

  • expid (String) – Experiment ID

  • current_jobs (list) – list of names of current jobs

Returns:

job to package, package to job, package to package_id, package to symbol

Return type:

Dictionary(Job Object, Package), Dictionary(Package, List of Job Objects), Dictionary(String, String), Dictionary(String, String)

static retrieve_times(status_code, name, tmp_path, make_exception=False, job_times=None, seconds=False, job_data_collection=None)#

Retrieve job timestamps from database. :param job_data_collection: :param seconds: :param status_code: Code of the Status of the job :type status_code: Integer :param name: Name of the job :type name: String :param tmp_path: Path to the tmp folder of the experiment :type tmp_path: String :param make_exception: flag for testing purposes :type make_exception: Boolean :param job_times: Detail from as_times.job_times for the experiment :type job_times: Dictionary Key: job name, Value: 5-tuple (submit time, start time, finish time, status, detail id) :return: minutes the job has been queuing, minutes the job has been running, and the text that represents it :rtype: int, int, str

save()#

Persists the job list

sort_by_id()#

Returns a list of jobs sorted by id

Returns:

jobs sorted by ID

Return type:

list

sort_by_name()#

Returns a list of jobs sorted by name

Returns:

jobs sorted by name

Return type:

list

sort_by_status()#

Returns a list of jobs sorted by status

Returns:

job sorted by status

Return type:

list

sort_by_type()#

Returns a list of jobs sorted by type

Returns:

job sorted by type

Return type:

list

split_by_platform()#

Splits the job list by platform name :return: job list per platform :rtype: dict

update_from_file(store_change=True)#

Updates jobs list on the fly from and update file :param store_change: if True, renames the update file to avoid reloading it at the next iteration

update_genealogy()#

When we have created the job list, every type of job is created. Update genealogy remove jobs that have no templates

update_list(as_conf: AutosubmitConfig, store_change: bool = True, fromSetStatus: bool = False, submitter: Optional[object] = None, first_time: bool = False) bool#

Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED

Parameters:
  • first_time

  • submitter

  • fromSetStatus

  • store_change

  • as_conf (AutosubmitConfig) – autosubmit config object

Returns:

True if job status were modified, False otherwise

Return type:

bool

update_log_status(job, as_conf)#

Updates the log err and log out.