<?php
/*
* This file is part of Chevere.
*
* (c) Rodolfo Berrios <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
declare(strict_types=1);
namespace Chevere\Workflow;
use Amp\Parallel\Worker\Execution;
use Chevere\Parameter\Interfaces\CastInterface;
use Chevere\Workflow\Exceptions\RunnerException;
use Chevere\Workflow\Interfaces\JobInterface;
use Chevere\Workflow\Interfaces\ResponseReferenceInterface;
use Chevere\Workflow\Interfaces\RunInterface;
use Chevere\Workflow\Interfaces\RunnerInterface;
use Chevere\Workflow\Interfaces\VariableInterface;
use OutOfBoundsException;
use Throwable;
use function Amp\Future\await;
use function Amp\Parallel\Worker\submit;
use function Chevere\Parameter\cast;
final class Runner implements RunnerInterface
{
public function __construct(
private RunInterface $run,
) {
}
public function run(): RunInterface
{
return $this->run;
}
public function withRun(): RunnerInterface
{
$new = clone $this;
$jobs = $new->run->workflow()->jobs();
$graph = $jobs->graph()->toArray();
foreach ($graph as $node) {
if (count($node) === 1) {
$runner = runnerForJob($new, $node[0]);
$new->merge($new, $runner);
continue;
}
$executions = $new->getExecutions($node);
/** @var RunnerInterface[] $responses */
$responses = await(array_map(
fn (Execution $e) => $e->getFuture(),
$executions,
));
foreach ($responses as $runner) {
$new->merge($new, $runner);
}
}
return $new;
}
public function withRunJob(string $name): RunnerInterface
{
$new = clone $this;
$job = $new->run()->workflow()->jobs()->get($name);
foreach ($job->runIf() as $runIf) {
if ($new->getRunIfCondition($runIf) === false) {
$new->addJobSkip($name);
return $new;
}
}
foreach ($job->dependencies() as $dependency) {
try {
$new->run()->response($dependency);
} catch (OutOfBoundsException) {
$new->addJobSkip($name);
return $new;
}
}
$arguments = $new->getJobArguments($job);
$action = $job->action();
try {
$response = cast($action(...$arguments));
} catch (Throwable $e) {
throw new RunnerException(
name: $name,
job: $job,
throwable: $e,
);
}
$new->addJobResponse($name, $response);
return $new;
}
private function getRunIfCondition(VariableInterface|ResponseReferenceInterface $runIf): bool
{
/** @var boolean */
return $runIf instanceof VariableInterface
? $this->run->arguments()->required($runIf->__toString())->bool()
: $this->run->response($runIf->job())->array()[$runIf->key()];
}
/**
* @return array<string, mixed>
*/
private function getJobArguments(JobInterface $job): array
{
$arguments = [];
foreach ($job->arguments() as $name => $value) {
$isResponseReference = $value instanceof ResponseReferenceInterface;
$isVariable = $value instanceof VariableInterface;
if (! ($isResponseReference || $isVariable)) {
$arguments[$name] = $value;
continue;
}
if ($isVariable) {
/** @var VariableInterface $value */
$arguments[$name] = $this->run->arguments()
->get($value->__toString());
continue;
}
/** @var ResponseReferenceInterface $value */
if ($value->key() !== null) {
$arguments[$name] = $this->run->response($value->job())->array()[$value->key()];
continue;
}
$arguments[$name] = $this->run->response($value->job())->mixed();
}
return $arguments;
}
private function addJobResponse(string $name, CastInterface $response): void
{
$this->run = $this->run->withResponse($name, $response);
}
private function addJobSkip(string $name): void
{
if ($this->run->skip()->contains($name)) {
return;
}
$this->run = $this->run->withSkip($name);
}
/**
* @param array<string> $queue
* @return array<Execution<mixed, never, never>>
*/
private function getExecutions(array $queue): array
{
$return = [];
foreach ($queue as $job) {
$return[] = submit(
new CallableTask(
'Chevere\\Workflow\\runnerForJob',
$this,
$job,
)
);
}
return $return;
}
private function merge(self $self, RunnerInterface $runner): void
{
foreach ($runner->run() as $name => $response) {
$self->addJobResponse($name, $response);
}
foreach ($runner->run()->skip() as $name) {
$self->addJobSkip($name);
}
}
}
|