Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discard master-slave jargon #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def cherry_pick(pr_num, merge_hash, default_branch):

def fix_version_from_branch(branch, versions):
# Note: Assumes this is a sorted (newest->oldest) list of un-released versions
if branch == "master":
if branch == "main":
return versions[0]
else:
branch_ver = branch.replace("branch-", "")
Expand Down Expand Up @@ -259,7 +259,7 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
default_fix_versions = map(lambda x: fix_version_from_branch(x, versions).name, merge_branches)
for v in default_fix_versions:
# Handles the case where we have forked a release branch but not yet made the release.
# In this case, if the PR is committed to the master branch and the release branch, we
# In this case, if the PR is committed to the main branch and the release branch, we
# only consider the release branch to be the fix version. E.g. it is not valid to have
# both 1.1.0 and 1.0.0 as fix versions.
(major, minor, patch) = v.split(".")
Expand Down Expand Up @@ -314,8 +314,8 @@ def standardize_jira_ref(text):
'[SPARK-1094] Support MiMa for reporting binary compatibility accross versions.'
>>> standardize_jira_ref("[WIP] [SPARK-1146] Vagrant support for Spark")
'[SPARK-1146][WIP] Vagrant support for Spark'
>>> standardize_jira_ref("SPARK-1032. If Yarn app fails before registering, app master stays aroun...")
'[SPARK-1032] If Yarn app fails before registering, app master stays aroun...'
>>> standardize_jira_ref("SPARK-1032. If Yarn app fails before registering, app main stays aroun...")
'[SPARK-1032] If Yarn app fails before registering, app main stays aroun...'
>>> standardize_jira_ref("[SPARK-6250][SPARK-6146][SPARK-5911][SQL] Types are now reserved words in DDL parser.")
'[SPARK-6250][SPARK-6146][SPARK-5911][SQL] Types are now reserved words in DDL parser.'
>>> standardize_jira_ref("Additional information for users building from source code")
Expand Down
6 changes: 3 additions & 3 deletions dev/run-tests-jenkins.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ def main():
# ---
# $ghprbActualCommit
# This is the hash of the most recent commit in the PR.
# The merge-base of this and master is the commit from which the PR was branched.
# The merge-base of this and main is the commit from which the PR was branched.
# $sha1
# If the patch merges cleanly, this is a reference to the merge commit hash
# (e.g. "origin/pr/2606/merge").
# If the patch does not merge cleanly, it is equal to $ghprbActualCommit.
# The merge-base of this and master in the case of a clean merge is the most recent commit
# against master.
# The merge-base of this and main in the case of a clean merge is the most recent commit
# against main.
ghprb_pull_id = os.environ["ghprbPullId"]
ghprb_actual_commit = os.environ["ghprbActualCommit"]
ghprb_pull_title = os.environ["ghprbPullTitle"]
Expand Down
478 changes: 239 additions & 239 deletions ec2/spark_ec2.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions python/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
# The encoding of source files.
#source_encoding = 'utf-8-sig'

# The master toctree document.
master_doc = 'index'
# The main toctree document.
main_doc = 'index'

# General information about the project.
project = u'PySpark'
Expand All @@ -55,7 +55,7 @@
# built documents.
#
# The short X.Y version.
version = 'master'
version = 'main'
# The full version, including alpha/beta/rc tags.
release = os.environ.get('RELEASE_VERSION', version)

Expand Down
14 changes: 7 additions & 7 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
>>> from pyspark.conf import SparkConf
>>> from pyspark.context import SparkContext
>>> conf = SparkConf()
>>> conf.setMaster("local").setAppName("My app")
>>> conf.setMain("local").setAppName("My app")
<pyspark.conf.SparkConf object at ...>
>>> conf.get("spark.master")
>>> conf.get("spark.main")
u'local'
>>> conf.get("spark.app.name")
u'My app'
>>> sc = SparkContext(conf=conf)
>>> sc.master
>>> sc.main
u'local'
>>> sc.appName
u'My app'
Expand Down Expand Up @@ -80,7 +80,7 @@ class SparkConf(object):
what the system properties are.

All setter methods in this class support chaining. For example,
you can write C{conf.setMaster("local").setAppName("My app")}.
you can write C{conf.setMain("local").setAppName("My app")}.

Note that once a SparkConf object is passed to Spark, it is cloned
and can no longer be modified by the user.
Expand Down Expand Up @@ -116,9 +116,9 @@ def setIfMissing(self, key, value):
self.set(key, value)
return self

def setMaster(self, value):
"""Set master URL to connect to."""
self._jconf.setMaster(value)
def setMain(self, value):
"""Set main URL to connect to."""
self._jconf.setMain(value)
return self

def setAppName(self, value):
Expand Down
28 changes: 14 additions & 14 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ class SparkContext(object):

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
def __init__(self, main=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None, jsc=None, profiler_cls=BasicProfiler):
"""
Create a new SparkContext. At least the master and app name should be set,
Create a new SparkContext. At least the main and app name should be set,
either through the named parameters here or through C{conf}.

:param master: Cluster URL to connect to
:param main: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
:param appName: A name for your job, to display on the cluster web UI.
:param sparkHome: Location where Spark is installed on cluster nodes.
Expand Down Expand Up @@ -111,14 +111,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._callsite = first_spark_call() or CallSite(None, None, None)
SparkContext._ensure_initialized(self, gateway=gateway)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
self._do_init(main, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf, jsc, profiler_cls)
except:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise

def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
def _do_init(self, main, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf, jsc, profiler_cls):
self.environment = environment or {}
self._conf = conf or SparkConf(_jvm=self._jvm)
Expand All @@ -131,8 +131,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
batchSize)

# Set any parameters passed directly to us on the conf
if master:
self._conf.setMaster(master)
if main:
self._conf.setMain(main)
if appName:
self._conf.setAppName(appName)
if sparkHome:
Expand All @@ -144,19 +144,19 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._conf.setIfMissing(key, value)

# Check that we have at least the required parameters
if not self._conf.contains("spark.master"):
raise Exception("A master URL must be set in your configuration")
if not self._conf.contains("spark.main"):
raise Exception("A main URL must be set in your configuration")
if not self._conf.contains("spark.app.name"):
raise Exception("An application name must be set in your configuration")

# Read back our properties from the conf in case we loaded some of them from
# the classpath or an external config file
self.master = self._conf.get("spark.master")
self.main = self._conf.get("spark.main")
self.appName = self._conf.get("spark.app.name")
self.sparkHome = self._conf.get("spark.home", None)

# Let YARN know it's a pyspark app, so it distributes needed libraries.
if self.master == "yarn-client":
if self.main == "yarn-client":
self._conf.set("spark.yarn.isPython", "true")

for (k, v) in self._conf.getAll():
Expand Down Expand Up @@ -248,16 +248,16 @@ def _ensure_initialized(cls, instance=None, gateway=None):
if instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentMain = SparkContext._active_spark_context.main
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
"existing SparkContext(app=%s, main=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
% (currentAppName, currentMain,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ def func(split, iterator):

def collectAsMap(self):
"""
Return the key-value pairs in this RDD to the master as a dictionary.
Return the key-value pairs in this RDD to the main as a dictionary.

>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
Expand Down Expand Up @@ -1560,7 +1560,7 @@ def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
def reduceByKeyLocally(self, func):
"""
Merge the values for each key using an associative reduce function, but
return the results immediately to the master as a dictionary.
return the results immediately to the main as a dictionary.

This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
Expand All @@ -1585,7 +1585,7 @@ def mergeMaps(m1, m2):
def countByKey(self):
"""
Count the number of elements for each key, and return the result to the
master as a dictionary.
main as a dictionary.

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def remember(self, duration):

def checkpoint(self, directory):
"""
Sets the context to periodically checkpoint the DStream operations for master
Sets the context to periodically checkpoint the DStream operations for main
fault-tolerance. The graph will be checkpointed every batch interval.

@param directory: HDFS-compatible directory where the checkpoint data
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/streaming/flume.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def createStream(ssc, hostname, port,
Create an input stream that pulls events from Flume.

:param ssc: StreamingContext object
:param hostname: Hostname of the slave machine to which the flume data will be sent
:param port: Port of the slave machine to which the flume data will be sent
:param hostname: Hostname of the subordinate machine to which the flume data will be sent
:param port: Port of the subordinate machine to which the flume data will be sent
:param storageLevel: Storage level to use for storing the received objects
:param enableDecompression: Should netty server decompress input stream
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ def test_module_dependency_on_cluster(self):
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--main",
"local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
Expand Down Expand Up @@ -1856,7 +1856,7 @@ def test_package_dependency_on_cluster(self):
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, "--master",
"file:" + self.programDir, "--main",
"local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
Expand All @@ -1876,7 +1876,7 @@ def test_single_script_on_cluster(self):
# this will fail if you have different spark.executor.memory
# in conf/spark-defaults.conf
proc = subprocess.Popen(
[self.sparkSubmit, "--master", "local-cluster[1,1,1024]", script],
[self.sparkSubmit, "--main", "local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
Expand All @@ -1887,7 +1887,7 @@ class ContextTests(unittest.TestCase):

def test_failed_sparkcontext_creation(self):
# Regression test for SPARK-1550
self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name"))
self.assertRaises(Exception, lambda: SparkContext("an-invalid-main-name"))

def test_get_or_create(self):
with SparkContext.getOrCreate() as sc:
Expand Down