diff --git a/LICENSE b/LICENSE index f377280..3c1cbb5 100644 --- a/LICENSE +++ b/LICENSE @@ -198,4 +198,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/parse_callbacks.py b/parse_callbacks.py index d22ebd1..751ccee 100644 --- a/parse_callbacks.py +++ b/parse_callbacks.py @@ -29,15 +29,15 @@ def clean_str(input_str): - return input_str.replace('\r', '').replace('\n', '') + return input_str.replace("\r", "").replace("\n", "") def basic(action_result, response): # Default one, just add the data to the action result data = {} - data['status_code'] = response.status_code - data['std_out'] = response.std_out - data['std_err'] = response.std_err + data["status_code"] = response.status_code + data["std_out"] = response.std_out + data["std_err"] = response.std_err action_result.add_data(data) return phantom.APP_SUCCESS @@ -45,14 +45,11 @@ def basic(action_result, response): def check_exit(action_result, response): if response.std_err: - return action_result.set_status( - phantom.APP_ERROR, - "Error running command: {}".format(clean_str(response.std_err)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_err))) data = {} - data['status_code'] = response.status_code - data['std_out'] = response.std_out - data['std_err'] = response.std_err + data["status_code"] = response.status_code + data["std_out"] = response.std_out + data["std_err"] = response.std_err action_result.add_data(data) @@ -61,73 +58,58 @@ def check_exit_no_data(action_result, response): if response.status_code: if isinstance(response.std_err, bytes): try: - response.std_err = response.std_err.decode('UTF-8') + response.std_err = response.std_err.decode("UTF-8") except: pass - return action_result.set_status( - phantom.APP_ERROR, - "Error running command: {}".format(clean_str(response.std_err)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_err))) return phantom.APP_SUCCESS def check_exit_no_data2(action_result, response): if response.std_err: - return action_result.set_status( - phantom.APP_ERROR, - "Error running command: {}".format(clean_str(response.std_err)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_err))) return phantom.APP_SUCCESS def check_exit_no_data_stdout(action_result, response): # Same as above, but for when the error message appears in std_out instead of std_err if response.status_code: - return action_result.set_status( - phantom.APP_ERROR, - "Error running command: {}".format(clean_str(response.std_out)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_out))) return phantom.APP_SUCCESS def ensure_no_errors(action_result, response): if response.status_code and response.std_err: - return action_result.set_status( - phantom.APP_ERROR, "Error running command: {}{}".format( - response.std_out, - response.std_err - ) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}{}".format(response.std_out, response.std_err)) return phantom.APP_SUCCESS def list_processes(action_result, response): if response.status_code != 0: return action_result.set_status( - phantom.APP_ERROR, - "Error: Returned non-zero status code. stderr: {}".format(clean_str(response.std_err)) + phantom.APP_ERROR, "Error: Returned non-zero status code. stderr: {}".format(clean_str(response.std_err)) ) output = response.std_out processes = json.loads(output) if not processes: summary = action_result.update_summary({}) - summary['num_processes'] = 0 + summary["num_processes"] = 0 return action_result.set_status(phantom.APP_ERROR, "No processes found") column_mapping = { - 'Handles': 'handles', - 'NPM': 'non_paged_memory', - 'PM': 'paged_memory', - 'WS': 'working_set', - 'VM': 'virtual_memory', - 'CPU': 'processor_time_(s)', - 'Id': 'pid', - 'SessionId': 'session_id', - 'Name': 'name', + "Handles": "handles", + "NPM": "non_paged_memory", + "PM": "paged_memory", + "WS": "working_set", + "VM": "virtual_memory", + "CPU": "processor_time_(s)", + "Id": "pid", + "SessionId": "session_id", + "Name": "name", } for process in processes: - data = { 'raw': process } + data = {"raw": process} for key, value in process.items(): key = column_mapping.get(key) if key: @@ -139,25 +121,21 @@ def list_processes(action_result, response): return action_result.set_status(phantom.APP_ERROR, "Unable to parse process list") summary = action_result.update_summary({}) - summary['num_processes'] = size + summary["num_processes"] = size return phantom.APP_SUCCESS def terminate_process(action_result, response): if response.std_err: - return action_result.set_status( - phantom.APP_ERROR, - "Error terminating process: {}".format(clean_str(response.std_err)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error terminating process: {}".format(clean_str(response.std_err))) return phantom.APP_SUCCESS def list_connections(action_result, response): if response.status_code != 0: return action_result.set_status( - phantom.APP_ERROR, - "Error: Returned non-zero status code. stderr: {}".format(clean_str(response.std_err)) + phantom.APP_ERROR, "Error: Returned non-zero status code. stderr: {}".format(clean_str(response.std_err)) ) lines = response.std_out.splitlines() @@ -165,25 +143,25 @@ def list_connections(action_result, response): connection = {} columns = line.split() try: - connection['protocol'] = columns[0] + connection["protocol"] = columns[0] try: - local_address = columns[1].rsplit(':', 1) + local_address = columns[1].rsplit(":", 1) except TypeError: # py3 - local_address = (columns[1].decode('UTF-8')).rsplit(':', 1) + local_address = (columns[1].decode("UTF-8")).rsplit(":", 1) - connection['local_address_ip'] = local_address[0] - connection['local_address_port'] = local_address[1] + connection["local_address_ip"] = local_address[0] + connection["local_address_port"] = local_address[1] try: - foreign_address = columns[2].rsplit(':', 1) + foreign_address = columns[2].rsplit(":", 1) except TypeError: # py3 - foreign_address = (columns[2].decode('UTF-8')).rsplit(':', 1) + foreign_address = (columns[2].decode("UTF-8")).rsplit(":", 1) - connection['foreign_address_ip'] = foreign_address[0] - connection['foreign_address_port'] = foreign_address[1] - connection['state'] = columns[3] - connection['pid'] = int(columns[4]) + connection["foreign_address_ip"] = foreign_address[0] + connection["foreign_address_port"] = foreign_address[1] + connection["state"] = columns[3] + connection["pid"] = int(columns[4]) except: continue action_result.add_data(connection) @@ -193,25 +171,19 @@ def list_connections(action_result, response): return action_result.set_status(phantom.APP_ERROR, "Unable to parse connection list") summary = action_result.update_summary({}) - summary['num_connections'] = size + summary["num_connections"] = size return phantom.APP_SUCCESS def parse_rule(action_result, rule_lines): - name_map = { - 'localip': 'local_ip', - 'remoteip': 'remote_ip', - 'localport': 'local_port', - 'remoteport': 'remote_port' - - } + name_map = {"localip": "local_ip", "remoteip": "remote_ip", "localport": "local_port", "remoteport": "remote_port"} rule = {} for line in rule_lines: - columns = line.split(':', 1) - if columns[0].startswith('--'): + columns = line.split(":", 1) + if columns[0].startswith("--"): continue - key_name = columns[0].lower().replace(' ', '_') + key_name = columns[0].lower().replace(" ", "_") key_name = name_map.get(key_name, key_name) try: rule[key_name] = columns[1].lower().strip() @@ -221,30 +193,26 @@ def parse_rule(action_result, rule_lines): return rule -def filtered_rule( - action_result, rule, - filter_port=None, - filter_ip=None, - **kwargs): +def filtered_rule(action_result, rule, filter_port=None, filter_ip=None, **kwargs): if filter_port: - if rule.get('remote_port') == filter_port: + if rule.get("remote_port") == filter_port: pass - elif rule.get('local_port') == filter_port: + elif rule.get("local_port") == filter_port: pass else: return False if filter_ip: - if rule.get('remote_ip') == filter_ip: + if rule.get("remote_ip") == filter_ip: pass - elif rule.get('local_ip') == filter_ip: + elif rule.get("local_ip") == filter_ip: pass else: return False for k, v in six.iteritems(kwargs): - if rule.get(k, '').lower() != v.lower(): + if rule.get(k, "").lower() != v.lower(): return False return True @@ -255,27 +223,24 @@ def filtered_rule( def list_firewall_rules(action_result, response, **kwargs): if response.status_code != 0: # The only reason this should fail is if there are no firewall rules - action_result.update_summary({'num_rules': 0}) - return action_result.set_status( - phantom.APP_SUCCESS, - "No firewall rules were found" - ) + action_result.update_summary({"num_rules": 0}) + return action_result.set_status(phantom.APP_SUCCESS, "No firewall rules were found") lines = list() if isinstance(response.std_out, str): lines = response.std_out.splitlines() else: - lines = response.std_out.decode('UTF-8').splitlines() + lines = response.std_out.decode("UTF-8").splitlines() rule_lines = None for line in lines: # start of a new rule - if line.startswith('Rule Name:'): + if line.startswith("Rule Name:"): rule_lines = [] rule_lines.append(line) elif not rule_lines: continue - elif line.strip() == '' and rule_lines: + elif line.strip() == "" and rule_lines: rule = parse_rule(action_result, rule_lines) if filtered_rule(action_result, rule, **kwargs): action_result.add_data(rule) @@ -285,7 +250,7 @@ def list_firewall_rules(action_result, response, **kwargs): size = action_result.get_data_size() summary = action_result.update_summary({}) - summary['num_rules'] = size + summary["num_rules"] = size if size == 0: return action_result.set_status(phantom.APP_SUCCESS, "No firewall rule found for given parameters") @@ -298,22 +263,18 @@ def create_firewall_rule(action_result, response): message = response.std_out.splitlines()[1] except: message = response.std_out - return action_result.set_status( - phantom.APP_ERROR, "Error running command: {}".format(message) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(message)) return phantom.APP_SUCCESS def delete_firewall_rule(action_result, response): if response.status_code: - return action_result.set_status( - phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_out)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_out))) # action_result.add_data({'message': response.std_out}) summary = action_result.update_summary({}) try: - summary['rules_deleted'] = int(response.std_out.split()[1]) + summary["rules_deleted"] = int(response.std_out.split()[1]) except: pass return phantom.APP_SUCCESS @@ -321,24 +282,24 @@ def delete_firewall_rule(action_result, response): def list_sessions(action_result, response): if isinstance(response.std_out, bytes): - lines = (response.std_out.decode('UTF-8')).splitlines() + lines = (response.std_out.decode("UTF-8")).splitlines() else: lines = response.std_out.splitlines() - username_index = lines[0].find('USERNAME') - type_index = lines[0].find('TYPE') - device_index = lines[0].find('DEVICE') + username_index = lines[0].find("USERNAME") + type_index = lines[0].find("TYPE") + device_index = lines[0].find("DEVICE") for line in lines[1:]: i = 0 session = {} columns = line.split() - if line.startswith('>'): - session['name'] = columns[i][1:] - session['this'] = True + if line.startswith(">"): + session["name"] = columns[i][1:] + session["this"] = True else: - session['name'] = columns[i] - session['this'] = False + session["name"] = columns[i] + session["this"] = False if not line[username_index].isspace(): i += 1 username = columns[i] @@ -346,8 +307,8 @@ def list_sessions(action_result, response): username = "" i += 1 - session['username'] = username - session['id'] = columns[i] + session["username"] = username + session["id"] = columns[i] if not line[type_index].isspace(): i += 1 @@ -356,7 +317,7 @@ def list_sessions(action_result, response): type_ = "" i += 1 - session['type'] = type_ + session["type"] = type_ if not line[device_index].isspace(): i += 1 @@ -365,29 +326,29 @@ def list_sessions(action_result, response): device = "" i += 1 - session['type'] = device + session["type"] = device action_result.add_data(session) size = action_result.get_data_size() summary = action_result.update_summary({}) - summary['num_sessions'] = size + summary["num_sessions"] = size return phantom.APP_SUCCESS def _parse_rule(rule): d = {} - d['description'] = rule.pop('@Description', '') - d['name'] = rule.pop('@Name', '') - d['user_or_group_sid'] = rule.pop('@UserOrGroupSid', None) - d['action'] = rule.pop('@Action', None) - d['id'] = rule.pop('@Id', None) - file_path_condition = rule.get('Conditions', {}).get('FilePathCondition', {}).get('@Path') + d["description"] = rule.pop("@Description", "") + d["name"] = rule.pop("@Name", "") + d["user_or_group_sid"] = rule.pop("@UserOrGroupSid", None) + d["action"] = rule.pop("@Action", None) + d["id"] = rule.pop("@Id", None) + file_path_condition = rule.get("Conditions", {}).get("FilePathCondition", {}).get("@Path") if file_path_condition: - d['file_path_condition'] = file_path_condition - rule.get('Conditions', {}).pop('FilePathCondition', None) - if len(rule.get('Conditions', {})) == 0: - rule.pop('Conditions', None) + d["file_path_condition"] = file_path_condition + rule.get("Conditions", {}).pop("FilePathCondition", None) + if len(rule.get("Conditions", {})) == 0: + rule.pop("Conditions", None) for k, v in six.iteritems(rule): # Add anything left over d[k] = v @@ -396,22 +357,17 @@ def _parse_rule(rule): def list_applocker_policies(action_result, response): if response.status_code: - return action_result.set_status( - phantom.APP_ERROR, - "Error running command: {}".format(clean_str(response.std_err)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_err))) try: # Get rid of all the linebreaks to prevent errors during reading data = xmltodict.parse("".join(response.std_out.splitlines())) except TypeError: - data = xmltodict.parse("".join((response.std_out.decode('utf-8')).splitlines())) + data = xmltodict.parse("".join((response.std_out.decode("utf-8")).splitlines())) except Exception as e: - return action_result.set_status( - phantom.APP_ERROR, "Error parsing XML response: {}".format(str(e)) - ) + return action_result.set_status(phantom.APP_ERROR, "Error parsing XML response: {}".format(str(e))) try: - rule_collection = data['AppLockerPolicy']['RuleCollection'] + rule_collection = data["AppLockerPolicy"]["RuleCollection"] except KeyError: return action_result.set_status(phantom.APP_SUCCESS, "No AppLocker Policies were found") @@ -419,22 +375,22 @@ def list_applocker_policies(action_result, response): rule_collection = [rule_collection] for rule in rule_collection: - r_type = rule['@Type'] - enforcement_mode = rule['@EnforcementMode'] - for rule_condition in ['FilePublisherRule', 'FilePathRule', 'FileHashRule']: + r_type = rule["@Type"] + enforcement_mode = rule["@EnforcementMode"] + for rule_condition in ["FilePublisherRule", "FilePathRule", "FileHashRule"]: condition = rule.get(rule_condition) if condition is None: continue if type(condition) in (dict, OrderedDict): d = _parse_rule(condition) - d['type'] = r_type - d['enforcement_mode'] = enforcement_mode + d["type"] = r_type + d["enforcement_mode"] = enforcement_mode action_result.add_data(d) elif type(condition) is list: for c in condition: d = _parse_rule(c) - d['type'] = r_type - d['enforcement_mode'] = enforcement_mode + d["type"] = r_type + d["enforcement_mode"] = enforcement_mode action_result.add_data(d) return phantom.APP_SUCCESS @@ -443,23 +399,16 @@ def list_applocker_policies(action_result, response): def decodeb64_add_to_vault(action_result, response, container_id, file_name): if response.status_code: if isinstance(response.std_err, bytes): - response.std_err = response.std_err.decode('UTF-8') - return action_result.set_status( - phantom.APP_ERROR, - "Error running command: {}".format(clean_str(response.std_err)) - ) + response.std_err = response.std_err.decode("UTF-8") + return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(clean_str(response.std_err))) b64string = response.std_out try: resp = Vault.create_attachment(base64.b64decode(b64string), container_id, file_name=file_name) except Exception as e: - return action_result.set_status( - phantom.APP_ERROR, "Error adding file to vault", e - ) + return action_result.set_status(phantom.APP_ERROR, "Error adding file to vault", e) - action_result.update_summary({ - 'vault_id': resp['vault_id'] - }) + action_result.update_summary({"vault_id": resp["vault_id"]}) return phantom.APP_SUCCESS diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..474efd9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[tool.black] +line-length = 145 +target-version = ['py39'] +verbose = true + +[tool.isort] +line_length = 145 diff --git a/tox.ini b/tox.ini index c4644ad..2b96e78 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ [flake8] max-line-length = 145 max-complexity = 28 -extend-ignore = F403,E128,E126,E111,E121,E127,E731,E201,E202,F405,E722,D,W292 +extend-ignore = F403,E128,E126,E121,E127,E731,E201,E202,E203,E701,F405,E722,D [isort] line_length = 145 diff --git a/wheels/py3/pycparser-2.22-py3-none-any.whl b/wheels/py3/pycparser-2.22-py3-none-any.whl new file mode 100644 index 0000000..b478187 Binary files /dev/null and b/wheels/py3/pycparser-2.22-py3-none-any.whl differ diff --git a/wheels/py3/pyspnego-0.11.1-py3-none-any.whl b/wheels/py3/pyspnego-0.11.1-py3-none-any.whl new file mode 100644 index 0000000..d59ad31 Binary files /dev/null and b/wheels/py3/pyspnego-0.11.1-py3-none-any.whl differ diff --git a/wheels/py3/requests_ntlm-1.3.0-py3-none-any.whl b/wheels/py3/requests_ntlm-1.3.0-py3-none-any.whl new file mode 100644 index 0000000..ad6ae23 Binary files /dev/null and b/wheels/py3/requests_ntlm-1.3.0-py3-none-any.whl differ diff --git a/wheels/py36/cffi-1.15.1-cp36-cp36m-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.manylinux_2_28_x86_64.whl b/wheels/py36/cffi-1.15.1-cp36-cp36m-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.manylinux_2_28_x86_64.whl new file mode 100644 index 0000000..3cdfe46 Binary files /dev/null and b/wheels/py36/cffi-1.15.1-cp36-cp36m-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.manylinux_2_28_x86_64.whl differ diff --git a/wheels/py36/cryptography-40.0.2-cp36-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl b/wheels/py36/cryptography-40.0.2-cp36-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl new file mode 100644 index 0000000..c61757d Binary files /dev/null and b/wheels/py36/cryptography-40.0.2-cp36-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl differ diff --git a/wheels/py39/cffi-1.17.1-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl b/wheels/py39/cffi-1.17.1-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl new file mode 100644 index 0000000..4cbe762 Binary files /dev/null and b/wheels/py39/cffi-1.17.1-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl differ diff --git a/wheels/py39/cryptography-43.0.1-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl b/wheels/py39/cryptography-43.0.1-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl new file mode 100644 index 0000000..fe14ebe Binary files /dev/null and b/wheels/py39/cryptography-43.0.1-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl differ diff --git a/winrm.json b/winrm.json index bcc8b0f..8a1655b 100644 --- a/winrm.json +++ b/winrm.json @@ -3320,5 +3320,69 @@ }, "versions": "EQ(*)" } - ] -} \ No newline at end of file + ], + "pip_dependencies": { + "wheel": [ + { + "module": "cffi", + "input_file": "wheels/py36/cffi-1.15.1-cp36-cp36m-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.manylinux_2_28_x86_64.whl" + }, + { + "module": "cryptography", + "input_file": "wheels/py36/cryptography-40.0.2-cp36-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl" + }, + { + "module": "ntlm_auth", + "input_file": "wheels/shared/ntlm_auth-1.5.0-py2.py3-none-any.whl" + }, + { + "module": "pycparser", + "input_file": "wheels/shared/pycparser-2.21-py2.py3-none-any.whl" + }, + { + "module": "pykerberos", + "input_file": "wheels/py36/pykerberos-1.2.4-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl" + }, + { + "module": "pywinrm", + "input_file": "wheels/shared/pywinrm-0.4.3-py2.py3-none-any.whl" + }, + { + "module": "requests_ntlm", + "input_file": "wheels/shared/requests_ntlm-1.1.0-py2.py3-none-any.whl" + } + ] + }, + "pip39_dependencies": { + "wheel": [ + { + "module": "cffi", + "input_file": "wheels/py39/cffi-1.17.1-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl" + }, + { + "module": "cryptography", + "input_file": "wheels/py39/cryptography-43.0.1-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl" + }, + { + "module": "pycparser", + "input_file": "wheels/py3/pycparser-2.22-py3-none-any.whl" + }, + { + "module": "pykerberos", + "input_file": "wheels/py39/pykerberos-1.2.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl" + }, + { + "module": "pyspnego", + "input_file": "wheels/py3/pyspnego-0.11.1-py3-none-any.whl" + }, + { + "module": "pywinrm", + "input_file": "wheels/shared/pywinrm-0.4.3-py2.py3-none-any.whl" + }, + { + "module": "requests_ntlm", + "input_file": "wheels/py3/requests_ntlm-1.3.0-py3-none-any.whl" + } + ] + } +} diff --git a/winrm_connector.py b/winrm_connector.py index c592963..5ee5a22 100644 --- a/winrm_connector.py +++ b/winrm_connector.py @@ -66,14 +66,14 @@ def _handle_py_ver_compat_for_input_str(self, input_str, always_encode=False): try: if input_str and (self._python_version == 2 or always_encode): - input_str = UnicodeDammit(input_str).unicode_markup.encode('utf-8') + input_str = UnicodeDammit(input_str).unicode_markup.encode("utf-8") except: self.debug_print("Error occurred while handling python 2to3 compatibility for the input string") return input_str def _get_error_message_from_exception(self, e): - """ This method is used to get appropriate error messages from the exception. + """This method is used to get appropriate error messages from the exception. :param e: Exception object :return: error message """ @@ -122,16 +122,17 @@ def _validate_integer(self, action_result, parameter, key, allow_zero=False): return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_INT.format(msg="", param=key)), None if parameter < 0: - return action_result.set_status(phantom.APP_ERROR, - consts.WINRM_ERROR_INVALID_INT.format(msg="non-negative", param=key)), None + return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_INT.format(msg="non-negative", param=key)), None if not allow_zero and parameter == 0: - return action_result.set_status(phantom.APP_ERROR, - consts.WINRM_ERROR_INVALID_INT.format(msg="non-zero positive", param=key)), None + return ( + action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_INT.format(msg="non-zero positive", param=key)), + None, + ) return phantom.APP_SUCCESS, parameter def is_ip(self, param): - if param in {'any', 'localsubnet', 'dns', 'dhcp', 'wins', 'defaultgateway'}: + if param in {"any", "localsubnet", "dns", "dhcp", "wins", "defaultgateway"}: return True try: ipaddress.ip_network(str(param)) @@ -147,16 +148,18 @@ def _get_vault_file_text(self, action_result, vault_id): success, message, file_info = phantom_rules.vault_info(vault_id=vault_id) if not file_info: return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_VAULT_ID), None - file_path = list(file_info)[0].get('path') + file_path = list(file_info)[0].get("path") except: return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_VAULT_ID), None try: - with open(file_path, 'r') as fp: + with open(file_path, "r") as fp: return phantom.APP_SUCCESS, fp.read() except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error reading vault file: {}".format(self._get_error_message_from_exception(e))), None + return ( + action_result.set_status(phantom.APP_ERROR, "Error reading vault file: {}".format(self._get_error_message_from_exception(e))), + None, + ) def _get_custom_parser_method(self, action_result, vault_id): if vault_id is None: @@ -166,31 +169,37 @@ def _get_custom_parser_method(self, action_result, vault_id): success, message, file_info = phantom_rules.vault_info(vault_id=vault_id) if not file_info: return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_VAULT_ID), None - file_path = list(file_info)[0].get('path') + file_path = list(file_info)[0].get("path") except: return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_VAULT_ID), None try: - custom_parser = imp.load_source('custom_parser', file_path) + custom_parser = imp.load_source("custom_parser", file_path) except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error creating custom parser: {}".format(self._get_error_message_from_exception(e))), None + return ( + action_result.set_status( + phantom.APP_ERROR, "Error creating custom parser: {}".format(self._get_error_message_from_exception(e)) + ), + None, + ) try: return phantom.APP_SUCCESS, custom_parser.custom_parser # noqa except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error saving custom parser: {}".format(self._get_error_message_from_exception(e))), None + return ( + action_result.set_status(phantom.APP_ERROR, "Error saving custom parser: {}".format(self._get_error_message_from_exception(e))), + None, + ) def _sanitize_string(self, string): # To avoid any shenanigans, we need to quote the arguments # The breaking character in PS is '`', so first we break any breaking characters, then we # break any double quotes which are found, then we break any $, which is used to declare variables - return string.replace('`', '``').replace('"', '`"').replace('$', '`$').replace('&', '`&').replace(')', '`)').replace('(', '`(') + return string.replace("`", "``").replace('"', '`"').replace("$", "`$").replace("&", "`&").replace(")", "`)").replace("(", "`(") def _get_fips_enabled(self): fips_enabled = is_fips_enabled() - self.debug_print(f'FIPS is enabled: {fips_enabled}') + self.debug_print(f"FIPS is enabled: {fips_enabled}") return fips_enabled def _create_ps_script(self, action_result, args, whitelist_args=set(), cmd_prefix="", cmd_suffix=""): @@ -207,50 +216,41 @@ def _create_ps_script(self, action_result, args, whitelist_args=set(), cmd_prefi if type(arg) is dict: for k, v in six.iteritems(arg): if (whitelist_args and k not in whitelist_args) or not k.isalpha(): - return RetVal(action_result.set_status( - phantom.APP_ERROR, "Invalid argument: {}".format(k) - ), None) + return RetVal(action_result.set_status(phantom.APP_ERROR, "Invalid argument: {}".format(k)), None) if v is None: continue if type(v) is bool: arg_str = "{0}-{1} ${2} ".format(arg_str, k, str(v)) elif type(v) is int: - arg_str = "{0}-{1} \"{2}\" ".format(arg_str, k, str(v)) + arg_str = '{0}-{1} "{2}" '.format(arg_str, k, str(v)) else: - arg_str = "{0}-{1} \"{2}\" ".format(arg_str, k, self._sanitize_string( - self._handle_py_ver_compat_for_input_str(v))) + arg_str = '{0}-{1} "{2}" '.format(arg_str, k, self._sanitize_string(self._handle_py_ver_compat_for_input_str(v))) if type(arg) is str: if (whitelist_args and arg not in whitelist_args) or not arg.isalpha(): - return RetVal(action_result.set_status( - phantom.APP_ERROR, "Invalid argument: {}".format(k) - ), None) + return RetVal(action_result.set_status(phantom.APP_ERROR, "Invalid argument: {}".format(k)), None) arg_str = "{0}-{1} ".format(arg_str, arg) return RetVal(phantom.APP_SUCCESS, "{0} {1} {2}".format(cmd_prefix, arg_str, cmd_suffix)) def _init_session(self, action_result, param=None): config = self.get_config() - default_protocol = config.get(consts.WINRM_CONFIG_PROTOCOL, 'http') + default_protocol = config.get(consts.WINRM_CONFIG_PROTOCOL, "http") ret_val, default_port = self._validate_integer( - action_result, - config.get(consts.WINRM_CONFIG_PORT, 5985 if default_protocol == 'http' else 5986), - "Default port", - True) + action_result, config.get(consts.WINRM_CONFIG_PORT, 5985 if default_protocol == "http" else 5986), "Default port", True + ) if phantom.is_fail(ret_val): return action_result.get_status() endpoint = self._handle_py_ver_compat_for_input_str(config.get(consts.WINRM_CONFIG_ENDPOINT)) if param: - endpoint = self._handle_py_ver_compat_for_input_str(param.get('ip_hostname', endpoint)) + endpoint = self._handle_py_ver_compat_for_input_str(param.get("ip_hostname", endpoint)) if endpoint is None: - return action_result.set_status( - phantom.APP_ERROR, "No Endpoint Configured" - ) - if re.search(r'^[a-z]+://', endpoint, re.UNICODE | re.IGNORECASE) is None: - endpoint = '{0}://{1}'.format(default_protocol, endpoint) - if re.search(r':\d+$', endpoint, re.UNICODE | re.IGNORECASE) is None: - endpoint = '{0}:{1}'.format(endpoint, default_port) + return action_result.set_status(phantom.APP_ERROR, "No Endpoint Configured") + if re.search(r"^[a-z]+://", endpoint, re.UNICODE | re.IGNORECASE) is None: + endpoint = "{0}://{1}".format(default_protocol, endpoint) + if re.search(r":\d+$", endpoint, re.UNICODE | re.IGNORECASE) is None: + endpoint = "{0}:{1}".format(endpoint, default_port) username = config[consts.WINRM_CONFIG_USERNAME] password = config[consts.WINRM_CONFIG_PASSWORD] transport = config.get(consts.WINRM_CONFIG_TRANSPORT) @@ -262,37 +262,29 @@ def _init_session(self, action_result, param=None): cert_ca_trust_path = config.get(consts.WINRM_CONFIG_CA_TRUST, "legacy_requests") if verify_bool: - verify = 'validate' + verify = "validate" else: - verify = 'ignore' + verify = "ignore" - if transport == 'basic' or transport == 'plaintext': + if transport == "basic" or transport == "plaintext": if domain: - self.save_progress( - "Warning: Domain is set but transport type is set to 'basic'" - ) - elif transport == 'ntlm': + self.save_progress("Warning: Domain is set but transport type is set to 'basic'") + elif transport == "ntlm": if self._get_fips_enabled(): - return action_result.set_status( - phantom.APP_ERROR, "This transport type is not supported when FIPS is enabled" - ) + return action_result.set_status(phantom.APP_ERROR, "This transport type is not supported when FIPS is enabled") if domain: - username = r'{}\{}'.format(domain, username) - elif transport == 'kerberos': + username = r"{}\{}".format(domain, username) + elif transport == "kerberos": if domain: - username = '{}@{}'.format(username, domain) - elif transport == 'certificate': - username = r'{}\{}'.format(domain, username) + username = "{}@{}".format(username, domain) + elif transport == "certificate": + username = r"{}\{}".format(domain, username) cert_pem_path = config.get(consts.WINRM_CONFIG_CERT_PEM) cert_key_pem_path = config.get(consts.WINRM_CONFIG_CERT_KEY_PEM) - elif transport == 'credssp': - return action_result.set_status( - phantom.APP_ERROR, "This transport type is not yet implemented" - ) + elif transport == "credssp": + return action_result.set_status(phantom.APP_ERROR, "This transport type is not yet implemented") else: - return action_result.set_status( - phantom.APP_ERROR, "Invalid transport type: {}".format(transport) - ) + return action_result.set_status(phantom.APP_ERROR, "Invalid transport type: {}".format(transport)) self._session = winrm.Session( endpoint, @@ -301,14 +293,15 @@ def _init_session(self, action_result, param=None): transport=transport, cert_pem=cert_pem_path, cert_key_pem=cert_key_pem_path, - ca_trust_path=cert_ca_trust_path + ca_trust_path=cert_ca_trust_path, ) self._protocol = self._session.protocol return phantom.APP_SUCCESS - def _run_cmd(self, action_result, cmd, args=None, parse_callback=pc.basic, - additional_data=None, async_=False, command_id=None, shell_id=None): + def _run_cmd( + self, action_result, cmd, args=None, parse_callback=pc.basic, additional_data=None, async_=False, command_id=None, shell_id=None + ): # The parser callback should have the function signature (ActionResult, winrm.Result) -> bool # The additional_data is a dictionary which will be passed to the parser, in which case the signature should be # (ActionResult, winrm.Result, **kwargs) -> bool @@ -326,23 +319,23 @@ def _run_cmd(self, action_result, cmd, args=None, parse_callback=pc.basic, try: resp = winrm.Response(self._protocol.get_command_output(shell_id, command_id)) except: - return action_result.set_status(phantom.APP_ERROR, "Failed to get command output from 'command_id' " - "and 'shell_id'") + return action_result.set_status(phantom.APP_ERROR, "Failed to get command output from 'command_id' " "and 'shell_id'") self._protocol.close_shell(shell_id) elif async_: shell_id = self._protocol.open_shell() command_id = self._protocol.run_command(shell_id, cmd, args) summary = action_result.set_summary({}) - summary['shell_id'] = shell_id - summary['command_id'] = command_id + summary["shell_id"] = shell_id + summary["command_id"] = command_id return phantom.APP_SUCCESS else: resp = self._session.run_cmd(cmd, args) except UnicodeDecodeError: return action_result.set_status(phantom.APP_ERROR, "Error running command: {}".format(consts.WINRM_UNICODE_ERROR_MESSAGE)) except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error running command: {}".format(unquote(self._get_error_message_from_exception(e)))) + return action_result.set_status( + phantom.APP_ERROR, "Error running command: {}".format(unquote(self._get_error_message_from_exception(e))) + ) if resp is None: # The exception will probably catch this self.debug_print("Error: _run_cmd is missing parameters") @@ -352,17 +345,15 @@ def _run_cmd(self, action_result, cmd, args=None, parse_callback=pc.basic, resp.std_err = self._handle_py_ver_compat_for_input_str(resp.std_err, True) if self._python_version == 3: - resp.std_out = resp.std_out.decode('UTF-8') - resp.std_err = resp.std_err.decode('UTF-8') + resp.std_out = resp.std_out.decode("UTF-8") + resp.std_err = resp.std_err.decode("UTF-8") try: return parse_callback(action_result, resp, **additional_data) except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error parsing output: {}".format(self._get_error_message_from_exception(e))) + return action_result.set_status(phantom.APP_ERROR, "Error parsing output: {}".format(self._get_error_message_from_exception(e))) - def _run_ps(self, action_result, script, parse_callback=pc.basic, additional_data=None, async_=False, - command_id=None, shell_id=None): + def _run_ps(self, action_result, script, parse_callback=pc.basic, additional_data=None, async_=False, command_id=None, shell_id=None): if additional_data is None: additional_data = {} resp = None @@ -383,14 +374,14 @@ def _run_ps(self, action_result, script, parse_callback=pc.basic, additional_dat if len(resp.std_err): resp.std_err = self._session._clean_error_msg(resp.std_err) if isinstance(resp.std_err, bytes): - resp.std_err = resp.std_err.decode('UTF-8', errors='backslashreplace') + resp.std_err = resp.std_err.decode("UTF-8", errors="backslashreplace") elif async_: - encoded_ps = b64encode(script.encode('utf_16_le')).decode('ascii') + encoded_ps = b64encode(script.encode("utf_16_le")).decode("ascii") shell_id = self._protocol.open_shell() - command_id = self._protocol.run_command(shell_id, 'powershell -encodedcommand {0}'.format(encoded_ps)) + command_id = self._protocol.run_command(shell_id, "powershell -encodedcommand {0}".format(encoded_ps)) summary = action_result.set_summary({}) - summary['shell_id'] = shell_id - summary['command_id'] = command_id + summary["shell_id"] = shell_id + summary["command_id"] = command_id return phantom.APP_SUCCESS else: if self._python_version == 2: @@ -399,8 +390,9 @@ def _run_ps(self, action_result, script, parse_callback=pc.basic, additional_dat except UnicodeDecodeError: return action_result.set_status(phantom.APP_ERROR, "Error running PowerShell script: {}".format(consts.WINRM_UNICODE_ERROR_MESSAGE)) except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error running PowerShell script: {}".format(self._get_error_message_from_exception(e))) + return action_result.set_status( + phantom.APP_ERROR, "Error running PowerShell script: {}".format(self._get_error_message_from_exception(e)) + ) if resp is None: self.debug_print("Error: _run_ps is missing parameters") return action_result.set_status(phantom.APP_ERROR, "Unknown error while running PowerShell script") @@ -410,14 +402,13 @@ def _run_ps(self, action_result, script, parse_callback=pc.basic, additional_dat resp.std_err = self._session._clean_error_msg(resp.std_err) if self._python_version == 3: - resp.std_out = resp.std_out.decode('UTF-8') - resp.std_err = resp.std_err.decode('UTF-8') + resp.std_out = resp.std_out.decode("UTF-8") + resp.std_err = resp.std_err.decode("UTF-8") try: return parse_callback(action_result, resp, **additional_data) except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Error parsing output: {}".format(self._get_error_message_from_exception(e))) + return action_result.set_status(phantom.APP_ERROR, "Error parsing output: {}".format(self._get_error_message_from_exception(e))) def _handle_test_connectivity(self, param): action_result = self.add_action_result(ActionResult(dict(param))) @@ -425,7 +416,7 @@ def _handle_test_connectivity(self, param): self.save_progress("Test connectivity failed") return action_result.get_status() - ret_val = self._run_cmd(action_result, 'ipconfig') + ret_val = self._run_cmd(action_result, "ipconfig") if phantom.is_fail(ret_val): self.save_progress(action_result.get_message()) self.save_progress("Test connectivity failed") @@ -438,7 +429,7 @@ def _handle_list_processes(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ret_val = self._run_ps(action_result, 'get-process | select * | convertTo-json', pc.list_processes) + ret_val = self._run_ps(action_result, "get-process | select * | convertTo-json", pc.list_processes) if phantom.is_fail(ret_val): return ret_val @@ -449,23 +440,16 @@ def _handle_terminate_process(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ret_val, pid = self._validate_integer(action_result, param.get('pid'), "pid", True) + ret_val, pid = self._validate_integer(action_result, param.get("pid"), "pid", True) if phantom.is_fail(ret_val): return action_result.get_status() - name = param.get('name') + name = param.get("name") if pid is None and name is None: - return action_result.set_status( - phantom.APP_ERROR, "Please specify at least one of 'pid' or 'name'" - ) + return action_result.set_status(phantom.APP_ERROR, "Please specify at least one of 'pid' or 'name'") - args = { - "id": pid, - "processname": name - } - ret_val, script = self._create_ps_script( - action_result, args, cmd_prefix="Stop-Process", cmd_suffix="-Force" - ) + args = {"id": pid, "processname": name} + ret_val, script = self._create_ps_script(action_result, args, cmd_prefix="Stop-Process", cmd_suffix="-Force") if phantom.is_fail(ret_val): return ret_val @@ -481,8 +465,8 @@ def _handle_list_connections(self, param): if not self._init_session(action_result, param): return action_result.get_status() - command = 'netstat' - arguments = ['-no'] # yes + command = "netstat" + arguments = ["-no"] # yes ret_val = self._run_cmd(action_result, command, arguments, pc.list_connections) if phantom.is_fail(ret_val): @@ -491,46 +475,44 @@ def _handle_list_connections(self, param): def _create_filter(self, action_result, param): filter_data = {} - filter_vars = [ - 'filter_port', - 'filter_ip', - 'direction', - 'protocol' - ] + filter_vars = ["filter_port", "filter_ip", "direction", "protocol"] for var in filter_vars: if var in param: filter_data.update({var: param[var]}) - other = param.get('other') + other = param.get("other") if other: try: other_dict = json.loads(other) except Exception as e: - return action_result.set_status( - phantom.APP_ERROR, - "Error parsing JSON Object: {}".format(self._get_error_message_from_exception(e)) - ), None + return ( + action_result.set_status( + phantom.APP_ERROR, "Error parsing JSON Object: {}".format(self._get_error_message_from_exception(e)) + ), + None, + ) filter_data.update(other_dict) return phantom.APP_SUCCESS, filter_data def _handle_list_firewall_rules(self, param): action_result = self.add_action_result(ActionResult(dict(param))) - direction = param.get('direction') + direction = param.get("direction") if direction and direction not in consts.DIRECTION_VALUE_LIST: - return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format( - consts.DIRECTION_VALUE_LIST, "direction")) + return action_result.set_status( + phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.DIRECTION_VALUE_LIST, "direction") + ) if not self._init_session(action_result, param): return action_result.get_status() - command = 'netsh' + command = "netsh" ret_val, filter_data = self._create_filter(action_result, param) if phantom.is_fail(ret_val): return ret_val # There isn't a way to filter using the command, so we need to handle that in the parser - arguments = ['advfirewall', 'firewall', 'show', 'rule', 'name=all'] + arguments = ["advfirewall", "firewall", "show", "rule", "name=all"] ret_val = self._run_cmd(action_result, command, arguments, pc.list_firewall_rules, filter_data) return ret_val @@ -541,8 +523,8 @@ def _handle_delete_firewall_rule(self, param): return action_result.get_status() ps_script_base = '& "netsh.exe" "advfirewall" "firewall" "delete" "rule" ' - argument_str = '' - other = param.get('other') + argument_str = "" + other = param.get("other") if other: try: other_dict = json.loads(other) @@ -551,39 +533,39 @@ def _handle_delete_firewall_rule(self, param): phantom.APP_ERROR, "Error parsing JSON Object: {}".format(self._get_error_message_from_exception(e)) ) param.update(other_dict) - dir = param.get('dir') + dir = param.get("dir") if dir and dir not in consts.DIR_VALUE_LIST: - return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.DIR_VALUE_LIST, 'dir')) + return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.DIR_VALUE_LIST, "dir")) - val_map = { - "local_ip": "localip", - "local_port": "localport", - "remote_ip": "remoteip", - "remote_port": "remoteport" - } + val_map = {"local_ip": "localip", "local_port": "localport", "remote_ip": "remoteip", "remote_port": "remoteport"} valid_params = { - 'name', 'dir', - 'remote_ip', 'local_ip', 'remote_port', - 'local_port', 'protocol', 'program', - 'profile', 'service', - 'localip', 'remoteip', - 'localport', 'remoteport' + "name", + "dir", + "remote_ip", + "local_ip", + "remote_port", + "local_port", + "protocol", + "program", + "profile", + "service", + "localip", + "remoteip", + "localport", + "remoteport", } for k, v in six.iteritems(param): if k in valid_params: - argument = '"{}"'.format(self._sanitize_string('{}={}'.format(val_map.get(k, k), - self._handle_py_ver_compat_for_input_str(v)))) - argument_str = '{}{} '.format(argument_str, argument) + argument = '"{}"'.format(self._sanitize_string("{}={}".format(val_map.get(k, k), self._handle_py_ver_compat_for_input_str(v)))) + argument_str = "{}{} ".format(argument_str, argument) - ret_val = self._run_ps(action_result, '{}{}'.format(ps_script_base, argument_str), pc.delete_firewall_rule) + ret_val = self._run_ps(action_result, "{}{}".format(ps_script_base, argument_str), pc.delete_firewall_rule) if phantom.is_fail(ret_val): return ret_val return action_result.set_status( phantom.APP_SUCCESS, - "Successfully deleted firewall rule{}".format( - "s" if action_result.get_summary().get('rules_deleted', 0) > 1 else "" - ) + "Successfully deleted firewall rule{}".format("s" if action_result.get_summary().get("rules_deleted", 0) > 1 else ""), ) def _handle_block_ip(self, param): @@ -592,11 +574,11 @@ def _handle_block_ip(self, param): return action_result.get_status() ps_script = '& "netsh.exe" "advfirewall" "firewall" "add" "rule" ' - name = self._handle_py_ver_compat_for_input_str(param['name']) - remote_ip = self._handle_py_ver_compat_for_input_str(param['remote_ip']) + name = self._handle_py_ver_compat_for_input_str(param["name"]) + remote_ip = self._handle_py_ver_compat_for_input_str(param["remote_ip"]) - ps_script = '{}"{}" '.format(ps_script, self._sanitize_string('{}={}'.format('name', name))) - ps_script = '{}"{}" '.format(ps_script, self._sanitize_string('{}={}'.format('remoteip', remote_ip))) + ps_script = '{}"{}" '.format(ps_script, self._sanitize_string("{}={}".format("name", name))) + ps_script = '{}"{}" '.format(ps_script, self._sanitize_string("{}={}".format("remoteip", remote_ip))) ps_script = '{}"dir=in" "action=block"'.format(ps_script) ret_val = self._run_ps(action_result, ps_script, pc.check_exit_no_data_stdout) @@ -610,8 +592,8 @@ def _handle_create_firewall_rule(self, param): return action_result.get_status() ps_script_base = '& "netsh.exe" "advfirewall" "firewall" "add" "rule" ' - argument_str = '' - other = param.get('other') + argument_str = "" + other = param.get("other") if other: try: other_dict = json.loads(other) @@ -620,38 +602,45 @@ def _handle_create_firewall_rule(self, param): phantom.APP_ERROR, "Error parsing JSON Object: {}".format(self._get_error_message_from_exception(e)) ) param.update(other_dict) - dir = param.get('dir') + dir = param.get("dir") if dir and dir not in consts.DIR_VALUE_LIST: - return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.DIR_VALUE_LIST, 'dir')) + return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.DIR_VALUE_LIST, "dir")) - action = param.get('action') + action = param.get("action") if action and action not in consts.ACTION_VALUE_LIST: - return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.ACTION_VALUE_LIST, 'action')) + return action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.ACTION_VALUE_LIST, "action")) - val_map = { - "local_ip": "localip", - "local_port": "localport", - "remote_ip": "remoteip", - "remote_port": "remoteport" - } + val_map = {"local_ip": "localip", "local_port": "localport", "remote_ip": "remoteip", "remote_port": "remoteport"} valid_params = { - 'name', 'dir', 'action', - 'remote_ip', 'local_ip', 'remote_port', - 'local_port', 'protocol', 'enable', - 'program', 'service', 'description', - 'interfacetype', 'rmtcomputergrp', 'rmtusrgrp', - 'edge', 'security', - 'localip', 'remoteip', - 'localport', 'remoteport' + "name", + "dir", + "action", + "remote_ip", + "local_ip", + "remote_port", + "local_port", + "protocol", + "enable", + "program", + "service", + "description", + "interfacetype", + "rmtcomputergrp", + "rmtusrgrp", + "edge", + "security", + "localip", + "remoteip", + "localport", + "remoteport", } for k, v in six.iteritems(param): if k in valid_params: - argument = '"{}"'.format(self._sanitize_string('{}={}'.format(val_map.get(k, k), - self._handle_py_ver_compat_for_input_str(v)))) - argument_str = '{}{} '.format(argument_str, argument) + argument = '"{}"'.format(self._sanitize_string("{}={}".format(val_map.get(k, k), self._handle_py_ver_compat_for_input_str(v)))) + argument_str = "{}{} ".format(argument_str, argument) - ret_val = self._run_ps(action_result, '{}{}'.format(ps_script_base, argument_str), pc.check_exit_no_data_stdout) + ret_val = self._run_ps(action_result, "{}{}".format(ps_script_base, argument_str), pc.check_exit_no_data_stdout) if phantom.is_fail(ret_val): return ret_val return action_result.set_status(phantom.APP_SUCCESS, "Successfully created firewall rule") @@ -673,7 +662,7 @@ def _handle_logoff_user(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ret_val, session_id = self._validate_integer(action_result, param.get('session_id'), "session_id", True) + ret_val, session_id = self._validate_integer(action_result, param.get("session_id"), "session_id", True) if phantom.is_fail(ret_val): return action_result.get_status() @@ -691,23 +680,19 @@ def _get_system_volume(self, action_result): return RetVal(ret_val) volume = -1 - std_out = action_result.get_data()[0]['std_out'] + std_out = action_result.get_data()[0]["std_out"] if isinstance(std_out, bytes): # py3 - std_out = std_out.decode('UTF-8') + std_out = std_out.decode("UTF-8") try: for line in std_out.splitlines(): - if line.strip().lower().endswith('system'): + if line.strip().lower().endswith("system"): volume = int(line.split()[1]) except: - return RetVal(action_result.set_status( - phantom.APP_ERROR, "Error parsing diskpart output" - )) + return RetVal(action_result.set_status(phantom.APP_ERROR, "Error parsing diskpart output")) if volume == -1: - return RetVal(action_result.set_status( - phantom.APP_ERROR, "Could not find the System partition" - )) + return RetVal(action_result.set_status(phantom.APP_ERROR, "Could not find the System partition")) return phantom.APP_SUCCESS, volume @@ -731,10 +716,7 @@ def _handle_deactivate_partition(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ret_val = self._update_system_volume( - action_result, - "inactive" - ) + ret_val = self._update_system_volume(action_result, "inactive") if phantom.is_fail(ret_val): return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_PARTITION) @@ -745,10 +727,7 @@ def _handle_activate_partition(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ret_val = self._update_system_volume( - action_result, - "active" - ) + ret_val = self._update_system_volume(action_result, "active") if phantom.is_fail(ret_val): return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_PARTITION) @@ -759,9 +738,9 @@ def _handle_shutdown_system(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ps_script = '& shutdown.exe /s /t 5 ' - comment = self._handle_py_ver_compat_for_input_str(param.get('comment')) - reason = self._handle_py_ver_compat_for_input_str(param.get('reason')) + ps_script = "& shutdown.exe /s /t 5 " + comment = self._handle_py_ver_compat_for_input_str(param.get("comment")) + reason = self._handle_py_ver_compat_for_input_str(param.get("reason")) if comment: ps_script = '{}/c "{}"'.format(ps_script, self._sanitize_string(comment)) if reason: @@ -777,9 +756,9 @@ def _handle_restart_system(self, param): if not self._init_session(action_result, param): return action_result.get_status() - ps_script = '& shutdown.exe /r /t 5 ' - comment = self._handle_py_ver_compat_for_input_str(param.get('comment')) - reason = self._handle_py_ver_compat_for_input_str(param.get('reason')) + ps_script = "& shutdown.exe /r /t 5 " + comment = self._handle_py_ver_compat_for_input_str(param.get("comment")) + reason = self._handle_py_ver_compat_for_input_str(param.get("reason")) if comment: ps_script = '{}/c "{}"'.format(ps_script, self._sanitize_string(comment)) if reason: @@ -794,32 +773,27 @@ def _handle_restart_system(self, param): def _format_list_applocker_script(self, action_result, location, ldap, xml=True, module=True): suffix = "-XML" if xml else "" if location.lower() not in consts.LOCATION_VALUE_LIST: - return action_result.set_status( - phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.LOCATION_VALUE_LIST, "location") - ), None + return ( + action_result.set_status(phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.LOCATION_VALUE_LIST, "location")), + None, + ) if location.lower() == "domain": if not ldap: - return action_result.set_status( - phantom.APP_ERROR, 'Error: Please include "ldap" with "domain"' - ), None + return action_result.set_status(phantom.APP_ERROR, 'Error: Please include "ldap" with "domain"'), None else: - args = { - "LDAP": ldap - } + args = {"LDAP": ldap} if module: - prefix = '{}Get-AppLockerPolicy -Domain'.format(consts.APPLOCKER_BASE_SCRIPT) + prefix = "{}Get-AppLockerPolicy -Domain".format(consts.APPLOCKER_BASE_SCRIPT) else: - prefix = 'Get-AppLockerPolicy -Domain' - ret_val, ps_script = self._create_ps_script( - action_result, args, cmd_prefix=prefix, cmd_suffix=suffix - ) + prefix = "Get-AppLockerPolicy -Domain" + ret_val, ps_script = self._create_ps_script(action_result, args, cmd_prefix=prefix, cmd_suffix=suffix) if phantom.is_fail(ret_val): return ret_val, None else: if module: - ps_script = '{0}Get-AppLockerPolicy -{1} {2}'.format(consts.APPLOCKER_BASE_SCRIPT, location, suffix) + ps_script = "{0}Get-AppLockerPolicy -{1} {2}".format(consts.APPLOCKER_BASE_SCRIPT, location, suffix) else: - ps_script = 'Get-AppLockerPolicy -{0} {1}'.format(location, suffix) + ps_script = "Get-AppLockerPolicy -{0} {1}".format(location, suffix) return phantom.APP_SUCCESS, ps_script @@ -828,8 +802,8 @@ def _handle_list_applocker_policies(self, param): if not self._init_session(action_result, param): return action_result.get_status() - location = param['location'] - ldap = self._handle_py_ver_compat_for_input_str(param.get('ldap')) + location = param["location"] + ldap = self._handle_py_ver_compat_for_input_str(param.get("ldap")) ret_val, ps_script = self._format_list_applocker_script(action_result, location, ldap) if phantom.is_fail(ret_val): return ret_val @@ -844,20 +818,15 @@ def _handle_create_applocker_policy(self, param): if not self._init_session(action_result, param): return action_result.get_status() - deny_allow = param['deny_allow'].lower() + deny_allow = param["deny_allow"].lower() if deny_allow not in consts.DENY_ALLOW_VALUE_LIST: return action_result.set_status( phantom.APP_ERROR, consts.VALUE_LIST_VALIDATION_MESSAGE.format(consts.DENY_ALLOW_VALUE_LIST, "deny_allow") ) - file_path = self._handle_py_ver_compat_for_input_str(param['file_path']) - args_new_policy = { - 'User': param.get('user'), - 'RuleNamePrefix': param.get('rule_name_prefix') - } - args_set_policy = { - 'LDAP': param.get('ldap') - } + file_path = self._handle_py_ver_compat_for_input_str(param["file_path"]) + args_new_policy = {"User": param.get("user"), "RuleNamePrefix": param.get("rule_name_prefix")} + args_set_policy = {"LDAP": param.get("ldap")} ret_val, new_policy_str = self._create_ps_script(action_result, args_new_policy) if phantom.is_fail(ret_val): return ret_val @@ -867,13 +836,15 @@ def _handle_create_applocker_policy(self, param): return ret_val if deny_allow == "allow": - ps_script = '{}{}'.format(consts.APPLOCKER_BASE_SCRIPT, consts.APPLOCKER_CREATE_POLICY.format( - self._sanitize_string(file_path), new_policy_str, set_policy_str - )) + ps_script = "{}{}".format( + consts.APPLOCKER_BASE_SCRIPT, + consts.APPLOCKER_CREATE_POLICY.format(self._sanitize_string(file_path), new_policy_str, set_policy_str), + ) else: - ps_script = '{}{}'.format(consts.APPLOCKER_BASE_SCRIPT, consts.APPLOCKER_CREATE_POLICY_DENY.format( - self._sanitize_string(file_path), new_policy_str, set_policy_str - )) + ps_script = "{}{}".format( + consts.APPLOCKER_BASE_SCRIPT, + consts.APPLOCKER_CREATE_POLICY_DENY.format(self._sanitize_string(file_path), new_policy_str, set_policy_str), + ) self.debug_print(ps_script) @@ -888,18 +859,16 @@ def _handle_delete_applocker_policy(self, param): if not self._init_session(action_result, param): return action_result.get_status() - policy_id = self._handle_py_ver_compat_for_input_str(param['applocker_policy_id']) - if not re.match(r'[\w\d-]*', policy_id): - return action_result.set_status( - phantom.APP_ERROR, "Invalid AppLocker Policy ID" - ) - ldap = param.get('ldap') + policy_id = self._handle_py_ver_compat_for_input_str(param["applocker_policy_id"]) + if not re.match(r"[\w\d-]*", policy_id): + return action_result.set_status(phantom.APP_ERROR, "Invalid AppLocker Policy ID") + ldap = param.get("ldap") if ldap: - location = 'domain' + location = "domain" else: - location = 'local' + location = "local" - ret_val, set_policy_str = self._create_ps_script(action_result, {'LDAP': ldap}) + ret_val, set_policy_str = self._create_ps_script(action_result, {"LDAP": ldap}) if phantom.is_fail(ret_val): return ret_val @@ -909,9 +878,9 @@ def _handle_delete_applocker_policy(self, param): action_result = tmp_action_result return ret_val - ps_script = '{}{}'.format(consts.APPLOCKER_BASE_SCRIPT, consts.APPLOCKER_DELETE_POLICY.format( - self._sanitize_string(policy_id), ps_script, set_policy_str - )) + ps_script = "{}{}".format( + consts.APPLOCKER_BASE_SCRIPT, consts.APPLOCKER_DELETE_POLICY.format(self._sanitize_string(policy_id), ps_script, set_policy_str) + ) ret_val = self._run_ps(action_result, ps_script, parse_callback=pc.check_exit_no_data2) if phantom.is_fail(ret_val): @@ -924,16 +893,13 @@ def _handle_get_file(self, param): if not self._init_session(action_result, param): return action_result.get_status() - file_path = self._handle_py_ver_compat_for_input_str(param['file_path']) + file_path = self._handle_py_ver_compat_for_input_str(param["file_path"]) file_path = self._sanitize_string(file_path) script_str = consts.GET_FILE.format(file_path) - additional_data = { - 'container_id': self.get_container_id(), - 'file_name': ntpath.split(file_path)[-1] - } + additional_data = {"container_id": self.get_container_id(), "file_name": ntpath.split(file_path)[-1]} ret_val = self._run_ps(action_result, script_str, pc.decodeb64_add_to_vault, additional_data) if phantom.is_fail(ret_val): return ret_val @@ -946,22 +912,21 @@ def _handle_send_file(self, param): return action_result.get_status() try: - vault_id = self._handle_py_ver_compat_for_input_str(param['vault_id']) + vault_id = self._handle_py_ver_compat_for_input_str(param["vault_id"]) success, message, file_info = phantom_rules.vault_info(vault_id=vault_id) if not file_info: return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_VAULT_ID) - path = list(file_info)[0].get('path') + path = list(file_info)[0].get("path") except: return action_result.set_status(phantom.APP_ERROR, consts.WINRM_ERROR_INVALID_VAULT_ID) - destination = self._handle_py_ver_compat_for_input_str(param['destination']) + destination = self._handle_py_ver_compat_for_input_str(param["destination"]) try: - with open(path, 'rb') as fp: + with open(path, "rb") as fp: encoded_file = base64.b64encode(fp.read()) except Exception as e: - return action_result.set_status(phantom.APP_ERROR, - "Unable to base64 encode file", self._get_error_message_from_exception(e)) + return action_result.set_status(phantom.APP_ERROR, "Unable to base64 encode file", self._get_error_message_from_exception(e)) destination = self._sanitize_string(destination) @@ -970,17 +935,13 @@ def _handle_send_file(self, param): try: chunks = textwrap.wrap(encoded_file, 1650) except TypeError: - chunks = textwrap.wrap(encoded_file.decode('UTF-8'), 1650) + chunks = textwrap.wrap(encoded_file.decode("UTF-8"), 1650) num_chunks = len(chunks) for i, chunk in enumerate(chunks): - ps_script = consts.SEND_FILE_START.format( - b64string_chunk=chunk, - file_path=destination, - action=">>" if sent_first else ">" - ) + ps_script = consts.SEND_FILE_START.format(b64string_chunk=chunk, file_path=destination, action=">>" if sent_first else ">") # The final chunk if i == num_chunks - 1: - ps_script = '{}{}'.format(ps_script, consts.SEND_FILE_END) + ps_script = "{}{}".format(ps_script, consts.SEND_FILE_END) self.save_progress("Sending chunk {} of {}".format(i + 1, num_chunks)) ret_val = self._run_ps(action_result, ps_script, parse_callback=pc.ensure_no_errors) if phantom.is_fail(ret_val): @@ -995,13 +956,10 @@ def _handle_copy_file(self, param): if not self._init_session(action_result, param): return action_result.get_status() - path_from = self._handle_py_ver_compat_for_input_str(param['from']) - path_to = self._handle_py_ver_compat_for_input_str(param['to']) + path_from = self._handle_py_ver_compat_for_input_str(param["from"]) + path_to = self._handle_py_ver_compat_for_input_str(param["to"]) - ps_script = "& copy {0} {1}".format( - self._sanitize_string(path_from), - self._sanitize_string(path_to) - ) + ps_script = "& copy {0} {1}".format(self._sanitize_string(path_from), self._sanitize_string(path_to)) ret_val = self._run_ps(action_result, ps_script, parse_callback=pc.check_exit_no_data2) if phantom.is_fail(ret_val): @@ -1014,13 +972,10 @@ def _handle_delete_file(self, param): if not self._init_session(action_result, param): return action_result.get_status() - file_path = self._handle_py_ver_compat_for_input_str(param['file_path']) - force_delete = '-Force ' if param.get('force') else '' + file_path = self._handle_py_ver_compat_for_input_str(param["file_path"]) + force_delete = "-Force " if param.get("force") else "" - ps_script = "& del {0}{1}".format( - force_delete, - self._sanitize_string(file_path) - ) + ps_script = "& del {0}{1}".format(force_delete, self._sanitize_string(file_path)) ret_val = self._run_ps(action_result, ps_script, parse_callback=pc.check_exit_no_data2) if phantom.is_fail(ret_val): @@ -1034,23 +989,18 @@ def _handle_run_command(self, param): return action_result.get_status() # Validate Parameters - command_id = param.get('command_id') - shell_id = param.get('shell_id') + command_id = param.get("command_id") + shell_id = param.get("shell_id") if command_id and not shell_id or shell_id and not command_id: - return action_result.set_status( - phantom.APP_ERROR, "Please specify 'command_id' and 'shell_id' together" - ) - command = self._handle_py_ver_compat_for_input_str(param.get('command')) - arguments = self._handle_py_ver_compat_for_input_str(param.get('arguments')) + return action_result.set_status(phantom.APP_ERROR, "Please specify 'command_id' and 'shell_id' together") + command = self._handle_py_ver_compat_for_input_str(param.get("command")) + arguments = self._handle_py_ver_compat_for_input_str(param.get("arguments")) if command is None and command_id is None: - return action_result.set_status( - phantom.APP_ERROR, "Please specify either 'command' or 'command_id' + 'shell_id'" - ) - async_ = param.get('async', False) + return action_result.set_status(phantom.APP_ERROR, "Please specify either 'command' or 'command_id' + 'shell_id'") + async_ = param.get("async", False) if not async_: - ret_val, custom_parser = self._get_custom_parser_method(action_result, - self._handle_py_ver_compat_for_input_str(param.get('parser'))) + ret_val, custom_parser = self._get_custom_parser_method(action_result, self._handle_py_ver_compat_for_input_str(param.get("parser"))) if phantom.is_fail(ret_val): return ret_val else: @@ -1074,23 +1024,20 @@ def _handle_run_script(self, param): return action_result.get_status() # Validate Parameters - command_id = param.get('command_id') - shell_id = param.get('shell_id') + command_id = param.get("command_id") + shell_id = param.get("shell_id") if command_id and not shell_id or shell_id and not command_id: - return action_result.set_status( - phantom.APP_ERROR, "Please specify 'command_id' and 'shell_id' together" - ) - script_file = self._handle_py_ver_compat_for_input_str(param.get('script_file')) - script_str = param.get('script_str') + return action_result.set_status(phantom.APP_ERROR, "Please specify 'command_id' and 'shell_id' together") + script_file = self._handle_py_ver_compat_for_input_str(param.get("script_file")) + script_str = param.get("script_str") if script_file is None and script_str is None and command_id is None: return action_result.set_status( phantom.APP_ERROR, "Please specify either a 'script_file', 'script_str', or 'command_id' + 'shell_id'" ) - async_ = param.get('async', False) + async_ = param.get("async", False) if not async_: - ret_val, custom_parser = self._get_custom_parser_method(action_result, - self._handle_py_ver_compat_for_input_str(param.get('parser'))) + ret_val, custom_parser = self._get_custom_parser_method(action_result, self._handle_py_ver_compat_for_input_str(param.get("parser"))) if phantom.is_fail(ret_val): return ret_val else: @@ -1118,10 +1065,10 @@ def handle_action(self, param): self.debug_print("action_id", self.get_action_identifier()) - if action_id == 'test_connectivity': + if action_id == "test_connectivity": ret_val = self._handle_test_connectivity(param) - elif action_id == 'run_command': + elif action_id == "run_command": ret_val = self._handle_run_command(param) elif action_id == "run_script": @@ -1205,7 +1152,7 @@ def finalize(self): return phantom.APP_SUCCESS -if __name__ == '__main__': +if __name__ == "__main__": import argparse @@ -1215,10 +1162,10 @@ def finalize(self): argparser = argparse.ArgumentParser() - argparser.add_argument('input_test_json', help='Input Test JSON file') - argparser.add_argument('-u', '--username', help='username', required=False) - argparser.add_argument('-p', '--password', help='password', required=False) - argparser.add_argument('-v', '--verify', action='store_true', help='verify', required=False, default=False) + argparser.add_argument("input_test_json", help="Input Test JSON file") + argparser.add_argument("-u", "--username", help="username", required=False) + argparser.add_argument("-p", "--password", help="password", required=False) + argparser.add_argument("-v", "--verify", action="store_true", help="verify", required=False, default=False) args = argparser.parse_args() session_id = None @@ -1231,28 +1178,30 @@ def finalize(self): # User specified a username but not a password, so ask import getpass + password = getpass.getpass("Password: ") if username and password: try: print("Accessing the Login page") - login_url = BaseConnector._get_phantom_base_url() + 'login' - r = requests.get(login_url, verify=verify) # nosemgrep: python.requests.best-practice.use-timeout.use-timeout - csrftoken = r.cookies['csrftoken'] + login_url = BaseConnector._get_phantom_base_url() + "login" + r = requests.get(login_url, verify=verify) # nosemgrep: python.requests.best-practice.use-timeout.use-timeout + csrftoken = r.cookies["csrftoken"] data = dict() - data['username'] = username - data['password'] = password - data['csrfmiddlewaretoken'] = csrftoken + data["username"] = username + data["password"] = password + data["csrfmiddlewaretoken"] = csrftoken headers = dict() - headers['Cookie'] = 'csrftoken=' + csrftoken - headers['Referer'] = login_url + headers["Cookie"] = "csrftoken=" + csrftoken + headers["Referer"] = login_url print("Logging into Platform to get the session id") - r2 = requests.post(login_url, # nosemgrep: python.requests.best-practice.use-timeout.use-timeout - verify=verify, data=data, headers=headers) - session_id = r2.cookies['sessionid'] + r2 = requests.post( + login_url, verify=verify, data=data, headers=headers # nosemgrep: python.requests.best-practice.use-timeout.use-timeout + ) + session_id = r2.cookies["sessionid"] except Exception as e: print("Unable to get session id from the platfrom. Error: " + str(e)) sys.exit(1) @@ -1266,8 +1215,8 @@ def finalize(self): connector.print_progress_message = True if session_id is not None: - in_json['user_session_token'] = session_id - connector._set_csrf_info(csrftoken, headers['Referer']) + in_json["user_session_token"] = session_id + connector._set_csrf_info(csrftoken, headers["Referer"]) ret_val = connector._handle_action(json.dumps(in_json), None) print(json.dumps(json.loads(ret_val), indent=4)) diff --git a/winrm_consts.py b/winrm_consts.py index ab14680..db69b25 100644 --- a/winrm_consts.py +++ b/winrm_consts.py @@ -16,7 +16,7 @@ Import-Module AppLocker """ -APPLOCKER_GET_POLICIES = 'Get-AppLockerPolicy -{0} {1}' +APPLOCKER_GET_POLICIES = "Get-AppLockerPolicy -{0} {1}" APPLOCKER_CREATE_POLICY = """ $Policy = Get-ChildItem "{0}" | Get-AppLockerFileInformation | New-AppLockerPolicy -RuleType Path,Hash {1} @@ -91,15 +91,16 @@ # Constants relating to '_validate_integer' WINRM_ERROR_INVALID_INT = 'Please provide a valid {msg} integer value in the "{param}"' -WINRM_ERROR_PARTITION = "Failed to fetch system volume, Please check the asset configuration and|or \"ip hostname\" parameter" +WINRM_ERROR_PARTITION = 'Failed to fetch system volume, Please check the asset configuration and|or "ip hostname" parameter' WINRM_ERROR_INVALID_VAULT_ID = "Could not retrieve vault file" # Constants relating to '_get_error_message_from_exception' WINRM_ERROR_CODE_MESSAGE = "Error code unavailable" WINRM_ERROR_MESSAGE_UNAVAILABLE = "Error message unavailable. Please check the asset configuration and|or action parameters" WINRM_PARSE_ERROR_MESSAGE = "Unable to parse the error message. Please check the asset configuration and|or action parameters" -WINRM_TYPE_ERROR_MESSAGE = ("Error occurred while connecting to the Winrm Server. " - "Please check the asset configuration and|or the action parameters") +WINRM_TYPE_ERROR_MESSAGE = ( + "Error occurred while connecting to the Winrm Server. " "Please check the asset configuration and|or the action parameters" +) # Constants relating to value_list check DIRECTION_VALUE_LIST = ["in", "out"]