Skip to content

Commit

Permalink
Merge pull request #253 from zeeshanakram3/dev
Browse files Browse the repository at this point in the history
Release: `v2.1.0`
  • Loading branch information
zeeshanakram3 committed Sep 11, 2023
2 parents c719595 + 7e7d9f0 commit 96a3891
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 88 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 2.1.0

- return syncBacklog field in `GET /status` endpoint
- **FIX**: Adds fix for premiere videos not syncing bug

### 2.0.0

- Updates minimum required NodeJS version to `v18`.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "youtube-sync",
"version": "2.0.0",
"version": "2.1.0",
"license": "MIT",
"scripts": {
"postpack": "rm -f oclif.manifest.json",
Expand Down
16 changes: 14 additions & 2 deletions src/app/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { DynamodbService } from '../repository'
import { bootstrapHttpApi } from '../services/httpApi/main'
import { LoggingService } from '../services/logging'
import { QueryNodeApi } from '../services/query-node/api'
import { RuntimeApi } from '../services/runtime/api'
import { JoystreamClient } from '../services/runtime/client'
import { ContentCreationService } from '../services/syncProcessing/ContentCreationService'
import { ContentDownloadService } from '../services/syncProcessing/ContentDownloadService'
Expand All @@ -21,6 +22,7 @@ export class Service {
private youtubeApi: IYoutubeApi
private queryNodeApi: QueryNodeApi
private dynamodbService: DynamodbService
private runtimeApi: RuntimeApi
private joystreamClient: JoystreamClient
private youtubePollingService: YoutubePollingService
private contentDownloadService: ContentDownloadService
Expand All @@ -35,7 +37,8 @@ export class Service {
this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode, this.logging)
this.dynamodbService = new DynamodbService(this.config.aws)
this.youtubeApi = YoutubeApi.create(this.config, this.dynamodbService.repo.stats)
this.joystreamClient = new JoystreamClient(config, this.youtubeApi, this.queryNodeApi, this.logging)
this.runtimeApi = new RuntimeApi(config.endpoints.joystreamNodeWs, this.logging)
this.joystreamClient = new JoystreamClient(config, this.runtimeApi, this.queryNodeApi, this.logging)

if (config.sync.enable) {
this.youtubePollingService = new YoutubePollingService(
Expand Down Expand Up @@ -131,7 +134,16 @@ export class Service {

public async start(): Promise<void> {
try {
await bootstrapHttpApi(this.config, this.logging, this.dynamodbService, this.queryNodeApi, this.youtubeApi)
await bootstrapHttpApi(
this.config,
this.logging,
this.dynamodbService,
this.runtimeApi,
this.queryNodeApi,
this.youtubeApi,
this.contentCreationService,
this.contentDownloadService
)
this.checkConfigDirectories()
await this.startSync()
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands/sync/addUnauthorizedChannelForSyncing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ export default class AddUnauthorizedChannelForSyncing extends RuntimeApiCommandB
privacyStatus: 'public',
uploadStatus: 'processed',
joystreamChannelId: c.joystreamChannelId,
liveStreamingDetails: video.isLive || undefined,
liveBroadcastContent: video.isLive ? 'live' : 'none',
state: 'New',
viewCount: 0,
} as YtVideo)
Expand Down
46 changes: 42 additions & 4 deletions src/services/httpApi/api-spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,29 @@
]
}
},
"/status": {
"get": {
"operationId": "StatusController_getStatus",
"summary": "",
"description": "Get status info of YT-Synch service",
"parameters": [],
"responses": {
"default": {
"description": "",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/StatusDto"
}
}
}
}
},
"tags": [
"status"
]
}
},
"/status/quota-usage": {
"get": {
"operationId": "StatusController_getQuotaStats",
Expand Down Expand Up @@ -821,9 +844,6 @@
"description": {
"type": "string"
},
"aggregatedStats": {
"type": "number"
},
"shouldBeIngested": {
"type": "boolean"
},
Expand Down Expand Up @@ -863,7 +883,6 @@
"youtubeChannelId",
"title",
"description",
"aggregatedStats",
"shouldBeIngested",
"yppStatus",
"joystreamChannelId",
Expand Down Expand Up @@ -1098,6 +1117,25 @@
"type": "object",
"properties": {}
},
"StatusDto": {
"type": "object",
"properties": {
"version": {
"type": "string"
},
"syncStatus": {
"type": "string"
},
"syncBacklog": {
"type": "number"
}
},
"required": [
"version",
"syncStatus",
"syncBacklog"
]
},
"CreateMembershipRequest": {
"type": "object",
"properties": {
Expand Down
25 changes: 24 additions & 1 deletion src/services/httpApi/controllers/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,40 @@ import { DynamodbService } from '../../../repository'
import { ReadonlyConfig } from '../../../types'
import { Stats } from '../../../types/youtube'
import { RuntimeApi } from '../../runtime/api'
import { CollaboratorStatusDto } from '../dtos'
import { ContentCreationService } from '../../syncProcessing/ContentCreationService'
import { ContentDownloadService } from '../../syncProcessing/ContentDownloadService'
import { CollaboratorStatusDto, StatusDto } from '../dtos'

@Controller('status')
@ApiTags('status')
export class StatusController {
constructor(
private dynamodbService: DynamodbService,
private runtimeApi: RuntimeApi,
private contentCreationService: ContentCreationService,
private contentDownloadService: ContentDownloadService,
@Inject('config') private config: ReadonlyConfig
) {}

@Get()
@ApiResponse({ type: StatusDto })
@ApiOperation({ description: `Get status info of YT-Synch service` })
async getStatus(): Promise<StatusDto> {
try {
// Get complete quota usage statss
const {
version,
sync: { enable },
} = this.config

const syncBacklog = this.contentCreationService.totalTasks + this.contentDownloadService.totalTasks
return { version, syncStatus: enable ? 'enabled' : 'disabled', syncBacklog }
} catch (error) {
const message = error instanceof Error ? error.message : error
throw new NotFoundException(message)
}
}

@Get('quota-usage')
@ApiResponse({ type: Stats, isArray: true })
@ApiOperation({ description: `Get youtube quota usage information` })
Expand Down
6 changes: 6 additions & 0 deletions src/services/httpApi/dtos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ export class ThumbnailsDto {
@ApiProperty() standard: string
}

export class StatusDto {
@ApiProperty() version: string
@ApiProperty() syncStatus: 'enabled' | 'disabled'
@ApiProperty() syncBacklog: number
}

export class CollaboratorStatusDto {
@ApiProperty() memberId: string
@ApiProperty() controllerAccount: string
Expand Down
16 changes: 14 additions & 2 deletions src/services/httpApi/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { ReadonlyConfig, formattedJSON } from '../../types'
import { LoggingService } from '../logging'
import { QueryNodeApi } from '../query-node/api'
import { RuntimeApi } from '../runtime/api'
import { ContentCreationService } from '../syncProcessing/ContentCreationService'
import { ContentDownloadService } from '../syncProcessing/ContentDownloadService'
import { IYoutubeApi } from '../youtube/api'
import {
ChannelsController,
Expand Down Expand Up @@ -42,13 +44,15 @@ export async function bootstrapHttpApi(
config: ReadonlyConfig,
logging: LoggingService,
dynamodbService: DynamodbService,
runtimeApi: RuntimeApi,
queryNodeApi: QueryNodeApi,
youtubeApi: IYoutubeApi
youtubeApi: IYoutubeApi,
contentCreationService: ContentCreationService,
contentDownloadService: ContentDownloadService
) {
// make sure WASM crypto module is ready
await cryptoWaitReady()

const runtimeApi = new RuntimeApi(config.endpoints.joystreamNodeWs, logging)
const objectAppModule: DynamicModule = {
module: ApiModule,
imports: [],
Expand All @@ -74,6 +78,14 @@ export async function bootstrapHttpApi(
provide: RuntimeApi,
useValue: runtimeApi,
},
{
provide: ContentCreationService,
useValue: contentCreationService,
},
{
provide: ContentDownloadService,
useValue: contentDownloadService,
},
{
provide: 'youtube',
useValue: youtubeApi,
Expand Down
9 changes: 3 additions & 6 deletions src/services/runtime/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { Thumbnails, YtVideo } from '../../types/youtube'
import { AppActionSignatureInput, computeFileHashAndSize, signAppActionCommitmentForVideo } from '../../utils/hasher'
import { LoggingService } from '../logging'
import { QueryNodeApi } from '../query-node/api'
import { IYoutubeApi } from '../youtube/api'
import { RuntimeApi } from './api'
import { asValidatedMetadata, metadataToBytes } from './serialization'
import { AccountsUtil } from './signer'
Expand All @@ -44,16 +43,14 @@ export class JoystreamClient {
private runtimeApi: RuntimeApi
private accounts: AccountsUtil
private qnApi: QueryNodeApi
private youtubeApi: IYoutubeApi
private config: ReadonlyConfig
private logger: Logger

constructor(config: ReadonlyConfig, youtubeApi: IYoutubeApi, qnApi: QueryNodeApi, logging: LoggingService) {
constructor(config: ReadonlyConfig, runtimeApi: RuntimeApi, qnApi: QueryNodeApi, logging: LoggingService) {
this.logger = logging.createLogger('JoystreamClient')
this.qnApi = qnApi
this.config = config
this.youtubeApi = youtubeApi
this.runtimeApi = new RuntimeApi(this.config.endpoints.joystreamNodeWs, logging)
this.runtimeApi = runtimeApi
this.accounts = new AccountsUtil(this.config.joystream)
}

Expand Down Expand Up @@ -298,7 +295,7 @@ function getVideoFFProbeMetadata(filePath: string): Promise<VideoFFProbeMetadata
duration: videoStream.duration !== undefined ? Math.ceil(Number(videoStream.duration)) || 0 : undefined,
})
} else {
reject(new Error('No video stream found in file'))
reject(new Error('NoVideoStreamInFile'))
}
})
})
Expand Down
28 changes: 20 additions & 8 deletions src/services/syncProcessing/ContentCreationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ export class ContentCreationService {
private contentDownloadService: ContentDownloadService
private queue: PriorityQueue<VideoCreationTask, 'sequentialProcessor'>
private lastVideoCreationBlockByChannelId: Map<number, BN> // JsChannelId -> Last video creation block number
private activeTaskId: string // video Id of the currently running video creation task

get totalTasks(): number {
return this.queue.stats.totalTasks
}

private get activeTaskId(): string | undefined {
// Since `ContentCreationService` is using sequentialProcessor, so there will be only one active task at max
return [...this.queue.stats.activeTaskIds.values()][0]
}

constructor(
logging: LoggingService,
Expand Down Expand Up @@ -110,8 +118,13 @@ export class ContentCreationService {
const percentageOfCreatorBacklogNotSynched = (unsyncedVideos.length * 100) / totalVideos

for (const v of unsyncedVideos) {
let sudoPriority = this.DEFAULT_SUDO_PRIORITY
if (new Date(v.publishedAt) > channel.createdAt && v.duration > 300) {
sudoPriority += 50
}

const rank = this.queue.calculateVideoRank(
this.DEFAULT_SUDO_PRIORITY,
sudoPriority,
percentageOfCreatorBacklogNotSynched,
Date.parse(v.publishedAt)
)
Expand All @@ -127,9 +140,6 @@ export class ContentCreationService {
}

private async processCreateVideoTask(video: VideoCreationTask, cb: (error?: any, result?: null) => void) {
// set `activeTaskId`
this.activeTaskId = video.id

try {
// * Pre-validation
// If the channel opted out of YPP program, then skip creating the video
Expand Down Expand Up @@ -180,17 +190,19 @@ export class ContentCreationService {
const channel = await this.dynamodbService.channels.getById(video.channelId)
const isHistoricalVideo = new Date(video.publishedAt) < channel.createdAt
if (isHistoricalVideo) {
const historicalVideoSyncedSize = (channel.historicalVideoSyncedSize || 0) + size
await this.dynamodbService.channels.save({
...channel,
historicalVideoSyncedSize: (channel.historicalVideoSyncedSize || 0) + size,
historicalVideoSyncedSize,
})
}
} catch (err) {
this.logger.error(`Got error processing video`, { videoId: video.id, err })
await this.dynamodbService.videos.updateState(video, 'VideoCreationFailed')
if (err instanceof Error && err.message === 'NoVideoStreamInFile') {
await this.contentDownloadService.removeVideoFile(video.id)
}
} finally {
// unset `activeTaskId` set
this.activeTaskId = ''
// Signal that the task is done
cb(null, null)
}
Expand Down

0 comments on commit 96a3891

Please sign in to comment.