Skip to content

Commit

Permalink
Update Feed in parser service to talk to Supabase instead of Redis
Browse files Browse the repository at this point in the history
- Change any invalid/flagged feed code to use supabase
- Add more methods to supabase modules
- Add supabase mock test
- Fix unit/e2e test accordingly
  • Loading branch information
TueeNguyen committed Apr 22, 2022
1 parent 2700016 commit dd67daf
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 161 deletions.
2 changes: 1 addition & 1 deletion docker/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ services:
# In development and testing, the SSO service needs to contact the Supabase
# service directly via Docker vs through the http://localhost/v1/supabase domain.
# Using staging database
- SUPABASE_URL=https://dev.api.telescope.cdot.systems/v1/supabase
- SUPABASE_URL=http://kong:8000
depends_on:
- elasticsearch
- traefik
Expand Down
7 changes: 4 additions & 3 deletions src/api/parser/env.local
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ PARSER_PORT = 10000
################################################################################

# Supabase Secrets
# Using staging database
#SUPABASE_URL=http://localhost/v1/supabase
SUPABASE_URL=https://dev.supabase.telescope.cdot.systems/
# Staging database
#SUPABASE_URL=https://dev.supabase.telescope.cdot.systems/
SUPABASE_URL=http://localhost/v1/supabase

14 changes: 9 additions & 5 deletions src/api/parser/src/data/feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ const {
getFeeds,
addFeed,
removeFeed,
setInvalidFeed,
isInvalid,
setDelayedFeed,
isDelayed,
removePost,
getPost,
getPosts,
} = require('../utils/storage');

const {
isInvalid,
setInvalidFeed,
getFlaggedFeeds,
setFlaggedFeed,
unsetFlaggedFeed,
} = require('../utils/storage');
} = require('../utils/supabase');

const { deletePost } = require('../utils/indexer');

const urlToId = (url) => hash(normalizeUrl(url));
Expand Down Expand Up @@ -83,8 +87,8 @@ class Feed {
* Adds the current Feed to the database with the specified reason
* Returns a Promise
*/
setInvalid(reason) {
return setInvalidFeed(this.id, reason);
setInvalid() {
return setInvalidFeed(this.id);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/api/parser/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ feedQueue.on('drained', loadFeedsIntoQueue);
*/
feedQueue.on('failed', (job, err) =>
invalidateFeed(job.data.id, err).catch((error) =>
logger.error({ error }, 'Unable to invalidate feed')
logger.error({ error }, `Unable to invalidate feed ${job.data.id}`)
)
);

Expand Down
2 changes: 1 addition & 1 deletion src/api/parser/src/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const updateFeed = async (feedData) => {
*/
const invalidateFeed = async (id, error) => {
const feed = await Feed.byId(id);
await feed.setInvalid(error.message);
await feed.setInvalid();
logger.info(`Invalidating feed ${feed.url} for the following reason: ${error.message}`);
};

Expand Down
73 changes: 73 additions & 0 deletions src/api/parser/src/utils/__mocks__/supabase.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
const { hash } = require('@senecacdot/satellite');
const normalizeUrl = require('normalize-url');

const urlToId = (url) => hash(normalizeUrl(url));

let feeds = [];
let feedIds = new Set();

module.exports = {
__resetMockFeeds: () => {
feeds = [];
feedIds = new Set();
},
/**
* @param {Array<Feed | { url: string }>} feedObjects
*/
__setMockFeeds: (feedObjects) => {
const mockFeeds = feedObjects.reduce((uniqueFeeds, feed) => {
const id = feed.id || urlToId(feed.url);
if (!feedIds.has(id)) {
feedIds.add(id);
return uniqueFeeds.concat({ id, invalid: false, flagged: false });
}
return uniqueFeeds;
}, []);
feeds = feeds.concat(mockFeeds);
},

// Invalid feed related functions
setInvalidFeed: (id) => {
feeds.forEach((feed) => {
if (feed.id === id) {
feed.invalid = true;
}
});
return Promise.resolve();
},
getInvalidFeeds: () => {
const invalidFeedIds = feeds.filter((feed) => feed.flagged).map((feed) => ({ id: feed.id }));
return Promise.resolve(invalidFeedIds);
},
isInvalid: (id) => {
const targetFeed = feeds.find((feed) => feed.id === id);
return Promise.resolve(!!targetFeed.invalid);
},
// Flagged feed related functions
getAllFeeds: jest.fn().mockImplementation(() => Promise.resolve(feeds)),
setFlaggedFeed: jest.fn().mockImplementation((id) => {
feeds.forEach((feed) => {
if (feed.id === id) {
feed.flagged = true;
}
});
return Promise.resolve();
}),
unsetFlaggedFeed: jest.fn().mockImplementation((id) => {
feeds.forEach((feed) => {
if (feed.id === id) {
feed.flagged = false;
}
});
return Promise.resolve();
}),

getFlaggedFeeds: jest.fn().mockImplementation(() => {
const flaggedFeedIds = feeds.filter((feed) => feed.flagged).map((feed) => feed.id);
return Promise.resolve(flaggedFeedIds);
}),
isFlagged: jest.fn().mockImplementation((id) => {
const targetFeed = feeds.find((feed) => feed.id === id);
return Promise.resolve(!!targetFeed.flagged);
}),
};
40 changes: 4 additions & 36 deletions src/api/parser/src/utils/storage.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
const { logger, Redis } = require('@senecacdot/satellite');
const { isFlagged } = require('./supabase');

const redis = Redis();

// Redis Keys

const feedsKey = 't:feeds';
const flaggedFeedsKey = 't:feeds:flagged';
const postsKey = 't:posts';

// Namespaces
const feedNamespace = 't:feed:';
const postNamespace = 't:post:';
// Suffixes
const invalidSuffix = ':invalid';
const delayedSuffix = ':delayed';

// "6Xoj0UXOW3" to "t:post:6Xoj0UXOW3"
const createPostKey = (id) => postNamespace.concat(id);
// "NirlSYranl" to "t:feed:NirlSYranl"
const createFeedKey = (id) => feedNamespace.concat(id);
// "NirlSYranl" to "t:feed:NirlSYranl:invalid"
const createInvalidFeedKey = (id) => createFeedKey(id).concat(invalidSuffix);

// "NirlSYranl" to "t:feed:NirlSYranl:delayed"
const createDelayedFeedKey = (id) => createFeedKey(id).concat(delayedSuffix);

Expand Down Expand Up @@ -50,7 +48,7 @@ module.exports = {
addFeed: async (feed) => {
// Check if feed being added already exists in flagged feeds set
// If it is, do nothing
if (await redis.sismember(flaggedFeedsKey, feed.id)) return;
if (await isFlagged(feed.id)) return;

const key = createFeedKey(feed.id);
await redis
Expand Down Expand Up @@ -78,20 +76,6 @@ module.exports = {

getFeeds: () => redis.smembers(feedsKey),

getInvalidFeeds: async () => {
const invalidKeys = await getFeedKeysUsingScanStream(`${feedNamespace}*${invalidSuffix}`);
return Promise.all(
invalidKeys.map(async (key) => {
const reason = await redis.get(key);
const id = key.replace(feedNamespace, '').replace(invalidSuffix, '');
return {
id,
reason: reason.replace(/\n/g, ' '),
};
})
);
},

getDelayedFeeds: async () => {
const delayedKeys = await getFeedKeysUsingScanStream(`${feedNamespace}*${delayedSuffix}`);
return delayedKeys.map((key) => {
Expand All @@ -102,44 +86,28 @@ module.exports = {
});
},

getFlaggedFeeds: () => redis.smembers(flaggedFeedsKey),

getFeed: (id) => redis.hgetall(feedNamespace.concat(id)),

getFeedsCount: () => redis.scard(feedsKey),

setInvalidFeed: (id, reason) => {
const key = createInvalidFeedKey(id);
const sevenDaysInSeconds = 60 * 60 * 24 * 7; // Expire after 7 days
return redis.set(key, reason, 'EX', sevenDaysInSeconds);
},

/**
* Removes a feed entry from redis
* @param id id of feed to be removed
*/
removeFeed: async (id) => {
const key = createFeedKey(id);
// Checks which set the feed is currently in
const redisKey = (await redis.sismember(feedsKey, id)) ? feedsKey : flaggedFeedsKey;
try {
await redis
.multi()
.hdel(key, 'id', 'author', 'url', 'user', 'link', 'etag', 'lastModified')
.srem(redisKey, id)
.srem(feedsKey, id)
.exec();
} catch (error) {
logger.error({ error }, `Error removing Feed ${id} from Redis`);
throw new Error(`Error trying to remove feed from Redis`);
}
},

setFlaggedFeed: (id) => redis.smove(feedsKey, flaggedFeedsKey, id),

unsetFlaggedFeed: (id) => redis.smove(flaggedFeedsKey, feedsKey, id),

isInvalid: (id) => redis.exists(createInvalidFeedKey(id)),

setDelayedFeed: (id, seconds) => redis.set(createDelayedFeedKey(id), seconds, 'EX', seconds),

isDelayed: (id) => redis.exists(createDelayedFeedKey(id)),
Expand Down
92 changes: 92 additions & 0 deletions src/api/parser/src/utils/supabase.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const { logger } = require('@senecacdot/satellite');
const hash = require('@senecacdot/satellite/src/hash');
const { createClient } = require('@supabase/supabase-js');
const normalizeUrl = require('normalize-url');

const { SUPABASE_URL, SERVICE_ROLE_KEY } = process.env;

Expand Down Expand Up @@ -27,4 +29,94 @@ module.exports = {
url: feed.url,
}));
},

// Invalid feed related functions
async setInvalidFeed(id) {
const { error } = await supabase.from('feeds').update({ invalid: true }).eq('id', id);

if (error) {
logger.error({ error });
throw Error(error.message, `can't invalidate feed ${id} in supabase`);
}
},

async getInvalidFeeds() {
const { data: invalidFeeds, error } = await supabase.from('feeds').select().is('invalid', true);
if (error) {
logger.error({ error });
throw Error(error.message, "can't fetch invalid feeds in supabase");
}
return invalidFeeds;
},
async isInvalid(id) {
const { data: invalidFeed, error } = await supabase
.from('feeds')
.select('invalid')
.eq('id', id)
.limit(1);

if (error) {
logger.error({ error });
throw Error(error.message, `can't fetch feed ${id} from supabase`);
}
return invalidFeed.invalid;
},

// Flagged feed related functions
async setFlaggedFeed(id) {
const { error } = await supabase.from('feeds').update({ flagged: true }).eq('id', id);

if (error) {
logger.error({ error });
throw Error(error.message, `can't flag feed ${id} in supabase`);
}
},
async unsetFlaggedFeed(id) {
const { error } = await supabase.from('feeds').update({ flagged: false }).eq('id', id);

if (error) {
logger.error({ error });
throw Error(error.message, `can't unflag feed ${id} in supabase`);
}
},
async getFlaggedFeeds() {
const { data: flaggedFeeds, error } = await supabase.from('feeds').select().eq('flagged', true);

if (error) {
logger.error({ error });
throw Error(error.message, `can't flagged feeds from supabase`);
}
return flaggedFeeds.map((feed) => feed.id);
},
async isFlagged(id) {
const { data: flaggedFeed, error } = await supabase
.from('feeds')
.select('flagged')
.eq('id', id)
.limit(1);

if (error) {
logger.error({ error });
throw Error(error.message, `can't fetch feed ${id} from supabase`);
}
return flaggedFeed.flagged;
},
async addFeeds(feeds) {
const { data, error } = await supabase.from('feeds').insert(
feeds.map((feed) => ({
url: feed.url,
id: hash(normalizeUrl(feed.url)),
wiki_author_name: feed.author,
invalid: false,
flagged: false,
type: 'blog',
html_url: null,
user_id: null,
}))
);
if (error) {
logger.error({ error });
throw Error(error.message, "can't insert feeds to supabase");
}
},
};
Loading

0 comments on commit dd67daf

Please sign in to comment.