From 02661bb7e47d06d7c8f7c6e5bc1825dc37f4b5f6 Mon Sep 17 00:00:00 2001 From: Justin Littman Date: Tue, 19 Mar 2019 20:46:29 -0400 Subject: [PATCH] Added additional tests and various other tweaks. --- .circleci/config.yml | 2 +- README.md | 2 +- tests/__init__.py | 75 +++++++++++ tests/test_tweet_harvester.py | 103 +++++++++++++++ tests/twarccloud/__init__.py | 68 ---------- tests/twarccloud/harvester/__init__.py | 0 .../harvester/test_collection_lock.py | 50 ++++++++ .../harvester/test_file_mover_thread.py | 65 ++++++++++ .../harvester/test_file_queueing_writer.py | 44 +++++++ .../twarccloud/harvester/test_harvest_info.py | 34 +++++ .../harvester/test_tweet_writer_thread.py | 121 ++++++++++++++++++ tests/twarccloud/test_changeset.py | 27 ++-- tests/twarccloud/test_collection_config.py | 53 ++++---- twarccloud/config_helpers.py | 2 + twarccloud/harvester/collection_lock.py | 6 +- twarccloud/harvester/file_mover_thread.py | 5 +- twarccloud/harvester/server_thread.py | 6 +- twarccloud/harvester/tweet_writer_thread.py | 15 ++- tweet_harvester.py | 22 ++-- 19 files changed, 564 insertions(+), 136 deletions(-) create mode 100644 tests/test_tweet_harvester.py create mode 100644 tests/twarccloud/harvester/__init__.py create mode 100644 tests/twarccloud/harvester/test_collection_lock.py create mode 100644 tests/twarccloud/harvester/test_file_mover_thread.py create mode 100644 tests/twarccloud/harvester/test_file_queueing_writer.py create mode 100644 tests/twarccloud/harvester/test_harvest_info.py create mode 100644 tests/twarccloud/harvester/test_tweet_writer_thread.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 3aab51d..a635c6c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,7 +43,7 @@ jobs: name: run pylint command: | . venv/bin/activate - pylint *.py twarccloud + pylint *.py twarccloud tests - store_artifacts: path: test-reports diff --git a/README.md b/README.md index fee2378..d352ad3 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ pip install pylint then: ``` python -m unittest discover -pylint *.py twarccloud +pylint *.py twarccloud tests ``` diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..961ec27 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,75 @@ +import unittest +import logging +from twarccloud.collection_config import CollectionConfig + + +class TestCase(unittest.TestCase): + logging.basicConfig(level=logging.DEBUG) + logging.getLogger('twarc-cloud').setLevel(logging.DEBUG) + + +def extract_dict(changeset): + del changeset['change_timestamp'] + return changeset + + +def timeline_config(): + config = CollectionConfig( + { + 'id': 'foo', + 'type': 'user_timeline', + 'keys': { + 'consumer_key': 'mBbq9ruEckInQHUir8Kn0', + 'consumer_secret': 'Pf28yReBUD90pLVOsb4r5ZnKCQ6xlOomBAjD5npFEQ6Rm', + 'access_token': '481186914-5yIyfryJqcHV29YVL37BOzjseYuRzCLmwO6', + 'access_token_secret': 'S51yY5Hjffts4WMKMgvGendxbZVsZO014Z38Tfvc' + }, + 'users': { + '481186914': { + 'screen_name': 'justin_littman' + }, + '6253282': { + 'screen_name': 'twitterapi' + }, + '12': { + 'screen_name': 'jack', + 'since_id': '12345' + } + } + }) + return config + + +def filter_config(): + config = CollectionConfig({ + 'id': 'foo', + 'type': 'filter', + 'keys': { + 'consumer_key': 'mBbq9ruEckInQHUir8Kn0', + 'consumer_secret': 'Pf28yReBUD90pLVOsb4r5ZnKCQ6xlOomBAjD5npFEQ6Rm', + 'access_token': '481186914-5yIyfryJqcHV29YVL37BOzjseYuRzCLmwO6', + 'access_token_secret': 'S51yY5Hjffts4WMKMgvGendxbZVsZO014Z38Tfvc' + }, + 'filter': { + 'track': 'foo,#bar' + } + }) + return config + + +def search_config(): + config = CollectionConfig( + { + 'id': 'foo', + 'type': 'search', + 'keys': { + 'consumer_key': 'mBbq9ruEckInQHUir8Kn0', + 'consumer_secret': 'Pf28yReBUD90pLVOsb4r5ZnKCQ6xlOomBAjD5npFEQ6Rm', + 'access_token': '481186914-5yIyfryJqcHV29YVL37BOzjseYuRzCLmwO6', + 'access_token_secret': 'S51yY5Hjffts4WMKMgvGendxbZVsZO014Z38Tfvc' + }, + 'search': { + 'query': 'foo' + } + }) + return config diff --git a/tests/test_tweet_harvester.py b/tests/test_tweet_harvester.py new file mode 100644 index 0000000..d829eca --- /dev/null +++ b/tests/test_tweet_harvester.py @@ -0,0 +1,103 @@ +from tempfile import mkdtemp +import shutil +import json +import os +import socket +from contextlib import closing +from unittest.mock import patch, MagicMock +from threading import Timer +import requests +from tweet_harvester import TweetHarvester +from twarccloud.harvester.twarc_thread import TwarcThread +from twarccloud.filepaths_helper import get_collection_config_filepath, get_harvest_file, get_changesets_path +from tests import TestCase, timeline_config + + +class TestTweetHarvester(TestCase): + def setUp(self): + self.collection_id = 'test_id' + self.collections_path = mkdtemp() + self.collection_config_filepath = get_collection_config_filepath(self.collection_id, + collections_path=self.collections_path) + self.write_collection_config() + + def tearDown(self): + shutil.rmtree(self.collections_path, ignore_errors=True) + + @patch('tweet_harvester.TwarcThread') + def test_harvest(self, mock_twarc_thread_class): + mock_twarc_thread = MagicMock(TwarcThread, exception=None) + mock_twarc_thread_class.return_value = mock_twarc_thread + + harvester = TweetHarvester(self.collection_id, self.collections_path, shutdown=True, port=self.find_free_port()) + # Make a change to changeset + harvester.changeset.update_user('screen_name', 'real_justin_littman', '481186914') + harvester.harvest() + + # Test collection config written to harvest + harvest_collection_config_filepath = get_harvest_file(self.collection_id, harvester.harvest_timestamp, + 'collection.json', collections_path=self.collections_path) + self.assertTrue(os.path.exists(harvest_collection_config_filepath)) + harvest_collection_config = self.load_collection_config(harvest_collection_config_filepath) + self.assertFalse('consumer_secret' in harvest_collection_config['keys']) + self.assertFalse('access_token_secret' in harvest_collection_config['keys']) + + # Test changeset + collection_config = self.load_collection_config(self.collection_config_filepath) + self.assertEqual('real_justin_littman', collection_config['users']['481186914']['screen_name']) + self.assertEqual(1, len( + os.listdir(get_changesets_path(self.collection_id, collections_path=self.collections_path)))) + + # Test events + self.assertTrue(harvester.stopped_event.is_set()) + self.assertTrue(harvester.shutdown_event.is_set()) + + @patch('tweet_harvester.TwarcThread') + def test_harvest_exception(self, mock_twarc_thread_class): + mock_twarc_thread = MagicMock(TwarcThread, exception=Exception('Darn')) + mock_twarc_thread_class.return_value = mock_twarc_thread + + harvester = TweetHarvester(self.collection_id, self.collections_path, shutdown=True, port=self.find_free_port()) + with self.assertRaises(Exception): + harvester.harvest() + + @patch('tweet_harvester.TwarcThread') + def test_harvest_without_shutdown(self, mock_twarc_thread_class): + mock_twarc_thread = MagicMock(TwarcThread, exception=None) + mock_twarc_thread_class.return_value = mock_twarc_thread + + harvester = TweetHarvester(self.collection_id, self.collections_path, shutdown=False, + port=self.find_free_port()) + + def test_shutdown_timer(): + self.assertFalse(harvester.shutdown_event.is_set()) + + Timer(.5, test_shutdown_timer).start() + + def shutdown_timer(): + requests.get('http://localhost:{}/shutdown'.format(harvester.port)) + + Timer(1, shutdown_timer).start() + harvester.harvest() + + # Test events + self.assertTrue(harvester.stopped_event.is_set()) + self.assertTrue(harvester.shutdown_event.is_set()) + + def write_collection_config(self): + os.makedirs(os.path.dirname(self.collection_config_filepath)) + with open(self.collection_config_filepath, 'w') as file: + json.dump(timeline_config(), file) + + @staticmethod + def load_collection_config(filepath): + with open(filepath) as file: + return json.load(file) + + @staticmethod + def find_free_port(): + # pylint: disable=no-member + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.bind(('', 0)) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return sock.getsockname()[1] diff --git a/tests/twarccloud/__init__.py b/tests/twarccloud/__init__.py index 7b5280a..e69de29 100644 --- a/tests/twarccloud/__init__.py +++ b/tests/twarccloud/__init__.py @@ -1,68 +0,0 @@ -from twarccloud.collection_config import CollectionConfig - - -def extract_dict(changeset): - del changeset['change_timestamp'] - return changeset - - -def timeline_config(): - config = CollectionConfig( - { - 'id': 'foo', - 'type': 'user_timeline', - 'keys': { - 'consumer_key': 'mBbq9ruEckInQHUir8Kn0', - 'consumer_secret': 'Pf28yReBUD90pLVOsb4r5ZnKCQ6xlOomBAjD5npFEQ6Rm', - 'access_token': '481186914-5yIyfryJqcHV29YVL37BOzjseYuRzCLmwO6', - 'access_token_secret': 'S51yY5Hjffts4WMKMgvGendxbZVsZO014Z38Tfvc' - }, - 'users': { - '481186914': { - 'screen_name': 'justin_littman' - }, - '6253282': { - 'screen_name': 'twitterapi' - }, - '12': { - 'screen_name': 'jack', - 'since_id': '12345' - } - } - }) - return config - - -def filter_config(): - config = CollectionConfig({ - 'id': 'foo', - 'type': 'filter', - 'keys': { - 'consumer_key': 'mBbq9ruEckInQHUir8Kn0', - 'consumer_secret': 'Pf28yReBUD90pLVOsb4r5ZnKCQ6xlOomBAjD5npFEQ6Rm', - 'access_token': '481186914-5yIyfryJqcHV29YVL37BOzjseYuRzCLmwO6', - 'access_token_secret': 'S51yY5Hjffts4WMKMgvGendxbZVsZO014Z38Tfvc' - }, - 'filter': { - 'track': 'foo,#bar' - } - }) - return config - - -def search_config(): - config = CollectionConfig( - { - 'id': 'foo', - 'type': 'search', - 'keys': { - 'consumer_key': 'mBbq9ruEckInQHUir8Kn0', - 'consumer_secret': 'Pf28yReBUD90pLVOsb4r5ZnKCQ6xlOomBAjD5npFEQ6Rm', - 'access_token': '481186914-5yIyfryJqcHV29YVL37BOzjseYuRzCLmwO6', - 'access_token_secret': 'S51yY5Hjffts4WMKMgvGendxbZVsZO014Z38Tfvc' - }, - 'search': { - 'query': 'foo' - } - }) - return config diff --git a/tests/twarccloud/harvester/__init__.py b/tests/twarccloud/harvester/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/twarccloud/harvester/test_collection_lock.py b/tests/twarccloud/harvester/test_collection_lock.py new file mode 100644 index 0000000..1acf5ff --- /dev/null +++ b/tests/twarccloud/harvester/test_collection_lock.py @@ -0,0 +1,50 @@ +from tempfile import mkdtemp +import shutil +from datetime import datetime +from queue import Queue +import os +from twarccloud.harvester.collection_lock import CollectionLock, AddFile, DeleteFile, is_locked, assert_locked, \ + LockedException +from twarccloud.filepaths_helper import get_lock_file, get_last_harvest_file +from tests import TestCase + + +class TestCollectionLock(TestCase): + def setUp(self): + self.collections_path = mkdtemp() + self.timestamp = datetime.utcnow() + self.file_queue = Queue() + self.collection_id = 'test_id' + self.lock_file = get_lock_file(self.collection_id, collections_path=self.collections_path) + self.last_harvest_file = get_last_harvest_file(self.collection_id, collections_path=self.collections_path) + + def tearDown(self): + shutil.rmtree(self.collections_path, ignore_errors=True) + + def test_lock(self): + with CollectionLock(self.collections_path, self.collection_id, self.file_queue, + harvest_timestamp=self.timestamp): + self.assertTrue(os.path.exists(self.lock_file)) + self.assertQueuedFile(self.lock_file) + + self.assertTrue(os.path.exists(self.last_harvest_file)) + self.assertQueuedFile(self.last_harvest_file) + self.assertFalse(os.path.exists(get_lock_file(self.collection_id, collections_path=self.collections_path))) + self.assertQueuedFile(self.lock_file, is_add=False) + + def test_is_locked(self): + self.assertFalse(is_locked(self.lock_file)) + with CollectionLock(self.collections_path, self.collection_id, self.file_queue, + harvest_timestamp=self.timestamp): + self.assertTrue(is_locked(self.lock_file)) + with self.assertRaises(LockedException): + assert_locked(self.lock_file) + self.assertFalse(is_locked(self.lock_file)) + assert_locked(self.lock_file) + + # pylint: disable=invalid-name + def assertQueuedFile(self, filepath, is_add=True): + queued_file = self.file_queue.get() + self.file_queue.task_done() + self.assertIsInstance(queued_file, AddFile if is_add else DeleteFile) + self.assertEqual(filepath, queued_file.filepath) diff --git a/tests/twarccloud/harvester/test_file_mover_thread.py b/tests/twarccloud/harvester/test_file_mover_thread.py new file mode 100644 index 0000000..d06def2 --- /dev/null +++ b/tests/twarccloud/harvester/test_file_mover_thread.py @@ -0,0 +1,65 @@ +from unittest.mock import patch, MagicMock +from tempfile import mkdtemp +from queue import Queue +import os +from twarccloud.filepaths_helper import get_collection_file +from twarccloud.harvester.file_mover_thread import S3FileMoverThread, AddFile, DeleteFile +from tests import TestCase + + +class TestS3FileMoverThread(TestCase): + def setUp(self): + self.collections_path = mkdtemp() + self.file_queue = Queue() + self.collection_id = 'test_id' + self.filepath = get_collection_file(self.collection_id, 'test.txt', collections_path=self.collections_path) + self.bucket = 'test_bucket' + + def test_no_bucket(self): + with S3FileMoverThread(self.file_queue, self.collections_path, None): + os.makedirs(os.path.dirname(self.filepath)) + with open(self.filepath, 'w') as file: + file.write('test') + self.file_queue.put(AddFile(self.filepath, True)) + self.assertTrue(self.file_queue.empty()) + self.assertTrue(os.path.exists(self.filepath)) + + @patch('twarccloud.harvester.file_mover_thread.aws_client') + def test_move(self, mock_aws_client_factory): + mock_aws_client = MagicMock() + mock_aws_client_factory.return_value = mock_aws_client + with S3FileMoverThread(self.file_queue, self.collections_path, self.bucket): + os.makedirs(os.path.dirname(self.filepath)) + with open(self.filepath, 'w') as file: + file.write('test') + self.file_queue.put(AddFile(self.filepath, True)) + self.assertTrue(self.file_queue.empty()) + # File was deleted. + self.assertFalse(os.path.exists(self.filepath)) + mock_aws_client.upload_file.assert_called_once_with(self.filepath, self.bucket, + get_collection_file(self.collection_id, 'test.txt')) + + @patch('twarccloud.harvester.file_mover_thread.aws_client') + def test_move_without_delete(self, mock_aws_client_factory): + mock_aws_client = MagicMock() + mock_aws_client_factory.return_value = mock_aws_client + with S3FileMoverThread(self.file_queue, self.collections_path, self.bucket): + os.makedirs(os.path.dirname(self.filepath)) + with open(self.filepath, 'w') as file: + file.write('test') + self.file_queue.put(AddFile(self.filepath, False)) + self.assertTrue(self.file_queue.empty()) + # File was not deleted. + self.assertTrue(os.path.exists(self.filepath)) + mock_aws_client.upload_file.assert_called_once_with(self.filepath, self.bucket, + get_collection_file(self.collection_id, 'test.txt')) + + @patch('twarccloud.harvester.file_mover_thread.aws_client') + def test_delete(self, mock_aws_client_factory): + mock_aws_client = MagicMock() + mock_aws_client_factory.return_value = mock_aws_client + with S3FileMoverThread(self.file_queue, self.collections_path, self.bucket): + self.file_queue.put(DeleteFile(self.filepath)) + self.assertTrue(self.file_queue.empty()) + mock_aws_client.delete_object.assert_called_once_with(Bucket=self.bucket, + Key=get_collection_file(self.collection_id, 'test.txt')) diff --git a/tests/twarccloud/harvester/test_file_queueing_writer.py b/tests/twarccloud/harvester/test_file_queueing_writer.py new file mode 100644 index 0000000..db479e6 --- /dev/null +++ b/tests/twarccloud/harvester/test_file_queueing_writer.py @@ -0,0 +1,44 @@ +from tempfile import mkdtemp +import shutil +from queue import Queue +from twarccloud.harvester.file_queueing_writer import FileQueueingWriter +from twarccloud.harvester.file_mover_thread import AddFile +from tests import TestCase + + +class TestFileQueueingWriter(TestCase): + def setUp(self): + self.path = mkdtemp() + self.file_queue = Queue() + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def test_write(self): + filepath = '{}/test.txt'.format(self.path) + with FileQueueingWriter(filepath, self.file_queue) as writer: + writer.write('test') + self.assertQueuedFile(filepath) + self.assertEqual('test', open(filepath).read()) + + def test_write_json(self): + filepath = '{}/test.txt'.format(self.path) + with FileQueueingWriter(filepath, self.file_queue) as writer: + writer.write_json(['test']) + self.assertQueuedFile(filepath) + self.assertEqual('["test"]\n', open(filepath).read()) + + def test_write_delete(self): + filepath = '{}/test.txt'.format(self.path) + with FileQueueingWriter(filepath, self.file_queue, delete=True) as writer: + writer.write('test') + self.assertQueuedFile(filepath, local_delete=True) + self.assertEqual('test', open(filepath).read()) + + # pylint: disable=invalid-name + def assertQueuedFile(self, filepath, local_delete=False): + queued_file = self.file_queue.get() + self.file_queue.task_done() + self.assertIsInstance(queued_file, AddFile) + self.assertTrue(queued_file.delete == local_delete) + self.assertEqual(filepath, queued_file.filepath) diff --git a/tests/twarccloud/harvester/test_harvest_info.py b/tests/twarccloud/harvester/test_harvest_info.py new file mode 100644 index 0000000..ff65ddf --- /dev/null +++ b/tests/twarccloud/harvester/test_harvest_info.py @@ -0,0 +1,34 @@ +from datetime import datetime +from twarccloud.harvester.harvest_info import HarvestInfo, AtomicInteger +from tests import TestCase + + +class TestHarvestInfo(TestCase): + def test_to_dict(self): + collection_id = 'test' + harvest_timestamp = datetime.utcnow() + harvest_info = HarvestInfo(collection_id, harvest_timestamp) + harvest_info.tweets.incr(amount=5) + harvest_info.files.incr() + harvest_info.file_bytes.incr(amount=2048) + harvest_info.end() + + harvest_dict = harvest_info.to_dict() + self.assertEqual('test', harvest_dict['collection_id']) + self.assertEqual(5, harvest_dict['tweets']) + self.assertEqual(1, harvest_dict['files']) + self.assertEqual(2048, harvest_dict['file_bytes']) + self.assertTrue('harvest_timestamp' in harvest_dict) + self.assertTrue('harvest_end_timestamp' in harvest_dict) + + +class TestAtomicInteger(TestCase): + def test_incr(self): + atomic_int = AtomicInteger(value=10) + self.assertEqual(10, atomic_int.value) + + atomic_int.incr() + self.assertEqual(11, atomic_int.value) + + atomic_int.incr(amount=5) + self.assertEqual(16, atomic_int.value) diff --git a/tests/twarccloud/harvester/test_tweet_writer_thread.py b/tests/twarccloud/harvester/test_tweet_writer_thread.py new file mode 100644 index 0000000..2de9848 --- /dev/null +++ b/tests/twarccloud/harvester/test_tweet_writer_thread.py @@ -0,0 +1,121 @@ +from tempfile import mkdtemp +import shutil +from datetime import datetime +from queue import Queue +import glob +import gzip +import json +import os +from time import sleep +from twarccloud.harvester.tweet_writer_thread import TweetWriterThread +from twarccloud.harvester.harvest_info import HarvestInfo +from twarccloud.filepaths_helper import get_harvest_path, get_harvest_manifest_filepath +from tests import TestCase + + +class TestTweetWriterThread(TestCase): + def setUp(self): + self.collections_path = mkdtemp() + self.collection_id = 'test_id' + self.harvest_timestamp = datetime.utcnow() + self.file_queue = Queue() + self.harvest_info = HarvestInfo(self.collection_id, self.harvest_timestamp) + self.harvest_path = get_harvest_path(self.collection_id, self.harvest_timestamp, + collections_path=self.collections_path) + + def tearDown(self): + shutil.rmtree(self.collections_path, ignore_errors=True) + + def test_write(self): + with TweetWriterThread(self.collections_path, self.collection_id, self.harvest_timestamp, self.file_queue, + self.harvest_info) as writer: + writer.write(self.generate_tweet(1)) + writer.write(self.generate_tweet(2)) + tweet_files = glob.glob('{}/*.jsonl.gz'.format(self.harvest_path)) + self.assertEqual(1, len(tweet_files)) + # Wrote to file. + self.assertTweetsInFile(tweet_files[0], 1, 2) + self.assertQueuedFiles(tweet_files) + # Added to manifest. + self.assertManifestFile(tweet_files) + # Updated harvest info. + self.assertEqual(2, self.harvest_info.tweets.value) + self.assertEqual(1, self.harvest_info.files.value) + self.assertTrue(self.harvest_info.file_bytes.value) + + def test_rollover_by_tweet_count(self): + with TweetWriterThread(self.collections_path, self.collection_id, self.harvest_timestamp, self.file_queue, + self.harvest_info, tweets_per_file=2) as writer: + writer.write(self.generate_tweet(1)) + writer.write(self.generate_tweet(2)) + # Sleep so that file has new timestamp + sleep(1) + writer.write(self.generate_tweet(3)) + writer.write(self.generate_tweet(4)) + tweet_files = glob.glob('{}/*.jsonl.gz'.format(self.harvest_path)) + self.assertEqual(2, len(tweet_files)) + self.assertQueuedFiles(tweet_files) + self.assertManifestFile(tweet_files) + self.assertEqual(4, self.harvest_info.tweets.value) + self.assertEqual(2, self.harvest_info.files.value) + self.assertTrue(self.harvest_info.file_bytes.value) + + def test_rollover_by_time(self): + with TweetWriterThread(self.collections_path, self.collection_id, self.harvest_timestamp, self.file_queue, + self.harvest_info, secs_per_file=1) as writer: + writer.write(self.generate_tweet(1)) + writer.write(self.generate_tweet(2)) + # Sleep so that rollover + sleep(1.25) + writer.write(self.generate_tweet(3)) + writer.write(self.generate_tweet(4)) + tweet_files = glob.glob('{}/*.jsonl.gz'.format(self.harvest_path)) + self.assertEqual(2, len(tweet_files)) + self.assertQueuedFiles(tweet_files) + self.assertManifestFile(tweet_files) + self.assertEqual(4, self.harvest_info.tweets.value) + self.assertEqual(2, self.harvest_info.files.value) + self.assertTrue(self.harvest_info.file_bytes.value) + + @staticmethod + def generate_tweet(tweet_id): + return { + 'tweet_id': tweet_id + } + + # pylint: disable=invalid-name + def assertTweetsInFile(self, filepath, start_tweet_id, stop_tweet_id): + with gzip.open(filepath) as tweet_file: + for tweet_id in range(start_tweet_id, stop_tweet_id + 1): + line = tweet_file.readline() + self.assertDictEqual(self.generate_tweet(tweet_id), json.loads(line)) + + # pylint: disable=invalid-name + def assertQueuedFiles(self, tweet_files): + queued_files = self.get_queued_files() + for tweet_file in tweet_files: + self.assertTrue(tweet_file in queued_files) + + # pylint: disable=invalid-name + def assertManifestFile(self, tweet_files): + manifest_files = self.get_manifest_files() + tweet_filenames = set() + for tweet_file in tweet_files: + tweet_filenames.add(os.path.basename(tweet_file)) + self.assertSetEqual(manifest_files, tweet_filenames) + + def get_queued_files(self): + files = set() + while not self.file_queue.empty(): + queued_file = self.file_queue.get() + files.add(queued_file.filepath) + self.file_queue.task_done() + return files + + def get_manifest_files(self): + files = set() + with open(get_harvest_manifest_filepath(self.collection_id, self.harvest_timestamp, + collections_path=self.collections_path)) as file: + for line in file: + files.add(line.split()[1]) + return files diff --git a/tests/twarccloud/test_changeset.py b/tests/twarccloud/test_changeset.py index aeb0820..656c163 100644 --- a/tests/twarccloud/test_changeset.py +++ b/tests/twarccloud/test_changeset.py @@ -1,6 +1,5 @@ -from unittest import TestCase +from tests import TestCase, timeline_config, extract_dict from twarccloud.collection_config import Changeset -from . import * class TestChangeSet(TestCase): @@ -24,15 +23,15 @@ def test_has_changes_delete(self): def test_clean(self): self.changeset['update'] = { - 'users': { - '6253282': { - 'since_id': '56789' - }, - '12345': { - 'screen_name': 'foo' - } + 'users': { + '6253282': { + 'since_id': '56789' + }, + '12345': { + 'screen_name': 'foo' } } + } self.changeset.clean_changeset(timeline_config()) self.assertDictEqual(extract_dict(self.changeset), { 'update': { @@ -46,12 +45,12 @@ def test_clean(self): }) def test_clean_no_users(self): - self.changeset['update'] = { - 'users': { - '12345': { - 'screen_name': 'foo' - } + self.changeset['update'] = { + 'users': { + '12345': { + 'screen_name': 'foo' } } + } self.changeset.clean_changeset(timeline_config()) self.assertFalse(self.changeset['update']) diff --git a/tests/twarccloud/test_collection_config.py b/tests/twarccloud/test_collection_config.py index cc1627a..9bdb20b 100644 --- a/tests/twarccloud/test_collection_config.py +++ b/tests/twarccloud/test_collection_config.py @@ -1,6 +1,5 @@ -from unittest import TestCase +from tests import TestCase, timeline_config, search_config, filter_config, extract_dict from twarccloud.collection_config import CollectionConfigException, Changeset -from . import * class TestInvalidReasons(TestCase): @@ -123,10 +122,10 @@ def test_changed_timeline(self): } }, 'delete': [{ - 'users': ['6253282', { - '12': ['since_id'] - }] + 'users': ['6253282', { + '12': ['since_id'] }] + }] }) @@ -144,38 +143,38 @@ def setUp(self): def test_merged_key(self): self.assert_timeline_config['keys']['consumer_key'] = 'foo' self.changeset['update'] = { - 'keys': { - 'consumer_key': 'foo' - } + 'keys': { + 'consumer_key': 'foo' } + } self.timeline_config.merge_changeset(self.changeset) self.assertDictEqual(self.assert_timeline_config, self.timeline_config) def test_merged_timeline(self): self.assert_timeline_config['users']['2244994945'] = { - 'screen_name': 'twitterdev', - 'since_id': '56789' - } + 'screen_name': 'twitterdev', + 'since_id': '56789' + } self.assert_timeline_config['users']['481186914']['screen_name'] = 'real_justin_littman' del self.assert_timeline_config['users']['6253282'] del self.assert_timeline_config['users']['12']['since_id'] self.changeset['update'] = { - 'users': { - '2244994945': { - 'screen_name': 'twitterdev', - 'since_id': '56789' - }, - '481186914': { - 'screen_name': 'real_justin_littman' - } + 'users': { + '2244994945': { + 'screen_name': 'twitterdev', + 'since_id': '56789' + }, + '481186914': { + 'screen_name': 'real_justin_littman' } + } } self.changeset['delete'] = [{ - 'users': ['6253282', { - '12': ['since_id'] - }] - }] + 'users': ['6253282', { + '12': ['since_id'] + }] + }] self.timeline_config.merge_changeset(self.changeset) self.assertDictEqual(self.assert_timeline_config, self.timeline_config) @@ -185,13 +184,13 @@ def test_merged_filter(self): del self.assert_filter_config['filter']['track'] self.changeset['update'] = { - 'filter': { - 'follow': '123' - } + 'filter': { + 'follow': '123' } + } self.changeset['delete'] = [ {'filter': ['track'] - }] + }] self.filter_config.merge_changeset(self.changeset) self.assertDictEqual(self.assert_filter_config, self.filter_config) diff --git a/twarccloud/config_helpers.py b/twarccloud/config_helpers.py index aec889f..57ee946 100644 --- a/twarccloud/config_helpers.py +++ b/twarccloud/config_helpers.py @@ -69,9 +69,11 @@ def setup_aws_keys(ini_config): 'Access key and/or secret key missing from twarc_cloud.ini. Maybe you used a different key ' 'configuration mechanism?') + def bucket_value(args, ini_config): return _config_value('bucket', args, ini_config) + def _config_value(key, args, ini_config): args_dict = vars(args) value = args_dict.get(key) diff --git a/twarccloud/harvester/collection_lock.py b/twarccloud/harvester/collection_lock.py index 6a972ba..169fd6d 100644 --- a/twarccloud/harvester/collection_lock.py +++ b/twarccloud/harvester/collection_lock.py @@ -10,17 +10,17 @@ # Provides locking for a collection by placing and removing lock.json in the root of the collection. class CollectionLock: - def __init__(self, collections_path, collection_id, file_queue, collect_timestamp=None): + def __init__(self, collections_path, collection_id, file_queue, harvest_timestamp=None): self.lock_filepath = get_lock_file(collection_id, collections_path=collections_path) self.last_harvest_filepath = get_last_harvest_file(collection_id, collections_path=collections_path) - self.collect_timestamp = collect_timestamp + self.harvest_timestamp = harvest_timestamp self.file_queue = file_queue # Lock the collection. def lock(self): log.debug('Locking') lock = { - 'harvest_id': self.collect_timestamp.isoformat() + 'harvest_id': self.harvest_timestamp.isoformat() } with FileQueueingWriter(self.lock_filepath, self.file_queue) as lock_writer: lock_writer.write_json(lock, indent=2) diff --git a/twarccloud/harvester/file_mover_thread.py b/twarccloud/harvester/file_mover_thread.py index 75f197e..85581ae 100644 --- a/twarccloud/harvester/file_mover_thread.py +++ b/twarccloud/harvester/file_mover_thread.py @@ -19,7 +19,6 @@ def __init__(self, queue, collections_path, bucket): self.collections_path = collections_path self.bucket = bucket self.stop_event = threading.Event() - self.s3_client = aws_client('s3') self.exception = None threading.Thread.__init__(self) @@ -45,12 +44,12 @@ def _move(self, src_file): dest_filepath = src_file.filepath.replace(self.collections_path, DEFAULT_COLLECTIONS_PATH) if isinstance(src_file, AddFile): log.debug('Copying %s to s3://%s/%s', src_file.filepath, self.bucket, dest_filepath) - self.s3_client.upload_file(src_file.filepath, self.bucket, dest_filepath) + aws_client('s3').upload_file(src_file.filepath, self.bucket, dest_filepath) if src_file.delete: os.remove(src_file.filepath) else: log.debug('Deleting s3://%s/%s', self.bucket, dest_filepath) - self.s3_client.delete_object(Bucket=self.bucket, Key=dest_filepath) + aws_client('s3').delete_object(Bucket=self.bucket, Key=dest_filepath) def stop(self): self.stop_event.set() diff --git a/twarccloud/harvester/server_thread.py b/twarccloud/harvester/server_thread.py index 588d5c3..ebd1854 100644 --- a/twarccloud/harvester/server_thread.py +++ b/twarccloud/harvester/server_thread.py @@ -43,14 +43,16 @@ def shutdown(): # Thread that runs this Flask application. +# pylint: disable=too-many-arguments class ServerThread(threading.Thread): - def __init__(self, stop_event, stopped_event, shutdown_event, harvest_info): + def __init__(self, stop_event, stopped_event, shutdown_event, harvest_info, port): threading.Thread.__init__(self) self.daemon = True + self.port = port app.config['stop_event'] = stop_event app.config['stopped_event'] = stopped_event app.config['shutdown_event'] = shutdown_event app.config['harvest_info'] = harvest_info def run(self): - app.run(host='0.0.0.0', port=80, debug=False) + app.run(host='0.0.0.0', port=self.port, debug=False) diff --git a/twarccloud/harvester/tweet_writer_thread.py b/twarccloud/harvester/tweet_writer_thread.py index a042496..69ae06d 100644 --- a/twarccloud/harvester/tweet_writer_thread.py +++ b/twarccloud/harvester/tweet_writer_thread.py @@ -49,12 +49,13 @@ def __exit__(self, *args): self.join() def write(self, tweet): + if self.tweet_count == self.tweets_per_file: + log.debug('Rolling over because tweet count is %s', self.tweet_count) + self._new_file() with self.file_lock: self.file.write('{}\n'.format(json.dumps(tweet)).encode('utf-8')) self.tweet_count += 1 self.harvest_info.tweets.incr() - if self.tweet_count == self.tweets_per_file: - self._new_file() def _new_file(self): with self.file_lock: @@ -73,9 +74,10 @@ def _close_file(self): if self.timer: self.timer.cancel() if self.file: + log.debug('Closing %s', self.filepath) + self.file.close() self.harvest_info.files.incr() self.harvest_info.file_bytes.incr(os.path.getsize(self.filepath)) - self.file.close() self._add_to_manifest() log.debug('Adding %s to file queue', self.filepath) self.file_queue.put(AddFile(self.filepath, True)) @@ -86,9 +88,12 @@ def _generate_filepath(self): datetime.utcnow().strftime('%Y%m%d%H%M%S')) def _add_to_manifest(self): - with FileQueueingWriter(get_harvest_manifest_filepath(self.collection_id, self.harvest_timestamp), + log.debug('Adding %s to manifest', self.filepath) + with FileQueueingWriter(get_harvest_manifest_filepath(self.collection_id, self.harvest_timestamp, + collections_path=self.collections_path), self.file_queue, mode='a') as writer: writer.write('{} {}\n'.format(self._sha1(), os.path.basename(self.filepath))) def _sha1(self): - return hashlib.sha1(open(self.filepath, 'rb').read()).hexdigest() + with open(self.filepath, 'rb') as file: + return hashlib.sha1(file.read()).hexdigest() diff --git a/tweet_harvester.py b/tweet_harvester.py index aa6754b..d079207 100644 --- a/tweet_harvester.py +++ b/tweet_harvester.py @@ -10,8 +10,8 @@ import dateutil.parser from twarccloud.harvester.server_thread import ServerThread from twarccloud.harvester.twarc_thread import TwarcThread -from twarccloud.filepaths_helper import get_harvest_path, get_lock_file, get_collection_config_filepath, \ - get_harvest_info_file, get_changeset_file +from twarccloud.filepaths_helper import get_lock_file, get_collection_config_filepath, \ + get_harvest_info_file, get_changeset_file, get_harvest_file from twarccloud.harvester.file_mover_thread import S3FileMoverThread from twarccloud.harvester.collection_lock import CollectionLock, assert_locked from twarccloud.aws.aws_helper import sync_collection_config, sync_collection_config_file @@ -25,17 +25,18 @@ from twarccloud import log, __version__ -# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-instance-attributes, too-few-public-methods class TweetHarvester: # pylint: disable=too-many-arguments def __init__(self, collection_id, collections_path, bucket=None, tweets_per_file=None, monitor=False, - shutdown=False): + shutdown=False, port=80): self.harvest_timestamp = datetime.utcnow() self.collection_id = collection_id self.collections_path = collections_path self.bucket = bucket self.tweets_per_file = tweets_per_file self.monitor = monitor + self.port = port # When running in AWS as a service: # 1. twarc_cloud calls /stop, which sets stop_event. @@ -78,10 +79,10 @@ def harvest(self): sync_collection_config(self.collections_path, self.collection_id, self.bucket) # Check if collection is locked - assert_locked(self.lock_filepath()) + assert_locked(get_lock_file(self.collection_id, collections_path=self.collections_path)) # Start the server - ServerThread(self.stop_event, self.stopped_event, self.shutdown_event, self.harvest_info).start() + ServerThread(self.stop_event, self.stopped_event, self.shutdown_event, self.harvest_info, self.port).start() # Start the monitor if self.monitor: @@ -91,7 +92,7 @@ def harvest(self): collection_config = self._load_collection_config() with S3FileMoverThread(self.file_queue, self.collections_path, self.bucket), CollectionLock( - self.collections_path, self.collection_id, self.file_queue, collect_timestamp=self.harvest_timestamp): + self.collections_path, self.collection_id, self.file_queue, harvest_timestamp=self.harvest_timestamp): # Write the collection config file to harvester self._write_harvest_collection_config(collection_config) @@ -153,9 +154,8 @@ def _load_collection_config(self): return CollectionConfig(json.load(config_file)) def _write_harvest_collection_config(self, collection_config): - harvest_collection_config_filepath = os.path.join( - get_harvest_path(self.collection_id, self.harvest_timestamp, collections_path=self.collections_path), - 'collection.json') + harvest_collection_config_filepath = get_harvest_file(self.collection_id, self.harvest_timestamp, + 'collection.json', collections_path=self.collections_path) os.makedirs(os.path.dirname(harvest_collection_config_filepath), exist_ok=True) # Remove secrets clean_config = copy.deepcopy(collection_config) @@ -164,8 +164,6 @@ def _write_harvest_collection_config(self, collection_config): with FileQueueingWriter(harvest_collection_config_filepath, self.file_queue) as config_writer: config_writer.write_json(clean_config, indent=2) - def lock_filepath(self): - return get_lock_file(self.collection_id, collections_path=self.collections_path) def add_local_subparser(subparsers): local_parser = subparsers.add_parser('local', help='Collect in local mode.')