autosubmit.job#
Main module for Autosubmit. Only contains an interface class to all functionality implemented on Autosubmit
- class autosubmit.job.job.Job(name, job_id, status, priority)#
Class to handle all the tasks with Jobs at HPC.
A job is created by default with a name, a jobid, a status and a type. It can have children and parents. The inheritance reflects the dependency between jobs. If Job2 must wait until Job1 is completed then Job2 is a child of Job1. Inversely Job1 is a parent of Job2
- add_children(children)#
Add children for the job. It also adds current job as a parent for all the new children
- Parameters:
children (list of Job objects) – job’s children to add
- add_edge_info(parent, special_conditions)#
Adds edge information to the job
- Parameters:
parent (Job) – parent job
special_conditions (dict) – special variables
- add_parent(*parents)#
Add parents for the job. It also adds current job as a child for all the new parents
- Parameters:
parents (Job) – job’s parents to add
- calendar_chunk(parameters)#
Calendar for chunks
- Parameters:
parameters –
- Returns:
- calendar_split(as_conf, parameters)#
Calendar for splits :param parameters: :return:
- check_completion(default_status=-1, over_wallclock=False)#
Check the presence of COMPLETED file. Change status to COMPLETED if COMPLETED file exists and to FAILED otherwise. :param over_wallclock: :param default_status: status to set if job is not completed. By default, is FAILED :type default_status: Status
- check_end_time(fail_count=-1)#
Returns end time from stat file
- Returns:
date and time
- Return type:
str
- check_retrials_end_time()#
Returns list of end datetime for retrials from total stats file
- Returns:
date and time
- Return type:
list[int]
- check_retrials_start_time()#
Returns list of start datetime for retrials from total stats file
- Returns:
date and time
- Return type:
list[int]
- check_running_after(date_limit)#
Checks if the job was running after the given date :param date_limit: reference date :type date_limit: datetime.datetime :return: True if job was running after the given date, false otherwise :rtype: bool
- check_script(as_conf, parameters, show_logs='false')#
Checks if script is well-formed
- Parameters:
parameters (dict) – script parameters
as_conf (AutosubmitConfig) – configuration file
show_logs (Bool) – Display output
- Returns:
true if not problem has been detected, false otherwise
- Return type:
bool
- check_start_time(fail_count=-1)#
Returns job’s start time
- Returns:
start time
- Return type:
str
- check_started_after(date_limit)#
Checks if the job started after the given date :param date_limit: reference date :type date_limit: datetime.datetime :return: True if job started after the given date, false otherwise :rtype: bool
- property checkpoint#
Generates a checkpoint step for this job based on job.type.
- property children#
Returns a list containing all children of the job
- Returns:
child jobs
- Return type:
set
- property children_names_str#
Comma separated list of children’s names
- property chunk#
Current chunk.
- create_script(as_conf)#
Creates script file to be run for the job
- Parameters:
as_conf (AutosubmitConfig) – configuration object
- Returns:
script’s filename
- Return type:
str
- property custom_directives#
List of custom directives.
- property delay#
Current delay.
- property delay_retrials#
TODO
- property dependencies#
Current job dependencies.
- property export#
TODO.
- property fail_count#
Number of failed attempts to run this job.
- property frequency#
TODO.
- get_checkpoint_files()#
Check if there is a file on the remote host that contains the checkpoint
- get_last_retrials() List[Union[datetime, str]] #
Returns the retrials of a job, including the last COMPLETED run. The selection stops, and does not include, when the previous COMPLETED job is located or the list of registers is exhausted.
- Returns:
list of dates of retrial [submit, start, finish] in datetime format
- Return type:
list of list
- get_new_remotelog_name(count=-1)#
Checks if remote log file exists on remote host if it exists, remote_log variable is updated :param
- has_children()#
Returns true if job has any children, else return false
- Returns:
true if job has any children, otherwise return false
- Return type:
bool
- has_parents()#
Returns true if job has any parents, else return false
- Returns:
true if job has any parent, otherwise return false
- Return type:
bool
- property hyperthreading#
Detects if hyperthreading is enabled or not.
- inc_fail_count()#
Increments fail count
- static is_a_completed_retrial(fields)#
Returns true only if there are 4 fields: submit start finish status, and status equals COMPLETED.
- is_ancestor(job)#
Check if the given job is an ancestor :param job: job to be checked if is an ancestor :return: True if job is an ancestor, false otherwise :rtype bool
- is_over_wallclock(start_time, wallclock)#
Check if the job is over the wallclock time, it is an alternative method to avoid platform issues :param start_time: :param wallclock: :return:
- is_parent(job)#
Check if the given job is a parent :param job: job to be checked if is a parent :return: True if job is a parent, false otherwise :rtype bool
- property long_name#
Job’s long name. If not set, returns name
- Returns:
long name
- Return type:
str
- property member#
Current member.
- property memory#
Memory requested for the job.
- property memory_per_task#
Memory requested per task.
- property name#
Current job full name.
- property nodes#
Number of nodes that the job will use.
- property packed#
TODO
- property parents#
Returns parent jobs list
- Returns:
parent jobs
- Return type:
set
- property partition#
Returns the queue to be used by the job. Chooses between serial and parallel platforms
:return HPCPlatform object for the job to use :rtype: HPCPlatform
- property platform#
Returns the platform to be used by the job. Chooses between serial and parallel platforms
:return HPCPlatform object for the job to use :rtype: HPCPlatform
- process_scheduler_parameters(as_conf, parameters, job_platform, chunk)#
Parsers yaml data stored in the dictionary and calculates the components of the heterogeneous job if any :return:
- property processors#
Number of processors that the job will use.
- property processors_per_node#
Number of processors per node that the job can use.
- property queue#
Returns the queue to be used by the job. Chooses between serial and parallel platforms.
:return HPCPlatform object for the job to use :rtype: HPCPlatform
- read_header_tailer_script(script_path: str, as_conf: AutosubmitConfig, is_header: bool)#
Opens and reads a script. If it is not a BASH script it will fail :(
Will strip away the line with the hash bang (#!)
- Parameters:
script_path – relative to the experiment directory path to the script
as_conf – Autosubmit configuration file
is_header – boolean indicating if it is header extended script
- remove_redundant_parents()#
Checks if a parent is also an ancestor, if true, removes the link in both directions. Useful to remove redundant dependencies.
- property retrials#
Max amount of retrials to run this job.
- retrieve_logfiles(platform, raise_error=False)#
Retrieves log files from remote host meant to be used inside a process. :param platform: platform that is calling the function, already connected. :param raise_error: boolean to raise an error if the logs are not retrieved :return:
- property scratch_free_space#
Percentage of free space required on the
scratch
.
- property sdate#
Current start date.
- property section#
Type of the job, as given on job configuration file.
- property shape#
Returns the shape of the job. Chooses between serial and parallel platforms
:return HPCPlatform object for the job to use :rtype: HPCPlatform
- property split#
Current split.
- property splits#
Max number of splits.
- property status_str#
String representation of the current status
- property synchronize#
TODO.
- property tasks#
Number of tasks that the job will use.
- property threads#
Number of threads that the job will use.
- property total_processors#
Number of processors requested by job. Reduces ‘:’ separated format if necessary.
- update_content(as_conf)#
Create the script content to be run for the job
- Parameters:
as_conf (config) – config
- Returns:
script code
- Return type:
str
- update_job_variables_final_values(parameters)#
Jobs variables final values based on parameters dict instead of as_conf This function is called to handle %CURRENT_% placeholders as they are filled up dynamically for each job
- update_parameters(as_conf, parameters, default_parameters={'M': '%M%', 'M_': '%M_%', 'Y': '%Y%', 'Y_': '%Y_%', 'd': '%d%', 'd_': '%d_%', 'm': '%m%', 'm_': '%m_%'})#
Refresh parameters value
- Parameters:
default_parameters (dict) –
as_conf (AutosubmitConfig) –
parameters (dict) –
- update_status(as_conf, failed_file=False)#
Updates job status, checking COMPLETED file if needed
- Parameters:
as_conf –
failed_file – boolean, if True, checks if the job failed
- Returns:
- property wallclock#
Duration for which nodes used by job will remain allocated.
- write_end_time(completed, enable_vertical_write=False, count=-1)#
Writes ends date and time to TOTAL_STATS file :param completed: True if job was completed successfully, False otherwise :type completed: bool
- write_start_time(enable_vertical_write=False, from_stat_file=False, count=-1)#
Writes start date and time to TOTAL_STATS file :return: True if successful, False otherwise :rtype: bool
- write_submit_time()#
Writes submit date and time to TOTAL_STATS file. It doesn’t write if hold is True.
- write_total_stat_by_retries(total_stats, first_retrial=False)#
Writes all data to TOTAL_STATS file :param total_stats: data gathered by the wrapper :type total_stats: dict :param first_retrial: True if this is the first retry, False otherwise :type first_retrial: bool
- class autosubmit.job.job.WrapperJob(name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config, hold)#
Defines a wrapper from a package.
Calls Job constructor.
- Parameters:
name (String) – Name of the Package
job_id (Integer) – ID of the first Job of the package
status (String) – ‘READY’ when coming from submit_ready_jobs()
priority (Integer) – 0 when coming from submit_ready_jobs()
job_list (List() of Job() objects) – List of jobs in the package
total_wallclock (String Formatted) – Wallclock of the package
num_processors (Integer) – Number of processors for the package
platform (Platform Object. e.g. EcPlatform()) – Platform object defined for the package
as_config (AutosubmitConfig object) – Autosubmit basic configuration object
- class autosubmit.job.job_common.StatisticsSnippetBash#
Class to handle the statistics snippet of a job. It contains header and tailer for local and remote jobs
- class autosubmit.job.job_common.StatisticsSnippetEmpty#
Class to handle the statistics snippet of a job. It contains header and footer for local and remote jobs
- class autosubmit.job.job_common.StatisticsSnippetPython(version='3')#
Class to handle the statistics snippet of a job. It contains header and tailer for local and remote jobs
- class autosubmit.job.job_common.StatisticsSnippetR#
Class to handle the statistics snippet of a job. It contains header and tailer for local and remote jobs
- class autosubmit.job.job_common.Status#
Class to handle the status of a job
- class autosubmit.job.job_common.Type#
Class to handle the status of a job
- autosubmit.job.job_common.increase_wallclock_by_chunk(current, increase, chunk)#
Receives the wallclock times an increases it according to a quantity times the number of the current chunk. The result cannot be larger than 48:00. If Chunk = 0 then no increment.
- Parameters:
current (str) – WALLCLOCK HH:MM
increase (str) – WCHUNKINC HH:MM
chunk (int) – chunk number
- Returns:
HH:MM wallclock
- Return type:
str
- autosubmit.job.job_common.parse_output_number(string_number)#
Parses number in format 1.0K 1.0M 1.0G
- Parameters:
string_number (str) – String representation of number
- Returns:
number in float format
- Return type:
float
- class autosubmit.job.job_list.JobList(expid, config, parser_factory, job_list_persistence, as_conf)#
Class to manage the list of jobs to be run by autosubmit
- add_logs(logs)#
add logs to the current job_list :return: logs :rtype: dict(tuple)
- add_special_conditions(job, special_conditions, filters_to_apply, parent)#
Add special conditions to the job edge :param job: Job :param special_conditions: dict :param filters_to_apply: dict :param parent: parent job :return:
- backup_save()#
Persists the job list
- check_checkpoint(job, parent)#
Check if a checkpoint step exists for this edge
- check_scripts(as_conf)#
When we have created the scripts, all parameters should have been substituted. %PARAMETER% handlers not allowed
- Parameters:
as_conf (AutosubmitConfig) – experiment configuration
- check_special_status()#
Check if all parents of a job have the correct status for checkpointing :return: jobs that fullfill the special conditions
- property expid#
Returns the experiment identifier
- Returns:
experiment’s identifier
- Return type:
str
- find_and_delete_redundant_relations(problematic_jobs)#
Jobs with intrisic rules than can’t be safelty not added without messing other workflows. The graph will have the least amount of edges added as much as safely possible before this function. Structure: problematic_jobs structure is {section: {child_name: [parent_names]}}
- Returns:
- generate(as_conf, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, default_job_type, wrapper_jobs={}, new=True, run_only_members=[], show_log=True, monitor=False, force=False, create=False)#
Creates all jobs needed for the current workflow. :param as_conf: AutosubmitConfig object :type as_conf: AutosubmitConfig :param date_list: list of dates :type date_list: list :param member_list: list of members :type member_list: list :param num_chunks: number of chunks :type num_chunks: int :param chunk_ini: initial chunk :type chunk_ini: int :param parameters: parameters :type parameters: dict :param date_format: date format ( D/M/Y ) :type date_format: str :param default_retrials: default number of retrials :type default_retrials: int :param default_job_type: default job type :type default_job_type: str :param wrapper_jobs: wrapper jobs :type wrapper_jobs: dict :param new: new :type new: bool :param run_only_members: run only members :type run_only_members: list :param show_log: show log :type show_log: bool :param monitor: monitor :type monitor: bool
- get_active(platform=None, wrapper=False)#
Returns a list of active jobs (In platforms queue + Ready)
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
active jobs
- Return type:
list
- get_all(platform=None, wrapper=False)#
Returns a list of all jobs
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
all jobs
- Return type:
list
- get_chunk_list()#
Get inner chunk list
- Returns:
chunk list
- Return type:
list
- get_completed(platform=None, wrapper=False)#
Returns a list of completed jobs
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
completed jobs
- Return type:
list
- get_completed_without_logs(platform=None)#
Returns a list of completed jobs without updated logs
- Parameters:
platform (HPCPlatform) – job platform
- Returns:
completed jobs
- Return type:
list
- get_date_list()#
Get inner date list
- Returns:
date list
- Return type:
list
- get_delayed(platform=None)#
Returns a list of delayed jobs
- Parameters:
platform (HPCPlatform) – job platform
- Returns:
delayed jobs
- Return type:
list
- get_failed(platform=None, wrapper=False)#
Returns a list of failed jobs
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
failed jobs
- Return type:
list
- get_finished(platform=None, wrapper=False)#
Returns a list of jobs finished (Completed, Failed)
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
finished jobs
- Return type:
list
- get_held_jobs(platform=None)#
Returns a list of jobs in the platforms (Held)
- Parameters:
platform (HPCPlatform) – job platform
- Returns:
jobs in platforms
- Return type:
list
- get_in_queue(platform=None, wrapper=False)#
Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown,Held)
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
jobs in platforms
- Return type:
list
- get_job_by_name(name)#
Returns the job that its name matches parameter name
- Parameters:
name (str) – name to look for
- Returns:
found job
- Return type:
job
- get_job_list()#
Get inner job list
- Returns:
job list
- Return type:
list
- get_job_names(lower_case=False)#
Returns a list of all job names :param: lower_case: if true, returns lower case job names :type: lower_case: bool
- Returns:
all job names
- Return type:
list
- Parameters:
two_step_start –
select_jobs_by_name – job name
select_all_jobs_by_section – section name
filter_jobs_by_section – section, date , member? , chunk?
- Returns:
jobs_list names
- Return type:
list
- get_jobs_by_section(section_list)#
Returns the job that its name matches parameter section :parameter section_list: list of sections to look for :type section_list: list :return: found job :rtype: job
- get_logs()#
Returns a dict of logs by jobs_name jobs
- Returns:
logs
- Return type:
dict(tuple)
- get_member_list()#
Get inner member list
- Returns:
member list
- Return type:
list
- get_not_in_queue(platform=None, wrapper=False)#
Returns a list of jobs NOT in the platforms (Ready, Waiting)
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
jobs not in platforms
- Return type:
list
- get_ordered_jobs_by_date_member(section)#
Get the dictionary of jobs ordered according to wrapper’s expression divided by date and member
- Returns:
jobs ordered divided by date and member
- Return type:
dict
- get_prepared(platform=None)#
Returns a list of prepared jobs
- Parameters:
platform (HPCPlatform) – job platform
- Returns:
prepared jobs
- Return type:
list
- get_queuing(platform=None, wrapper=False)#
Returns a list of jobs queuing
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
queuedjobs
- Return type:
list
- get_ready(platform=None, hold=False, wrapper=False)#
Returns a list of ready jobs
- Parameters:
wrapper –
hold –
platform (HPCPlatform) – job platform
- Returns:
ready jobs
- Return type:
list
- get_running(platform=None, wrapper=False)#
Returns a list of jobs running
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
running jobs
- Return type:
list
- get_skipped(platform=None)#
Returns a list of skipped jobs
- Parameters:
platform (HPCPlatform) – job platform
- Returns:
skipped jobs
- Return type:
list
- get_submitted(platform=None, hold=False, wrapper=False)#
Returns a list of submitted jobs
- Parameters:
wrapper –
hold –
platform (HPCPlatform) – job platform
- Returns:
submitted jobs
- Return type:
list
- get_suspended(platform=None, wrapper=False)#
Returns a list of jobs on unknown state
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
unknown state jobs
- Return type:
list
- get_uncompleted(platform=None, wrapper=False)#
Returns a list of completed jobs
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
completed jobs
- Return type:
list
- get_uncompleted_and_not_waiting(platform=None, wrapper=False)#
Returns a list of completed jobs and waiting
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
completed jobs
- Return type:
list
- get_unknown(platform=None, wrapper=False)#
Returns a list of jobs on unknown state
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
unknown state jobs
- Return type:
list
- get_unsubmitted(platform=None, wrapper=False)#
Returns a list of unsubmitted jobs
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
all jobs
- Return type:
list
- get_waiting(platform=None, wrapper=False)#
Returns a list of jobs waiting
- Parameters:
wrapper –
platform (HPCPlatform) – job platform
- Returns:
waiting jobs
- Return type:
list
- get_waiting_remote_dependencies(platform_type='slurm')#
Returns a list of jobs waiting on slurm scheduler :param platform_type: platform type :type platform_type: str :return: waiting jobs :rtype: list
- load(create=False, backup=False)#
Recreates a stored job list from the persistence
- Returns:
loaded job list object
- Return type:
- static load_file(filename)#
Recreates a stored joblist from the pickle file
- Parameters:
filename (str) – pickle file to load
- Returns:
loaded joblist object
- Return type:
- property parameters#
List of parameters common to all jobs :return: parameters :rtype: dict
- print_with_status(statusChange=None, nocolor=False, existingList=None)#
Returns the string representation of the dependency tree of the Job List
- Parameters:
statusChange (List of strings) – List of changes in the list, supplied in set status
nocolor (Boolean) – True if the result should not include color codes
existingList (List of Job Objects) – External List of Jobs that will be printed, this excludes the inner list of jobs.
- Returns:
String representation
- Return type:
String
- remove_rerun_only_jobs(notransitive=False)#
Removes all jobs to be run only in reruns
- rerun(job_list_unparsed, as_conf, monitor=False)#
Updates job list to rerun the jobs specified by a job list :param job_list_unparsed: list of jobs to rerun :type job_list_unparsed: str :param as_conf: experiment configuration :type as_conf: AutosubmitConfig :param monitor: if True, the job list will be monitored :type monitor: bool
- static retrieve_packages(BasicConfig, expid, current_jobs=None)#
Retrieves dictionaries that map the collection of packages in the experiment
- Parameters:
BasicConfig (Configuration Object) – Basic configuration
expid (String) – Experiment ID
current_jobs (list) – list of names of current jobs
- Returns:
job to package, package to job, package to package_id, package to symbol
- Return type:
Dictionary(Job Object, Package), Dictionary(Package, List of Job Objects), Dictionary(String, String), Dictionary(String, String)
- static retrieve_times(status_code, name, tmp_path, make_exception=False, job_times=None, seconds=False, job_data_collection=None)#
Retrieve job timestamps from database. :param job_data_collection: :param seconds: :param status_code: Code of the Status of the job :type status_code: Integer :param name: Name of the job :type name: String :param tmp_path: Path to the tmp folder of the experiment :type tmp_path: String :param make_exception: flag for testing purposes :type make_exception: Boolean :param job_times: Detail from as_times.job_times for the experiment :type job_times: Dictionary Key: job name, Value: 5-tuple (submit time, start time, finish time, status, detail id) :return: minutes the job has been queuing, minutes the job has been running, and the text that represents it :rtype: int, int, str
- save()#
Persists the job list
- sort_by_id()#
Returns a list of jobs sorted by id
- Returns:
jobs sorted by ID
- Return type:
list
- sort_by_name()#
Returns a list of jobs sorted by name
- Returns:
jobs sorted by name
- Return type:
list
- sort_by_status()#
Returns a list of jobs sorted by status
- Returns:
job sorted by status
- Return type:
list
- sort_by_type()#
Returns a list of jobs sorted by type
- Returns:
job sorted by type
- Return type:
list
- split_by_platform()#
Splits the job list by platform name :return: job list per platform :rtype: dict
- update_from_file(store_change=True)#
Updates jobs list on the fly from and update file :param store_change: if True, renames the update file to avoid reloading it at the next iteration
- update_genealogy()#
When we have created the job list, every type of job is created. Update genealogy remove jobs that have no templates
- update_list(as_conf: AutosubmitConfig, store_change: bool = True, fromSetStatus: bool = False, submitter: Optional[object] = None, first_time: bool = False) bool #
Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED
- Parameters:
first_time –
submitter –
fromSetStatus –
store_change –
as_conf (AutosubmitConfig) – autosubmit config object
- Returns:
True if job status were modified, False otherwise
- Return type:
bool
- update_log_status(job, as_conf)#
Updates the log err and log out.