Skip to content

Commit

Permalink
ApOutboxFetchService.ts iroiro (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
penginn-net authored Nov 4, 2024
1 parent fa0a7c8 commit d6e2071
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 116 deletions.
217 changes: 108 additions & 109 deletions packages/backend/src/core/activitypub/models/ApOutboxFetchService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import { Inject, Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import * as Redis from 'ioredis';
import { AbortError } from 'got';
import { DI } from '@/di-symbols.js';
import type { UsersRepository } from '@/models/_.js';
import type { MiRemoteUser } from '@/models/User.js';
Expand All @@ -19,16 +21,16 @@ import { AppLockService } from '@/core/AppLockService.js';
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import { NoteCreateService } from '@/core/NoteCreateService.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
import { getApId, isIOrderedCollectionPage, isAnnounce, isNote, isPost } from '../type.js';
import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js';
import { isIOrderedCollectionPage, isCreate, IOrderedCollectionPage, isNote } from '../type.js';
import { ApAudienceService } from '../ApAudienceService.js';
import type { OnModuleInit } from '@nestjs/common';
import type { ApNoteService } from './ApNoteService.js';
import type { ApResolverService, Resolver } from '../ApResolverService.js';
import type { ApLoggerService } from '../ApLoggerService.js';

const pagelimit = 3;
const createLimit = 15;

const pagelimit = 1;
const createLimit = 20;
@Injectable()
export class ApOutboxFetchService implements OnModuleInit {
private utilityService: UtilityService;
Expand All @@ -44,11 +46,14 @@ export class ApOutboxFetchService implements OnModuleInit {

@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@Inject(DI.redis)
private redisClient: Redis.Redis,

private noteEntityService: NoteEntityService,
private noteCreateService: NoteCreateService,
private appLockService: AppLockService,
private apAudienceService: ApAudienceService,
private apDbResolverService: ApDbResolverService,
private appLockService: AppLockService,
private noteCreateService: NoteCreateService,
private noteEntityService: NoteEntityService,
) {
}

Expand All @@ -67,142 +72,136 @@ export class ApOutboxFetchService implements OnModuleInit {
*/
@bindThis
public async fetchOutbox(userId: MiUser['id'], includeAnnounce = false, resolver?: Resolver): Promise<void> {
const user = await this.usersRepository.findOneByOrFail({ id: userId }) as MiRemoteUser;
const user = (await this.usersRepository.findOneBy({ id: userId }) as MiRemoteUser | null) ?? null;
if (!user) throw new IdentifiableError('3fc5a089-cab4-48db-b9f3-f220574b3c0a', 'No such user');
if (!user.host) throw new IdentifiableError('67070303-177c-4600-af93-b26a7ab889c6', 'Is local user');
if (!user.outbox) throw new IdentifiableError('e7a2e510-a8ce-40e9-b1e6-c007bacdc89f', 'outbox undefined.');
const blockedHosts = (await this.metaService.fetch()).blockedHosts;
if (this.utilityService.isBlockedHost(blockedHosts, user.host)) throw new IdentifiableError('b27090c8-8a68-4189-a445-14591c32a89c', 'blocked instance.');
const outboxUrl = user.outbox;

const meta = await this.metaService.fetch();
if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.extractDbHost(outboxUrl))) return;

this.logger.info(`Fetcing the Outbox: ${outboxUrl}`);
const Resolver = resolver ?? this.apResolverService.createResolver();

const cache = await this.redisClient.get(`${outboxUrl}--next`);
// Resolve to (Ordered)Collection Object
const outbox = await Resolver.resolveCollection(outboxUrl);
const outbox = cache ? await Resolver.resolveOrderedCollectionPage(cache) : await Resolver.resolveCollection(outboxUrl);

if (outbox.type !== 'OrderedCollection') throw new IdentifiableError('0be2f5a1-2345-46d8-b8c3-430b111c68d3', 'outbox type is not OrderedCollection');
if (!outbox.first) throw new IdentifiableError('a723c2df-0250-4091-b5fc-e3a7b36c7b61', 'outbox first page not exist');
if (!cache && outbox.type !== 'OrderedCollection') throw new IdentifiableError('0be2f5a1-2345-46d8-b8c3-430b111c68d3', 'outbox type is not OrderedCollection');
if (!cache && !outbox.first) throw new IdentifiableError('a723c2df-0250-4091-b5fc-e3a7b36c7b61', 'outbox first page not exist');

let nextUrl = outbox.first;
let nextUrl = cache ? (outbox as IOrderedCollectionPage).next : outbox.first;
let page = 0;
let created = 0;
if (typeof(nextUrl) !== 'string') {
const first = (nextUrl as any);
if (first.partOf !== user.outbox) throw new IdentifiableError('6603433f-99db-4134-980c-48705ae57ab8', 'outbox part is invalid');

for (let i = 0; i < pagelimit; i++) {
const collectionPage = await Resolver.resolveOrderedCollectionPage(nextUrl);
if (!isIOrderedCollectionPage(collectionPage)) throw new IdentifiableError('2a05bb06-f38c-4854-af6f-7fd5e87c98ee', 'Object is not collectionPage');
const activityes = first.orderedItems ?? first.items;
await this.fetchObjects(user, activityes, includeAnnounce, created);

if (collectionPage.orderedItems.length === 0) {
break;
}
page = 1;
if (!first.next) return;
}

//IObject,IActivity型にないプロパティがあるのでany型にする
const activityes = collectionPage.orderedItems as any[];
created = await this.fetchObjects(activityes, created, includeAnnounce, outboxUrl, user, Resolver);
if (created > createLimit) break;
for (; page < pagelimit; page++) {
this.logger.info(nextUrl as string);
const collectionPage = (typeof(nextUrl) === 'string' ? await Resolver.resolveOrderedCollectionPage(nextUrl) : nextUrl) as IOrderedCollectionPage;
if (!isIOrderedCollectionPage(collectionPage)) throw new IdentifiableError('2a05bb06-f38c-4854-af6f-7fd5e87c98ee', 'Object is not collectionPage');
if (collectionPage.partOf !== user.outbox) throw new IdentifiableError('6603433f-99db-4134-980c-48705ae57ab8', 'outbox part is invalid');

const activityes = (collectionPage.orderedItems ?? collectionPage.items);
nextUrl = collectionPage.next;
if (!activityes) continue;

created = await this.fetchObjects(user, activityes, includeAnnounce, created);
if (createLimit <= created) break;//次ページ見て一件だけしか取れないのは微妙
if (!nextUrl) {
break;
}

await this.redisClient.set(`${outboxUrl}--next`, `${nextUrl}`, 'EX', 60 * 15);//15min
}
this.logger.succ(`Outbox Fetced: ${outboxUrl}`);
//this.logger.info(`Outbox Fetced last: ${nextUrl}`);
}

@bindThis
private async fetchObjects(activityes: any[], created: number, includeAnnounce:boolean, outboxUrl: string, user:MiRemoteUser, resolver?: Resolver): Promise<number> {
const Resolver = resolver ?? this.apResolverService.createResolver();
const meta = await this.metaService.fetch();

private async fetchObjects(user: MiRemoteUser, activityes: any[], includeAnnounce:boolean, created: number): Promise<number> {
for (const activity of activityes) {
if (created > createLimit) break;
if (activity) {
try {
if (isAnnounce(activity)) {
if (includeAnnounce === false) continue;
const uri = getApId(activity);
const announceLocal = await this.apNoteService.fetchNote(uri);
if (announceLocal) continue;

if (!activity.object) {
this.apLoggerService.logger.info('skip: activity has no object property');
if (createLimit < created) return created;
try {
if (includeAnnounce && activity.type === 'Announce') {
const object = await this.apDbResolverService.getNoteFromApId(activity.id);

if (object) continue;

//ブロックしてたら取得しない
const blockedHosts = (await this.metaService.fetch()).blockedHosts;
if (typeof(activity.object) === 'string') {
if (this.utilityService.isBlockedHost(blockedHosts, this.utilityService.toPuny(new URL(activity.object).hostname))) continue;
} else {
if (this.utilityService.isBlockedHost(blockedHosts, this.utilityService.toPuny(new URL(activity.object.id).hostname))) continue;
}

const unlock = await this.appLockService.getApLock(activity.id);
try {
if (!activity.id) continue;
let renote = await this.apNoteService.fetchNote(activity.object);
if (renote === null) {
renote = await this.apNoteService.createNote(activity.object, undefined, true);
if (renote === null) {
this.logger.info('announce target is null');
continue;
}
}
this.logger.info(`Creating the (Re)Note: ${activity.id}`);

const activityAudience = await this.apAudienceService.parseAudience(user, activity.to, activity.cc);
const createdAt = activity.published ? new Date(activity.published) : null;

if (createdAt && createdAt < this.idService.parse(renote.id).date) {
this.logger.info('skip: malformed createdAt');
continue;
}
//From ApInboxService.Announce
//Announce対象
const targetUri = getApId(activity.object);
if (targetUri.startsWith('bear:')) {
this.apLoggerService.logger.info('skip: bearcaps url not supported.');
if (!await this.noteEntityService.isVisibleForMe(renote, user.id)) {
this.logger.info('skip: invalid actor for this activity');
continue;
}
const target = await Resolver.resolve(activity.object);
const unlock = await this.appLockService.getApLock(uri);
try {
if (isPost(target)) {
if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.extractDbHost(targetUri))) continue;
try {
let renote = await this.apNoteService.fetchNote(targetUri);
if (renote === null) {
renote = await this.apNoteService.createNote(targetUri, undefined, true);
if (renote === null) {
this.apLoggerService.logger.info('announce target is null');
continue;
}
}
this.logger.info(`Creating the (Re)Note: ${uri}`);

const activityAudience = await this.apAudienceService.parseAudience(user, activity.to, activity.cc);
const createdAt = activity.published ? new Date(activity.published) : null;

if (createdAt && createdAt < this.idService.parse(renote.id).date) {
this.apLoggerService.logger.info('skip: malformed createdAt');
continue;
}
if (!await this.noteEntityService.isVisibleForMe(renote, user.id)) {
this.apLoggerService.logger.info('skip: invalid actor for this activity');
continue;
}
await this.noteCreateService.create(user, {
createdAt,
renote,
visibility: activityAudience.visibility,
visibleUsers: activityAudience.visibleUsers,
uri,
}, true );
created++;
continue;
} catch (err) {
// 対象が4xxならスキップ
if (err instanceof StatusError) {
if (!err.isRetryable) {
this.apLoggerService.logger.info(`Ignored announce target ${target.id} - ${err.statusCode}`);
}
this.apLoggerService.logger.info(`Error in announce target ${target.id} - ${err.statusCode}`);
}
throw err;
}
} else {
continue;
await this.noteCreateService.create(user, {
createdAt,
renote,
visibility: activityAudience.visibility,
visibleUsers: activityAudience.visibleUsers,
uri: activity.id,
}, true );
} catch (err) {
// 対象が4xxならスキップ
if (err instanceof StatusError) {
if (!err.isRetryable) {
this.logger.info(`Ignored announce target ${activity.object} - ${err.statusCode}`);
}
} finally {
unlock();
this.logger.info(`Error in announce target ${activity.object} - ${err.statusCode}`);
} else {
throw err;
}
} else if (isNote(activity.object)) {
const id = getApId(activity.object);
const local = await this.apNoteService.fetchNote(id);
if (local) {
continue;
}
await this.apNoteService.createNote(id, undefined, true);
created++;
} else {
this.apLoggerService.logger.warn('Outbox activity type is not announce or create-note (type:' + activity.type + ')' );
} finally {
unlock();
}
} catch (err) {
this.apLoggerService.logger.warn('Outbox activity fetch error:' + err );
this.apLoggerService.logger.info(JSON.stringify(activity));
} else if (isCreate(activity) && typeof(activity.object) !== 'string' && isNote(activity.object)) {
const object = await this.apDbResolverService.getNoteFromApId(activity.object);
if (object) continue;
await this.apNoteService.createNote(activity.object, undefined, true);
}
} catch (err) {
if (err instanceof AbortError) {
this.logger.warn(`Aborted note: ${activity.id}`);
} else {
this.logger.warn(JSON.stringify(err));
this.logger.warn(JSON.stringify(activity));
throw err;
}
}
created ++;
}
return created;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/core/activitypub/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ export interface IOrderedCollectionPage extends IObject {
partOf: string;
totalItems?: number;
first?: IObject | string;
orderedItems: ApObject[];
orderedItems?: IObject[];
items?: IObject[];
prev: string;
next: string;
}
Expand Down
19 changes: 13 additions & 6 deletions packages/backend/src/server/api/endpoints/ap/fetch-outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ export const meta = {
code: 'OUTBOX_UNDEFINED_THIS_USER',
id: '890ecef7-ad5a-487d-a201-b49a54059c90',
},
outboxFirstPageUndefined: {
message: 'outbox first page undefined this user',
code: 'OUTBOX_FIRST_PAGE_UNDEFINED_THIS_USER',
id: 'e1f29e66-86a9-4fdc-9be6-63d4587dc350',
},
},
} as const;

Expand All @@ -59,6 +64,7 @@ export const paramDef = {
default: false,
description: 'Outbox取得の際にRenoteも対象にします',
},
//skip: { type: 'integer', minimum: 1, default: 0 },
},
required: ['userId'],
} as const;
Expand All @@ -71,21 +77,22 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
super(meta, paramDef, async (ps, me) => {
if (ps.wait) {
try {
ps.includeAnnounce ?
await this.apOutboxFetchService.fetchOutbox(ps.userId, true) :
await this.apOutboxFetchService.fetchOutbox(ps.userId, false);
await this.apOutboxFetchService.fetchOutbox(ps.userId, ps.includeAnnounce);
} catch (err) {
if (err instanceof IdentifiableError) {
if (err.id === '3fc5a089-cab4-48db-b9f3-f220574b3c0a') throw new ApiError(meta.errors.noSuchUser);
if (err.id === '67070303-177c-4600-af93-b26a7ab889c6') throw new ApiError(meta.errors.isLocalUser);
if (err.id === 'e7a2e510-a8ce-40e9-b1e6-c007bacdc89f') throw new ApiError(meta.errors.outboxUndefined);
//if (err.id === 'b27090c8-8a68-4189-a445-14591c32a89c')
//if (err.id === '0be2f5a1-2345-46d8-b8c3-430b111c68d3')
if (err.id === 'a723c2df-0250-4091-b5fc-e3a7b36c7b61') throw new ApiError(meta.errors.outboxFirstPageUndefined);
//if (err.id === '6603433f-99db-4134-980c-48705ae57ab8')
//if (err.id === '2a05bb06-f38c-4854-af6f-7fd5e87c98ee')
}
throw (err);
}
} else {
ps.includeAnnounce ?
this.apOutboxFetchService.fetchOutbox(ps.userId, true) :
this.apOutboxFetchService.fetchOutbox(ps.userId, false);
this.apOutboxFetchService.fetchOutbox(ps.userId, ps.includeAnnounce);
}
});
}
Expand Down

0 comments on commit d6e2071

Please sign in to comment.