Skip to content

Commit

Permalink
Merge pull request #143 from stfndamjanovic/main
Browse files Browse the repository at this point in the history
Add queue jobs check
  • Loading branch information
freekmurze authored Dec 28, 2022
2 parents e3a622a + 3bd1994 commit cf70f34
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 0 deletions.
81 changes: 81 additions & 0 deletions docs/available-checks/queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
title: Queue
weight: 18
---

This check will make sure that queue jobs are running. If the check detects that the queue job is not to run for more than five minutes, it will fail.

This check relies on cache.

## Usage

First, you must register the `QueueCheck`

```php
use Spatie\Health\Facades\Health;
use Spatie\Health\Checks\Checks\QueueCheck;

Health::checks([
QueueCheck::new(),
]);
```

Next, you must schedule the `Spatie\Health\Commands\QueueCheckHeartbeatCommand` to run every minute. We recommend putting this command as the very last command in your schedule.

```php
// in app/Console/Kernel.php
use \Spatie\Health\Commands\QueueCheckHeartbeatCommand;

public function schedule(Schedule $schedule) {
// your other commands

$schedule->command(QueueCheckHeartbeatCommand::class)->everyMinute();
}
```

### Tracking different queues

By default, the package will track the default queue. If you want, you can specify which queue you want to track.

```php
use Spatie\Health\Facades\Health;
use Spatie\Health\Checks\Checks\QueueCheck;

Health::checks([
QueueCheck::new()->onQueue('email'),
]);
```

Also, you can specify more than one queue that you want to track.

```php
use Spatie\Health\Facades\Health;
use Spatie\Health\Checks\Checks\QueueCheck;

Health::checks([
QueueCheck::new()->onQueue(['email', 'payment']),
]);
```

### Customize the cache store

This check relies on cache to work. We highly recommend creating a [new cache store](https://laravel.com/docs/8.x/cache#configuration) and pass its name to `useCacheStore`

```php
use Spatie\Health\Facades\Health;
use Spatie\Health\Checks\Checks\QueueCheck;

Health::checks([
QueueCheck::new()->useCacheStore('your-custom-store-name'),
]);
```

### Customizing the maximum heart beat age

The `QueueCheckHeartbeatCommand` will write the current timestamp into the cache. The `QueueCheck` will verify that that timestamp is not over 5 minutes.

Should you get too many false positives, you can change the max age of the timestamp by calling `heartbeatMaxAgeInMinutes`.

```php
QueueCheck::new()->heartbeatMaxAgeInMinutes(10),
```
99 changes: 99 additions & 0 deletions src/Checks/Checks/QueueCheck.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php

namespace Spatie\Health\Checks\Checks;

use Carbon\Carbon;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\Arr;
use Spatie\Health\Checks\Check;
use Spatie\Health\Checks\Result;

class QueueCheck extends Check
{
protected ?string $cacheKey = 'health:checks:queue:latestHeartbeatAt';

protected ?string $cacheStoreName = null;

protected int $heartbeatMaxAgeInMinutes = 5;

protected ?array $onQueues;

public function useCacheStore(string $cacheStoreName): self
{
$this->cacheStoreName = $cacheStoreName;

return $this;
}

public function getCacheStoreName(): string
{
return $this->cacheStoreName ?? config('cache.default');
}

public function cacheKey(string $cacheKey): self
{
$this->cacheKey = $cacheKey;

return $this;
}

public function heartbeatMaxAgeInMinutes(int $heartbeatMaxAgeInMinutes): self
{
$this->heartbeatMaxAgeInMinutes = $heartbeatMaxAgeInMinutes;

return $this;
}

public function getCacheKey(string $queue): string
{
return "{$this->cacheKey}.{$queue}";
}

public function onQueue(array|string $queue): self
{
$this->onQueues = array_unique(Arr::wrap($queue));

return $this;
}

public function getQueues(): array
{
return $this->onQueues ?? [$this->getDefaultQueue(config('queue.driver'))];
}

protected function getDefaultQueue($connection)
{
return config("queue.connections.{$connection}.queue", 'default');
}

public function run(): Result
{
$fails = [];

foreach ($this->getQueues() as $queue) {
$lastHeartbeatTimestamp = cache()->store($this->cacheStoreName)->get($this->getCacheKey($queue));

if (! $lastHeartbeatTimestamp) {
$fails[] = "The `{$queue}` queue did not run yet.";
continue;
}

$latestHeartbeatAt = Carbon::createFromTimestamp($lastHeartbeatTimestamp);

$minutesAgo = $latestHeartbeatAt->diffInMinutes() + 1;

if ($minutesAgo > $this->heartbeatMaxAgeInMinutes) {
$fails[] = "The last run of the `{$queue}` queue was more than {$minutesAgo} minutes ago.";
}
}

$result = Result::make();

if (!empty($fails)) {
$result->meta($fails);
return $result->failed("Queue jobs running failed. Check meta for more information.");
}

return $result->ok();
}
}
34 changes: 34 additions & 0 deletions src/Commands/QueueCheckHeartbeatCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace Spatie\Health\Commands;

use Illuminate\Console\Command;
use Spatie\Health\Checks\Check;
use Spatie\Health\Checks\Checks\QueueCheck;
use Spatie\Health\Facades\Health;
use Spatie\Health\Jobs\HealthQueueJob;

class QueueCheckHeartbeatCommand extends Command
{
protected $signature = 'health:queue-check-heartbeat';

public function handle(): int
{
/** @var QueueCheck|null $queueCheck */
$queueCheck = Health::registeredChecks()->first(
fn (Check $check) => $check instanceof QueueCheck
);

if (! $queueCheck) {
$this->error("In order to use this command, you should register the `Spatie\Health\Checks\Checks\QueueCheck`");

return static::FAILURE;
}

foreach ($queueCheck->getQueues() as $queue) {
HealthQueueJob::dispatch($queueCheck)->onQueue($queue);
}

return static::SUCCESS;
}
}
2 changes: 2 additions & 0 deletions src/HealthServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Illuminate\Support\Facades\Route;
use Spatie\Health\Commands\ListHealthChecksCommand;
use Spatie\Health\Commands\QueueCheckHeartbeatCommand;
use Spatie\Health\Commands\RunHealthChecksCommand;
use Spatie\Health\Commands\ScheduleCheckHeartbeatCommand;
use Spatie\Health\Components\Logo;
Expand Down Expand Up @@ -31,6 +32,7 @@ public function configurePackage(Package $package): void
ListHealthChecksCommand::class,
RunHealthChecksCommand::class,
ScheduleCheckHeartbeatCommand::class,
QueueCheckHeartbeatCommand::class
);
}

Expand Down
27 changes: 27 additions & 0 deletions src/Jobs/HealthQueueJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Spatie\Health\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Spatie\Health\Checks\Checks\QueueCheck;

class HealthQueueJob implements ShouldQueue
{
use Queueable, Dispatchable;

protected QueueCheck $queueCheck;

public function __construct(QueueCheck $queueCheck)
{
$this->queueCheck = $queueCheck;
}

public function handle(): void
{
$cacheStore = $this->queueCheck->getCacheStoreName();

cache()->store($cacheStore)->set($this->queueCheck->getCacheKey($this->queue), now()->timestamp);
}
}
98 changes: 98 additions & 0 deletions tests/Checks/QueueCheckTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?php

use Spatie\Health\Facades\Health;
use function Spatie\PestPluginTestTime\testTime;
use Spatie\Health\Checks\Checks\QueueCheck;
use function Pest\Laravel\artisan;
use Spatie\Health\Commands\QueueCheckHeartbeatCommand;
use Spatie\Health\Enums\Status;
use Illuminate\Support\Facades\Queue;
use Spatie\Health\Jobs\HealthQueueJob;

beforeEach(function () {
$this->queueCheck = QueueCheck::new();

Health::checks([
QueueCheck::new(),
]);

testTime()->freeze();
});

it('can check whether the queue jobs are still running', function () {
artisan(QueueCheckHeartbeatCommand::class)->assertSuccessful();

$result = $this->queueCheck->run();
expect($result->status)->toBe(Status::ok());

testTime()->addMinutes(5)->subSecond();
$result = $this->queueCheck->run();
expect($result->status)->toBe(Status::ok());

testTime()->addSecond();
$result = $this->queueCheck->run();
expect($result->status)->toBe(Status::failed());
});

it('can use custom max age of the heartbeat for queue jobs', function () {
$this->queueCheck->heartbeatMaxAgeInMinutes(10);

artisan(QueueCheckHeartbeatCommand::class)->assertSuccessful();

$result = $this->queueCheck->run();
expect($result->status)->toBe(Status::ok());

testTime()->addMinutes(10)->subSecond();
$result = $this->queueCheck->run();
expect($result->status)->toBe(Status::ok());

testTime()->addSecond();
$result = $this->queueCheck->run();
expect($result->status)->toBe(Status::failed());
});

it('will fail if only one queue is not working', function () {
Health::clearChecks();

$queueCheck = QueueCheck::new()->onQueue('payment');

Health::checks([$queueCheck]);

artisan(QueueCheckHeartbeatCommand::class)->assertSuccessful();

$result = $queueCheck->run();
expect($result->status)->toBe(Status::ok());

$queueCheck->onQueue(['payment', 'email']);
$result = $queueCheck->run();
expect($result->status)->toBe(Status::failed());
});

it('can specify on which queue check should be performed', function () {
Queue::fake();

Health::clearChecks();

Health::checks([
QueueCheck::new()->onQueue('email'),
]);

artisan(QueueCheckHeartbeatCommand::class)->assertSuccessful();

Queue::assertPushedOn('email', HealthQueueJob::class);
});

it('can specify on which queues check should be performed', function () {
Queue::fake();

Health::clearChecks();

Health::checks([
QueueCheck::new()->onQueue(['email', 'payment']),
]);

artisan(QueueCheckHeartbeatCommand::class)->assertSuccessful();

Queue::assertPushedOn('email', HealthQueueJob::class);
Queue::assertPushedOn('payment', HealthQueueJob::class);
});

0 comments on commit cf70f34

Please sign in to comment.