API Reference

Flow

Bases: StateModel

Flow class represents a sequence of jobs to be executed in a defined order.

Attributes:
  • name (str) –

    The name of the flow.

  • jobs (List[Job]) –

    A list of jobs to be executed in the flow.

  • env (dict) –

    A dictionary containing environment variables for the flow.

  • context (dict) –

    A dictionary containing context information for the flow.

  • workspace (str) –

    The workspace directory for the flow.

  • _start (datetime) –

    The start time of the flow execution.

  • _end (datetime) –

    The end time of the flow execution.

Properties

jobs_count (int): Returns the number of jobs in the flow.

Methods:

Name Description
next_job

Yields the index and job for each job in the flow.

execute

Raises an exception if any job fails during execution.

summary

Prints a summary of the actions in each job of the flow.

load

str) -> dict: Loads flow data from a YAML string and returns a dictionary containing the parsed flow data.

from_file

str) -> "Flow": Creates a Flow instance from a file containing the flow data.

from_string

str) -> "Flow": Creates a Flow instance from a raw string containing the flow data.

load_all_actions

Loads all action modules from the 'actionflow.actions' package.

Source code in actionflow/core.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class Flow(StateModel):
    """
    Flow class represents a sequence of jobs to be executed in a defined order.

    Attributes:
        name (str): The name of the flow.
        jobs (List[Job]): A list of jobs to be executed in the flow.
        env (dict): A dictionary containing environment variables for the flow.
        context (dict): A dictionary containing context information for the flow.
        workspace (str): The workspace directory for the flow.
        _start (datetime): The start time of the flow execution.
        _end (datetime): The end time of the flow execution.

    Properties:
        jobs_count (int): Returns the number of jobs in the flow.

    Methods:
        next_job() -> Generator[Tuple[int, Job], None, None]:
            Yields the index and job for each job in the flow.

        execute():
            Raises an exception if any job fails during execution.

        summary():
            Prints a summary of the actions in each job of the flow.

        load(raw: str) -> dict:
            Loads flow data from a YAML string and returns a dictionary containing the parsed flow data.

        from_file(filepath: str) -> "Flow":
            Creates a Flow instance from a file containing the flow data.

        from_string(raw: str) -> "Flow":
            Creates a Flow instance from a raw string containing the flow data.

        load_all_actions():
            Loads all action modules from the 'actionflow.actions' package.

    """

    name: str
    jobs: List[Job]
    env: dict = {}
    context: dict = {}
    workspace: str

    _start: datetime = None
    _end: datetime = None

    @property
    def jobs_count(self):
        return len(self.jobs)

    @property
    def exec_time(self) -> float:
        return self._end - self._start

    @staticmethod
    def get_available_actions() -> List[str]:
        return sorted(Action.list())

    def init_workspace(self):
        Path(self.workspace).mkdir(parents=True, exist_ok=True)

    def next_job(self) -> Generator[Tuple[int, Job], None, None]:
        for index, job in enumerate(self.jobs, start=1):
            yield index, job

    def execute(self):
        """
        Executes the flow by starting the machine, executing each job, and handling success or failure.

        The method performs the following steps:
        1. Starts the machine.
        2. Prints a message indicating the start of execution.
        3. Iterates over the jobs and executes each one.
        4. Checks the state of each job's machine. If any job fails (state is not "success"), raises an exception.
        5. If all jobs succeed, completes the machine.

        If an exception occurs during execution:
        - Fails the machine.
        - Prints an error message with the exception details.

        Raises:
            Exception: If any job fails during execution.
        """

        self._start = datetime.now()

        self.init_workspace()

        _logger.info("*" * 50)
        try:
            _logger.info(f"[Flow] Starting execution... ({self.jobs_count} jobs)")
            self.machine.start()

            for index, job in self.next_job():
                job.execute()
                if job.machine.state != "success":
                    _logger.error(f"Job {index}/{self.jobs_count} {job.name} failed.")
                    return

        except Exception as error:
            _logger.error(f"[Flow] Failed with error: {error}")
            self.machine.fail()
            return

        self._end = datetime.now()
        self.machine.complete()

    def summary(self) -> Generator[str, None, None]:
        for job_index, job in self.next_job():
            yield f"Job {job_index}: {job.name}"
            for group_index, group in job.next_group():
                # yield f"Group {group_index}"
                for action_index, action in group.next_action():
                    yield f"Action {action_index}: {action.name} -> {action._exec_time:.5f} ({action.machine.state})"

        yield f"Total execution time: {self.exec_time}"

    @staticmethod
    def load(raw: str) -> dict:
        """
        Load flow data from a YAML string.

        This function takes a YAML string as input, parses it, and returns a dictionary
        containing the flow data. It processes the environment variables and jobs defined
        in the YAML string and structures them into a new dictionary format.

        Args:
            raw (str): A string containing the YAML data.

        Returns:
            dict: A dictionary containing the parsed flow data with keys 'name', 'jobs', 'env', and 'context'.
        """

        data = yaml.safe_load(raw)
        env = data.get("env", {})
        parsed_data = parse_yaml(raw, env)

        jobs = [
            {
                "name": k,
                "steps": v["steps"],
            }
            for k, v in parsed_data["jobs"].items()
        ]

        context = Context()
        context.initialize(parsed_data.get("context", {}))

        return {
            "name": parsed_data["name"],
            "jobs": jobs,
            "env": env,
            "workspace": parsed_data.get("context", {}).get("workspace", ""),
            "context": parsed_data.get("context", {}),
        }

    @classmethod
    def from_file(cls, filepath: str) -> "Flow":
        """
        Create a Flow instance from a file.

        Args:
            filepath (str): The path to the file containing the flow data.

        Returns:
            Flow: An instance of the Flow class created from the file data.

        Raises:
            FileNotFoundError: If the file at the specified path does not exist.
            IOError: If there is an error reading the file.
            ValidationError: If the data in the file is not valid for creating a Flow instance.
        """
        with open(filepath, "r") as file:
            raw = file.read()
            data = cls.load(raw)

        return cls.model_validate(data)

    @classmethod
    def from_string(cls, raw: str) -> "Flow":
        """
        Create a Flow instance from a raw string.

        Args:
            raw (str): The raw string to be parsed and converted into a Flow instance.

        Returns:
            Flow: An instance of the Flow class created from the parsed data.
        """
        data = cls.load(raw)
        return cls.model_validate(data)

    @staticmethod
    def load_all_actions(package_name: str = "actionflow.actions") -> None:
        package = importlib.import_module(package_name)
        for _, module_name, is_pkg in pkgutil.iter_modules(package.__path__):
            if not is_pkg:
                importlib.import_module(f"{package_name}.{module_name}")

execute()

Executes the flow by starting the machine, executing each job, and handling success or failure.

The method performs the following steps: 1. Starts the machine. 2. Prints a message indicating the start of execution. 3. Iterates over the jobs and executes each one. 4. Checks the state of each job's machine. If any job fails (state is not "success"), raises an exception. 5. If all jobs succeed, completes the machine.

If an exception occurs during execution: - Fails the machine. - Prints an error message with the exception details.

Raises:
  • Exception

    If any job fails during execution.

Source code in actionflow/core.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def execute(self):
    """
    Executes the flow by starting the machine, executing each job, and handling success or failure.

    The method performs the following steps:
    1. Starts the machine.
    2. Prints a message indicating the start of execution.
    3. Iterates over the jobs and executes each one.
    4. Checks the state of each job's machine. If any job fails (state is not "success"), raises an exception.
    5. If all jobs succeed, completes the machine.

    If an exception occurs during execution:
    - Fails the machine.
    - Prints an error message with the exception details.

    Raises:
        Exception: If any job fails during execution.
    """

    self._start = datetime.now()

    self.init_workspace()

    _logger.info("*" * 50)
    try:
        _logger.info(f"[Flow] Starting execution... ({self.jobs_count} jobs)")
        self.machine.start()

        for index, job in self.next_job():
            job.execute()
            if job.machine.state != "success":
                _logger.error(f"Job {index}/{self.jobs_count} {job.name} failed.")
                return

    except Exception as error:
        _logger.error(f"[Flow] Failed with error: {error}")
        self.machine.fail()
        return

    self._end = datetime.now()
    self.machine.complete()

from_file(filepath) classmethod

Create a Flow instance from a file.

Parameters:
  • filepath (str) –

    The path to the file containing the flow data.

Returns:
  • Flow( Flow ) –

    An instance of the Flow class created from the file data.

Raises:
  • FileNotFoundError

    If the file at the specified path does not exist.

  • IOError

    If there is an error reading the file.

  • ValidationError

    If the data in the file is not valid for creating a Flow instance.

Source code in actionflow/core.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@classmethod
def from_file(cls, filepath: str) -> "Flow":
    """
    Create a Flow instance from a file.

    Args:
        filepath (str): The path to the file containing the flow data.

    Returns:
        Flow: An instance of the Flow class created from the file data.

    Raises:
        FileNotFoundError: If the file at the specified path does not exist.
        IOError: If there is an error reading the file.
        ValidationError: If the data in the file is not valid for creating a Flow instance.
    """
    with open(filepath, "r") as file:
        raw = file.read()
        data = cls.load(raw)

    return cls.model_validate(data)

from_string(raw) classmethod

Create a Flow instance from a raw string.

Parameters:
  • raw (str) –

    The raw string to be parsed and converted into a Flow instance.

Returns:
  • Flow( Flow ) –

    An instance of the Flow class created from the parsed data.

Source code in actionflow/core.py
198
199
200
201
202
203
204
205
206
207
208
209
210
@classmethod
def from_string(cls, raw: str) -> "Flow":
    """
    Create a Flow instance from a raw string.

    Args:
        raw (str): The raw string to be parsed and converted into a Flow instance.

    Returns:
        Flow: An instance of the Flow class created from the parsed data.
    """
    data = cls.load(raw)
    return cls.model_validate(data)

load(raw) staticmethod

Load flow data from a YAML string.

This function takes a YAML string as input, parses it, and returns a dictionary containing the flow data. It processes the environment variables and jobs defined in the YAML string and structures them into a new dictionary format.

Parameters:
  • raw (str) –

    A string containing the YAML data.

Returns:
  • dict( dict ) –

    A dictionary containing the parsed flow data with keys 'name', 'jobs', 'env', and 'context'.

Source code in actionflow/core.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@staticmethod
def load(raw: str) -> dict:
    """
    Load flow data from a YAML string.

    This function takes a YAML string as input, parses it, and returns a dictionary
    containing the flow data. It processes the environment variables and jobs defined
    in the YAML string and structures them into a new dictionary format.

    Args:
        raw (str): A string containing the YAML data.

    Returns:
        dict: A dictionary containing the parsed flow data with keys 'name', 'jobs', 'env', and 'context'.
    """

    data = yaml.safe_load(raw)
    env = data.get("env", {})
    parsed_data = parse_yaml(raw, env)

    jobs = [
        {
            "name": k,
            "steps": v["steps"],
        }
        for k, v in parsed_data["jobs"].items()
    ]

    context = Context()
    context.initialize(parsed_data.get("context", {}))

    return {
        "name": parsed_data["name"],
        "jobs": jobs,
        "env": env,
        "workspace": parsed_data.get("context", {}).get("workspace", ""),
        "context": parsed_data.get("context", {}),
    }

Group

Bases: StateModel

A class representing a group of actions to be executed in parallel.

Attributes:
  • actions (List[Action]) –

    A list of actions to be executed.

Methods:

Name Description
next_action

Yields the index and action for each action in the group.

execute

Executes all actions in the group in parallel. If any action fails, the group's state is set to 'fail'. If all actions succeed, the group's state is set to 'complete'.

Source code in actionflow/jobs.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class Group(StateModel):
    """
    A class representing a group of actions to be executed in parallel.

    Attributes:
        actions (List[Action]): A list of actions to be executed.

    Methods:
        next_action() -> Generator[Tuple[int, Action], None, None]:
            Yields the index and action for each action in the group.

        execute():
            Executes all actions in the group in parallel. If any action fails,
            the group's state is set to 'fail'. If all actions succeed, the group's
            state is set to 'complete'.
    """

    actions: List[Action]

    def next_action(self) -> Generator[Tuple[int, Action], None, None]:
        for index, action in enumerate(self.actions, start=1):
            yield index, action

    def execute(self):
        try:
            # _logger.info("[Group] Executing actions in parallel...")
            self.machine.start()

            with ThreadPoolExecutor() as executor:
                futures = [executor.submit(action.execute) for action in self.actions]
                for future in futures:
                    # Wait for all actions to complete
                    future.result()

            if not all(action.machine.state == "success" for action in self.actions):
                self.machine.fail()
                return

        except Exception as e:
            self.machine.fail()
            _logger.info(f"[Group] Failed with error: {e}")
            return

        # _logger.info("[Group] All actions completed successfully.")
        self.machine.complete()

Job

Bases: StateModel

Represents a job consisting of a series of actions to be executed.

Attributes:
  • name (str) –

    The name of the job.

  • steps (List[Action]) –

    A list of actions to be executed as part of the job.

Methods:

Name Description
preprocess_data

Replaces steps with action instances created from the registry.

length

Returns the number of steps in the job.

grouped

Groups the steps by the "concurrency" attribute.

set_indexes

int) -> None: Sets unique indexes for each action in the job.

next_group

Yields the next group of actions to be executed.

execute

Executes the job by running all actions in sequence, handling state transitions and logging.

Source code in actionflow/jobs.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class Job(StateModel):
    """
    Represents a job consisting of a series of actions to be executed.

    Attributes:
        name (str): The name of the job.
        steps (List[Action]): A list of actions to be executed as part of the job.

    Methods:
        preprocess_data(cls, values):
            Replaces steps with action instances created from the registry.

        length() -> int:
            Returns the number of steps in the job.

        grouped() -> List[List[Action]]:
            Groups the steps by the "concurrency" attribute.

        set_indexes(index: int) -> None:
            Sets unique indexes for each action in the job.

        next_group() -> Generator[Tuple[int, Group], None, None]:
            Yields the next group of actions to be executed.

        execute():
            Executes the job by running all actions in sequence, handling state transitions and logging.
    """

    name: str
    steps: List[Action]

    @model_validator(mode="before")
    def preprocess_data(cls, values):
        """
        Replace steps with action instances created from the registry
        """
        steps = []

        for step in values["steps"]:
            name = step.pop("name")
            params = step.pop("with", {})
            try:
                action = Action.by_name(name, **params)
            except ActionNotFound:
                _logger.error(f"Action not found: {name}")
                exit(1)
            steps.append(action)

        values["steps"] = steps
        return values

    @property
    def length(self) -> int:
        return len(self.steps)

    @property
    def grouped(self) -> List[List[Action]]:
        return group_by(self.steps, "concurrency")

    def set_indexes(self, index: int) -> None:
        for group_index, group in enumerate(self.grouped, start=1):
            for action_index, action in enumerate(group, start=1):
                action._id = (
                    f"{index}_{self.name}_{group_index}_{action_index}_{action.name}"
                )

    def next_group(self) -> Generator[Tuple[int, Group], None, None]:
        for index, actions in enumerate(self.grouped, start=1):
            group = Group(actions=actions)
            yield index, group

    def execute(self):
        try:
            self.machine.start()
            _logger.info(f"[Job: {self.name}] Starting execution...")

            total = len(self.grouped)
            for index, group in self.next_group():
                _logger.info(
                    f"[Group {index}/{total}] Executing actions in parallel..."
                )

                group.execute()
                if group.machine.state != "success":
                    self.machine.fail()
                    return

                _logger.info(
                    f"[Group {index}/{total}] All actions completed successfully."
                )

            _logger.info(f"[Job: {self.name}] Completed successfully.")

        except Exception as e:
            _logger.info(f"[Job: {self.name}] Failed with error: {e}")
            self.machine.fail()
            return

        self.machine.complete()

preprocess_data(values)

Replace steps with action instances created from the registry

Source code in actionflow/jobs.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@model_validator(mode="before")
def preprocess_data(cls, values):
    """
    Replace steps with action instances created from the registry
    """
    steps = []

    for step in values["steps"]:
        name = step.pop("name")
        params = step.pop("with", {})
        try:
            action = Action.by_name(name, **params)
        except ActionNotFound:
            _logger.error(f"Action not found: {name}")
            exit(1)
        steps.append(action)

    values["steps"] = steps
    return values

Action

Bases: BaseAction, StateModel

Source code in actionflow/action.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class Action(BaseAction, StateModel):
    _id: str
    _exec_time: float
    name: str = None
    description: str
    concurrency: bool = False
    retry: int = 1
    skip: bool = False
    continue_on_error: bool = False

    shared_resources: SharedResources = SharedResources()

    @log_execution("_exec_time")
    def run(self) -> bool:
        """Run the action with retry logic"""
        try:
            while self.retry > 0:
                # Check if the action should be skipped
                if self.skip and self._check():
                    _logger.info(f"[Action: {self.name}] already satisfied, skipping.")
                    return True

                self._pre_process()

                if self._run():  # If the action succeeds, stop retrying
                    _logger.info(f"[Action: {self.name}] completed successfully.")
                    self._post_process()
                    return self._check()
                if self.continue_on_error:
                    _logger.warning(
                        f"[Action: {self.name}] Error occurred, continuing despite failure."
                    )
                    self._post_process()
                    return True
                self.retry -= 1
                _logger.warning(
                    f"[Action: {self.name}] Retrying, attempts left: {self.retry}"
                )
        except Exception as error:
            _logger.error(f"[Action: {self.name}] Error: {error}")
            raise

        return False

    def execute(self):
        """Unified execution pipeline."""
        self.machine.start()
        try:
            self.machine.complete() if self.run() else self.machine.fail()
        except Exception as error:
            _logger.error(f"Error executing action {self.name}: {error}")
            self.machine.fail()

    def summary(self):
        """Summary of the action"""
        return {
            "name": self.name,
            "state": self.machine.state,
            "exec": self._exec_time,
        }

execute()

Unified execution pipeline.

Source code in actionflow/action.py
 96
 97
 98
 99
100
101
102
103
def execute(self):
    """Unified execution pipeline."""
    self.machine.start()
    try:
        self.machine.complete() if self.run() else self.machine.fail()
    except Exception as error:
        _logger.error(f"Error executing action {self.name}: {error}")
        self.machine.fail()

run()

Run the action with retry logic

Source code in actionflow/action.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@log_execution("_exec_time")
def run(self) -> bool:
    """Run the action with retry logic"""
    try:
        while self.retry > 0:
            # Check if the action should be skipped
            if self.skip and self._check():
                _logger.info(f"[Action: {self.name}] already satisfied, skipping.")
                return True

            self._pre_process()

            if self._run():  # If the action succeeds, stop retrying
                _logger.info(f"[Action: {self.name}] completed successfully.")
                self._post_process()
                return self._check()
            if self.continue_on_error:
                _logger.warning(
                    f"[Action: {self.name}] Error occurred, continuing despite failure."
                )
                self._post_process()
                return True
            self.retry -= 1
            _logger.warning(
                f"[Action: {self.name}] Retrying, attempts left: {self.retry}"
            )
    except Exception as error:
        _logger.error(f"[Action: {self.name}] Error: {error}")
        raise

    return False

summary()

Summary of the action

Source code in actionflow/action.py
105
106
107
108
109
110
111
def summary(self):
    """Summary of the action"""
    return {
        "name": self.name,
        "state": self.machine.state,
        "exec": self._exec_time,
    }

BaseAction

Bases: ABC

Source code in actionflow/action.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BaseAction(ABC):
    name: str
    _context: Context = Context()
    _subclasses: dict = {}

    @classmethod
    def list(cls):
        """List subclasses"""
        return sorted(list(cls._subclasses.keys()))

    @classmethod
    def by_name(cls, name, **kwargs):
        """Get subclass by name"""
        try:
            return cls._subclasses[name](**kwargs)
        except KeyError:
            raise ActionNotFound(f"'{name}' not found")

    def set_context(self, context: BaseModel):
        """Set context for the action"""
        self.context = context

    @abstractmethod
    def _run(self):
        """Method to be implemented by subclasses"""

    def _check(self) -> bool:
        """Method to be implemented by subclasses"""
        return True

    def _pre_process(self):
        """Method to be implemented by subclasses"""

    def _post_process(self):
        """Method to be implemented by subclasses"""

    def __init_subclass__(cls):
        if cls.name is not None:
            BaseAction._subclasses[cls.name] = cls

by_name(name, **kwargs) classmethod

Get subclass by name

Source code in actionflow/action.py
21
22
23
24
25
26
27
@classmethod
def by_name(cls, name, **kwargs):
    """Get subclass by name"""
    try:
        return cls._subclasses[name](**kwargs)
    except KeyError:
        raise ActionNotFound(f"'{name}' not found")

list() classmethod

List subclasses

Source code in actionflow/action.py
16
17
18
19
@classmethod
def list(cls):
    """List subclasses"""
    return sorted(list(cls._subclasses.keys()))

set_context(context)

Set context for the action

Source code in actionflow/action.py
29
30
31
def set_context(self, context: BaseModel):
    """Set context for the action"""
    self.context = context

Actions

Pull

Bases: Action

Action to pull Docker images.

Attributes:
  • name (str) –

    The name of the action, default is "pull".

  • description (str) –

    A brief description of the action, default is "Pull docker images".

  • registry (str) –

    The Docker registry to pull images from.

  • login (str) –

    The login username for the Docker registry.

  • password (str) –

    The login password for the Docker registry.

  • images (List[ImageSchema]) –

    A list of images to be pulled, defined by the ImageSchema.

Methods:

Name Description
_get_credentials

Retrieves the credentials for Docker registry authentication.

_check

Checks if all specified images are already present in the local Docker client.

_run

Pulls the specified images from the Docker registry if they are not already present.

Source code in actionflow/actions/container.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Pull(Action):
    """
    Action to pull Docker images.

    Attributes:
        name (str): The name of the action, default is "pull".
        description (str): A brief description of the action, default is "Pull docker images".
        registry (str): The Docker registry to pull images from.
        login (str): The login username for the Docker registry.
        password (str): The login password for the Docker registry.
        images (List[ImageSchema]): A list of images to be pulled, defined by the ImageSchema.

    Methods:
        _get_credentials() -> dict:
            Retrieves the credentials for Docker registry authentication.

        _check() -> bool:
            Checks if all specified images are already present in the local Docker client.

        _run() -> bool:
            Pulls the specified images from the Docker registry if they are not already present.
    """

    name: str = "pull"
    description: str = "Pull docker images"

    registry: str = None
    login: str = None
    password: str = None
    images: List[ImageSchema]

    def _get_credentials(self) -> dict:
        vals = {"registry": self.registry} if self.registry else {}
        if self.login and self.password:
            vals.update({"username": self.login, "password": self.password})

        return {"auth_config": vals}

    def _check(self) -> bool:
        try:
            return all(client.images.get(image.name) for image in self.images)
        except docker.errors.ImageNotFound:
            return False

    def _run(self) -> bool:
        for image in self.images:
            try:
                client.images.get(image.name)
            except docker.errors.ImageNotFound:
                _logger.info(f"Image {image.name} not found")

                client.images.pull(repository=image.repository, tag=image.tag)
                _logger.info(f"Image {image.name} pulled")
            else:
                _logger.info(f"Image {image.name} already exists")
        return True