Comments (3)
Sorry, I'm not a collaborator, so I cannot assign anybody... maybe @eladkal could?
from airflow.
I have created this extended version and seems to be working correctly, sorry I don't have time to create a PR:
from __future__ import annotations
from airflow.configuration import conf
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.context import Context
from requests import Response
from requests.auth import AuthBase
from typing import Any, Callable
class RetryHttpOperator(HttpOperator):
def __init__(self, *, endpoint: str | None = None, method: str = "POST", data: dict[str, Any] | str | None = None,
headers: dict[str, str] | None = None, pagination_function: Callable[..., Any] | None = None,
response_check: Callable[..., bool] | None = None, response_filter: Callable[..., Any] | None = None,
extra_options: dict[str, Any] | None = None, http_conn_id: str = "http_default",
log_response: bool = False, auth_type: type[AuthBase] | None = None, tcp_keep_alive: bool = True,
tcp_keep_alive_idle: int = 120, tcp_keep_alive_count: int = 20, tcp_keep_alive_interval: int = 30,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
retry_args: dict[Any, Any],
**kwargs: Any) -> None:
super().__init__(endpoint=endpoint, method=method, data=data, headers=headers,
pagination_function=pagination_function, response_check=response_check,
response_filter=response_filter, extra_options=extra_options, http_conn_id=http_conn_id,
log_response=log_response, auth_type=auth_type, tcp_keep_alive=tcp_keep_alive,
tcp_keep_alive_idle=tcp_keep_alive_idle, tcp_keep_alive_count=tcp_keep_alive_count,
tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable, **kwargs)
self._retry_args = retry_args
def execute_sync(self, context: Context) -> Any:
self.log.info("Calling HTTP method")
if self._retry_args:
response = self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data, self.headers,
self.extra_options)
else:
response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options)
response = self.paginate_sync(response=response)
return self.process_response(context=context, response=response)
def paginate_sync(self, response: Response) -> Response | list[Response]:
if not self.pagination_function:
return response
all_responses = [response]
while True:
next_page_params = self.pagination_function(response)
if not next_page_params:
break
if self._retry_args:
response = self.hook.run_with_advanced_retry(self._retry_args,
**self._merge_next_page_parameters(next_page_params))
else:
response = self.hook.run(**self._merge_next_page_parameters(next_page_params))
all_responses.append(response)
return all_responses
from airflow.
Hi, I can work on this issue. Could you please assign me?
from airflow.
Related Issues (20)
- Best practice: Mock connections AttributeError: type object 'Connection' has no attribute 'get' HOT 5
- union of dynamically mapped tasks in airflow
- Error when _upload_file_temp in DataprocSubmitPySparkJobOperator HOT 4
- ProcessPoolExecutor in CeleryExecutor should be reused
- Resolve `AirflowProviderDeprecationWarning` in providers system tests HOT 11
- [Bug] Strict validation in Dataset URI in Airflow 2.9 breaks some DAGs HOT 2
- Upgrade `gcloud-aio-auth` to 5.2.+ HOT 2
- SageMakerTransformOperator does not deduplicate model name HOT 2
- Context is not preserved after execution returns from deferred state HOT 2
- mypy errors - Argument 1 has incompatible type "XComArg" HOT 2
- Unsupported credential type AzureIdentityCredentialAdapter when using AzureDataLakeStorageV2Hook with DefaultAzureCredential as authentication method (AKS workload indentity) HOT 1
- Unable to see dag_id and task_id tagging for mentioned metrics HOT 8
- SparkSubmitOperator not mark task as success after spark job complete job HOT 2
- Add `json` and `sql` template rendering to Grid View HOT 8
- Task Instance Note accordion doesn't collapse HOT 1
- Xcom support for reschedule sensors HOT 4
- Airflow 与 任务 失联,任务状态没办法更新 HOT 2
- BigQuery task decorated functions failing in Airflow 2.9.1 HOT 7
- Add service account impersonation with Google Cloud SQL Proxy in Google Cloud SQL Operators
- The Rendered Template button has disappeared from the Task Detail. HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from airflow.