Files
wp-multisite-waas/inc/class-async-calls.php
David Stone a815fdf179 Prep Plugin for release on WordPress.org
Escape everything that should be escaped.
Add nonce checks where needed.
Sanitize all inputs.
Apply Code style changes across the codebase.
Correct many deprecation notices.
Optimize load order of many filters.
2025-04-07 09:15:21 -06:00

235 lines
5.1 KiB
PHP

<?php
/**
* WP Multisite WaaS Async Calls implementation.
*
* @package WP_Ultimo
* @subpackage Async_Calls
* @since 2.0.7
*/
namespace WP_Ultimo;
use Amp\Iterator;
use Amp\Sync\LocalSemaphore;
use Amp\Sync\ConcurrentIterator;
use Amp\Http\Client\Request;
use Amp\Http\Client\Connection\DefaultConnectionPool;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Http\Client\HttpClientBuilder;
// Exit if accessed directly
defined('ABSPATH') || exit;
/**
* WP Multisite WaaS Async Calls implementation.
*
* @since 2.0.7
*/
class Async_Calls {
/**
* Keeps a registry of the listeners.
*
* @var array
*/
public static $registry = [];
/**
* Registers a new listener.
*
* @since 2.0.7
* @param string $id The id of the listener.
* @param callable $callback A callback to be run.
* @param mixed ...$args Arguments to be passed to the callback.
*
* @return void
*/
public static function register_listener($id, $callback, ...$args): void {
self::$registry[ $id ] = [
'callable' => $callback,
'args' => $args,
];
}
/**
* Install the registered listeners.
*
* @since 2.0.7
* @return void
*/
public static function install_listeners(): void {
foreach (self::$registry as $id => $listener) {
add_action(
"wp_ajax_wu_async_call_listener_{$id}",
function () use ($listener) {
try {
$results = call_user_func_array($listener['callable'], $listener['args']);
} catch (\Throwable $th) {
wp_send_json_error($th);
}
wp_send_json_success($results);
exit;
}
);
}
}
/**
* Build the base URL for the listener calls.
*
* @since 2.0.7
*
* @param string $id The listener id.
* @param array $args The additional args passed to the URL.
* @return string
*/
public static function build_base_url($id, $args) {
return add_query_arg($args, admin_url('admin-ajax.php'));
}
/**
* Build the final URL to be called.
*
* @since 2.0.7
*
* @param string $id The listener id.
* @param int $total The total number of records.
* @param int $chunk_size The chunk size.
* @param array $args Additional arguments to be passed.
* @return array The list of paginates URLs to call.
*/
public static function build_url_list($id, $total, $chunk_size, $args = []) {
$pages = ceil($total / $chunk_size);
$urls = [];
for ($i = 1; $i <= $pages; $i++) {
$urls[] = self::build_base_url(
$id,
array_merge(
$args,
[
'action' => "wu_async_call_listener_$id",
'parallel' => 1,
'page' => $i,
'per_page' => $chunk_size,
]
)
);
}
return $urls;
}
/**
* Builds and returns the client that will handle the calls.
*
* @since 2.0.7
* @return \Amp\Http\Client\HttpClient;
*/
public static function get_client() {
$client_tls_context = new ClientTlsContext('');
$connect_base_context = new ConnectContext();
$tls_context = $client_tls_context->withoutPeerVerification();
$connect_context = $connect_base_context->withTlsContext($tls_context);
$builder = new HttpClientBuilder();
return $builder->usingPool(new DefaultConnectionPool(null, $connect_context))->build();
}
/**
* Run the parallel queue after everything is correctly enqueued.
*
* @since 2.0.7
*
* @param string $id The listener id.
* @param array $args Additional arguments to be passed.
* @param int $total The total number of records.
* @param integer $chunk_size The chunk size to use.
* @param integer $parallel_threads The number of parallel threads to be run.
* @return true|\WP_Error
*/
public static function run($id, $args, $total, $chunk_size = 10, $parallel_threads = 3) {
$client = self::get_client();
$urls = self::build_url_list($id, $total, $chunk_size, $args);
$coroutine = \Amp\call(
static function () use ($id, $total, $chunk_size, $parallel_threads, $client, $urls) {
$results = [];
$chunker = new LocalSemaphore($parallel_threads);
yield ConcurrentIterator\each(
Iterator\fromIterable($urls),
$chunker,
function ($url) use (&$results, $client) {
try {
$request = new Request($url);
$request->setTcpConnectTimeout(1000 * 1000);
$request->setTlsHandshakeTimeout(1000 * 1000);
$request->setTransferTimeout(1000 * 1000);
$request->setHeader('cookie', wu_get_isset($_SERVER, 'HTTP_COOKIE', ''));
$response = yield $client->request($request);
$body = yield $response->getBody()->buffer();
$results[ $url ] = json_decode($body);
} catch (\Throwable $e) {
throw $e;
}
}
);
return self::condense_results($results); // phpcs:ignore
}
);
$responses = \Amp\Promise\wait($coroutine);
return $responses;
}
/**
* Condense multiple results into one single result.
*
* @since 2.0.7
*
* @param array $results The different results returned by multiple calls.
* @return true|\WP_Error
*/
public static function condense_results($results) {
foreach ($results as $result) {
$status = wu_get_isset($result, 'success', false);
if (false === $status) {
return $result;
}
}
return true;
}
}