Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INFO] custom pre-processing function in ML connectors is returning Invalid JSON in payload #2346

Closed
toyaokeke opened this issue Apr 22, 2024 · 15 comments
Assignees

Comments

@toyaokeke
Copy link

toyaokeke commented Apr 22, 2024

What is the bug?
I am trying to deploy an ML model that connects to an external resource

I am trying to write a pre-process function that will pass the following request body to my Sagemaker endpoint

{
    "inputs": [
        {
            "name": "query",
            "shape": [${parameters.input.length}, 1],
            "datatype": "BYTES",
            "data": ${parameters.input}
        }
    ]
}

I have created the ML connector and deployed the model following the steps below. When I do this I get the following Invalid JSON in payload error. Could someone assist me in understanding why the pre-processing function is not working as expected?

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. create connector
POST /_plugins/_ml/connectors/_create
{
	"name": "my_connector",
	"description": "My Connector",
	"version": 1,
	"protocol": "aws_sigv4",
	"credential": {
		"roleArn": "my_role_arn"
	},
	"parameters": {
		"region": "us-west-2",
		"service_name": "sagemaker"
	},
	"actions": [
		{
			"action_type": "predict",
			"method": "POST",
			"headers": {
				"content-type": "application/json"
			},
			"url": "my_sagemaker_endpoint",
			"request_body": "{ \"inputs\": [{ \"name\": \"query\", \"shape\": [${parameters.length}, 1], \"datatype\": \"BYTES\", \"data\": ${parameters.input} }] }",
			"pre_process_function": "return '{\"length\": ' + params.text_docs.length + ', \"input\": ' + params.text_docs + ' }'",
			"post_process_function": "def data = params.outputs[0].data; def n = Math.ceil(data.length / 512);def embeddings = [];for (int i = 0; i < n; i++) {embeddings.add(Arrays.copyOfRange(data, i * 512, Math.min(data.length, 512 * (i + 1))));}return '{\"name\": \"sentence_embedding\", \"data_type\": \"FLOAT32\", \"shape\": [' + n + '], \"data\": ' + embeddings + '}';"
		}
	]
}
  1. register and deploy model
POST /_plugins/_ml/models/_register
{
    "name": "my_model",
    "function_name": "remote",
    "model_group_id": "my_model_group_id",
    "description": "My Model",
    "connector_id": "my_connector_id"
}
POST /_plugins/_ml/models/my_model_id/_deploy
  1. create ingest pipeline
PUT /_ingest/pipeline/my-ingest-pipeline
{
  "description": "My ingest pipeline",
  "processors": [
    {
      "convert": {
        "field": "brand.id",
        "type": "string"
      }
    },
    {
      "convert": {
        "field": "category.id",
        "type": "string"
      }
    },
    {
      "text_embedding": {
        "model_id": "my_model_id",
        "field_map": {
          "name": {
            "en": "name_vector"
          },
          "brand": {
            "name": {
              "en": "brand_name_vector"
            }
          },
          "category": {
            "name": {
              "en": "category_name_vector"
            }
          }
        }
      }
    }
  ]
}
  1. simulate pipeline and see error
POST _ingest/pipeline/my-ingest-pipeline/_simulate
{
  "docs": [
    {
      "_index": "my-index",
      "_id": "1",
      "_source": {
        "brand": {
          "id": 1,
          "name": {
            "en": "brand 1"
          }
        },
        "category": {
          "id": 1,
          "name": {
            "en": "category 1"
          }
        }
      }
    }
  ]
}

What is the expected behavior?
Expected generated vector embeddings brand_name_vector and category_name_vector

What is your host/environment?

  • Environment: AWS Opensearch - Managed Cluster
  • OS: Opensearch
  • Version: 2.11

Do you have any screenshots?

{
  "docs": [
    {
      "error": {
        "root_cause": [
          {
            "type": "illegal_argument_exception",
            "reason": "Invalid JSON in payload"
          }
        ],
        "type": "illegal_argument_exception",
        "reason": "Invalid JSON in payload"
      }
    }
  ]
}

Do you have any additional context?
I am aware of signing the request using AWS SigV4 and providing the correct keys. The issue is not with creating the connector, I am able to create the connector fine.

My issue is when I deploy my model using the connector, and simulate the ingest pipeline, that is when I get the error described.

@toyaokeke toyaokeke added bug Something isn't working untriaged labels Apr 22, 2024
@ylwu-amzn
Copy link
Collaborator

ylwu-amzn commented Apr 26, 2024

So here your pre-processing function is to translate the text docs to your model input, but seems not correct. Can you try this ?

"pre_process_function": " StringBuilder builder = new StringBuilder('[');\n    \n    for (int i=0; i<params.text_docs.length; i ++) {\n      builder.append('\"');\n      builder.append(escape(params.text_docs[i]));\n      builder.append('\"');\n      if (i<params.text_docs.length - 1) {\n        builder.append(',');\n      }\n    }\n    builder.append(']');\n    \n    def parameters = '{\"length\": ' + params.text_docs.length + ', \"input\": ' + builder + ' }';\n    return  '{\"parameters\": ' + parameters + '}';\n     "

For post-processing function, need to know your model's output. Can you share the raw model output?

@ylwu-amzn ylwu-amzn removed the bug Something isn't working label Apr 26, 2024
@toyaokeke
Copy link
Author

hi @ylwu-amzn, thank you for looking into this!

the raw output would look like [embeddings1, embeddings2, ..., embeddingsN]
where embeddings is float[]

@toyaokeke
Copy link
Author

toyaokeke commented Apr 29, 2024

also I noticed an ecape method that you added to the script. Is that something that exists in Painless scripting language or something I will need to create?

or perhaps I could use this if the Apache Commons package is installed in the Opensearch tool?

{
  "docs": [
    {
      "error": {
        "root_cause": [
          {
            "type": "script_exception",
            "reason": "compile error",
            "script_stack": [
              """... ');
      builder.append(escape(params.text_docs[i ...""",
              "                             ^---- HERE"
            ],
            "script": """ StringBuilder builder = new StringBuilder('[');
    
    for (int i=0; i<params.text_docs.length; i ++) {
      builder.append('"');
      builder.append(escape(params.text_docs[i]));
      builder.append('"');
      if (i<params.text_docs.length - 1) {
        builder.append(',');
      }
    }
    builder.append(']');
    
    def parameters = '{"length": ' + params.text_docs.length + ', "input": ' + builder + ' }';
    return  '{"parameters": ' + parameters + '}';
     """,
            "lang": "painless",
            "position": {
              "offset": 155,
              "start": 130,
              "end": 180
            }
          }
        ],
        "type": "script_exception",
        "reason": "compile error",
        "script_stack": [
          """... ');
      builder.append(escape(params.text_docs[i ...""",
          "                             ^---- HERE"
        ],
        "script": """ StringBuilder builder = new StringBuilder('[');
    
    for (int i=0; i<params.text_docs.length; i ++) {
      builder.append('"');
      builder.append(escape(params.text_docs[i]));
      builder.append('"');
      if (i<params.text_docs.length - 1) {
        builder.append(',');
      }
    }
    builder.append(']');
    
    def parameters = '{"length": ' + params.text_docs.length + ', "input": ' + builder + ' }';
    return  '{"parameters": ' + parameters + '}';
     """,
        "lang": "painless",
        "position": {
          "offset": 155,
          "start": 130,
          "end": 180
        },
        "caused_by": {
          "type": "illegal_argument_exception",
          "reason": "Unknown call [escape] with [[org.opensearch.painless.node.EBrace@449daf]] arguments."
        }
      }
    }
  ]
}

@ylwu-amzn
Copy link
Collaborator

ylwu-amzn commented May 2, 2024

also I noticed an ecape method that you added to the script. Is that something that exists in Painless scripting language or something I will need to create?

No, that's a function we added in ml-commons (code link). Which added in 2.12 release. If you are using older version, you can manually copy the escape function to your pre/post process function

You can't use Apache Commons package

@ylwu-amzn
Copy link
Collaborator

the raw output would look like [embeddings1, embeddings2, ..., embeddingsN]
where embeddings is float[]

Just confirm the raw whole output doesn't contain any key ? For example

{
  "embeddings": [ 
      float[],  float[]
  ]
}

If the raw output just [ float[], float[] ], I think you can try default post-process function

@toyaokeke toyaokeke changed the title [BUG] custom pre-processing function in ML connectors is returning Invalid JSON in payload [INFO] custom pre-processing function in ML connectors is returning Invalid JSON in payload May 6, 2024
@toyaokeke
Copy link
Author

Thank you @ylwu-amzn ! The pre-process function almost worked. However the request body sent to my sagemaker endpoint now looks like

{
  "inputs": [
    {
      "name": "query",
      "shape": [
        2.0, // error is because this is not an integer
        1
      ],
      "datatype": "BYTES",
      "data": [
        "brand 1",
        "category 1"
      ]
    }
  ]
}

I have tried

  1. type cast int paramsLength = (int) params.text_docs.length
  2. returning the integer value int paramsLength = params.text_docs.length.intValue()
  3. rounding to nearest integer int paramsLength = Math.round(params.text_docs.length)

all 3 approaches were still returning a non-integer in the request body for sagemaker

see the error below when simulating the ingest pipeline

{
  "docs": [
    {
      "error": {
        "root_cause": [
          {
            "type": "status_exception",
            "reason": """Error from remote service: {"ErrorCode":"CLIENT_ERROR_FROM_MODEL","LogStreamArn":"log_arn","Message":"Received client error (400) from primary with message \"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}\". See log_url in account account_number for more information.","OriginalMessage":"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}","OriginalStatusCode":400}"""
          }
        ],
        "type": "status_exception",
        "reason": """Error from remote service: {"ErrorCode":"CLIENT_ERROR_FROM_MODEL","LogStreamArn":"my_log_arn","Message":"Received client error (400) from primary with message \"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}\". See log_url in account account_number for more information.","OriginalMessage":"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}","OriginalStatusCode":400}"""
      }
    }
  ]
}

@ylwu-amzn
Copy link
Collaborator

Can you try this ? Wrap the params.text_docs.length with \"

"pre_process_function": " StringBuilder builder = new StringBuilder('[');\n    \n    for (int i=0; i<params.text_docs.length; i ++) {\n      builder.append('\"');\n      builder.append(escape(params.text_docs[i]));\n      builder.append('\"');\n      if (i<params.text_docs.length - 1) {\n        builder.append(',');\n      }\n    }\n    builder.append(']');\n    \n    def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n    return  '{\"parameters\": ' + parameters + '}';\n     "

@toyaokeke
Copy link
Author

I tried your suggestion. The request body is now correct ✅

{
  "inputs": [
    {
      "name": "query",
      "shape": [
        2, // correct, it is now integer
        1
      ],
      "datatype": "BYTES",
      "data": [
        "brand 1",
        "category 1"
      ]
    }
  ]
}

but I am getting this error now in the simulator

{
  "docs": [
    {
      "error": {
        "root_cause": [
          {
            "type": "class_cast_exception",
            "reason": "class java.lang.String cannot be cast to class java.util.List (java.lang.String and java.util.List are in module java.base of loader 'bootstrap')"
          }
        ],
        "type": "class_cast_exception",
        "reason": "class java.lang.String cannot be cast to class java.util.List (java.lang.String and java.util.List are in module java.base of loader 'bootstrap')"
      }
    }
  ]
}

@ylwu-amzn
Copy link
Collaborator

"class java.lang.String cannot be cast to class java.util.List (java.lang.String and java.util.List are in module java.base of loader 'bootstrap')"

Hard to figure out why from this error. Can you share the log exception trace ?

@toyaokeke
Copy link
Author

I do not have a log trace because the request did not reach the sagemaker endpoint. I was expecting the error to contain Error from remote service, but it looks like the error occurs in the ML Commons Framework before the request is sent

@ylwu-amzn
Copy link
Collaborator

ylwu-amzn commented May 7, 2024

Will be hard to guess what's wrong. If possible, you can reach out to me on OpenSearch slack and we can jump to a call? You can join the public ml channel

@toyaokeke
Copy link
Author

thank you @ylwu-amzn for joining a call to debug the issue!

here is the raw output of the sagemaker endpoint

{
	"model_name": "search_ensemble",
	"model_version": "1",
	"parameters": {
		"sequence_id": 0,
		"sequence_start": false,
		"sequence_end": false
	},
	"outputs": [
		{
			"name": "outputs",
			"datatype": "FP32",
			"shape": [
				2,
				512
			],
			"data": [ // (1024 x 1) array
				-0.3834260106086731,
				-0.36356380581855776,
				-0.25114601850509646,
				-0.12556827068328858,
				-0.0514649897813797,
                ...
			]
		}
	]
}

turns out the raw output is not a 2D array but a json object.
the data array is 1 dimensional, so it will need to be sliced into the correct shape.
We should be able to re-write the custom post-process function now based on this information. I will continue working on that while you investigate what the function should look like on your side.

Thanks again 🙏🏿

@ylwu-amzn
Copy link
Collaborator

ylwu-amzn commented May 10, 2024

@toyaokeke Can you try these pre/post process function?

"pre_process_function": " StringBuilder builder = new StringBuilder('[');\n    \n    for (int i=0; i<params.text_docs.length; i ++) {\n      builder.append('\"');\n      builder.append(escape(params.text_docs[i]));\n      builder.append('\"');\n      if (i<params.text_docs.length - 1) {\n        builder.append(',');\n      }\n    }\n    builder.append(']');\n    \n    def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n    return  '{\"parameters\": ' + parameters + '}';\n     ",

"post_process_function": "\n      \n      def dataType = \"FLOAT32\";\n      \n      \n      if (params.outputs == null || params.outputs.length == 0)\n      {\n          return 'no embedding generated';\n      }\n      def outputs = params.outputs;\n      def embedding_output = outputs[0];\n      def embedding_num = embedding_output.shape[0].intValue();\n      def embedding_dimension = embedding_output.shape[1].intValue();\n      def embedding_data = embedding_output.data;\n      \n      def resultBuilder = new StringBuilder(\"[\");\n      for (int i=0; i<embedding_num; i++) {\n        resultBuilder.append('{\"name\": \"sentence_embedding\", \"data_type\": \"FLOAT32\", \"shape\": [');\n        resultBuilder.append(embedding_dimension).append('],');\n        \n        resultBuilder.append('\"data\": [');\n        for (int j=i*embedding_dimension; j<(i+1)*embedding_dimension; j++) {\n          resultBuilder.append(embedding_data[j]);\n          if (j<(i+1)*embedding_dimension - 1) {\n            resultBuilder.append(',');\n          }\n        }\n        resultBuilder.append(']}');\n        if (i<embedding_num-1) {\n          resultBuilder.append(',');\n        }\n      }\n      resultBuilder.append(']');\n      \n      return resultBuilder.toString();\n    "

Edit: As you are using OS 2.11, you should add the escape method to pre-process function, the whole pre-process function with escape function will be

"pre_process_function": "\n    String escape(def input) { \n      if (input.contains(\"\\\\\")) {\n        input \u003d input.replace(\"\\\\\", \"\\\\\\\\\");\n      }\n      if (input.contains(\"\\\"\")) {\n        input \u003d input.replace(\"\\\"\", \"\\\\\\\"\");\n      }\n      if (input.contains(\u0027\r\u0027)) {\n        input \u003d input \u003d input.replace(\u0027\r\u0027, \u0027\\\\r\u0027);\n      }\n      if (input.contains(\"\\\\t\")) {\n        input \u003d input.replace(\"\\\\t\", \"\\\\\\\\\\\\t\");\n      }\n      if (input.contains(\u0027\n\u0027)) {\n        input \u003d input.replace(\u0027\n\u0027, \u0027\\\\n\u0027);\n      }\n      if (input.contains(\u0027\b\u0027)) {\n        input \u003d input.replace(\u0027\b\u0027, \u0027\\\\b\u0027);\n      }\n      if (input.contains(\u0027\f\u0027)) {\n        input \u003d input.replace(\u0027\f\u0027, \u0027\\\\f\u0027);\n      }\n      return input;\n    }\n StringBuilder builder = new StringBuilder('[');\n    \n    for (int i=0; i<params.text_docs.length; i ++) {\n      builder.append('\"');\n      builder.append(escape(params.text_docs[i]));\n      builder.append('\"');\n      if (i<params.text_docs.length - 1) {\n        builder.append(',');\n      }\n    }\n    builder.append(']');\n    \n    def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n    return  '{\"parameters\": ' + parameters + '}';\n     ",

The above pre_process_function has Unicode escape sequence , you can also use this

"pre_process_function": "\n    String escape(def input) { \n       if (input.contains(\"\\\\\")) {\n        input = input.replace(\"\\\\\", \"\\\\\\\\\");\n      }\n      if (input.contains(\"\\\"\")) {\n        input = input.replace(\"\\\"\", \"\\\\\\\"\");\n      }\n      if (input.contains('\r')) {\n        input = input = input.replace('\r', '\\\\r');\n      }\n      if (input.contains(\"\\\\t\")) {\n        input = input.replace(\"\\\\t\", \"\\\\\\\\\\\\t\");\n      }\n      if (input.contains('\n')) {\n        input = input.replace('\n', '\\\\n');\n      }\n      if (input.contains('\b')) {\n        input = input.replace('\b', '\\\\b');\n      }\n      if (input.contains('\f')) {\n        input = input.replace('\f', '\\\\f');\n      }\n      return input;\n    }\n   StringBuilder builder = new StringBuilder('[');\n    \n    for (int i=0; i<params.text_docs.length; i ++) {\n      builder.append('\"');\n      builder.append(escape(params.text_docs[i]));\n      builder.append('\"');\n      if (i<params.text_docs.length - 1) {\n        builder.append(',');\n      }\n    }\n    builder.append(']');\n    \n    def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n    return  '{\"parameters\": ' + parameters + '}';\n     "

@toyaokeke
Copy link
Author

@ylwu-amzn this is working as expected!! Thank you very much 🙏🏿

@ylwu-amzn
Copy link
Collaborator

Close this issue as problem solved.

@github-project-automation github-project-automation bot moved this from In Progress to Done in ml-commons projects May 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

No branches or pull requests

3 participants