diff --git a/changelog/unreleased/kong/acl-always-use-authenticated-groups.yml b/changelog/unreleased/kong/acl-always-use-authenticated-groups.yml
new file mode 100644
index 000000000000..a2da56e2fc68
--- /dev/null
+++ b/changelog/unreleased/kong/acl-always-use-authenticated-groups.yml
@@ -0,0 +1,3 @@
+message: "**acl:** Added a new config `always_use_authenticated_groups` to support using authenticated groups even when an authenticated consumer already exists."
+type: feature
+scope: Plugin
diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua
index 4b0ac6e48631..78465f47268b 100644
--- a/kong/clustering/compat/removed_fields.lua
+++ b/kong/clustering/compat/removed_fields.lua
@@ -167,5 +167,8 @@ return {
       "traces_endpoint",
       "logs_endpoint",
     },
+    acl = {
+      "always_use_authenticated_groups",
+    },
   },
 }
diff --git a/kong/plugins/acl/handler.lua b/kong/plugins/acl/handler.lua
index 803fa472ad1e..af4f04b36f07 100644
--- a/kong/plugins/acl/handler.lua
+++ b/kong/plugins/acl/handler.lua
@@ -80,7 +80,7 @@ function ACLHandler:access(conf)
   else
     local credential = kong.client.get_credential()
     local authenticated_groups
-    if not credential then
+    if not credential or conf.always_use_authenticated_groups then
       -- authenticated groups overrides anonymous groups
       authenticated_groups = groups.get_authenticated_groups()
     end
diff --git a/kong/plugins/acl/schema.lua b/kong/plugins/acl/schema.lua
index df0afc638edf..14a70e67ba5f 100644
--- a/kong/plugins/acl/schema.lua
+++ b/kong/plugins/acl/schema.lua
@@ -16,6 +16,7 @@ return {
               type = "array",
               elements = { type = "string" }, }, },
           { hide_groups_header = { type = "boolean", required = true, default = false, description = "If enabled (`true`), prevents the `X-Consumer-Groups` header from being sent in the request to the upstream service." }, },
+          { always_use_authenticated_groups = { type = "boolean", required = true, default = false, description = "If enabled (`true`), the authenticated groups will always be used even when an authenticated consumer already exists. If the authenticated groups don't exist, it will fallback to use the groups associated with the consumer. By default the authenticated groups will only be used when there is no consumer or the consumer is anonymous." } },
         },
       }
     }
diff --git a/spec/02-integration/03-db/08-declarative_spec.lua b/spec/02-integration/03-db/08-declarative_spec.lua
index 4bfc44f16507..9c6b80af2e1c 100644
--- a/spec/02-integration/03-db/08-declarative_spec.lua
+++ b/spec/02-integration/03-db/08-declarative_spec.lua
@@ -132,6 +132,7 @@ for _, strategy in helpers.each_strategy() do
         deny = ngx.null,
         allow = { "*" },
         hide_groups_header = false,
+        always_use_authenticated_groups = false,
       }
     }
 
@@ -146,6 +147,7 @@ for _, strategy in helpers.each_strategy() do
         deny = ngx.null,
         allow = { "*" },
         hide_groups_header = false,
+        always_use_authenticated_groups = false,
       }
     }
 
diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
index 8473bdf3b528..19b972a2ca79 100644
--- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
+++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
@@ -769,6 +769,46 @@ describe("CP/DP config compat transformations #" .. strategy, function()
         admin.plugins:remove({ id = rt.id })
       end)
     end)
+
+    describe("compatibility test for acl plugin", function()
+      it("removes `config.always_use_authenticated_groups` before sending them to older(less than 3.8.0.0) DP nodes", function()
+        local acl = admin.plugins:insert {
+          name = "acl",
+          enabled = true,
+          config = {
+            allow = { "admin" },
+            -- [[ new fields 3.8.0
+            always_use_authenticated_groups = true,
+            -- ]]
+          }
+        }
+
+        assert.not_nil(acl.config.always_use_authenticated_groups)
+        local expected_acl = cycle_aware_deep_copy(acl)
+        expected_acl.config.always_use_authenticated_groups = nil
+        do_assert(uuid(), "3.7.0", expected_acl)
+
+        -- cleanup
+        admin.plugins:remove({ id = acl.id })
+      end)
+
+      it("does not remove `config.always_use_authenticated_groups` from DP nodes that are already compatible", function()
+        local acl = admin.plugins:insert {
+          name = "acl",
+          enabled = true,
+          config = {
+            allow = { "admin" },
+            -- [[ new fields 3.8.0
+            always_use_authenticated_groups = true,
+            -- ]]
+          }
+        }
+        do_assert(uuid(), "3.8.0", acl)
+
+        -- cleanup
+        admin.plugins:remove({ id = acl.id })
+      end)
+    end)
   end)
 end)
 
diff --git a/spec/03-plugins/18-acl/02-access_spec.lua b/spec/03-plugins/18-acl/02-access_spec.lua
index 157fc2afcf7b..8b69f0f24346 100644
--- a/spec/03-plugins/18-acl/02-access_spec.lua
+++ b/spec/03-plugins/18-acl/02-access_spec.lua
@@ -80,6 +80,20 @@ for _, strategy in helpers.each_strategy() do
         consumer = { id = consumer4.id },
       }
 
+      local consumer5 = bp.consumers:insert {
+        username = "consumer5"
+      }
+
+      bp.keyauth_credentials:insert {
+        key      = "apikey127",
+        consumer = { id = consumer5.id },
+      }
+
+      bp.acls:insert {
+        group    = "acl_group1",
+        consumer = { id = consumer5.id },
+      }
+
       local anonymous = bp.consumers:insert {
         username = "anonymous"
       }
@@ -739,6 +753,86 @@ for _, strategy in helpers.each_strategy() do
         }
       }
 
+      local route15 = bp.routes:insert({
+        hosts = { "acl15.test" }
+      })
+
+      bp.plugins:insert {
+        name = "acl",
+        route = { id = route15.id },
+        config = {
+          allow = { "auth_group1" },
+          hide_groups_header = false,
+          always_use_authenticated_groups = true,
+        }
+      }
+
+      bp.plugins:insert {
+        name = "key-auth",
+        route = { id = route15.id },
+        config = {}
+      }
+
+      bp.plugins:insert {
+        name = "ctx-checker",
+        route = { id = route15.id },
+        config = {
+          ctx_kind      = "kong.ctx.shared",
+          ctx_set_field = "authenticated_groups",
+          ctx_set_array = { "auth_group1" },
+        }
+      }
+
+      local route16 = bp.routes:insert({
+        hosts = { "acl16.test" }
+      })
+
+      bp.plugins:insert {
+        name = "acl",
+        route = { id = route16.id },
+        config = {
+          allow = { "auth_group1" },
+          hide_groups_header = false,
+          always_use_authenticated_groups = true,
+        }
+      }
+
+      bp.plugins:insert {
+        name = "key-auth",
+        route = { id = route16.id },
+        config = {}
+      }
+
+      bp.plugins:insert {
+        name = "ctx-checker",
+        route = { id = route16.id },
+        config = {
+          ctx_kind      = "kong.ctx.shared",
+          ctx_set_field = "authenticated_groups",
+          ctx_set_array = { "auth_group2" },
+        }
+      }
+
+      local route17 = bp.routes:insert({
+        hosts = { "acl17.test" }
+      })
+
+      bp.plugins:insert {
+        name = "acl",
+        route = { id = route17.id },
+        config = {
+          allow = { "acl_group1" },
+          hide_groups_header = false,
+          always_use_authenticated_groups = true,
+        }
+      }
+
+      bp.plugins:insert {
+        name = "key-auth",
+        route = { id = route17.id },
+        config = {}
+      }
+
       assert(helpers.start_kong({
         plugins    = "bundled, ctx-checker",
         database   = strategy,
@@ -1380,6 +1474,46 @@ for _, strategy in helpers.each_strategy() do
         assert.res_status(200, res)
       end)
     end)
+
+    describe("always_use_authenticated_groups", function()
+      it("if authenticated_groups is set, it'll be used", function()
+        local res = assert(proxy_client:get("/request?apikey=apikey127", {
+          headers = {
+            ["Host"] = "acl15.test"
+          }
+        }))
+        local body = assert(cjson.decode(assert.res_status(200, res)))
+        assert.equal("auth_group1", body.headers["x-authenticated-groups"])
+        assert.equal(nil, body.headers["x-consumer-groups"])
+
+        res = assert(proxy_client:get("/request?apikey=apikey127", {
+          headers = {
+            ["Host"] = "acl16.test"
+          }
+        }))
+        body = assert(cjson.decode(assert.res_status(403, res)))
+        assert.matches("You cannot consume this service", body.message)
+      end)
+
+      it("if authenticated_groups is not set, fallback to use acl groups", function()
+        local res = assert(proxy_client:get("/request?apikey=apikey127", {
+          headers = {
+            ["Host"] = "acl17.test"
+          }
+        }))
+        local body = assert(cjson.decode(assert.res_status(200, res)))
+        assert.equal(nil, body.headers["x-authenticated-groups"])
+        assert.equal("acl_group1", body.headers["x-consumer-groups"])
+
+        local res = assert(proxy_client:get("/request?apikey=apikey126", {
+          headers = {
+            ["Host"] = "acl17.test"
+          }
+        }))
+        body = assert(cjson.decode(assert.res_status(403, res)))
+        assert.matches("You cannot consume this service", body.message)
+      end)
+    end)
   
   end)