操作符

使用 GithubOperatorGitHub 中执行操作。

您可以使用 GithubOperator 并通过传递顶级 PyGithub 方法中的 github_methodgithub_method_args 来构建自己的操作符。

您可以进一步使用 result_processor 可调用对象根据需要处理结果。

列出用户拥有的所有仓库的示例,client.get_user().get_repos() 可以按如下方式实现

tests/system/github/example_github.py


github_list_repos = GithubOperator(
    task_id="github_list_repos",
    github_method="get_user",
    result_processor=lambda user: logger.info(list(user.get_repos())),
)

列出仓库中标签的示例,client.get_repo(full_name_or_id=’apache/airflow’).get_tags() 可以按如下方式实现

tests/system/github/example_github.py


list_repo_tags = GithubOperator(
    task_id="list_repo_tags",
    github_method="get_repo",
    github_method_args={"full_name_or_id": "apache/airflow"},
    result_processor=lambda repo: logger.info(list(repo.get_tags())),
)

传感器

您可以使用 GithubSensor 构建自己的传感器,

您还可以使用 BaseGithubRepositorySensor 在仓库上实现自己的传感器,一个示例是 GithubTagSensor

使用 GithubTagSensor 等待在 GitHub 中创建标签。

标签 v1.0 的示例

tests/system/github/example_github.py


tag_sensor = GithubTagSensor(
    task_id="example_tag_sensor",
    tag_name="v1.0",
    repository_name="apache/airflow",
    timeout=60,
    poke_interval=10,
)

通过直接使用 GithubSensor 可以实现类似的功能。

tests/system/github/example_github.py


def tag_checker(repo: Any, tag_name: str) -> bool | None:
    result = None
    try:
        if repo is not None and tag_name is not None:
            all_tags = [x.name for x in repo.get_tags()]
            result = tag_name in all_tags

    except GithubException as github_error:  # type: ignore[misc]
        raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}")
    except Exception as e:
        raise AirflowException(f"GitHub operator error: {e}")
    return result

github_sensor = GithubSensor(
    task_id="example_sensor",
    method_name="get_repo",
    method_params={"full_name_or_id": "apache/airflow"},
    result_processor=lambda repo: tag_checker(repo, "v1.0"),
    timeout=60,
    poke_interval=10,
)

此条目有帮助吗?