Skip to content

Commit

Permalink
Merge pull request #297 from Joystream/dev
Browse files Browse the repository at this point in the history
Release: `v3.1.0`
  • Loading branch information
zeeshanakram3 committed Nov 1, 2023
2 parents aa5f510 + 08da970 commit 9dadb6c
Show file tree
Hide file tree
Showing 23 changed files with 463 additions and 187 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
### 3.1.0

- bump `@bull-board` npm dependency
- Add new `referrers/top-referrers` endpoint to return top referrers by cumulative reward
- Updates `channels/induction/requirements` endpoint to return both the requirements as well as error messages if the signup fails
- **FIX**: lock bullmq jobs processing when recalculating the priorities
- **FIX**: skip stall check & add auto-renewal of locks for batch jobs
- **FIX**: remove `joystreamChannelId` field from video type + add optional locking feature for Dynamodb tables
- **FIX**: add timeout for 'pWaitFor' in storage-node/api.ts

### 3.0.0

- Introduce new YPP 2.0 program with with four different tiers (`Bronze`, `Silver`, `Gold` & `Diamond`) and different rewards for each tier. Previously the tiers were based on the subscribers count
- Migrate from `better-queue` npm package to `BullMQ` for queue management.
- Adds `channels/:id/referrals` endpoint to get the referrals of a channel
- update `/channels/:id` endpoint for returning the sync eta, backlog etc
- remove unused channels/ & users/ endpoints
- reenable syncing for 'Unverified channels'
- Add feature for batch creation of videos on chain

### 2.1.0

- return syncBacklog field in `GET /status` endpoint
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "youtube-sync",
"version": "3.0.0",
"version": "3.1.0",
"license": "MIT",
"scripts": {
"postpack": "rm -f oclif.manifest.json",
Expand Down Expand Up @@ -69,7 +69,7 @@
},
"dependencies": {
"@apollo/client": "^3.2.5",
"@bull-board/express": "^5.8.4",
"@bull-board/express": "^5.9.1",
"@elastic/ecs-winston-format": "^1.3.1",
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"@ffprobe-installer/ffprobe": "^1.4.1",
Expand Down
1 change: 0 additions & 1 deletion src/app/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export class Service {
await bootstrapHttpApi(
this.config,
this.logging,
this.dynamodbService,
this.runtimeApi,
this.queryNodeApi,
this.youtubeApi,
Expand Down
1 change: 0 additions & 1 deletion src/cli/commands/sync/addUnauthorizedChannelForSyncing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ export default class AddUnauthorizedChannelForSyncing extends RuntimeApiCommandB
url: video.shortUrl,
privacyStatus: 'public',
uploadStatus: 'processed',
joystreamChannelId: c.joystreamChannelId,
liveBroadcastContent: video.isLive ? 'live' : 'none',
state: 'New',
viewCount: 0,
Expand Down
12 changes: 6 additions & 6 deletions src/repository/DynamodbService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ interface IDynamodbClient {
}

const DynamodbClient = {
create(tablePrefix: ResourcePrefix): IDynamodbClient {
create(tablePrefix: ResourcePrefix, useLock?: boolean): IDynamodbClient {
return {
channels: new ChannelsRepository(tablePrefix),
channels: new ChannelsRepository(tablePrefix, useLock),
users: new UsersRepository(tablePrefix),
videos: new VideosRepository(tablePrefix),
stats: new StatsRepository(tablePrefix),
Expand All @@ -40,15 +40,15 @@ export class DynamodbService implements IDynamodbService {
readonly users: UsersService
readonly videos: VideosService

constructor(aws?: ReadonlyConfig['aws']) {
const { repo, channels, users, videos } = this.init(aws)
constructor(aws?: ReadonlyConfig['aws'], useLock?: boolean) {
const { repo, channels, users, videos } = this.init(aws, useLock)
this.repo = repo
this.channels = channels
this.users = users
this.videos = videos
}

private init(aws?: ReadonlyConfig['aws']): IDynamodbService {
private init(aws?: ReadonlyConfig['aws'], useLock?: boolean): IDynamodbService {
// configure Dynamoose to use DynamoDB Local.
if (aws?.endpoint) {
console.log(`Using local DynamoDB at ${aws.endpoint || `http://localhost:4566`}`)
Expand All @@ -75,7 +75,7 @@ export class DynamodbService implements IDynamodbService {
dynamoose.aws.ddb.set(ddb)
}

const repo = DynamodbClient.create(aws?.endpoint ? 'local_' : resourcePrefix)
const repo = DynamodbClient.create(aws?.endpoint ? 'local_' : resourcePrefix, useLock)
const channels = new ChannelsService(repo.channels)
const users = new UsersService(repo.users)
const videos = new VideosService(repo.videos)
Expand Down
96 changes: 80 additions & 16 deletions src/repository/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ import * as dynamoose from 'dynamoose'
import { ConditionInitializer } from 'dynamoose/dist/Condition'
import { AnyItem } from 'dynamoose/dist/Item'
import { Query, QueryResponse, Scan, ScanResponse } from 'dynamoose/dist/ItemRetriever'
import _ from 'lodash'
import { omit } from 'ramda'
import { DYNAMO_MODEL_OPTIONS, IRepository, mapTo } from '.'
import { ResourcePrefix, YtChannel, channelYppStatus } from '../types/youtube'
import {
ChannelYppStatusVerified,
REFERRAL_REWARD_BY_TIER,
ResourcePrefix,
TopReferrer,
YtChannel,
channelYppStatus,
} from '../types/youtube'

function createChannelModel(tablePrefix: ResourcePrefix) {
const channelSchema = new dynamoose.Schema(
Expand Down Expand Up @@ -188,18 +196,30 @@ export class ChannelsRepository implements IRepository<YtChannel> {
// lock any updates on video table
private readonly ASYNC_LOCK_ID = 'channel'
private asyncLock: AsyncLock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER })
private useLock: boolean // Flag to determine if locking should be used

constructor(tablePrefix: ResourcePrefix) {
constructor(tablePrefix: ResourcePrefix, useLock: boolean = true) {
this.model = createChannelModel(tablePrefix)
this.useLock = useLock // Initialize the locking flag
}

private async withLock<T>(func: () => Promise<T>): Promise<T> {
if (this.useLock) {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, func)
} else {
return func()
}
}

async upsertAll(channels: YtChannel[]): Promise<YtChannel[]> {
const results = await Promise.all(channels.map(async (channel) => await this.save(channel)))
return results
return this.withLock(async () => {
const results = await Promise.all(channels.map(async (channel) => await this.save(channel)))
return results
})
}

async scan(init: ConditionInitializer, f: (q: Scan<AnyItem>) => Scan<AnyItem>): Promise<YtChannel[]> {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
return this.withLock(async () => {
let lastKey = undefined
const results = []
do {
Expand All @@ -215,39 +235,43 @@ export class ChannelsRepository implements IRepository<YtChannel> {
}

async get(id: string): Promise<YtChannel | undefined> {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
return this.withLock(async () => {
const [result] = await this.model.query({ id }).using('id-index').exec()
return result ? mapTo<YtChannel>(result) : undefined
})
}

async save(channel: YtChannel): Promise<YtChannel> {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
return this.withLock(async () => {
const update = omit(['id', 'userId', 'updatedAt'], channel)
const result = await this.model.update({ id: channel.id, userId: channel.userId }, update)
return mapTo<YtChannel>(result)
})
}

async batchSave(videos: YtChannel[]): Promise<void> {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
const result = await this.model.batchPut(videos)
if (result.unprocessedItems.length) {
console.log('Unprocessed items', result.unprocessedItems.length)
return await this.batchSave(result.unprocessedItems as YtChannel[])
}
async batchSave(channels: YtChannel[]): Promise<void> {
if (!channels.length) {
return
}

return this.withLock(async () => {
const updateTransactions = channels.map((channel) => {
const update = omit(['id', 'userId', 'updatedAt'], channel)
return this.model.transaction.update({ id: channel.id, userId: channel.userId }, update)
})
return dynamoose.transaction(updateTransactions)
})
}

async delete(id: string, userId: string): Promise<void> {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
return this.withLock(async () => {
await this.model.delete({ id, userId })
return
})
}

async query(init: ConditionInitializer, f: (q: Query<AnyItem>) => Query<AnyItem>) {
return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
return this.withLock(async () => {
let lastKey = undefined
const results = []
do {
Expand Down Expand Up @@ -368,4 +392,44 @@ export class ChannelsService {
async batchSave(channels: YtChannel[]): Promise<void> {
return await this.channelsRepository.batchSave(channels)
}

async getTopReferrers(limit: number = 10): Promise<TopReferrer[]> {
const topReferrers: TopReferrer[] = []
const allReferredChannels = await this.channelsRepository.scan({}, (q) => q.using('referrerChannelId-index'))

const referredChannelsByReferrer = _(allReferredChannels)
.groupBy((ch) => ch.referrerChannelId)
.map((referredChannels, referrerChannelId) => ({ referrerChannelId, referredChannels: [...referredChannels] }))
.value()

referredChannelsByReferrer.forEach(({ referrerChannelId, referredChannels }) => {
let totalEarnings = 0
let totalReferredChannels = referredChannels.length

const referredByTier: { [K in ChannelYppStatusVerified]: number } = {
'Bronze': 0,
'Silver': 0,
'Gold': 0,
'Diamond': 0,
}

for (const channel of referredChannels) {
const tier = YtChannel.getTier(channel)
if (tier) {
referredByTier[tier]++
totalEarnings += REFERRAL_REWARD_BY_TIER[tier]
}
}

topReferrers.push({
referrerChannelId: parseInt(referrerChannelId),
referredByTier,
totalEarnings,
totalReferredChannels,
})
})

// Sort referrers by totalEarnings and take the top 'limit'
return topReferrers.sort((a, b) => b.totalEarnings - a.totalEarnings).slice(0, limit)
}
}
14 changes: 9 additions & 5 deletions src/repository/video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,16 @@ export class VideosRepository implements IRepository<YtVideo> {
}

async batchSave(videos: YtVideo[]): Promise<void> {
if (!videos.length) {
return
}

return this.asyncLock.acquire(this.ASYNC_LOCK_ID, async () => {
const result = await this.model.batchPut(videos)
if (result.unprocessedItems.length) {
console.log('Unprocessed items', result.unprocessedItems.length)
return await this.batchSave(result.unprocessedItems as YtVideo[])
}
const updateTransactions = videos.map((video) => {
const upd = omit(['id', 'channelId', 'updatedAt'], video)
return this.model.transaction.update({ channelId: video.channelId, id: video.id }, upd)
})
return dynamoose.transaction(updateTransactions)
})
}

Expand Down

0 comments on commit 9dadb6c

Please sign in to comment.