Skip to content

Commit

Permalink
Implement raw_command on Client, PooledClient and HashClient.
Browse files Browse the repository at this point in the history
`raw_command` allows for sending raw commands to Memcached, and specify a custom end token.
This allows for communicating with non-standard servers such as ElasticCache instances.

Part of implementing pinterest#87.
  • Loading branch information
martinnj committed Jun 9, 2022
1 parent 25ec117 commit 4258684
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 31 deletions.
21 changes: 13 additions & 8 deletions pymemcache/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import errno
from functools import partial
import platform
import socket
from typing import Tuple, Union
Expand Down Expand Up @@ -867,7 +868,7 @@ def version(self):
raise MemcacheUnknownError("Received unexpected response: %s" % results[0])
return after

def misc(self, command, end_tokens="\r\n"):
def raw_command(self, command, end_tokens="\r\n"):
"""
Sends an arbitrary command to the server and parses the response until a
specified token is encountered.
Expand All @@ -876,7 +877,7 @@ def misc(self, command, end_tokens="\r\n"):
command: str|bytes: The command to send.
end_tokens: str|bytes: The token expected at the end of the
response. If the `end_token` is not found, the client will wait
until the timesout specified in the constructor.
until the timeout specified in the constructor.
Returns:
The response from the server, with the `end_token` removed.
Expand Down Expand Up @@ -1151,11 +1152,10 @@ def _misc_cmd(self, cmds, cmd_name, noreply, end_tokens=None):

# If no end_tokens have been given, just assume standard memcached
# operations, which end in "\r\n", use regular code for that.
_reader = _readline
if end_tokens:

def _reader(_sock, _buf):
return _readsegment(_sock, _buf, end_tokens)
_reader = partial(_readsegment, end_tokens=end_tokens)
else:
_reader = _readline

if self.sock is None:
self._connect()
Expand Down Expand Up @@ -1426,6 +1426,10 @@ def shutdown(self, graceful=False):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
client.shutdown(graceful)

def raw_command(self, command, end_tokens=b"\r\n"):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.raw_command(command, end_tokens)

def __setitem__(self, key, value):
self.set(key, value, noreply=True)

Expand Down Expand Up @@ -1560,8 +1564,9 @@ def _readsegment(sock, buf, end_tokens):

while True:

if buf.find(end_tokens) != -1:
before, sep, after = buf.partition(end_tokens)
tokens_pos = buf.find(end_tokens)
if tokens_pos != -1:
before, after = buf[:tokens_pos], buf[tokens_pos + len(end_tokens) :]
result += before
return after, result

Expand Down
39 changes: 39 additions & 0 deletions pymemcache/client/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,42 @@ def flush_all(self, *args, **kwargs):
def quit(self):
for client in self.clients.values():
self._safely_run_func(client, client.quit, False)

def raw_command(self, command, end_tokens="\r\n", send_to_all=False):
"""
Sends an arbitrary command to the server and parses the response until a
specified token is encountered.
OBS: If the end_tokens are not included in the reply, the client will wait until it's
configured timeout is reached.
Args:
command: str|bytes: The command to send.
end_tokens: str|bytes: The token expected at the end of the
response. If the `end_token` is not found, the client will wait
until the timeout specified in the constructor.
send_to_all: bool, whether or not the command is sent to all servers.
If left at false it will only be sent to a single server, as determined by hashing.
Returns:
The response from the server, with the `end_token` removed.
If `send_to_all` is True, a list of responses will be returned.
"""

if send_to_all:
responses = []
for client in self.clients.values():
response = self._safely_run_func(
client, client.raw_command, False, command=command, end_tokens=end_tokens
)
responses.append(response)
return responses

# Whitespace not allowed in _get_client, the cache_key_helper tries to check it against the
# memcached spec for keys.
# So we hash it ahead of time to remove them.
command_hash = str(self.hasher.hash_function(command))
client = self._get_client(command_hash)
return self._safely_run_func(
client, client.raw_command, False, command=command, end_tokens=end_tokens
)
46 changes: 23 additions & 23 deletions pymemcache/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,14 +1149,14 @@ def test_version_exception(self):
with pytest.raises(MemcacheUnknownError):
client.version()

def test_misc_command_default_end_tokens(self):
def test_raw_command_default_end_tokens(self):
client = self.make_client([b"REPLY\r\n", b"REPLY\r\nLEFTOVER"])
result = client.misc(b"misc")
result = client.raw_command(b"misc")
assert result == b"REPLY"
result = client.misc(b"misc")
result = client.raw_command(b"misc")
assert result == b"REPLY"

def test_misc_command_custom_end_tokens(self):
def test_raw_command_custom_end_tokens(self):
client = self.make_client(
[
b"REPLY\r\nEND\r\n",
Expand All @@ -1166,45 +1166,45 @@ def test_misc_command_custom_end_tokens(self):
]
)
end_tokens = b"END\r\n"
result = client.misc(b"misc", end_tokens)
result = client.raw_command(b"misc", end_tokens)
assert result == b"REPLY\r\n"
result = client.misc(b"misc", end_tokens)
result = client.raw_command(b"misc", end_tokens)
assert result == b"REPLY\r\n"
result = client.misc(b"misc", end_tokens)
result = client.raw_command(b"misc", end_tokens)
assert result == b"REPLY"
result = client.misc(b"misc", b"\n")
result = client.raw_command(b"misc", b"\n")
assert result == b"REPLY"

def test_misc_command_missing_end_tokens(self):
def test_raw_command_missing_end_tokens(self):
client = self.make_client([b"REPLY", b"REPLY"])
with pytest.raises(IndexError):
client.misc(b"misc")
client.raw_command(b"misc")
with pytest.raises(IndexError):
client.misc(b"misc", b"END\r\n")
client.raw_command(b"misc", b"END\r\n")

def test_misc_command_empty_end_tokens(self):
def test_raw_command_empty_end_tokens(self):
client = self.make_client([b"REPLY"])

with pytest.raises(IndexError):
client.misc(b"misc", b"")
client.raw_command(b"misc", b"")

def test_misc_command_types(self):
def test_raw_command_types(self):
client = self.make_client(
[b"REPLY\r\n", b"REPLY\r\n", b"REPLY\r\nLEFTOVER", b"REPLY\r\nLEFTOVER"]
)
assert client.misc("key") == b"REPLY"
assert client.misc(b"key") == b"REPLY"
assert client.misc("key") == b"REPLY"
assert client.misc(b"key") == b"REPLY"
assert client.raw_command("key") == b"REPLY"
assert client.raw_command(b"key") == b"REPLY"
assert client.raw_command("key") == b"REPLY"
assert client.raw_command(b"key") == b"REPLY"

def test_misc_end_token_types(self):
def test_send_end_token_types(self):
client = self.make_client(
[b"REPLY\r\n", b"REPLY\r\n", b"REPLY\r\nLEFTOVER", b"REPLY\r\nLEFTOVER"]
)
assert client.misc("key", "\r\n") == b"REPLY"
assert client.misc(b"key", b"\r\n") == b"REPLY"
assert client.misc("key", "\r\n") == b"REPLY"
assert client.misc(b"key", b"\r\n") == b"REPLY"
assert client.raw_command("key", "\r\n") == b"REPLY"
assert client.raw_command(b"key", b"\r\n") == b"REPLY"
assert client.raw_command("key", "\r\n") == b"REPLY"
assert client.raw_command(b"key", b"\r\n") == b"REPLY"


@pytest.mark.unit()
Expand Down
46 changes: 46 additions & 0 deletions pymemcache/test/test_client_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,52 @@ def test_quit(self):
assert result is None
assert all(c.sock is None for c in client.clients.values())

def test_raw_command_send_one(self):
# We add a reply for each node to test that the command is hashed to the
# same node every time.
client = self.make_client(
*[
[
b"VALUE\r\nTOKEN\r\nEND\n",
],
[
b"VALUE\r\nTOKEN\r\nEND\n",
],
]
)

# Check the returned value.
assert client.raw_command("NOT_A_COMMAND", "END\n", send_to_all=False) == b"VALUE\r\nTOKEN\r\n"

# Check that we get an IndexError indicating the command hashed to the same node.
with pytest.raises(IndexError):
_ = client.raw_command("NOT_A_COMMAND", "END\n", send_to_all=False)

def test_raw_command_send_all(self):
# We add different replies to check that an error on one node causes an
# exception as expected.
client = self.make_client(
*[
[
b"VALUE\r\nTOKEN\r\nEND\n",
b"VALUE\r\nTOKEN\r\nEND\n",
],
[
b"VALUE\r\nTOKEN\r\nEND\n",
],
]
)

# Check the returned value.
values = client.raw_command("NOT_A_COMMAND", "END\n", send_to_all=True)
assert len(values) == 2
for value in values:
assert value == b"VALUE\r\nTOKEN\r\n"

# Check that we get an IndexError.
with pytest.raises(IndexError):
_ = client.raw_command("NOT_A_COMMAND", "END\n", send_to_all=False)

def test_no_servers_left(self):
from pymemcache.client.hash import HashClient

Expand Down

0 comments on commit 4258684

Please sign in to comment.