diff --git a/src/Helper/Iterators/SearchHitIterator.php b/src/Helper/Iterators/SearchHitIterator.php new file mode 100644 index 000000000..06606f497 --- /dev/null +++ b/src/Helper/Iterators/SearchHitIterator.php @@ -0,0 +1,166 @@ +searchResponses = $searchResponses; + } + + /** + * Rewinds the internal SearchResponseIterator and itself + * + * @return void + * @throws ClientResponseException + * @throws ServerResponseException + * @see Iterator::rewind() + */ + public function rewind(): void + { + $this->currentKey = 0; + $this->searchResponses->rewind(); + + // The first page may be empty. In that case, the next page is fetched. + $currentPage = $this->searchResponses->current(); + if ($this->searchResponses->valid() && empty($currentPage['hits']['hits'])) { + $this->searchResponses->next(); + } + + $this->count = 0; + if (isset($currentPage['hits']['total']['value'], $currentPage['hits']['total'])) { + $this->count = $currentPage['hits']['total']['value'] ?? $currentPage['hits']['total']; + } + + $this->readPageData(); + } + + /** + * Advances pointer of the current hit to the next one in the current page. If there + * isn't a next hit in the current page, then it advances the current page and moves the + * pointer to the first hit in the page. + * + * @return void + * @throws ClientResponseException + * @throws ServerResponseException + * @see Iterator::next() + */ + public function next(): void + { + $this->currentKey++; + $this->currentHitIndex++; + $currentPage = $this->searchResponses->current(); + if (isset($currentPage['hits']['hits'][$this->currentHitIndex])) { + $this->currentHitData = $currentPage['hits']['hits'][$this->currentHitIndex]; + } else { + $this->searchResponses->next(); + $this->readPageData(); + } + } + + /** + * Returns a boolean indicating whether or not the current pointer has valid data + * + * @return bool + * @see Iterator::valid() + */ + public function valid(): bool + { + return is_array($this->currentHitData); + } + + /** + * Returns the current hit + * + * @return array + * @see Iterator::current() + */ + public function current(): array + { + return $this->currentHitData; + } + + /** + * Returns the current hit index. The hit index spans all pages. + * + * @return int + * @see Iterator::key() + */ + public function key(): int + { + return $this->currentKey; + } + + /** + * Advances the internal SearchResponseIterator and resets the currentHitIndex to 0 + * + * @internal + */ + private function readPageData(): void + { + if ($this->searchResponses->valid()) { + $currentPage = $this->searchResponses->current(); + $this->currentHitIndex = 0; + $this->currentHitData = $currentPage['hits']['hits'][$this->currentHitIndex]; + } else { + $this->currentHitData = null; + } + } + + /** + * {@inheritDoc} + */ + public function count(): int + { + return $this->count; + } +} \ No newline at end of file diff --git a/src/Helper/Iterators/SearchResponseIterator.php b/src/Helper/Iterators/SearchResponseIterator.php new file mode 100644 index 000000000..58936350a --- /dev/null +++ b/src/Helper/Iterators/SearchResponseIterator.php @@ -0,0 +1,192 @@ +client = $client; + $this->params = $searchParams; + + if (isset($searchParams['scroll'])) { + $this->scrollTtl = $searchParams['scroll']; + } + } + + /** + * Destructor + * + * @throws ClientResponseException + * @throws ServerResponseException + */ + public function __destruct() + { + $this->clearScroll(); + } + + /** + * Sets the time to live duration of a scroll window + * + * @param string $timeToLive + * @return $this + */ + public function setScrollTimeout(string $timeToLive): SearchResponseIterator + { + $this->scrollTtl = $timeToLive; + return $this; + } + + /** + * Clears the current scroll window if there is a scroll_id stored + * + * @return void + * @throws ClientResponseException + * @throws ServerResponseException + */ + private function clearScroll(): void + { + if (!empty($this->scrollId)) { + /* @phpstan-ignore-next-line */ + $this->client->clearScroll( + [ + 'body' => [ + 'scroll_id' => $this->scrollId + ], + 'client' => [ + 'ignore' => 404 + ] + ] + ); + $this->scrollId = null; + } + } + + /** + * Rewinds the iterator by performing the initial search. + * + * @return void + * @throws ClientResponseException + * @throws ServerResponseException + * @see Iterator::rewind() + */ + public function rewind(): void + { + $this->clearScroll(); + $this->currentKey = 0; + /* @phpstan-ignore-next-line */ + $this->currentScrolledResponse = $this->client->search($this->params)->asArray(); + $this->scrollId = $this->currentScrolledResponse['_scroll_id']; + } + + /** + * Fetches every "page" after the first one using the lastest "scroll_id" + * + * @return void + * @throws ClientResponseException + * @throws ServerResponseException + * @see Iterator::next() + */ + public function next(): void + { + /* @phpstan-ignore-next-line */ + $this->currentScrolledResponse = $this->client->scroll( + [ + 'body' => [ + 'scroll_id' => $this->scrollId, + 'scroll' => $this->scrollTtl + ] + ] + )->asArray(); + $this->scrollId = $this->currentScrolledResponse['_scroll_id']; + $this->currentKey++; + } + + /** + * Returns a boolean value indicating if the current page is valid or not + * + * @return bool + * @see Iterator::valid() + */ + public function valid(): bool + { + return isset($this->currentScrolledResponse['hits']['hits'][0]); + } + + /** + * Returns the current "page" + * + * @return array + * @see Iterator::current() + */ + public function current(): array + { + return $this->currentScrolledResponse; + } + + /** + * Returns the current "page number" of the current "page" + * + * @return int + * @see Iterator::key() + */ + public function key(): int + { + return $this->currentKey; + } +} \ No newline at end of file diff --git a/tests/Helper/Iterators/SearchHitIteratorTest.php b/tests/Helper/Iterators/SearchHitIteratorTest.php new file mode 100644 index 000000000..0fd5ed388 --- /dev/null +++ b/tests/Helper/Iterators/SearchHitIteratorTest.php @@ -0,0 +1,150 @@ +searchResponse = Mockery::mock(SearchResponseIterator::class); + } + + public function tearDown(): void + { + Mockery::close(); + } + + public function testWithNoResults(): void + { + $searchHit = new SearchHitIterator($this->searchResponse); + $this->assertCount(0, $searchHit); + } + + public function testWithHits(): void + { + $this->searchResponse->shouldReceive('rewind') + ->once() + ->ordered(); + + $this->searchResponse->shouldReceive('current') + ->andReturn( + [ + 'hits' => [ + 'hits' => [ + [ 'foo' => 'bar0' ], + [ 'foo' => 'bar1' ], + [ 'foo' => 'bar2' ] + ], + 'total' => [ + 'value' => 3, + 'relation' => 'eq' + ] + ] + ], + [ + 'hits' => [ + 'hits' => [ + [ 'foo' => 'bar0' ], + [ 'foo' => 'bar1' ], + [ 'foo' => 'bar2' ] + ], + 'total' => [ + 'value' => 3, + 'relation' => 'eq' + ] + ] + ], + [ + 'hits' => [ + 'hits' => [ + [ 'foo' => 'bar0' ], + [ 'foo' => 'bar1' ], + [ 'foo' => 'bar2' ] + ], + 'total' => [ + 'value' => 3, + 'relation' => 'eq' + ] + ] + ], + [ + 'hits' => [ + 'hits' => [ + [ 'foo' => 'bar0' ], + [ 'foo' => 'bar1' ], + [ 'foo' => 'bar2' ] + ], + 'total' => [ + 'value' => 3, + 'relation' => 'eq' + ] + ] + ], + [ + 'hits' => [ + 'hits' => [ + [ 'foo' => 'bar3' ], + [ 'foo' => 'bar4' ] + ], + 'total' => [ + 'value' => 2, + 'relation' => 'eq' + ] + ] + ], + [ + 'hits' => [ + 'hits' => [ + [ 'foo' => 'bar3' ], + [ 'foo' => 'bar4' ] + ], + 'total' => [ + 'value' => 2, + 'relation' => 'eq' + ] + ] + ] + ); + + $this->searchResponse->shouldReceive('valid') + ->andReturn(true, true, true, false); + + $this->searchResponse->shouldReceive('next') + ->times(2) + ->ordered(); + + $responses = new SearchHitIterator($this->searchResponse); + $i = 0; + foreach ($responses as $key => $value) { + $this->assertEquals($i, $key); + $this->assertEquals("bar$i", $value['foo']); + $i++; + } + } +} \ No newline at end of file diff --git a/tests/Helper/Iterators/SearchResponseIteratorTest.php b/tests/Helper/Iterators/SearchResponseIteratorTest.php new file mode 100644 index 000000000..5b8814e04 --- /dev/null +++ b/tests/Helper/Iterators/SearchResponseIteratorTest.php @@ -0,0 +1,199 @@ +createStream(json_encode($data)); + $response = (new Psr17Factory())->createResponse(200)->withBody($body) + ->withHeader('X-Elastic-Product', 'Elasticsearch') + ->withHeader('Content-Type', 'application/json'); + $elasticsearch->setResponse($response); + + return $elasticsearch; + } + + public function testWithNoResults(): void + { + $search_params = [ + 'scroll' => '5m', + 'index' => 'twitter', + 'size' => 1000, + 'body' => [ + 'query' => [ + 'match_all' => new \stdClass + ] + ] + ]; + + $mock_client = m::mock(ClientInterface::class); + + $mock_client->shouldReceive('search') + ->twice() + ->with($search_params) + ->andReturn($this->elasticsearchResponse(['_scroll_id' => 'scroll_id_01'])); + + $mock_client->shouldReceive('clearScroll') + ->twice() + ->withAnyArgs(); + + $responses = new SearchResponseIterator($mock_client, $search_params); + + $this->assertCount(0, $responses); + } + + public function testWithHits(): void + { + $search_params = [ + 'scroll' => '5m', + 'index' => 'twitter', + 'size' => 1000, + 'body' => [ + 'query' => [ + 'match_all' => new \stdClass + ] + ] + ]; + + $mock_client = m::mock(ClientInterface::class); + + $mock_client->shouldReceive('search') + ->once() + ->ordered() + ->with($search_params) + ->andReturn($this->elasticsearchResponse([ + '_scroll_id' => 'scroll_id_01', + 'hits' => [ + 'hits' => [ + [ + 'foo' => 'bar' + ] + ] + ] + ]) + ); + + $mock_client->shouldReceive('scroll') + ->once() + ->ordered() + ->with( + [ + 'body' => [ + 'scroll_id' => 'scroll_id_01', + 'scroll' => '5m' + ] + ] + ) + ->andReturn($this->elasticsearchResponse([ + '_scroll_id' => 'scroll_id_02', + 'hits' => [ + 'hits' => [ + [ + 'foo' => 'bar' + ] + ] + ] + ]) + ); + + $mock_client->shouldReceive('scroll') + ->once() + ->ordered() + ->with( + [ + 'body' => [ + 'scroll_id' => 'scroll_id_02', + 'scroll' => '5m' + ] + ] + ) + ->andReturn($this->elasticsearchResponse([ + '_scroll_id' => 'scroll_id_03', + 'hits' => [ + 'hits' => [ + [ + 'foo' => 'bar' + ] + ] + ] + ]) + ); + + $mock_client->shouldReceive('scroll') + ->once() + ->ordered() + ->with( + [ + 'body' => [ + 'scroll_id' => 'scroll_id_03', + 'scroll' => '5m' + ] + ] + ) + ->andReturn($this->elasticsearchResponse( + [ + '_scroll_id' => 'scroll_id_04', + 'hits' => [ + 'hits' => [] + ] + ]) + ); + + $mock_client->shouldReceive('scroll') + ->never() + ->with( + [ + 'body' => [ + 'scroll_id' => 'scroll_id_04', + 'scroll' => '5m' + ] + ] + ); + + $mock_client->shouldReceive('clearScroll') + ->once() + ->ordered() + ->withAnyArgs(); + + $responses = new SearchResponseIterator($mock_client, $search_params); + $count = 0; + $i = 0; + foreach ($responses as $response) { + $count += count($response['hits']['hits']); + $this->assertEquals($response['_scroll_id'], sprintf("scroll_id_%02d", ++$i)); + } + $this->assertEquals(3, $count); + } +} \ No newline at end of file