import { OnQueueEvent, Processor, WorkerHost } from '@nestjs/bullmq'; import { Job } from 'bullmq'; import { PinoLogger } from 'nestjs-pino'; import { GoogleSearchContext } from 'src/providers/contexts/google.search.context'; import { BookSearchResultDto } from 'src/providers/dto/book-search-result.dto'; import { ProvidersService } from 'src/providers/providers.service'; import { SeriesSubscriptionJobDto } from 'src/series/dto/series-subscription-job.dto'; import { LibraryService } from './library.service'; @Processor('library') export class LibraryConsumer extends WorkerHost { constructor( private readonly library: LibraryService, private readonly provider: ProvidersService, private readonly logger: PinoLogger, ) { super(); } async process(job: Job, token?: string): Promise { this.logger.info({ class: LibraryConsumer.name, method: this.process.name, job: job, msg: 'Started task on queue.', }); if (job.name == 'new_series') { const series: SeriesSubscriptionJobDto = job.data; const books = await this.search(job, series, null); let counter = 0; for (let book of books) { try { // Force the provider's series id to be set, so that we know which series this belongs. book.result.providerSeriesId = series.providerSeriesId; await this.library.addBook(book.result); } catch (err) { this.logger.error({ class: LibraryConsumer.name, method: this.process.name, book: book.result, score: book.score, msg: 'Failed to add book in background during adding series.', error: err, }); } finally { counter++; job.updateProgress(25 + 75 * counter / books.length); } } } else if (job.name == 'update_series') { const series: SeriesSubscriptionJobDto = job.data; const existingBooks = await this.library.findBooksFromSeries(series); const existingVolumes = existingBooks.map(b => b.volume); const lastPublishedBook = existingBooks.reduce((a, b) => a.publishedAt.getTime() > b.publishedAt.getTime() ? a : b); const books = await this.search(job, series, lastPublishedBook?.publishedAt); let counter = 0; for (let book of books) { if (existingVolumes.includes(book.result.volume)) { continue; } try { // Force the provider's series id to be set, so that we know which series this belongs. book.result.providerSeriesId = series.providerSeriesId; await this.library.addBook(book.result); } catch (err) { this.logger.error({ class: LibraryConsumer.name, method: this.process.name, book: book.result, score: book.score, msg: 'Failed to add book in background during series update.', error: err, }); } finally { counter++; job.updateProgress(25 + 75 * counter / books.length); } } } else { this.logger.warn({ class: LibraryConsumer.name, method: this.process.name, job: job, msg: 'Unknown job name found.', }); } this.logger.info({ class: LibraryConsumer.name, method: this.process.name, job: job, msg: 'Completed task on queue.', }); return null; } private async search(job: Job, series: SeriesSubscriptionJobDto, after: Date | null): Promise<{ result: BookSearchResultDto, score: number }[]> { let context = this.provider.generateSearchContext(series.provider, series.title) as GoogleSearchContext; context.maxResults = '40'; if (after) { context.orderBy = 'newest'; } // Search for the book(s) via the provider. // Up until end of results or after 3 unhelpful pages of results. let results = []; let related = []; let pageSearchedCount = 0; let unhelpfulResultsCount = 0; do { pageSearchedCount += 1; results = await this.provider.search(context); const potential = results.filter((r: BookSearchResultDto) => r.providerSeriesId == series.providerSeriesId || r.title == series.title && r.mediaType == series.mediaType); if (potential.length > 0) { related.push.apply(related, potential); } else { unhelpfulResultsCount += 1; } context = context.next(); job.updateProgress(pageSearchedCount * 5); } while (results.length >= context.maxResults && (!after || after < results[results.length - 1].publishedAt)); // Sort & de-duplicate the entries received. const books = related.map(book => this.toScore(book, series)) .sort((a, b) => a.result.volume - b.result.volume || b.score - a.score) .filter((_, index, arr) => index == 0 || arr[index - 1].result.volume != arr[index].result.volume); job.updateProgress(25); this.logger.debug({ class: LibraryConsumer.name, method: this.search.name, job: job, msg: 'Finished searching for book entries.', results: { pages: pageSearchedCount, related_entries: related.length, volumes: books.length, } }); return books; } @OnQueueEvent('failed') onFailed(job: Job, err: Error) { this.logger.error({ class: LibraryConsumer.name, method: this.onFailed.name, job: job, msg: 'A library job failed.', error: err, }); } @OnQueueEvent('paused') onPaused() { this.logger.info({ class: LibraryConsumer.name, method: this.onPaused.name, msg: 'Library jobs have been paused.', }); } @OnQueueEvent('resumed') onResume(job: Job) { this.logger.info({ class: LibraryConsumer.name, method: this.onResume.name, msg: 'Library jobs have resumed.', }); } @OnQueueEvent('waiting') onWaiting(jobId: number | string) { this.logger.info({ class: LibraryConsumer.name, method: this.onWaiting.name, msg: 'A library job is waiting...', }); } private toScore(book: BookSearchResultDto, series: SeriesSubscriptionJobDto): ({ result: BookSearchResultDto, score: number }) { if (!book) { return { result: null, score: -1, } } return { result: book, score: (!!book.providerSeriesId ? 50 : 0) + (book.title == series.title ? 25 : 0) + (book.url.startsWith('https://play.google.com/store/books/details?') ? 10 : 0), } } }