Giter VIP home page Giter VIP logo

Comments (3)

lopezvit avatar lopezvit commented on June 2, 2024 1

Sorry, I'm not a collaborator, so I cannot assign anybody... maybe @eladkal could?

from airflow.

lopezvit avatar lopezvit commented on June 2, 2024

I have created this extended version and seems to be working correctly, sorry I don't have time to create a PR:

from __future__ import annotations

from airflow.configuration import conf
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.context import Context
from requests import Response
from requests.auth import AuthBase
from typing import Any, Callable


class RetryHttpOperator(HttpOperator):

    def __init__(self, *, endpoint: str | None = None, method: str = "POST", data: dict[str, Any] | str | None = None,
                 headers: dict[str, str] | None = None, pagination_function: Callable[..., Any] | None = None,
                 response_check: Callable[..., bool] | None = None, response_filter: Callable[..., Any] | None = None,
                 extra_options: dict[str, Any] | None = None, http_conn_id: str = "http_default",
                 log_response: bool = False, auth_type: type[AuthBase] | None = None, tcp_keep_alive: bool = True,
                 tcp_keep_alive_idle: int = 120, tcp_keep_alive_count: int = 20, tcp_keep_alive_interval: int = 30,
                 deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
                 retry_args: dict[Any, Any],
                 **kwargs: Any) -> None:
        super().__init__(endpoint=endpoint, method=method, data=data, headers=headers,
                         pagination_function=pagination_function, response_check=response_check,
                         response_filter=response_filter, extra_options=extra_options, http_conn_id=http_conn_id,
                         log_response=log_response, auth_type=auth_type, tcp_keep_alive=tcp_keep_alive,
                         tcp_keep_alive_idle=tcp_keep_alive_idle, tcp_keep_alive_count=tcp_keep_alive_count,
                         tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable, **kwargs)
        self._retry_args = retry_args

    def execute_sync(self, context: Context) -> Any:
        self.log.info("Calling HTTP method")
        if self._retry_args:
            response = self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data, self.headers,
                                                         self.extra_options)
        else:
            response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options)
        response = self.paginate_sync(response=response)
        return self.process_response(context=context, response=response)

    def paginate_sync(self, response: Response) -> Response | list[Response]:
        if not self.pagination_function:
            return response

        all_responses = [response]
        while True:
            next_page_params = self.pagination_function(response)
            if not next_page_params:
                break
            if self._retry_args:
                response = self.hook.run_with_advanced_retry(self._retry_args,
                                                             **self._merge_next_page_parameters(next_page_params))
            else:
                response = self.hook.run(**self._merge_next_page_parameters(next_page_params))
            all_responses.append(response)
        return all_responses

from airflow.

boraberke avatar boraberke commented on June 2, 2024

Hi, I can work on this issue. Could you please assign me?

from airflow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.