diff --git a/server/apps/microservices/src/microservices.module.ts b/server/apps/microservices/src/microservices.module.ts index f1b0d641d2..5db6916c50 100644 --- a/server/apps/microservices/src/microservices.module.ts +++ b/server/apps/microservices/src/microservices.module.ts @@ -10,9 +10,9 @@ import { SearchIndexProcessor, StorageTemplateMigrationProcessor, ThumbnailGeneratorProcessor, + VideoTranscodeProcessor, } from './processors'; import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor'; -import { VideoTranscodeProcessor } from './processors/video-transcode.processor'; @Module({ imports: [ diff --git a/server/apps/microservices/src/processors.ts b/server/apps/microservices/src/processors.ts index 384bb76ca9..882c61aa67 100644 --- a/server/apps/microservices/src/processors.ts +++ b/server/apps/microservices/src/processors.ts @@ -153,3 +153,18 @@ export class ThumbnailGeneratorProcessor { await this.mediaService.handleGenerateWepbThumbnail(job.data); } } + +@Processor(QueueName.VIDEO_CONVERSION) +export class VideoTranscodeProcessor { + constructor(private mediaService: MediaService) {} + + @Process({ name: JobName.QUEUE_VIDEO_CONVERSION, concurrency: 1 }) + async onQueueVideoConversion(job: Job): Promise { + await this.mediaService.handleQueueVideoConversion(job.data); + } + + @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 }) + async onVideoConversion(job: Job) { + await this.mediaService.handleVideoConversion(job.data); + } +} diff --git a/server/apps/microservices/src/processors/video-transcode.processor.ts b/server/apps/microservices/src/processors/video-transcode.processor.ts deleted file mode 100644 index 067bd49cc2..0000000000 --- a/server/apps/microservices/src/processors/video-transcode.processor.ts +++ /dev/null @@ -1,184 +0,0 @@ -import { - IAssetJob, - IAssetRepository, - IBaseJob, - IJobRepository, - IStorageRepository, - JobName, - QueueName, - StorageCore, - StorageFolder, - SystemConfigFFmpegDto, - SystemConfigService, - WithoutProperty, -} from '@app/domain'; -import { AssetEntity, AssetType, TranscodePreset } from '@app/infra/entities'; -import { Process, Processor } from '@nestjs/bull'; -import { Inject, Logger } from '@nestjs/common'; -import { Job } from 'bull'; -import ffmpeg, { FfprobeData, FfprobeStream } from 'fluent-ffmpeg'; -import { join } from 'path'; - -@Processor(QueueName.VIDEO_CONVERSION) -export class VideoTranscodeProcessor { - readonly logger = new Logger(VideoTranscodeProcessor.name); - private storageCore = new StorageCore(); - - constructor( - @Inject(IAssetRepository) private assetRepository: IAssetRepository, - @Inject(IJobRepository) private jobRepository: IJobRepository, - private systemConfigService: SystemConfigService, - @Inject(IStorageRepository) private storageRepository: IStorageRepository, - ) {} - - @Process({ name: JobName.QUEUE_VIDEO_CONVERSION, concurrency: 1 }) - async handleQueueVideoConversion(job: Job): Promise { - try { - const { force } = job.data; - const assets = force - ? await this.assetRepository.getAll({ type: AssetType.VIDEO }) - : await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO); - for (const asset of assets) { - await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } }); - } - } catch (error: any) { - this.logger.error('Failed to queue video conversions', error.stack); - } - } - - @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 }) - async handleVideoConversion(job: Job) { - const { asset } = job.data; - - const encodedVideoPath = this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, asset.ownerId); - - this.storageRepository.mkdirSync(encodedVideoPath); - - const savedEncodedPath = join(encodedVideoPath, `${asset.id}.mp4`); - - await this.runVideoEncode(asset, savedEncodedPath); - } - - async runFFProbePipeline(asset: AssetEntity): Promise { - return new Promise((resolve, reject) => { - ffmpeg.ffprobe(asset.originalPath, (err, data) => { - if (err || !data) { - this.logger.error(`Cannot probe video ${err}`, 'runFFProbePipeline'); - reject(err); - } - - resolve(data); - }); - }); - } - - async runVideoEncode(asset: AssetEntity, savedEncodedPath: string): Promise { - const config = await this.systemConfigService.getConfig(); - const videoStream = await this.getVideoStream(asset); - - const transcode = await this.needsTranscoding(videoStream, config.ffmpeg); - if (transcode) { - //TODO: If video or audio are already the correct format, don't re-encode, copy the stream - return this.runFFMPEGPipeLine(asset, videoStream, savedEncodedPath); - } - } - - async needsTranscoding(videoStream: FfprobeStream, ffmpegConfig: SystemConfigFFmpegDto): Promise { - switch (ffmpegConfig.transcode) { - case TranscodePreset.ALL: - return true; - - case TranscodePreset.REQUIRED: - { - if (videoStream.codec_name !== ffmpegConfig.targetVideoCodec) { - return true; - } - } - break; - - case TranscodePreset.OPTIMAL: { - if (videoStream.codec_name !== ffmpegConfig.targetVideoCodec) { - return true; - } - - const config = await this.systemConfigService.getConfig(); - - const videoHeightThreshold = Number.parseInt(config.ffmpeg.targetResolution); - return !videoStream.height || videoStream.height > videoHeightThreshold; - } - } - return false; - } - - async getVideoStream(asset: AssetEntity): Promise { - const videoInfo = await this.runFFProbePipeline(asset); - - const videoStreams = videoInfo.streams.filter((stream) => { - return stream.codec_type === 'video'; - }); - - const longestVideoStream = videoStreams.sort((stream1, stream2) => { - const stream1Frames = Number.parseInt(stream1.nb_frames ?? '0'); - const stream2Frames = Number.parseInt(stream2.nb_frames ?? '0'); - return stream2Frames - stream1Frames; - })[0]; - - return longestVideoStream; - } - - async runFFMPEGPipeLine(asset: AssetEntity, videoStream: FfprobeStream, savedEncodedPath: string): Promise { - const config = await this.systemConfigService.getConfig(); - - const ffmpegOptions = [ - `-crf ${config.ffmpeg.crf}`, - `-preset ${config.ffmpeg.preset}`, - `-vcodec ${config.ffmpeg.targetVideoCodec}`, - `-acodec ${config.ffmpeg.targetAudioCodec}`, - // Makes a second pass moving the moov atom to the beginning of - // the file for improved playback speed. - `-movflags faststart`, - ]; - - if (!videoStream.height || !videoStream.width) { - this.logger.error('Height or width undefined for video stream'); - return; - } - - const streamHeight = videoStream.height; - const streamWidth = videoStream.width; - - const targetResolution = Number.parseInt(config.ffmpeg.targetResolution); - - let scaling = `-2:${targetResolution}`; - const shouldScale = Math.min(streamHeight, streamWidth) > targetResolution; - - const videoIsRotated = Math.abs(Number.parseInt(`${videoStream.rotation ?? 0}`)) === 90; - - if (streamHeight > streamWidth || videoIsRotated) { - scaling = `${targetResolution}:-2`; - } - - if (shouldScale) { - ffmpegOptions.push(`-vf scale=${scaling}`); - } - - return new Promise((resolve, reject) => { - ffmpeg(asset.originalPath) - .outputOptions(ffmpegOptions) - .output(savedEncodedPath) - .on('start', () => { - this.logger.log('Start Converting Video'); - }) - .on('error', (error) => { - this.logger.error(`Cannot Convert Video ${error}`); - reject(); - }) - .on('end', async () => { - this.logger.log(`Converting Success ${asset.id}`); - await this.assetRepository.save({ id: asset.id, encodedVideoPath: savedEncodedPath }); - resolve(); - }) - .run(); - }); - } -} diff --git a/server/libs/domain/src/media/media.repository.ts b/server/libs/domain/src/media/media.repository.ts index 9063ce5464..931b5b7a21 100644 --- a/server/libs/domain/src/media/media.repository.ts +++ b/server/libs/domain/src/media/media.repository.ts @@ -5,8 +5,26 @@ export interface ResizeOptions { format: 'webp' | 'jpeg'; } -export interface IMediaRepository { - resize(input: string, output: string, options: ResizeOptions): Promise; - extractVideoThumbnail(input: string, output: string, size: number): Promise; - extractThumbnailFromExif(input: string, output: string): Promise; +export interface VideoStreamInfo { + height: number; + width: number; + rotation: number; + codecName?: string; + codecType?: string; + frameCount: number; +} + +export interface VideoInfo { + streams: VideoStreamInfo[]; +} + +export interface IMediaRepository { + // image + extractThumbnailFromExif(input: string, output: string): Promise; + resize(input: string, output: string, options: ResizeOptions): Promise; + + // video + extractVideoThumbnail(input: string, output: string, size: number): Promise; + probe(input: string): Promise; + transcode(input: string, output: string, options: any): Promise; } diff --git a/server/libs/domain/src/media/media.service.spec.ts b/server/libs/domain/src/media/media.service.spec.ts index 332e444deb..93e0900d00 100644 --- a/server/libs/domain/src/media/media.service.spec.ts +++ b/server/libs/domain/src/media/media.service.spec.ts @@ -1,3 +1,4 @@ +import { AssetType, SystemConfigKey } from '@app/infra/entities'; import _ from 'lodash'; import { assetEntityStub, @@ -6,17 +7,21 @@ import { newJobRepositoryMock, newMediaRepositoryMock, newStorageRepositoryMock, + newSystemConfigRepositoryMock, + probeStub, } from '../../test'; import { IAssetRepository, WithoutProperty } from '../asset'; import { ICommunicationRepository } from '../communication'; import { IJobRepository, JobName } from '../job'; import { IStorageRepository } from '../storage'; +import { ISystemConfigRepository } from '../system-config'; import { IMediaRepository } from './media.repository'; import { MediaService } from './media.service'; describe(MediaService.name, () => { let sut: MediaService; let assetMock: jest.Mocked; + let configMock: jest.Mocked; let communicationMock: jest.Mocked; let jobMock: jest.Mocked; let mediaMock: jest.Mocked; @@ -24,11 +29,12 @@ describe(MediaService.name, () => { beforeEach(async () => { assetMock = newAssetRepositoryMock(); + configMock = newSystemConfigRepositoryMock(); communicationMock = newCommunicationRepositoryMock(); jobMock = newJobRepositoryMock(); mediaMock = newMediaRepositoryMock(); storageMock = newStorageRepositoryMock(); - sut = new MediaService(assetMock, communicationMock, jobMock, mediaMock, storageMock); + sut = new MediaService(assetMock, communicationMock, jobMock, mediaMock, storageMock, configMock); }); it('should be defined', () => { @@ -169,4 +175,106 @@ describe(MediaService.name, () => { expect(mediaMock.resize).toHaveBeenCalled(); }); }); + + describe('handleQueueVideoConversion', () => { + it('should queue all video assets', async () => { + assetMock.getAll.mockResolvedValue([assetEntityStub.video]); + + await sut.handleQueueVideoConversion({ force: true }); + + expect(assetMock.getAll).toHaveBeenCalledWith({ type: AssetType.VIDEO }); + expect(assetMock.getWithout).not.toHaveBeenCalled(); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.VIDEO_CONVERSION, + data: { asset: assetEntityStub.video }, + }); + }); + + it('should queue all video assets without encoded videos', async () => { + assetMock.getWithout.mockResolvedValue([assetEntityStub.video]); + + await sut.handleQueueVideoConversion({}); + + expect(assetMock.getAll).not.toHaveBeenCalled(); + expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.ENCODED_VIDEO); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.VIDEO_CONVERSION, + data: { asset: assetEntityStub.video }, + }); + }); + + it('should log an error', async () => { + assetMock.getAll.mockRejectedValue(new Error('database unavailable')); + + await sut.handleQueueVideoConversion({ force: true }); + + expect(assetMock.getAll).toHaveBeenCalled(); + }); + }); + + describe('handleVideoConversion', () => { + it('should log an error', async () => { + mediaMock.transcode.mockRejectedValue(new Error('unable to transcode')); + + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + + expect(storageMock.mkdirSync).toHaveBeenCalled(); + }); + + it('should transcode the longest stream', async () => { + mediaMock.probe.mockResolvedValue(probeStub.multiple); + + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + + expect(mediaMock.probe).toHaveBeenCalledWith('/original/path.ext'); + expect(configMock.load).toHaveBeenCalled(); + expect(storageMock.mkdirSync).toHaveBeenCalled(); + expect(mediaMock.transcode).toHaveBeenCalledWith( + '/original/path.ext', + 'upload/encoded-video/user-id/asset-id.mp4', + ['-crf 23', '-preset ultrafast', '-vcodec h264', '-acodec aac', '-movflags faststart'], + ); + }); + + it('should skip a video without any streams', async () => { + mediaMock.probe.mockResolvedValue(probeStub.empty); + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + expect(mediaMock.transcode).not.toHaveBeenCalled(); + }); + + it('should skip a video without any height', async () => { + mediaMock.probe.mockResolvedValue(probeStub.noHeight); + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + expect(mediaMock.transcode).not.toHaveBeenCalled(); + }); + + it('should transcode when set to all', async () => { + mediaMock.probe.mockResolvedValue(probeStub.multiple); + configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'all' }]); + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + expect(mediaMock.transcode).toHaveBeenCalledWith( + '/original/path.ext', + 'upload/encoded-video/user-id/asset-id.mp4', + ['-crf 23', '-preset ultrafast', '-vcodec h264', '-acodec aac', '-movflags faststart'], + ); + }); + + it('should transcode when optimal and too big', async () => { + mediaMock.probe.mockResolvedValue(probeStub.tooBig); + configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'optimal' }]); + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + expect(mediaMock.transcode).toHaveBeenCalledWith( + '/original/path.ext', + 'upload/encoded-video/user-id/asset-id.mp4', + ['-crf 23', '-preset ultrafast', '-vcodec h264', '-acodec aac', '-movflags faststart', '-vf scale=-2:720'], + ); + }); + + it('should not transcode an invalid transcode value', async () => { + mediaMock.probe.mockResolvedValue(probeStub.tooBig); + configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'invalid' }]); + await sut.handleVideoConversion({ asset: assetEntityStub.video }); + expect(mediaMock.transcode).not.toHaveBeenCalled(); + }); + }); }); diff --git a/server/libs/domain/src/media/media.service.ts b/server/libs/domain/src/media/media.service.ts index e11c48a2c9..1388d1cad8 100644 --- a/server/libs/domain/src/media/media.service.ts +++ b/server/libs/domain/src/media/media.service.ts @@ -1,16 +1,19 @@ -import { AssetType } from '@app/infra/entities'; +import { AssetType, TranscodePreset } from '@app/infra/entities'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { join } from 'path'; import { IAssetRepository, mapAsset, WithoutProperty } from '../asset'; import { CommunicationEvent, ICommunicationRepository } from '../communication'; import { IAssetJob, IBaseJob, IJobRepository, JobName } from '../job'; import { IStorageRepository, StorageCore, StorageFolder } from '../storage'; -import { IMediaRepository } from './media.repository'; +import { ISystemConfigRepository, SystemConfigFFmpegDto } from '../system-config'; +import { SystemConfigCore } from '../system-config/system-config.core'; +import { IMediaRepository, VideoStreamInfo } from './media.repository'; @Injectable() export class MediaService { private logger = new Logger(MediaService.name); private storageCore = new StorageCore(); + private configCore: SystemConfigCore; constructor( @Inject(IAssetRepository) private assetRepository: IAssetRepository, @@ -18,7 +21,10 @@ export class MediaService { @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(IMediaRepository) private mediaRepository: IMediaRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, - ) {} + @Inject(ISystemConfigRepository) systemConfig: ISystemConfigRepository, + ) { + this.configCore = new SystemConfigCore(systemConfig); + } async handleQueueGenerateThumbnails(job: IBaseJob): Promise { try { @@ -93,7 +99,114 @@ export class MediaService { await this.mediaRepository.resize(asset.resizePath, webpPath, { size: 250, format: 'webp' }); await this.assetRepository.save({ id: asset.id, webpPath: webpPath }); } catch (error: any) { - this.logger.error('Failed to generate webp thumbnail for asset: ' + asset.id, error.stack); + this.logger.error(`Failed to generate webp thumbnail for asset: ${asset.id}`, error.stack); } } + + async handleQueueVideoConversion(job: IBaseJob) { + const { force } = job; + + try { + const assets = force + ? await this.assetRepository.getAll({ type: AssetType.VIDEO }) + : await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO); + for (const asset of assets) { + await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } }); + } + } catch (error: any) { + this.logger.error('Failed to queue video conversions', error.stack); + } + } + + async handleVideoConversion(job: IAssetJob) { + const { asset } = job; + + try { + const input = asset.originalPath; + const outputFolder = this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, asset.ownerId); + const output = join(outputFolder, `${asset.id}.mp4`); + this.storageRepository.mkdirSync(outputFolder); + + const { streams } = await this.mediaRepository.probe(input); + const stream = await this.getLongestStream(streams); + if (!stream) { + return; + } + + const { ffmpeg: config } = await this.configCore.getConfig(); + + const required = this.isTranscodeRequired(stream, config); + if (!required) { + return; + } + + const options = this.getFfmpegOptions(stream, config); + await this.mediaRepository.transcode(input, output, options); + + this.logger.log(`Converting Success ${asset.id}`); + + await this.assetRepository.save({ id: asset.id, encodedVideoPath: output }); + } catch (error: any) { + this.logger.error(`Failed to handle video conversion for asset: ${asset.id}`, error.stack); + } + } + + private getLongestStream(streams: VideoStreamInfo[]): VideoStreamInfo | null { + return streams + .filter((stream) => stream.codecType === 'video') + .sort((stream1, stream2) => stream2.frameCount - stream1.frameCount)[0]; + } + + private isTranscodeRequired(stream: VideoStreamInfo, ffmpegConfig: SystemConfigFFmpegDto): boolean { + if (!stream.height || !stream.width) { + this.logger.error('Skipping transcode, height or width undefined for video stream'); + return false; + } + + const isTargetVideoCodec = stream.codecName === ffmpegConfig.targetVideoCodec; + + const targetResolution = Number.parseInt(ffmpegConfig.targetResolution); + const isLargerThanTargetResolution = Math.min(stream.height, stream.width) > targetResolution; + + switch (ffmpegConfig.transcode) { + case TranscodePreset.ALL: + return true; + + case TranscodePreset.REQUIRED: + return !isTargetVideoCodec; + + case TranscodePreset.OPTIMAL: + return !isTargetVideoCodec || isLargerThanTargetResolution; + + default: + return false; + } + } + + private getFfmpegOptions(stream: VideoStreamInfo, ffmpeg: SystemConfigFFmpegDto) { + // TODO: If video or audio are already the correct format, don't re-encode, copy the stream + + const options = [ + `-crf ${ffmpeg.crf}`, + `-preset ${ffmpeg.preset}`, + `-vcodec ${ffmpeg.targetVideoCodec}`, + `-acodec ${ffmpeg.targetAudioCodec}`, + // Makes a second pass moving the moov atom to the beginning of + // the file for improved playback speed. + `-movflags faststart`, + ]; + + const videoIsRotated = Math.abs(stream.rotation) === 90; + const targetResolution = Number.parseInt(ffmpeg.targetResolution); + + const isVideoVertical = stream.height > stream.width || videoIsRotated; + const scaling = isVideoVertical ? `${targetResolution}:-2` : `-2:${targetResolution}`; + + const shouldScale = Math.min(stream.height, stream.width) > targetResolution; + if (shouldScale) { + options.push(`-vf scale=${scaling}`); + } + + return options; + } } diff --git a/server/libs/domain/test/fixtures.ts b/server/libs/domain/test/fixtures.ts index 1c2f02746c..cc1d7976b8 100644 --- a/server/libs/domain/test/fixtures.ts +++ b/server/libs/domain/test/fixtures.ts @@ -18,6 +18,7 @@ import { mapUser, SearchResult, SharedLinkResponseDto, + VideoInfo, } from '../src'; const today = new Date(); @@ -704,3 +705,51 @@ export const searchStub = { facets: [], }), }; + +export const probeStub = { + empty: { streams: [] }, + multiple: Object.freeze({ + streams: [ + { + height: 1080, + width: 400, + codecName: 'h265', + codecType: 'video', + frameCount: 100, + rotation: 0, + }, + { + height: 1080, + width: 400, + codecName: 'h7000', + codecType: 'video', + frameCount: 99, + rotation: 0, + }, + ], + }), + noHeight: Object.freeze({ + streams: [ + { + height: 0, + width: 400, + codecName: 'h265', + codecType: 'video', + frameCount: 100, + rotation: 0, + }, + ], + }), + tooBig: Object.freeze({ + streams: [ + { + height: 10000, + width: 10000, + codecName: 'h264', + codecType: 'video', + frameCount: 100, + rotation: 0, + }, + ], + }), +}; diff --git a/server/libs/domain/test/media.repository.mock.ts b/server/libs/domain/test/media.repository.mock.ts index 606509764a..41844aec76 100644 --- a/server/libs/domain/test/media.repository.mock.ts +++ b/server/libs/domain/test/media.repository.mock.ts @@ -5,5 +5,7 @@ export const newMediaRepositoryMock = (): jest.Mocked => { extractThumbnailFromExif: jest.fn(), extractVideoThumbnail: jest.fn(), resize: jest.fn(), + probe: jest.fn(), + transcode: jest.fn(), }; }; diff --git a/server/libs/infra/src/repositories/media.repository.ts b/server/libs/infra/src/repositories/media.repository.ts index afd3ca0a5b..b96fb0c5d6 100644 --- a/server/libs/infra/src/repositories/media.repository.ts +++ b/server/libs/infra/src/repositories/media.repository.ts @@ -1,7 +1,10 @@ -import { IMediaRepository, ResizeOptions } from '@app/domain'; +import { IMediaRepository, ResizeOptions, VideoInfo } from '@app/domain'; import { exiftool } from 'exiftool-vendored'; -import ffmpeg from 'fluent-ffmpeg'; +import ffmpeg, { FfprobeData } from 'fluent-ffmpeg'; import sharp from 'sharp'; +import { promisify } from 'util'; + +const probe = promisify(ffmpeg.ffprobe); export class MediaRepository implements IMediaRepository { extractThumbnailFromExif(input: string, output: string): Promise { @@ -42,4 +45,31 @@ export class MediaRepository implements IMediaRepository { .run(); }); } + + async probe(input: string): Promise { + const results = await probe(input); + + return { + streams: results.streams.map((stream) => ({ + height: stream.height || 0, + width: stream.width || 0, + codecName: stream.codec_name, + codecType: stream.codec_type, + frameCount: Number.parseInt(stream.nb_frames ?? '0'), + rotation: Number.parseInt(`${stream.rotation ?? 0}`), + })), + }; + } + + transcode(input: string, output: string, options: string[]): Promise { + return new Promise((resolve, reject) => { + ffmpeg(input) + // + .outputOptions(options) + .output(output) + .on('error', reject) + .on('end', resolve) + .run(); + }); + } }