bluspark / airflow-dag-run-bundle
Provide an HTTP Client to manage DAG run from Airflow
Installs: 4 043
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 2
Forks: 0
Open Issues: 0
Type:symfony-bundle
Requires
- php: >=8.1
- symfony/config: ^6.3
- symfony/dependency-injection: ^6.3
- symfony/event-dispatcher: ^6.3
- symfony/http-client: ^6.3
- symfony/http-kernel: ^6.3
- symfony/messenger: ^6.3
- symfony/scheduler: ^6.3
Requires (Dev)
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5
README
This bundle provide a way to trigger new dag on Apache Airflow run to generate export files and request asynchronously for the generated export filename.
Installation
With composer:
composer require bluspark/airflow-dag-run-bundle
Configuration
Make sure your bundle has been enabled in your config/bundles.php
file (automatically done if you're using Symfony Flex)
# config/bundles.php return [ ... Bluspark\AirflowDagRunBundle\BlusparkAirflowDagRunBundle::class => ['all' => true], ];
Add a bluspark_airflow_dag_run.yaml
file in your config/packages
directory to define the following required parameters:
bluspark_airflow_dag_run: airflow_host: https://from-config.example.org airflow_dag_ids: your-dag-id # or name:dag-id,anotherName:another-dag-id airflow_username: user airflow_password: !ChangeMe!
You can have more details about the configuration parameters by running this command in your Symfony project:
bin/console config:dump bluspark_airflow_dag_run
Finally, add in your config/packages/messenger.yaml
file the transport of your choice which will handle the success message dispatched by the bundle (as explained below):
framework: messenger: routing: 'Bluspark\AirflowDagRunBundle\Scheduler\Message\DagRunMessageExecuted': my_project_transport
If you're using Symfony < 6.4, the bundle won't use
Scheduler
but a standard message dispatched through aMessengerBus
instead.
If so, you must declare all the bundl'es messages for the transport management :framework: messenger: routing: 'Bluspark\AirflowDagRunBundle\Scheduler\Message\*': my_project_transport
Usage
The bundle provide a bridge service class that you can use in your project through using dependency injection.
namespace App\Controller; use Bluspark\AirflowDagRunBundle\Contracts\Bridge\AirflowDagBridgeInterface; use Symfony\Bundle\FrameworkBundle\Controller\AbstractController; use Symfony\Component\HttpFoundation\Response; use Symfony\Component\Routing\Annotation\Route; final class YourController extends AbstractController { #[Route(path: '/my-export-route')] public function __invoke(AirflowDagBridgeInterface $airflowDagBridge): Response { // ... build your $config array with your export parameters $dagRunStatus = $airflowDagBridge->requestNewExportFile($config); // or if you passed an array of dagIds like "name:dag-id,anotherName:another-dag-id" $dagRunStatus = $airflowDagBridge->requestNewExportFile($config, 'anotherName'); } }
Authorized parameters for the export request are:
format
: format expected for the export file (eg: "csv", "xls")export
: data type for your export (eg: 'pickup', 'producer', ...)search
: array of filters you want to apply for data included in your export fileextra
: array of data that you want to use or pass throughout all the export process (e.g. an email to notify on success)raw
: array of data only sent and used in request (eg: initial filters before convertion)
No other configuration parameters are considered valid.
Once the export file has been requested on Airflow, the bundle uses a Scheduler recurring message to check every 30 seconds if the file has been successfully created, with its own handler.
Then, the bundle dispatch using Symfony Messenger component a Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted
message with a filename
property containing the now available file on S3
You will have to implement the handler for the Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted
message with your own logic inside your project.
To run the Scheduler transport used by this bundle, do not forget to run the following command (only for Symfony version >= 6.4)
bin/console messenger:consume scheduler_airflow_dag_run
Messages
The bundle implements 2 different messages that need to be consumed :
- a
Bluspark\AirflowDagRunBundle\Message\DagRunChecker
message meant to be consumed by thescheduler_airflow_dag_run
Schedule (provided by the bundle) - a
Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted
message meant to be consumed by the Messenger transport of your choice in your project
License
This project is licensed under the CeCILL-B License - see the LICENSE file for details.
Sponsors
Bluspark is a Saas application to operate infrastructure of agglomerations and cities. It is a complete solution to manage the life cycle of your infrastructure, from the design to the maintenance.
The digital SaaS platform for financing and managing energy renovation aid