From 5a0f44178aabe277981c547a1d24d8d2ee0030c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=C3=A9cio=20Santos?= Date: Fri, 12 Apr 2019 18:41:47 -0400 Subject: [PATCH] Minimum delay between based on download finished time - Modified PolitenessScheduler to compute the delay between same-domain requests based on the time when the download finished - Refactoring of FethcedResultHandler to simply notify the LinkStorage that the download finished and to delegate data processing to other handlers for the appropriate link type - Fixed unit and integration tests to consider the new scheduler workflow that requires notification when the download was finished Fixes #179. --- CHANGELOG.md | 6 + .../crawler/async/AsyncCrawler.java | 24 +- .../crawler/async/AsyncCrawlerConfig.java | 3 +- .../crawler/async/FetchedResultHandler.java | 83 +++---- .../crawler/async/ForwardLinkHandler.java | 57 +++++ .../crawler/async/RobotsTxtHandler.java | 46 ++-- .../crawler/async/SitemapXmlHandler.java | 23 +- .../java/focusedCrawler/link/LinkStorage.java | 43 +--- .../link/PolitenessScheduler.java | 208 +++++++++++------- .../link/frontier/CrawlScheduler.java | 10 +- .../link/frontier/Frontier.java | 36 ++- .../link/frontier/FrontierManager.java | 27 ++- .../focusedCrawler/tools/ReplayCrawl.java | 34 ++- .../focusedCrawler/util/StorageException.java | 12 - .../integration/HardFocusCrawlingTest.java | 12 +- .../integration/RobotsAndSitemapTest.java | 22 +- .../link/PolitenessSchedulerTest.java | 163 +++++++++++--- .../link/frontier/FrontierManagerTest.java | 36 ++- .../link/frontier/FrontierTest.java | 32 +-- 19 files changed, 539 insertions(+), 338 deletions(-) create mode 100644 src/main/java/focusedCrawler/crawler/async/ForwardLinkHandler.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e16c697bd..e13c2b05b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,12 @@ - Upgrade `commons-validator` library to version 1.6 - Upgrade `okhttp3` library to version 3.14.0 - Fix issue #177: Links from recent TLDs are considered invalid +- Minimum delay between request now considers the time when the dowload is + actually finihsed, not the time when the URL was initially scheduled to be + downloaded (which disregards other processing times between scheduling and actual download) +- Refactoring of FethcedResultHandler to simply notify the LinkStorage that + the download finished and to delegate data processing to other handlers + for the appropriate link type ## Version 0.11.0 diff --git a/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java b/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java index ed4db422b..aaacfbe8b 100644 --- a/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java +++ b/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java @@ -2,7 +2,6 @@ import java.util.HashMap; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,7 +9,6 @@ import com.google.common.util.concurrent.AbstractExecutionThreadService; import focusedCrawler.config.Configuration; -import focusedCrawler.crawler.async.HttpDownloader.Callback; import focusedCrawler.crawler.cookies.Cookie; import focusedCrawler.crawler.cookies.CookieUtils; import focusedCrawler.link.LinkStorage; @@ -26,9 +24,9 @@ public class AsyncCrawler extends AbstractExecutionThreadService { private final TargetStorage targetStorage; private final LinkStorage linkStorage; private final HttpDownloader downloader; - private final Map handlers = new HashMap<>(); - private MetricsManager metricsManager; - private Configuration config; + private final FetchedResultHandler fetchedResultHandler; + private final MetricsManager metricsManager; + private final Configuration config; public AsyncCrawler(String crawlerId, TargetStorage targetStorage, LinkStorage linkStorage, Configuration config, String dataPath, MetricsManager metricsManager) { @@ -41,10 +39,9 @@ public AsyncCrawler(String crawlerId, TargetStorage targetStorage, LinkStorage l HttpDownloaderConfig downloaderConfig = config.getCrawlerConfig().getDownloaderConfig(); this.downloader = new HttpDownloader(downloaderConfig, dataPath, metricsManager); - this.handlers.put(LinkRelevance.Type.FORWARD, new FetchedResultHandler(crawlerId, targetStorage)); - this.handlers.put(LinkRelevance.Type.SITEMAP, new SitemapXmlHandler(linkStorage)); - this.handlers.put(LinkRelevance.Type.ROBOTS, new RobotsTxtHandler(linkStorage, - downloaderConfig.getUserAgentName())); + String userAgentName = downloaderConfig.getUserAgentName(); + this.fetchedResultHandler = new FetchedResultHandler(crawlerId, targetStorage, linkStorage, + userAgentName); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { @@ -58,14 +55,9 @@ public void run() { protected void run() { while (isRunning()) { try { - LinkRelevance link = (LinkRelevance) linkStorage.select(null); + LinkRelevance link = linkStorage.select(); if (link != null) { - Callback handler = handlers.get(link.getType()); - if (handler == null) { - logger.error("No registered handler for link type: " + link.getType()); - continue; - } - downloader.dipatchDownload(link, handler); + downloader.dipatchDownload(link, fetchedResultHandler); } } catch (DataNotFoundException e) { // There are no more links available in the frontier right now diff --git a/src/main/java/focusedCrawler/crawler/async/AsyncCrawlerConfig.java b/src/main/java/focusedCrawler/crawler/async/AsyncCrawlerConfig.java index 277ab37d1..7640d5c40 100644 --- a/src/main/java/focusedCrawler/crawler/async/AsyncCrawlerConfig.java +++ b/src/main/java/focusedCrawler/crawler/async/AsyncCrawlerConfig.java @@ -3,7 +3,6 @@ import java.io.IOException; import com.fasterxml.jackson.annotation.JsonUnwrapped; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,7 +15,7 @@ public AsyncCrawlerConfig() { // Required for de-serialization } - public AsyncCrawlerConfig(JsonNode config, ObjectMapper objectMapper) throws JsonProcessingException, IOException { + public AsyncCrawlerConfig(JsonNode config, ObjectMapper objectMapper) throws IOException { objectMapper.readerForUpdating(this).readValue(config); } diff --git a/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java b/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java index 6a2c58b14..ab576e5f2 100644 --- a/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java +++ b/src/main/java/focusedCrawler/crawler/async/FetchedResultHandler.java @@ -1,64 +1,65 @@ package focusedCrawler.crawler.async; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import focusedCrawler.crawler.async.HttpDownloader.Callback; import focusedCrawler.crawler.crawlercommons.fetcher.AbortedFetchException; import focusedCrawler.crawler.crawlercommons.fetcher.FetchedResult; +import focusedCrawler.link.LinkStorage; import focusedCrawler.link.frontier.LinkRelevance; import focusedCrawler.target.TargetStorage; -import focusedCrawler.target.model.Page; -import focusedCrawler.target.model.ParsedData; -import focusedCrawler.util.parser.PaginaURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FetchedResultHandler implements HttpDownloader.Callback { - + private static final Logger logger = LoggerFactory.getLogger(FetchedResultHandler.class); - - private String crawlerId; - private TargetStorage targetStorage; - public FetchedResultHandler(String crawlerId, TargetStorage targetStorage) { - this.crawlerId = crawlerId; - this.targetStorage = targetStorage; + private final SitemapXmlHandler sitemapXmlHandler; + private final ForwardLinkHandler forwardLinkHandler; + private final RobotsTxtHandler robotsTxtHandler; + private LinkStorage linkStorage; + + public FetchedResultHandler(String crawlerId, TargetStorage targetStorage, + LinkStorage linkStorage, String userAgentName) { + this.linkStorage = linkStorage; + this.forwardLinkHandler = new ForwardLinkHandler(crawlerId, targetStorage); + this.sitemapXmlHandler = new SitemapXmlHandler(linkStorage); + this.robotsTxtHandler = new RobotsTxtHandler(linkStorage, userAgentName); } - + @Override public void completed(LinkRelevance link, FetchedResult response) { - - int statusCode = response.getStatusCode(); - if(statusCode >= 200 && statusCode < 300) { - processData(link, response); - } - //else { - // TODO: Update metadata about page visits in link storage - //} + linkStorage.notifyDownloadFinished(link); + Callback handler = getDownloadHandler(link); + handler.completed(link, response); } - + @Override public void failed(LinkRelevance link, Exception e) { - if(e instanceof AbortedFetchException) { + linkStorage.notifyDownloadFinished(link); + if (e instanceof AbortedFetchException) { AbortedFetchException afe = (AbortedFetchException) e; - logger.info("Download aborted: \n>URL: {}\n>Reason: {}", link.getURL().toString(), afe.getAbortReason()); + logger.info("Download aborted: \n>URL: {}\n>Reason: {}", link.getURL().toString(), + afe.getAbortReason()); } else { - logger.info("Failed to download URL: {}\n>Reason: {}", link.getURL().toString(), e.getMessage()); + logger.info("Failed to download URL: {}\n>Reason: {}", link.getURL().toString(), + e.getMessage()); } + Callback handler = getDownloadHandler(link); + handler.failed(link, e); } - - private void processData(LinkRelevance link, FetchedResult response) { - try { - Page page = new Page(response); - page.setLinkRelevance(link); - page.setCrawlerId(crawlerId); - if (page.isHtml()) { - PaginaURL pageParser = new PaginaURL(page); - page.setParsedData(new ParsedData(pageParser)); - } - targetStorage.insert(page); - - } catch (Exception e) { - logger.error("Problem while processing data.", e); + + private Callback getDownloadHandler(LinkRelevance link) { + switch (link.getType()) { + case FORWARD: + return forwardLinkHandler; + case ROBOTS: + return robotsTxtHandler; + case SITEMAP: + return sitemapXmlHandler; + default: + // There should be a handler for each link type, so this shouldn't happen + throw new IllegalStateException("No handler for link type: " + link.getType()); } } -} +} \ No newline at end of file diff --git a/src/main/java/focusedCrawler/crawler/async/ForwardLinkHandler.java b/src/main/java/focusedCrawler/crawler/async/ForwardLinkHandler.java new file mode 100644 index 000000000..adbdf11da --- /dev/null +++ b/src/main/java/focusedCrawler/crawler/async/ForwardLinkHandler.java @@ -0,0 +1,57 @@ +package focusedCrawler.crawler.async; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import focusedCrawler.crawler.crawlercommons.fetcher.FetchedResult; +import focusedCrawler.link.frontier.LinkRelevance; +import focusedCrawler.target.TargetStorage; +import focusedCrawler.target.model.Page; +import focusedCrawler.target.model.ParsedData; +import focusedCrawler.util.parser.PaginaURL; + +public class ForwardLinkHandler implements HttpDownloader.Callback { + + private static final Logger logger = LoggerFactory.getLogger(ForwardLinkHandler.class); + + private String crawlerId; + private TargetStorage targetStorage; + + public ForwardLinkHandler(String crawlerId, TargetStorage targetStorage) { + this.crawlerId = crawlerId; + this.targetStorage = targetStorage; + } + + @Override + public void completed(LinkRelevance link, FetchedResult response) { + + int statusCode = response.getStatusCode(); + if (statusCode >= 200 && statusCode < 300) { + processPage(link, response); + } + //else { + // TODO: Update metadata about page visits in link storage + //} + } + + @Override + public void failed(LinkRelevance link, Exception e) { + } + + private void processPage(LinkRelevance link, FetchedResult response) { + try { + Page page = new Page(response); + page.setLinkRelevance(link); + page.setCrawlerId(crawlerId); + if (page.isHtml()) { + PaginaURL pageParser = new PaginaURL(page); + page.setParsedData(new ParsedData(pageParser)); + } + targetStorage.insert(page); + + } catch (Exception e) { + logger.error("Problem while processing data.", e); + } + } + +} diff --git a/src/main/java/focusedCrawler/crawler/async/RobotsTxtHandler.java b/src/main/java/focusedCrawler/crawler/async/RobotsTxtHandler.java index efd6be5b7..ed6687896 100644 --- a/src/main/java/focusedCrawler/crawler/async/RobotsTxtHandler.java +++ b/src/main/java/focusedCrawler/crawler/async/RobotsTxtHandler.java @@ -8,7 +8,6 @@ import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRulesParser; -import focusedCrawler.crawler.crawlercommons.fetcher.AbortedFetchException; import focusedCrawler.crawler.crawlercommons.fetcher.FetchedResult; import focusedCrawler.link.LinkStorage; import focusedCrawler.link.frontier.LinkRelevance; @@ -26,70 +25,61 @@ public RobotsData(LinkRelevance link, SimpleRobotRules robotRules) { this.robotRules = robotRules; } } - + private static final Logger logger = LoggerFactory.getLogger(RobotsTxtHandler.class); - + private SimpleRobotRulesParser parser = new SimpleRobotRulesParser(); private LinkStorage linkStorage; private String userAgentName; - + public RobotsTxtHandler(LinkStorage linkStorage, String userAgentName) { this.linkStorage = linkStorage; this.userAgentName = userAgentName; } - + @Override public void completed(LinkRelevance link, FetchedResult response) { int statusCode = response.getStatusCode(); - if(statusCode >= 200 && statusCode < 300) { - logger.info("Successfully downloaded URL=["+response.getBaseUrl()+"] HTTP-Response-Code="+statusCode); + if (statusCode >= 200 && statusCode < 300) { + // HTTP 2xx means the request was successful processRobot(link, response, false); } else { - logger.info("Server returned bad code for URL=["+response.getBaseUrl()+"] HTTP-Response-Code="+statusCode); processRobot(link, response, true); } } - + @Override public void failed(LinkRelevance link, Exception e) { - if(e instanceof AbortedFetchException) { - AbortedFetchException afe = (AbortedFetchException) e; - logger.info("Download aborted: \n>URL: {}\n>Reason: {}", - link.getURL().toString(), afe.getAbortReason()); - } else { - logger.info("Failed to download URL: "+link.getURL().toString(), e.getMessage()); - } processRobot(link, null, true); } - + private void processRobot(LinkRelevance link, FetchedResult response, boolean fetchFailed) { - + SimpleRobotRules robotRules; - if(fetchFailed || response == null) { - robotRules = (SimpleRobotRules) parser.failedFetch(HttpStatus.SC_GONE); - } - else { + if (fetchFailed || response == null) { + robotRules = parser.failedFetch(HttpStatus.SC_GONE); + } else { String contentType = response.getContentType(); boolean isPlainText = (contentType != null) && (contentType.startsWith("text/plain")); if ((response.getNumRedirects() > 0) && !isPlainText) { - robotRules = (SimpleRobotRules) parser.failedFetch(HttpStatus.SC_GONE); + robotRules = parser.failedFetch(HttpStatus.SC_GONE); } else { - robotRules = (SimpleRobotRules) parser.parseContent( + robotRules = parser.parseContent( response.getFetchedUrl(), response.getContent(), response.getContentType(), - userAgentName + userAgentName ); } } - + try { RobotsData robotsData = new RobotsData(link, robotRules); linkStorage.insert(robotsData); } catch (Exception e) { logger.error("Failed to insert robots.txt data into link storage.", e); } - + } - + } diff --git a/src/main/java/focusedCrawler/crawler/async/SitemapXmlHandler.java b/src/main/java/focusedCrawler/crawler/async/SitemapXmlHandler.java index 685c3ce61..6dd776221 100644 --- a/src/main/java/focusedCrawler/crawler/async/SitemapXmlHandler.java +++ b/src/main/java/focusedCrawler/crawler/async/SitemapXmlHandler.java @@ -16,7 +16,6 @@ import crawlercommons.sitemaps.SiteMapParser; import crawlercommons.sitemaps.SiteMapURL; import crawlercommons.sitemaps.UnknownFormatException; -import focusedCrawler.crawler.crawlercommons.fetcher.AbortedFetchException; import focusedCrawler.crawler.crawlercommons.fetcher.FetchedResult; import focusedCrawler.link.LinkStorage; import focusedCrawler.link.frontier.LinkRelevance; @@ -43,8 +42,7 @@ public SitemapXmlHandler(LinkStorage linkStorage) { public void completed(LinkRelevance link, FetchedResult response) { int statusCode = response.getStatusCode(); if(statusCode >= 200 && statusCode < 300) { - logger.info("Successfully downloaded URL=["+response.getBaseUrl()+"] HTTP-Response-Code="+statusCode); - processData(link, response); + processSitemap(link, response); } else { logger.info("Server returned bad code for URL=["+response.getBaseUrl()+"] HTTP-Response-Code="+statusCode); } @@ -52,22 +50,15 @@ public void completed(LinkRelevance link, FetchedResult response) { @Override public void failed(LinkRelevance link, Exception e) { - if(e instanceof AbortedFetchException) { - AbortedFetchException afe = (AbortedFetchException) e; - logger.info("Download aborted: \n>URL: {}\n>Reason: {}", - link.getURL().toString(), afe.getAbortReason()); - } else { - logger.info("Failed to download URL: "+link.getURL().toString(), e.getMessage()); - } } - - private void processData(LinkRelevance link, FetchedResult response) { - + + private void processSitemap(LinkRelevance link, FetchedResult response) { + AbstractSiteMap sm; try { sm = parser.parseSiteMap(response.getContent(), new URL(response.getFetchedUrl())); } catch (UnknownFormatException | IOException e) { - logger.warn("Failed to download sitemap: "+link.getURL().toString(), e); + logger.warn("Failed to parse sitemap: " + link.getURL().toString(), e); return; } @@ -83,13 +74,13 @@ private void processData(LinkRelevance link, FetchedResult response) { sitemapData.links.add(smu.getUrl().toString()); } } - + try { linkStorage.insert(sitemapData); } catch (Exception e) { logger.error("Failed to insert sitemaps data into link storage.", e); } - + } } diff --git a/src/main/java/focusedCrawler/link/LinkStorage.java b/src/main/java/focusedCrawler/link/LinkStorage.java index dc3cc7678..3e1af55ce 100644 --- a/src/main/java/focusedCrawler/link/LinkStorage.java +++ b/src/main/java/focusedCrawler/link/LinkStorage.java @@ -19,7 +19,6 @@ import focusedCrawler.link.classifier.online.OnlineLearning; import focusedCrawler.link.frontier.FrontierManager; import focusedCrawler.link.frontier.FrontierManagerFactory; -import focusedCrawler.link.frontier.FrontierPersistentException; import focusedCrawler.link.frontier.LinkRelevance; import focusedCrawler.target.model.Page; import focusedCrawler.util.DataNotFoundException; @@ -65,26 +64,6 @@ public void close(){ logger.info("done."); } - /** - * This method inserts links from a given page into the frontier - * - * @param obj - * Object - page containing links - * @return Object - */ - public Object insert(Object obj) throws StorageException { - if(obj instanceof Page) { - return insert((Page) obj); - } - else if(obj instanceof RobotsTxtHandler.RobotsData) { - insert((RobotsTxtHandler.RobotsData) obj); - } - else if(obj instanceof SitemapXmlHandler.SitemapData) { - insert((SitemapXmlHandler.SitemapData) obj); - } - return null; - } - public void insert(RobotsTxtHandler.RobotsData robotsData) { if (disallowSitesInRobotsTxt) { this.insertRobotRules(robotsData.link, robotsData.robotRules); @@ -119,9 +98,11 @@ public void insert(SitemapXmlHandler.SitemapData sitemapData) { } logger.info("Added {} child sitemaps.", sitemapData.sitemaps.size()); } - - - public Object insert(Page page) throws StorageException { + + /** + * Inserts links from the given page into the frontier + */ + public void insert(Page page) throws StorageException { try { if (getBacklinks && page.isAuth()) { frontierManager.insertBacklinks(page); @@ -145,24 +126,19 @@ public Object insert(Page page) throws StorageException { logger.info("Failed to insert page into LinkStorage.", ex); throw new StorageException(ex.getMessage(), ex); } - return null; } /** * This method sends a link to crawler * @throws DataNotFoundException */ - public synchronized Object select(Object obj) throws StorageException, DataNotFoundException { - try { - return frontierManager.nextURL(true); - } catch (FrontierPersistentException e) { - throw new StorageException(e.getMessage(), e); - } + public synchronized LinkRelevance select() throws DataNotFoundException { + return frontierManager.nextURL(true); } public static LinkStorage create(String configPath, String seedFile, String dataPath, String modelPath, LinkStorageConfig config, MetricsManager metricsManager) - throws FrontierPersistentException, IOException { + throws IOException { Path stoplistPath = Paths.get(configPath, "/stoplist.txt"); StopList stoplist; @@ -231,4 +207,7 @@ public void addSeeds(List seeds) { frontierManager.addSeeds(seeds); } + public void notifyDownloadFinished(LinkRelevance link) { + frontierManager.notifyDownloadFinished(link); + } } diff --git a/src/main/java/focusedCrawler/link/PolitenessScheduler.java b/src/main/java/focusedCrawler/link/PolitenessScheduler.java index ad3eb54dc..5b157f354 100644 --- a/src/main/java/focusedCrawler/link/PolitenessScheduler.java +++ b/src/main/java/focusedCrawler/link/PolitenessScheduler.java @@ -13,47 +13,58 @@ import focusedCrawler.link.frontier.LinkRelevance; class DomainNode { - + public final String domainName; volatile public long lastAccessTime; - private PriorityQueue links; + private PriorityQueue pendingQueue; private Set urls = new HashSet<>(); - + private Set downloading = new HashSet<>(); + public DomainNode(String domainName, long lastAccessTime) { this.domainName = domainName; this.lastAccessTime = lastAccessTime; int initialCapacity = 50; - this.links = new PriorityQueue(initialCapacity, LinkRelevance.DESC_ABS_ORDER_COMPARATOR); + this.pendingQueue = new PriorityQueue<>(initialCapacity, + LinkRelevance.DESC_ABS_ORDER_COMPARATOR); } - + public boolean isEmpty() { - return links.isEmpty(); + return pendingQueue.isEmpty(); } - + public boolean add(LinkRelevance link) { - if(!urls.contains(link.getURL().toString())) { - links.add(link); + if (!urls.contains(link.getURL().toString())) { + pendingQueue.add(link); urls.add(link.getURL().toString()); return true; } return false; } - - public LinkRelevance removeFirst() { - LinkRelevance link = links.poll(); - urls.remove(link.getURL().toString()); + + public LinkRelevance selectFirst() { + LinkRelevance link = pendingQueue.poll(); + downloading.add(link.getURL().toString()); return link; } - public int size() { - return links.size(); + public int pendingSize() { + return pendingQueue.size(); } - public void clear() { - links.clear(); + public void clearPendingQueue() { + pendingQueue.clear(); urls.clear(); } - + + public boolean contains(LinkRelevance link) { + return urls.contains(link.getURL().toString()); + } + + public void removeDownloading(LinkRelevance link) { + String url = link.getURL().toString(); + urls.remove(url); + downloading.remove(url); + } } /** @@ -61,20 +72,26 @@ public void clear() { * is never selected twice within a given minimum access time interval. That means that the natural * order (based on link relevance) is modified so that links are selected based on last time that * the host was last accessed in order to respect the minimum access time limit. - * - * @author aeciosantos * + * @author aeciosantos */ public class PolitenessScheduler { + /** + * Minimum links per domain to allow new links to be loaded in the scheduler. Ideally, the + * scheduler should always have a reasonable number of links loaded per domain so that it can + * always return a link to the downloader when requested. + */ private static final int MIN_LINKS_PER_DOMAIN_TO_ALLOW_LOAD = 2000; - private final PriorityQueue domainsQueue; + private final PriorityQueue pendingQueue; + private final Map downloadingQueue; private final PriorityQueue emptyDomainsQueue; + private final Map domains; private final long minimumAccessTime; private final int maxLinksInScheduler; - + private AtomicInteger numberOfLinks = new AtomicInteger(0); public PolitenessScheduler(int minimumAccessTimeInterval, int maxLinksInScheduler) { @@ -82,58 +99,59 @@ public PolitenessScheduler(int minimumAccessTimeInterval, int maxLinksInSchedule this.maxLinksInScheduler = maxLinksInScheduler; this.domains = new HashMap<>(); this.emptyDomainsQueue = createDomainPriorityQueue(); - this.domainsQueue = createDomainPriorityQueue(); + this.pendingQueue = createDomainPriorityQueue(); + this.downloadingQueue = new HashMap<>(); } private PriorityQueue createDomainPriorityQueue() { int initialCapacity = 10; - return new PriorityQueue(initialCapacity, new Comparator() { + return new PriorityQueue<>(initialCapacity, new Comparator() { @Override public int compare(DomainNode o1, DomainNode o2) { return Long.compare(o1.lastAccessTime, o2.lastAccessTime); } }); } - + public boolean addLink(LinkRelevance link) { removeExpiredNodes(); - - if(numberOfLinks() >= maxLinksInScheduler) { + + if (numberOfLinks() >= maxLinksInScheduler) { return false; // ignore link } - + String domainName = link.getTopLevelDomainName(); - - synchronized(this) { + + synchronized (this) { DomainNode domainNode = domains.get(domainName); - if(domainNode == null) { + if (domainNode == null) { domainNode = new DomainNode(domainName, 0l); domains.put(domainName, domainNode); } - - if(domainNode.isEmpty()) { + + if (domainNode.isEmpty()) { emptyDomainsQueue.remove(domainNode); - domainsQueue.add(domainNode); + pendingQueue.add(domainNode); } - - if(domainNode.add(link)) { + + if (domainNode.add(link)) { numberOfLinks.incrementAndGet(); } } - + return true; } private synchronized void removeExpiredNodes() { - while(true) { + while (true) { DomainNode node = emptyDomainsQueue.peek(); - if(node == null) { + if (node == null) { break; } - + long expirationTime = node.lastAccessTime + minimumAccessTime; - if(System.currentTimeMillis() > expirationTime) { + if (System.currentTimeMillis() > expirationTime) { emptyDomainsQueue.poll(); domains.remove(node.domainName); } else { @@ -144,10 +162,10 @@ private synchronized void removeExpiredNodes() { public LinkRelevance nextLink() { LinkRelevance linkRelevance; - + synchronized (this) { - DomainNode domainNode = domainsQueue.peek(); + DomainNode domainNode = pendingQueue.peek(); if (domainNode == null) { // no domains available to be crawled return null; @@ -159,32 +177,48 @@ public LinkRelevance nextLink() { // the domain with longest access time cannot be crawled right now return null; } - - domainsQueue.poll(); - linkRelevance = domainNode.removeFirst(); + + pendingQueue.poll(); + linkRelevance = domainNode.selectFirst(); + downloadingQueue.put(domainNode.domainName, domainNode); + } + + return linkRelevance; + } + + /** + * Records the time when a link from the domain was last downloaded and returns the domain to + * the the queue of pending domains (so that the next link can be downloaded) or to empty + * domains list. + */ + public void notifyDownloadFinished(LinkRelevance link) { + String domainName = link.getTopLevelDomainName(); + synchronized (this) { + DomainNode domainNode = downloadingQueue.get(domainName); + if (domainNode == null) { + return; + } domainNode.lastAccessTime = System.currentTimeMillis(); + domainNode.removeDownloading(link); if (domainNode.isEmpty()) { emptyDomainsQueue.add(domainNode); } else { - domainsQueue.add(domainNode); + pendingQueue.add(domainNode); } - + downloadingQueue.remove(domainName); + numberOfLinks.decrementAndGet(); } - - numberOfLinks.decrementAndGet(); - - return linkRelevance; } - + public int numberOfNonExpiredDomains() { removeExpiredNodes(); return domains.size(); } - + public int numberOfAvailableDomains() { int available = 0; - for(DomainNode node : domainsQueue) { - if(isAvailable(node)){ + for (DomainNode node : pendingQueue) { + if (isAvailable(node)) { available++; } } @@ -203,56 +237,80 @@ public boolean hasPendingLinks() { return numberOfLinks() > 0; } - public boolean hasLinksAvailable() { - // pick domain with longest access time - DomainNode domainNode = domainsQueue.peek(); - if(domainNode == null) { + /** + * Checks whether a link from the given domain can be downloaded now respecting the minimum + * delay between requests. + */ + private boolean isAvailable(DomainNode domainNode) { + if (downloadingQueue.containsKey(domainNode.domainName)) { return false; } - return isAvailable(domainNode); - } - - private boolean isAvailable(DomainNode domainNode) { long now = System.currentTimeMillis(); long timeSinceLastAccess = now - domainNode.lastAccessTime; - if(timeSinceLastAccess < minimumAccessTime) { + if (timeSinceLastAccess < minimumAccessTime) { return false; } return true; } - public synchronized void clear() { + public boolean hasLinksAvailable() { + // pick domain with longest access time + DomainNode domainNode = pendingQueue.peek(); + if (domainNode == null) { + return false; + } + return isAvailable(domainNode); + } + + public synchronized void clearPendingQueue() { Iterator> it = domains.entrySet().iterator(); - while(it.hasNext()) { + while (it.hasNext()) { DomainNode node = it.next().getValue(); - numberOfLinks.addAndGet(-node.size()); // adds negative value - node.clear(); + numberOfLinks.addAndGet(-node.pendingSize()); // adds negative value + node.clearPendingQueue(); } - while(true) { - DomainNode node = domainsQueue.poll(); - if(node == null) { + while (true) { + DomainNode node = pendingQueue.poll(); + if (node == null) { break; } emptyDomainsQueue.add(node); } } + /** + * Checks whether the given link can be downloaded now according to the politeness constraints. + * If the domain is not stored in the scheduler, it is because it was never downloaded or the + * last time it was dowloaded was long time ago and thus it already expired. If the domain is + * stored, than we check whether enough time has passed since the last time a link from the + * domain was downloaded. + */ public boolean canDownloadNow(LinkRelevance link) { DomainNode domain = domains.get(link.getTopLevelDomainName()); - if(domain == null) { + if (domain == null) { return true; } else { return isAvailable(domain); } } + /** + * Checks whether the given link can be inserted now. A link can be inserted if it is not + * already present in the scheduler or if the number of links from this domain is bellow a given + * minimum threshold. The goal is to always keep a significant number of links in the scheduler, + * so that there is always a link available to return to the downloader when requested. + */ public boolean canInsertNow(LinkRelevance link) { - DomainNode domain = domains.get(link.getTopLevelDomainName()); + String domainName = link.getTopLevelDomainName(); + DomainNode domain = domains.get(domainName); if (domain == null) { return true; } else { - return domain.size() < MIN_LINKS_PER_DOMAIN_TO_ALLOW_LOAD; + return domain.pendingSize() < MIN_LINKS_PER_DOMAIN_TO_ALLOW_LOAD && !domain.contains(link); } } + public int numberOfDownloadingLinks() { + return downloadingQueue.size(); + } } diff --git a/src/main/java/focusedCrawler/link/frontier/CrawlScheduler.java b/src/main/java/focusedCrawler/link/frontier/CrawlScheduler.java index 432a80d90..bd95230b4 100644 --- a/src/main/java/focusedCrawler/link/frontier/CrawlScheduler.java +++ b/src/main/java/focusedCrawler/link/frontier/CrawlScheduler.java @@ -81,8 +81,8 @@ private void setupMetrics() { } /** - * This method loads links from the frontier (stored in disk) to the scheduler. The is scheduler - * is a in-memory data structure that prioritizes links base on score and also politeness + * This method loads links from the frontier (stored in disk) to the scheduler. The scheduler + * is a in-memory data structure that prioritizes links based on score and also politeness * constraints. Which links are selected to be inserted in the frontier is determined the policy * implemented by the LinkSelector configured. * @@ -119,7 +119,7 @@ private synchronized void loadQueue(int numberOfLinks) { uncrawledLinks++; - // check whether link can be download now according to politeness constraints + // check whether link can be downloaded now according to politeness constraints if (scheduler.canInsertNow(link)) { // consider link to be downloaded linkSelector.evaluateLink(link); @@ -127,6 +127,7 @@ private synchronized void loadQueue(int numberOfLinks) { } else { rejectedLinks++; } + } catch (Exception e) { // just log the exception and continue the load even when some link fails logger.error("Failed to load link in frontier.", e); @@ -221,4 +222,7 @@ public void reload() { loadQueue(linksToLoad); } + public void notifyDownloadFinished(LinkRelevance link) { + scheduler.notifyDownloadFinished(link); + } } diff --git a/src/main/java/focusedCrawler/link/frontier/Frontier.java b/src/main/java/focusedCrawler/link/frontier/Frontier.java index 2947deecd..1691d9a30 100644 --- a/src/main/java/focusedCrawler/link/frontier/Frontier.java +++ b/src/main/java/focusedCrawler/link/frontier/Frontier.java @@ -118,19 +118,17 @@ public void update(LinkRelevance linkRelevance) { /** * This method inserts a new link into the frontier * - * @param linkRelev + * @param link * @return - * @throws FrontierPersistentException */ - public boolean insert(LinkRelevance linkRelev) throws FrontierPersistentException { - if (linkRelev == null) { + public boolean insert(LinkRelevance link) { + if (link == null) { return false; } boolean inserted = false; - String url = linkRelev.getURL().toString(); - Double relevance = exist(linkRelev); - if (relevance == null) { - urlRelevance.put(url, linkRelev); + String url = link.getURL().toString(); + if (!exists(link)) { + urlRelevance.put(url, link); inserted = true; } @@ -138,19 +136,16 @@ public boolean insert(LinkRelevance linkRelev) throws FrontierPersistentExceptio } /** - * It verifies whether a given URL was already visited or does not belong to - * the scope. + * It verifies whether a given URL was already inserted in the frontier. * - * @param linkRelev - * @return - * @throws FrontierPersistentException + * @param link + * @return true if URL already exists in frontier, false otherwise */ - public Double exist(LinkRelevance linkRelev) throws FrontierPersistentException { - LinkRelevance link = urlRelevance.get(linkRelev.getURL().toString()); - return link == null ? null : link.getRelevance(); + public boolean exists(LinkRelevance link) { + return urlRelevance.get(link.getURL().toString()) == null ? false : true; } - public LinkRelevance get(String url) throws FrontierPersistentException { + public LinkRelevance get(String url) { return urlRelevance.get(url); } @@ -160,10 +155,10 @@ public LinkRelevance get(String url) throws FrontierPersistentException { * @param linkRelevance * @throws FrontierPersistentException */ - public void delete(LinkRelevance linkRelevance) throws FrontierPersistentException { + public void markAsDownloaded(LinkRelevance linkRelevance) { String url = linkRelevance.getURL().toString(); - if (exist(linkRelevance) != null) { + if (exists(linkRelevance)) { // we don't want to delete the URL file, it is useful to avoid visiting an old url double relevance = linkRelevance.getRelevance(); double negativeRelevance = relevance > 0 ? -1*relevance : relevance; @@ -202,4 +197,7 @@ public boolean isDisallowedByRobots(LinkRelevance link) { return rules != null && !rules.isAllowed(link.getURL().toString()); } + public boolean isDownloaded(String url) { + return get(url).getRelevance() < 0; + } } diff --git a/src/main/java/focusedCrawler/link/frontier/FrontierManager.java b/src/main/java/focusedCrawler/link/frontier/FrontierManager.java index 84fd67493..2ecb599a6 100644 --- a/src/main/java/focusedCrawler/link/frontier/FrontierManager.java +++ b/src/main/java/focusedCrawler/link/frontier/FrontierManager.java @@ -88,7 +88,7 @@ public FrontierManager(Frontier frontier, String dataPath, String modelPath, this.downloadRobots = getDownloadRobots(); this.linksToLoad = config.getSchedulerMaxLinks(); this.maxPagesPerDomain = config.getMaxPagesPerDomain(); - this.domainCounter = new HashMap(); + this.domainCounter = new HashMap<>(); this.scheduler = new CrawlScheduler(linkSelector, recrawlSelector, frontier, metricsManager, config.getSchedulerHostMinAccessInterval(), linksToLoad); this.graphRepository = new BipartiteGraphRepository(dataPath, config.getPersistentHashtableBackend()); @@ -128,7 +128,7 @@ public void forceReload() { scheduler.reload(); } - public boolean isRelevant(LinkRelevance link) throws FrontierPersistentException { + public boolean isRelevant(LinkRelevance link) { if (link.getRelevance() <= 0) { return false; } @@ -141,12 +141,11 @@ public boolean isRelevant(LinkRelevance link) throws FrontierPersistentException return false; } - Double value = frontier.exist(link); - if (value != null) { + if (!linkFilter.accept(link)) { return false; } - if (!linkFilter.accept(link)) { + if (frontier.exists(link)) { return false; } @@ -160,7 +159,7 @@ public void insert(LinkRelevance[] linkRelevance) throws FrontierPersistentExcep } } - public boolean insert(LinkRelevance linkRelevance) throws FrontierPersistentException { + public boolean insert(LinkRelevance linkRelevance) { Context timerContext = insertTimer.time(); try { if (linkRelevance == null) { @@ -175,7 +174,7 @@ public boolean insert(LinkRelevance linkRelevance) throws FrontierPersistentExce hostsManager.insert(hostName); try { URL robotsUrl = new URL(url.getProtocol(), url.getHost(), url.getPort(), "/robots.txt"); - LinkRelevance sitemap = LinkRelevance.createRobots(robotsUrl.toString(), 299); + LinkRelevance sitemap = LinkRelevance.createRobots(robotsUrl.toString(), 300); frontier.insert(sitemap); } catch (Exception e) { logger.warn("Failed to insert robots.txt for host: " + hostName, e); @@ -191,11 +190,16 @@ public boolean insert(LinkRelevance linkRelevance) throws FrontierPersistentExce } } - public LinkRelevance nextURL() throws FrontierPersistentException, DataNotFoundException { + public void notifyDownloadFinished(LinkRelevance link) { + scheduler.notifyDownloadFinished(link); + frontier.markAsDownloaded(link); + } + + public LinkRelevance nextURL() throws DataNotFoundException { return nextURL(false); } - public LinkRelevance nextURL(boolean asyncLoad) throws FrontierPersistentException, DataNotFoundException { + public LinkRelevance nextURL(boolean asyncLoad) throws DataNotFoundException { Context timerContext = selectTimer.time(); try { LinkRelevance link = scheduler.nextLink(asyncLoad); @@ -206,7 +210,6 @@ public LinkRelevance nextURL(boolean asyncLoad) throws FrontierPersistentExcepti throw new DataNotFoundException(true, "Frontier run out of links."); } } - frontier.delete(link); schedulerLog.printf("%d\t%.5f\t%s\n", System.currentTimeMillis(), link.getRelevance(), link.getURL().toString()); @@ -248,7 +251,7 @@ public void addSeeds(List seeds) { logger.info("Added seed URL: {}", seed); count++; } - } catch (FrontierPersistentException e) { + } catch (Exception e) { throw new RuntimeException("Failed to insert seed URL: " + seed, e); } } @@ -273,7 +276,7 @@ public void addSeedScope(LinkRelevance link) { } public void insertOutlinks(Page page) - throws IOException, FrontierPersistentException, LinkClassifierException { + throws FrontierPersistentException, LinkClassifierException { LinkRelevance[] linksRelevance = outlinkClassifier.classify(page); diff --git a/src/main/java/focusedCrawler/tools/ReplayCrawl.java b/src/main/java/focusedCrawler/tools/ReplayCrawl.java index b7491f500..addc82896 100644 --- a/src/main/java/focusedCrawler/tools/ReplayCrawl.java +++ b/src/main/java/focusedCrawler/tools/ReplayCrawl.java @@ -1,7 +1,7 @@ package focusedCrawler.tools; +import focusedCrawler.crawler.async.FetchedResultHandler; import java.nio.file.Paths; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -11,11 +11,7 @@ import org.slf4j.LoggerFactory; import focusedCrawler.config.Configuration; -import focusedCrawler.crawler.async.FetchedResultHandler; -import focusedCrawler.crawler.async.HttpDownloader; import focusedCrawler.crawler.async.HttpDownloaderConfig; -import focusedCrawler.crawler.async.RobotsTxtHandler; -import focusedCrawler.crawler.async.SitemapXmlHandler; import focusedCrawler.crawler.crawlercommons.fetcher.FetchedResult; import focusedCrawler.link.LinkStorage; import focusedCrawler.link.frontier.Frontier; @@ -130,14 +126,9 @@ public void replay(String crawlerId, TargetStorage targetStorage, LinkStorage li LinkFilter inputLinkFilter) throws Exception { - Map handlers = new HashMap<>(); - HttpDownloaderConfig downloaderConfig = config.getCrawlerConfig().getDownloaderConfig(); - handlers.put(LinkRelevance.Type.FORWARD, - new FetchedResultHandler(crawlerId, targetStorage)); - handlers.put(LinkRelevance.Type.SITEMAP, new SitemapXmlHandler(linkStorage)); - handlers.put(LinkRelevance.Type.ROBOTS, new RobotsTxtHandler(linkStorage, - downloaderConfig.getUserAgentName())); + FetchedResultHandler handler = new FetchedResultHandler(crawlerId, targetStorage, + linkStorage, downloaderConfig.getUserAgentName()); int processedPages = 0; int ignoredPages = 0; @@ -180,14 +171,8 @@ public void replay(String crawlerId, TargetStorage targetStorage, LinkStorage li // negative value after the page is crawled) lr = new LinkRelevance(lr.getURL(), Math.abs(lr.getRelevance()), lr.getType()); - String finalUrl = page.getFinalUrl(); - Metadata responseHeaders = createHeadersMetadata(page); - FetchedResult result = new FetchedResult(requestedUrl, - finalUrl, page.getFetchTime(), responseHeaders, - page.getContent(), page.getContentType(), 0, null, - page.getFinalUrl(), 0, "", 200, "OK"); - - handlers.get(lr.getType()).completed(lr, result); + FetchedResult result = createFetchedResult(page); + handler.completed(lr, result); } catch (Exception e) { logger.error("An unexpected error happened.", e); @@ -204,6 +189,15 @@ public void replay(String crawlerId, TargetStorage targetStorage, LinkStorage li } } + private FetchedResult createFetchedResult(Page page) { + String finalUrl = page.getFinalUrl(); + Metadata responseHeaders = createHeadersMetadata(page); + return new FetchedResult(page.getRequestedUrl(), + finalUrl, page.getFetchTime(), responseHeaders, + page.getContent(), page.getContentType(), 0, null, + page.getFinalUrl(), 0, "", 200, "OK"); + } + private Metadata createHeadersMetadata(Page page) { Map> headers = page.getResponseHeaders(); Metadata metadata = new Metadata(); diff --git a/src/main/java/focusedCrawler/util/StorageException.java b/src/main/java/focusedCrawler/util/StorageException.java index 692ede5fa..794ad248c 100644 --- a/src/main/java/focusedCrawler/util/StorageException.java +++ b/src/main/java/focusedCrawler/util/StorageException.java @@ -3,18 +3,6 @@ @SuppressWarnings("serial") public class StorageException extends Exception { - public StorageException() { - super(); - } - - public StorageException(String message) { - super(message); - } - - public StorageException(Throwable detail) { - super(detail); - } - public StorageException(String message, Throwable detail) { super(message, detail); } diff --git a/src/test/java/focusedCrawler/integration/HardFocusCrawlingTest.java b/src/test/java/focusedCrawler/integration/HardFocusCrawlingTest.java index 4ee6fc33c..339b57ff1 100644 --- a/src/test/java/focusedCrawler/integration/HardFocusCrawlingTest.java +++ b/src/test/java/focusedCrawler/integration/HardFocusCrawlingTest.java @@ -77,14 +77,14 @@ public void shouldDownloadLinksOnlyFromRelevantPages() throws Exception { "irrelevant_page2.html" ); - for (String url : shouldBeDownloaded) { - LinkRelevance link = LinkRelevance.create("http://127.0.0.1:1234/" + url); - assertThat("URL="+link.getURL().toString(), frontier.exist(link), is(lessThan(0d))); + for (String path : shouldBeDownloaded) { + String url = "http://127.0.0.1:1234/" + path; + assertThat("URL=" + url, frontier.get(url).getRelevance(), is(lessThan(0d))); } - for (String url : shouldNOTBeDownloaded) { - LinkRelevance link = LinkRelevance.create("http://127.0.0.1:1234/" + url); - assertThat("URL="+link.getURL().toString(), frontier.exist(link), is(nullValue())); + for (String path : shouldNOTBeDownloaded) { + String url = "http://127.0.0.1:1234/" + path; + assertThat("URL=" + url, frontier.get(url), is(nullValue())); } } diff --git a/src/test/java/focusedCrawler/integration/RobotsAndSitemapTest.java b/src/test/java/focusedCrawler/integration/RobotsAndSitemapTest.java index f8c1fb5ce..82f38d3cc 100644 --- a/src/test/java/focusedCrawler/integration/RobotsAndSitemapTest.java +++ b/src/test/java/focusedCrawler/integration/RobotsAndSitemapTest.java @@ -10,14 +10,10 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.file.Paths; import java.util.List; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRulesParser; import focusedCrawler.util.persistence.PersistentHashtable; @@ -91,13 +87,12 @@ public void shouldDownloadLinksListedOnSitemapsXml() throws Exception { for (String url : shouldNOTBeDownloaded) { LinkRelevance link = new LinkRelevance("http://127.0.0.1:1234/" + url, LinkRelevance.DEFAULT_RELEVANCE); - System.out.println(link); - assertThat("URL="+url, frontier.exist(link), is(nullValue())); + assertThat("URL="+url, frontier.exists(link), is(false)); } } @Test - public void test1ToNotToDownloadSitesDisallowedOnRobots() throws Exception { + public void shouldNotDownloadSitesDisallowedOnRobots() throws Exception { String outputPath = tempFolder.newFolder().toString(); @@ -135,7 +130,7 @@ public void test1ToNotToDownloadSitesDisallowedOnRobots() throws Exception { } @Test - public void test2ToNotToDownloadSitesDisallowedOnRobotsWithSitemapsFalse() throws Exception { + public void shouldNotToDownloadSitesDisallowedOnRobotsWithSitemapsFalse() throws Exception { String outputPath = tempFolder.newFolder().toString(); @@ -194,14 +189,13 @@ public void testKryoSerializationAndDeserialization() throws IOException { assertTrue(rules.isAllowed("http://www.domain.com/anypage.html")); } - private void assertWasCrawled(String url, Frontier frontier) throws Exception { - LinkRelevance link = LinkRelevance.create("http://127.0.0.1:1234/" + url); - assertThat("URL=" + url, frontier.exist(link), is(lessThan(0d))); + private void assertWasCrawled(String path, Frontier frontier) { + String link = "http://127.0.0.1:1234/" + path; + assertThat("URL=" + link, frontier.get(link).getRelevance(), is(lessThan(0d))); } - private void assertWasNotCrawled(String url, Frontier frontier) throws Exception { - LinkRelevance link = LinkRelevance.create(url); - assertThat("URL=" + url, frontier.exist(link), is(not(lessThan(0d)))); + private void assertWasNotCrawled(String url, Frontier frontier) { + assertThat("URL=" + url, frontier.get(url), is(nullValue())); } private Frontier openFrontier(String outputPath, String configPath) { diff --git a/src/test/java/focusedCrawler/link/PolitenessSchedulerTest.java b/src/test/java/focusedCrawler/link/PolitenessSchedulerTest.java index c63d55fb0..4fcf4a81e 100644 --- a/src/test/java/focusedCrawler/link/PolitenessSchedulerTest.java +++ b/src/test/java/focusedCrawler/link/PolitenessSchedulerTest.java @@ -25,15 +25,27 @@ public void shouldSelectLinksBasedOnPoliteness() throws Exception { PolitenessScheduler scheduler = new PolitenessScheduler(minimumAccessTime, maxLinksInScheduler); - // when add link l1 + // when add link l1 from domain ex1.com scheduler.addLink(l1); // then should return it (+some other state checks) assertThat(scheduler.hasLinksAvailable(), is(true)); assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.nextLink(), is(l1)); + assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.canDownloadNow(l1), is(false)); + assertThat(scheduler.nextLink(), is(nullValue())); + assertThat(scheduler.numberOfDownloadingLinks(), is(1)); + + // after the download is finished, then the link is moved from a "downloading" list and + // removed from the scheduler. + scheduler.notifyDownloadFinished(l1); assertThat(scheduler.numberOfLinks(), is(0)); + assertThat(scheduler.canDownloadNow(l1), is(false)); assertThat(scheduler.nextLink(), is(nullValue())); - // and should remember domains from links recently chosen + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); + + // and should remember domains from links recently chosen assertThat(scheduler.numberOfNonExpiredDomains(), is(1)); assertThat(scheduler.numberOfEmptyDomains(), is(1)); @@ -42,13 +54,22 @@ public void shouldSelectLinksBasedOnPoliteness() throws Exception { // fast they will have the same access times) Thread.sleep(1); - // same thing when add link l2... + // same thing when add link l2 from a different domain ex2.com... scheduler.addLink(l2); assertThat(scheduler.hasLinksAvailable(), is(true)); assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); assertThat(scheduler.nextLink(), is(l2)); + assertThat(scheduler.numberOfDownloadingLinks(), is(1)); + assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.nextLink(), is(nullValue())); + + scheduler.notifyDownloadFinished(l2); assertThat(scheduler.numberOfLinks(), is(0)); + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); + assertThat(scheduler.canDownloadNow(l2), is(false)); assertThat(scheduler.nextLink(), is(nullValue())); + // should remember domains from links recently chosen assertThat(scheduler.numberOfNonExpiredDomains(), is(2)); assertThat(scheduler.numberOfEmptyDomains(), is(2)); @@ -59,7 +80,7 @@ public void shouldSelectLinksBasedOnPoliteness() throws Exception { scheduler.addLink(l3); scheduler.addLink(l4); scheduler.addLink(l5); - + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); assertThat(scheduler.numberOfNonExpiredDomains(), is(3)); assertThat(scheduler.numberOfEmptyDomains(), is(0)); @@ -68,30 +89,58 @@ public void shouldSelectLinksBasedOnPoliteness() throws Exception { // since other links from their domain has been chosen recently assertThat(scheduler.hasLinksAvailable(), is(true)); assertThat(scheduler.nextLink(), is(l5)); - + assertThat(scheduler.numberOfDownloadingLinks(), is(1)); + // at this moment, there should have no links available + // because l3 and l4 are from domains that were just downloaded assertThat(scheduler.hasLinksAvailable(), is(false)); assertThat(scheduler.nextLink(), is(nullValue())); + assertThat(scheduler.numberOfLinks(), is(3)); - // after waiting the minimumAccessTime interval, they links can be returned + // after waiting the minimumAccessTime interval, the links can be returned Thread.sleep(minimumAccessTime+100); - + + // now l3 and l4 can be downloaded assertThat(scheduler.nextLink(), is(l3)); assertThat(scheduler.nextLink(), is(l4)); - - // scheduler should also forget domains that don't have links chosen - // for longer then the minimumAccessTime + assertThat(scheduler.numberOfDownloadingLinks(), is(3)); + + // domains from links l3 and l4 should still be remembered until more than + // minimumAccessTime has passed since their download time + assertThat(scheduler.numberOfNonExpiredDomains(), is(3)); + // link l3 , l4, and l5 should still be in the scheduler until they are downloaded + assertThat(scheduler.numberOfLinks(), is(3)); + + // now we mark l3, l4, and l5 as downloaded + scheduler.notifyDownloadFinished(l3); + assertThat(scheduler.numberOfDownloadingLinks(), is(2)); + scheduler.notifyDownloadFinished(l4); + assertThat(scheduler.numberOfDownloadingLinks(), is(1)); + scheduler.notifyDownloadFinished(l5); + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); + + // now the links should have been removed + assertThat(scheduler.numberOfLinks(), is(0)); + // but their domains should still be remembered for more minimumAccessTime milliseconds + assertThat(scheduler.numberOfNonExpiredDomains(), is(3)); + + + // after minimumAccessTime has passed, the scheduler should forget domains that + // don't have links chosen for longer then the minimumAccessTime Thread.sleep(minimumAccessTime+10); assertThat(scheduler.numberOfNonExpiredDomains(), is(0)); // adding link again just to test that after removing old domain // everything is still working fine scheduler.addLink(l1); - assertThat(scheduler.nextLink(), is(l1)); assertThat(scheduler.numberOfNonExpiredDomains(), is(1)); - assertThat(scheduler.numberOfEmptyDomains(), is(1)); + assertThat(scheduler.numberOfEmptyDomains(), is(0)); assertThat(scheduler.nextLink(), is(nullValue())); + + scheduler.notifyDownloadFinished(l1); + assertThat(scheduler.numberOfEmptyDomains(), is(1)); + assertThat(scheduler.numberOfNonExpiredDomains(), is(1)); } @Test @@ -128,11 +177,22 @@ public void shouldReturnLinksFromSameTLDsUsingRelevanceOrder() throws Exception scheduler.addLink(l2); scheduler.addLink(l3); scheduler.addLink(l4); - - assertThat(scheduler.nextLink().getRelevance(), is(4d)); - assertThat(scheduler.nextLink().getRelevance(), is(3d)); - assertThat(scheduler.nextLink().getRelevance(), is(2d)); - assertThat(scheduler.nextLink().getRelevance(), is(1d)); + + LinkRelevance link = scheduler.nextLink(); + scheduler.notifyDownloadFinished(link); + assertThat(link.getRelevance(), is(4d)); + + link = scheduler.nextLink(); + assertThat(link.getRelevance(), is(3d)); + scheduler.notifyDownloadFinished(link); + + link = scheduler.nextLink(); + assertThat(link.getRelevance(), is(2d)); + scheduler.notifyDownloadFinished(link); + + link = scheduler.nextLink(); + assertThat(link.getRelevance(), is(1d)); + scheduler.notifyDownloadFinished(link); } @Test @@ -156,6 +216,38 @@ public void shouldNotAddLinkMultipleTimes() throws Exception { assertThat(scheduler.nextLink().getRelevance(), is(1d)); assertThat(scheduler.nextLink(), is(nullValue())); } + + @Test + public void shouldNotConsiderToAddLinkThatWasAlreadyAdded() throws Exception { + + LinkRelevance l1 = new LinkRelevance("http://ex1.com/1", 1); + + int minimumAccessTime = 0; + int maxLinksInScheduler = 100; + + PolitenessScheduler scheduler = new PolitenessScheduler(minimumAccessTime, maxLinksInScheduler); + + // when add links l1 + scheduler.addLink(l1); + // should count them correctly + assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); + assertThat(scheduler.canInsertNow(l1), is(false)); + + // even when select link l1 to be download + scheduler.nextLink(); + // should not consider to be be included + assertThat(scheduler.numberOfDownloadingLinks(), is(1)); + assertThat(scheduler.numberOfLinks(), is(1)); + assertThat(scheduler.canInsertNow(l1), is(false)); + + // when download is completed + scheduler.notifyDownloadFinished(l1); + // then should be able to re-schedule (notethat the minimal delay is 0ms in this test) + assertThat(scheduler.numberOfDownloadingLinks(), is(0)); + assertThat(scheduler.numberOfLinks(), is(0)); + assertThat(scheduler.canInsertNow(l1), is(true)); + } @Test public void shouldCheckIfLinkCanBeDownloadedAtCurrentTime() throws Exception { @@ -169,12 +261,23 @@ public void shouldCheckIfLinkCanBeDownloadedAtCurrentTime() throws Exception { PolitenessScheduler scheduler = new PolitenessScheduler(minimumAccessTime, maxLinksInScheduler); scheduler.addLink(l1); - assertThat(scheduler.nextLink().getRelevance(), is(1d)); - - assertThat(scheduler.canDownloadNow(l3), is(true)); + LinkRelevance link = scheduler.nextLink(); + assertThat(link.getRelevance(), is(1d)); + assertThat(link.getURL().toString(), is(l1.getURL().toString())); + + // cannot be downloaded since it was just selected for download assertThat(scheduler.canDownloadNow(l2), is(false)); + scheduler.notifyDownloadFinished(l2); + // still cannot be downloaded since minimum time between downloads has not passed + assertThat(scheduler.canDownloadNow(l2), is(false)); + + // no link from ex2.com was ever downloaded, so it can be downloaded + assertThat(scheduler.canDownloadNow(l3), is(true)); + + // now we wait a little longer than the minimum time between downloads Thread.sleep(minimumAccessTime+10); - + + // now links both ex1.com and ex2.com can be downloaded assertThat(scheduler.canDownloadNow(l2), is(true)); assertThat(scheduler.canDownloadNow(l3), is(true)); } @@ -199,16 +302,24 @@ public void shouldBeAbleToClearListOfLinks() throws Exception { assertThat(scheduler.hasLinksAvailable(), is(true)); // when - scheduler.nextLink(); - scheduler.nextLink(); - scheduler.clear(); + + // downloads 2 out of the 4 links added + LinkRelevance link1 = scheduler.nextLink(); + scheduler.notifyDownloadFinished(link1); + + LinkRelevance link2= scheduler.nextLink(); + scheduler.notifyDownloadFinished(link2); + + scheduler.clearPendingQueue(); // should remove all links - // then + // then should have no links available anymore assertThat(scheduler.hasLinksAvailable(), is(false)); assertThat(scheduler.numberOfLinks(), is(0)); assertThat(scheduler.numberOfEmptyDomains(), is(4)); + // but should still remember download times from ex3.com and ex4.com assertThat(scheduler.numberOfNonExpiredDomains(), is(2)); - + assertThat(scheduler.canDownloadNow(link1), is(false)); + assertThat(scheduler.canDownloadNow(link2), is(false)); // make sure it remembers domains that were previously selected scheduler.addLink(l1); diff --git a/src/test/java/focusedCrawler/link/frontier/FrontierManagerTest.java b/src/test/java/focusedCrawler/link/frontier/FrontierManagerTest.java index dd9c70cb3..745c4b01d 100644 --- a/src/test/java/focusedCrawler/link/frontier/FrontierManagerTest.java +++ b/src/test/java/focusedCrawler/link/frontier/FrontierManagerTest.java @@ -88,6 +88,8 @@ public void shouldNotInsertLinkOutOfScope() throws Exception { frontierManager.insert(link2); LinkRelevance selectedLink1 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink1); + DataNotFoundException notFoundException = null; try { frontierManager.nextURL(); @@ -138,6 +140,8 @@ public void shouldRememberScopeOnRestart() throws Exception { frontierManager.insert(link2); LinkRelevance selectedLink1 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink1); + DataNotFoundException notFoundException = null; try { frontierManager.nextURL(); @@ -184,6 +188,8 @@ public void shouldModifyScopeAfterAddingNewSeeds() throws Exception { frontierManager.insert(link2_1); LinkRelevance selectedLink1 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink1); + DataNotFoundException notFoundException = null; try { frontierManager.nextURL(); @@ -193,9 +199,11 @@ public void shouldModifyScopeAfterAddingNewSeeds() throws Exception { frontierManager.addSeeds(asList(link2_1.getURL().toString())); LinkRelevance selectedLink2 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink2); frontierManager.insert(link2_2); LinkRelevance selectedLink2_2 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink2_2); // then assertThat(selectedLink1, is(notNullValue())); @@ -264,6 +272,7 @@ public void shouldNotInsertUrlTwice() throws Exception { // when LinkRelevance nextURL = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(nextURL); // then assertThat(nextURL, is(notNullValue())); assertThat(nextURL.getRelevance(), is(299d)); @@ -271,6 +280,7 @@ public void shouldNotInsertUrlTwice() throws Exception { // when nextURL = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(nextURL); // then assertThat(nextURL, is(notNullValue())); assertThat(nextURL.getRelevance(), is(-299d)); @@ -359,12 +369,14 @@ public void shouldInsertRobotsLinkWhenAddDomainForTheFirstTime() throws Exceptio LinkRelevance nextURL; nextURL = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(nextURL); assertThat(nextURL, is(notNullValue())); assertThat(nextURL.getURL(), is(notNullValue())); assertThat(nextURL.getURL().toString(), is("http://www.example1.com/robots.txt")); assertThat(nextURL.getType(), is(LinkRelevance.Type.ROBOTS)); nextURL = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(nextURL); assertThat(nextURL, is(notNullValue())); assertThat(nextURL.getURL(), is(notNullValue())); assertThat(nextURL.getURL(), is(link1.getURL())); @@ -393,15 +405,20 @@ public void shouldInsertUrlsAndSelectUrlsInSortedByRelevance() throws Exception frontierManager.insert(link3); LinkRelevance selectedLink1 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink1); + LinkRelevance selectedLink2 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink2); + LinkRelevance selectedLink3 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink3); + DataNotFoundException notFoundException = null; try { frontierManager.nextURL(); } catch (DataNotFoundException e) { notFoundException = e; } - // then // should return only 3 inserted links, 4th should be null @@ -436,7 +453,9 @@ public void shouldNotReturnAgainALinkThatWasAlreadyReturned() throws Exception { frontierManager.insert(link1); frontierManager.insert(link2); LinkRelevance selectedLink1 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink1); LinkRelevance selectedLink2 = frontierManager.nextURL(); + frontierManager.notifyDownloadFinished(selectedLink2); DataNotFoundException notFoundException1 = null; try { frontierManager.nextURL(); @@ -470,7 +489,7 @@ public void shouldNotReturnAgainALinkThatWasAlreadyReturned() throws Exception { @Test public void shouldNotReturnLinkReturnedWithinMinimumTimeInterval() throws Exception { // given - int minimumAccessTimeInterval = 500; + int minimumAccessTimeInterval = 300; ImmutableMap props = ImmutableMap.of( "link_storage.scheduler.max_links", 2, "link_storage.scheduler.host_min_access_interval", minimumAccessTimeInterval, @@ -501,7 +520,18 @@ public void shouldNotReturnLinkReturnedWithinMinimumTimeInterval() throws Except notFoundException1 = e; assertThat(e.ranOutOfLinks(), is(false)); } - + + Thread.sleep(minimumAccessTimeInterval+10); + try { + frontierManager.nextURL(); + fail("Should still not return link, minimum delay starts after download is done."); + } catch(DataNotFoundException e) { + notFoundException1 = e; + assertThat(e.ranOutOfLinks(), is(false)); + } + + // minimum time interval should start counting just now + frontierManager.notifyDownloadFinished(selectedLink1); // should return after minimum time interval Thread.sleep(minimumAccessTimeInterval+10); LinkRelevance selectedLink2 = frontierManager.nextURL(); diff --git a/src/test/java/focusedCrawler/link/frontier/FrontierTest.java b/src/test/java/focusedCrawler/link/frontier/FrontierTest.java index 0cdb57d6d..4dbbf1bf6 100644 --- a/src/test/java/focusedCrawler/link/frontier/FrontierTest.java +++ b/src/test/java/focusedCrawler/link/frontier/FrontierTest.java @@ -48,10 +48,16 @@ public void shouldInsertUrl() throws Exception { // when frontier.insert(link1); - + // then - assertThat(frontier.exist(link1), is(1d)); - assertThat(frontier.exist(link2), is(nullValue())); + String url1 = link1.getURL().toString(); + assertThat(frontier.exists(link1), is(true)); + assertThat(frontier.get(url1), is(notNullValue())); + assertThat(frontier.get(url1).getRelevance(), is(1d)); + + String url2 = link2.getURL().toString(); + assertThat(frontier.exists(link2), is(false)); + assertThat(frontier.get(url2), is(nullValue())); } @Test @@ -65,11 +71,8 @@ public void shouldInsertUrlsAndSelectGivenNumberOfUrls() throws Exception { frontier.insert(link2); // then - assertThat(frontier.exist(link1), is(notNullValue())); - assertThat(frontier.exist(link1), is(1d)); - - assertThat(frontier.exist(link2), is(notNullValue())); - assertThat(frontier.exist(link2), is(2d)); + assertThat(frontier.exists(link1), is(true)); + assertThat(frontier.exists(link2), is(true)); } @@ -77,16 +80,19 @@ public void shouldInsertUrlsAndSelectGivenNumberOfUrls() throws Exception { public void shouldInsertAndDeleteUrl() throws Exception { // given LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1); - + String url1 = link1.getURL().toString(); + // when frontier.insert(link1); // then - assertThat(frontier.exist(link1), is(1d)); - + assertThat(frontier.exists(link1), is(true)); + assertThat(frontier.isDownloaded(url1), is(false)); + // when - frontier.delete(link1); + frontier.markAsDownloaded(link1); // then - assertThat(frontier.exist(link1), is(-1d)); + assertThat(frontier.get(url1).getRelevance(), is(-1d)); + assertThat(frontier.isDownloaded(url1), is(true)); } }