* This file is part of Chevere.
* (c) Rodolfo Berrios <[email protected]>
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
namespace Chevere\Workflow;
use Amp\Parallel\Worker\Execution;
use Chevere\Parameter\Interfaces\CastInterface;
use Chevere\Workflow\Exceptions\RunnerException;
use Chevere\Workflow\Interfaces\JobInterface;
use Chevere\Workflow\Interfaces\ResponseReferenceInterface;
use Chevere\Workflow\Interfaces\RunInterface;
use Chevere\Workflow\Interfaces\RunnerInterface;
use Chevere\Workflow\Interfaces\VariableInterface;
use OutOfBoundsException;
use Throwable;
use function Amp\Future\await;
use function Amp\Parallel\Worker\submit;
use function Chevere\Parameter\cast;
final class Runner implements RunnerInterface
public function __construct(
private RunInterface $run,
) {
public function run(): RunInterface
return $this->run;
public function withRun(): RunnerInterface
$new = clone $this;
$jobs = $new->run->workflow()->jobs();
$graph = $jobs->graph()->toArray();
foreach ($graph as $node) {
if (count($node) === 1) {
$runner = runnerForJob($new, $node[0]);
$new->merge($new, $runner);
$executions = $new->getExecutions($node);
/** @var RunnerInterface[] $responses */
$responses = await(array_map(
fn (Execution $e) => $e->getFuture(),
foreach ($responses as $runner) {
$new->merge($new, $runner);
return $new;
public function withRunJob(string $name): RunnerInterface
$new = clone $this;
$job = $new->run()->workflow()->jobs()->get($name);
foreach ($job->runIf() as $runIf) {
if ($new->getRunIfCondition($runIf) === false) {
return $new;
foreach ($job->dependencies() as $dependency) {
try {
} catch (OutOfBoundsException) {
return $new;
$arguments = $new->getJobArguments($job);
$action = $job->action();
try {
$response = cast($action(...$arguments));
} catch (Throwable $e) {
throw new RunnerException(
name: $name,
job: $job,
throwable: $e,
$new->addJobResponse($name, $response);
return $new;
private function getRunIfCondition(VariableInterface|ResponseReferenceInterface $runIf): bool
/** @var boolean */
return $runIf instanceof VariableInterface
? $this->run->arguments()->required($runIf->__toString())->bool()
: $this->run->response($runIf->job())->array()[$runIf->key()];
* @return array<string, mixed>
private function getJobArguments(JobInterface $job): array
$arguments = [];
foreach ($job->arguments() as $name => $value) {
$isResponseReference = $value instanceof ResponseReferenceInterface;
$isVariable = $value instanceof VariableInterface;
if (! ($isResponseReference || $isVariable)) {
$arguments[$name] = $value;
if ($isVariable) {
/** @var VariableInterface $value */
$arguments[$name] = $this->run->arguments()
/** @var ResponseReferenceInterface $value */
if ($value->key() !== null) {
$arguments[$name] = $this->run->response($value->job())->array()[$value->key()];
$arguments[$name] = $this->run->response($value->job())->mixed();
return $arguments;
private function addJobResponse(string $name, CastInterface $response): void
$this->run = $this->run->withResponse($name, $response);
private function addJobSkip(string $name): void
if ($this->run->skip()->contains($name)) {
$this->run = $this->run->withSkip($name);
* @param array<string> $queue
* @return array<Execution<mixed, never, never>>
private function getExecutions(array $queue): array
$return = [];
foreach ($queue as $job) {
$return[] = submit(
new CallableTask(
return $return;
private function merge(self $self, RunnerInterface $runner): void
foreach ($runner->run() as $name => $response) {
$self->addJobResponse($name, $response);
foreach ($runner->run()->skip() as $name) {