PHP Classes

File: src/Runner.php

Recommend this page to a friend!
  Classes of Rodolfo Berrios Arce   Workflow   src/Runner.php   Download  
File: src/Runner.php
Role: Class source
Content type: text/plain
Description: Class source
Class: Workflow
Create and run action workflows
Author: By
Last change:
Date: 1 month ago
Size: 5,409 bytes
 

Contents

Class file image Download
<?php

/*
 * This file is part of Chevere.
 *
 * (c) Rodolfo Berrios <[email protected]>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

declare(strict_types=1);

namespace
Chevere\Workflow;

use
Amp\Parallel\Worker\Execution;
use
Chevere\Parameter\Interfaces\CastInterface;
use
Chevere\Workflow\Exceptions\RunnerException;
use
Chevere\Workflow\Interfaces\JobInterface;
use
Chevere\Workflow\Interfaces\ResponseReferenceInterface;
use
Chevere\Workflow\Interfaces\RunInterface;
use
Chevere\Workflow\Interfaces\RunnerInterface;
use
Chevere\Workflow\Interfaces\VariableInterface;
use
OutOfBoundsException;
use
Throwable;
use function
Amp\Future\await;
use function
Amp\Parallel\Worker\submit;
use function
Chevere\Parameter\cast;

final class
Runner implements RunnerInterface
{
    public function
__construct(
        private
RunInterface $run,
    ) {
    }

    public function
run(): RunInterface
   
{
        return
$this->run;
    }

    public function
withRun(): RunnerInterface
   
{
       
$new = clone $this;
       
$jobs = $new->run->workflow()->jobs();
       
$graph = $jobs->graph()->toArray();
        foreach (
$graph as $node) {
            if (
count($node) === 1) {
               
$runner = runnerForJob($new, $node[0]);
               
$new->merge($new, $runner);

                continue;
            }
           
$executions = $new->getExecutions($node);
           
/** @var RunnerInterface[] $responses */
           
$responses = await(array_map(
                fn (
Execution $e) => $e->getFuture(),
               
$executions,
            ));
            foreach (
$responses as $runner) {
               
$new->merge($new, $runner);
            }
        }

        return
$new;
    }

    public function
withRunJob(string $name): RunnerInterface
   
{
       
$new = clone $this;
       
$job = $new->run()->workflow()->jobs()->get($name);
        foreach (
$job->runIf() as $runIf) {
            if (
$new->getRunIfCondition($runIf) === false) {
               
$new->addJobSkip($name);

                return
$new;
            }
        }
        foreach (
$job->dependencies() as $dependency) {
            try {
               
$new->run()->response($dependency);
            } catch (
OutOfBoundsException) {
               
$new->addJobSkip($name);

                return
$new;
            }
        }
       
$arguments = $new->getJobArguments($job);
       
$action = $job->action();

        try {
           
$response = cast($action(...$arguments));
        } catch (
Throwable $e) {
            throw new
RunnerException(
               
name: $name,
               
job: $job,
               
throwable: $e,
            );
        }
       
$new->addJobResponse($name, $response);

        return
$new;
    }

    private function
getRunIfCondition(VariableInterface|ResponseReferenceInterface $runIf): bool
   
{
       
/** @var boolean */
       
return $runIf instanceof VariableInterface
               
? $this->run->arguments()->required($runIf->__toString())->bool()
                :
$this->run->response($runIf->job())->array()[$runIf->key()];
    }

   
/**
     * @return array<string, mixed>
     */
   
private function getJobArguments(JobInterface $job): array
    {
       
$arguments = [];
        foreach (
$job->arguments() as $name => $value) {
           
$isResponseReference = $value instanceof ResponseReferenceInterface;
           
$isVariable = $value instanceof VariableInterface;
            if (! (
$isResponseReference || $isVariable)) {
               
$arguments[$name] = $value;

                continue;
            }
            if (
$isVariable) {
               
/** @var VariableInterface $value */
               
$arguments[$name] = $this->run->arguments()
                    ->
get($value->__toString());

                continue;
            }
           
/** @var ResponseReferenceInterface $value */
           
if ($value->key() !== null) {
               
$arguments[$name] = $this->run->response($value->job())->array()[$value->key()];

                continue;
            }
           
$arguments[$name] = $this->run->response($value->job())->mixed();
        }

        return
$arguments;
    }

    private function
addJobResponse(string $name, CastInterface $response): void
   
{
       
$this->run = $this->run->withResponse($name, $response);
    }

    private function
addJobSkip(string $name): void
   
{
        if (
$this->run->skip()->contains($name)) {
            return;
        }
       
$this->run = $this->run->withSkip($name);
    }

   
/**
     * @param array<string> $queue
     * @return array<Execution<mixed, never, never>>
     */
   
private function getExecutions(array $queue): array
    {
       
$return = [];
        foreach (
$queue as $job) {
           
$return[] = submit(
                new
CallableTask(
                   
'Chevere\\Workflow\\runnerForJob',
                   
$this,
                   
$job,
                )
            );
        }

        return
$return;
    }

    private function
merge(self $self, RunnerInterface $runner): void
   
{
        foreach (
$runner->run() as $name => $response) {
           
$self->addJobResponse($name, $response);
        }
        foreach (
$runner->run()->skip() as $name) {
           
$self->addJobSkip($name);
        }
    }
}