ai2_kit.core.queue_system module#

class ai2_kit.core.queue_system.BaseQueueSystem[source]#

Bases: ABC

abstract cancel(job_id: str)[source]#
connector: BaseConnector#
abstract get_job_id_envvar() str[source]#
abstract get_job_id_pattern() str[source]#
abstract get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval() int[source]#
abstract get_script_suffix() str[source]#
get_setup_script() str[source]#
abstract get_submit_cmd() str[source]#
submit(script: str, cwd: str, name: Optional[str] = None, success_indicator: Optional[str] = None)[source]#
class ai2_kit.core.queue_system.Lsf[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: LSF#
get_job_id_envvar() str[source]#
get_job_id_pattern()[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval()[source]#
get_script_suffix()[source]#
get_submit_cmd()[source]#
class ai2_kit.core.queue_system.PBS[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: PBS#
get_job_id_envvar() str[source]#
get_job_id_pattern() str[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_script_suffix() str[source]#
get_setup_script() str[source]#
get_submit_cmd() str[source]#
translate_table = {'B': JobState.RUNNING, 'C': JobState.COMPLETED, 'E': JobState.COMPLETED, 'H': JobState.HELD, 'Q': JobState.PENDING, 'R': JobState.RUNNING, 'S': JobState.HELD, 'W': JobState.PENDING}#
class ai2_kit.core.queue_system.QueueJobFuture(queue_system: BaseQueueSystem, job_id: str, script: str, cwd: str, name: str, success_indicator: str, polling_interval=10)[source]#

Bases: JobFuture

cancel()[source]#
done()[source]#
get_job_state()[source]#
is_success()[source]#
resubmit()[source]#
result(timeout: float = inf) JobState[source]#
async result_async(timeout: float = inf) JobState[source]#

Though this is not fully async, as the job submission and state polling are still blocking, but it is already good enough to handle thousands of jobs (I guess).

property success_indicator_path#
class ai2_kit.core.queue_system.QueueSystemConfig(*, slurm: Optional[Slurm] = None, lsf: Optional[LSF] = None, pbs: Optional[PBS] = None)[source]#

Bases: BaseModel

class LSF(*, bsub_bin: str = 'bsub', bjobs_bin: str = 'bjobs', polling_interval: int = 10)[source]#

Bases: BaseModel

bjobs_bin: str#
bsub_bin: str#
polling_interval: int#
class PBS(*, qsub_bin: str = 'qsub', qstat_bin: str = 'qstat', qdel_bin: str = 'qdel')[source]#

Bases: BaseModel

qdel_bin: str#
qstat_bin: str#
qsub_bin: str#
class Slurm(*, sbatch_bin: str = 'sbatch', squeue_bin: str = 'squeue', scancel_bin: str = 'scancel', polling_interval: int = 10)[source]#

Bases: BaseModel

polling_interval: int#
sbatch_bin: str#
scancel_bin: str#
squeue_bin: str#
lsf: Optional[LSF]#
pbs: Optional[PBS]#
slurm: Optional[Slurm]#
class ai2_kit.core.queue_system.Slurm[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: Slurm#
get_job_id_envvar() str[source]#
get_job_id_pattern()[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval()[source]#
get_script_suffix()[source]#
get_submit_cmd()[source]#
translate_table = {'CA': JobState.CANCELLED, 'CD': JobState.COMPLETED, 'CF': JobState.PENDING, 'CG': JobState.RUNNING, 'F': JobState.FAILED, 'NF': JobState.FAILED, 'PD': JobState.PENDING, 'R': JobState.RUNNING, 'RV': JobState.FAILED, 'SE': JobState.FAILED, 'TO': JobState.TIMEOUT}#
ai2_kit.core.queue_system.inject_cmd_to_script(script: str, cmd: str)[source]#

Find the position of first none comment or empty lines, and inject command before it