Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored data clumps with the help of LLMs (research project) #1770

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,14 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.*;
import io.netty.util.ReferenceCountUtil;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* A collection of rejection related utilities useful for failing requests. These are tightly coupled with the channel
Expand All @@ -55,83 +48,65 @@ public final class RejectionUtils {
/**
* Closes the connection without sending a response, and fires a {@link RequestRejectedEvent} back up the pipeline.
*
* @param nfStatus the status to use for metric reporting
* @param reason the reason for rejecting the request. This is not sent back to the client.
* @param request the request that is being rejected.
* @param injectedLatencyMillis optional parameter to delay sending a response. The reject notification is still
* sent up the pipeline.
* @param throttlingContextData The ThrottlingContextData object
*/
public static void rejectByClosingConnection(
ChannelHandlerContext ctx,
StatusCategory nfStatus,
String reason,
HttpRequest request,
@Nullable Integer injectedLatencyMillis) {
ThrottlingContextData throttlingContextData) {

// Notify other handlers before closing the conn.
notifyHandlers(ctx, nfStatus, REJECT_CLOSING_STATUS, reason, request);
notifyHandlers(throttlingContextData.getCtx(), throttlingContextData.getNfStatus(), REJECT_CLOSING_STATUS, throttlingContextData.getReason(), throttlingContextData.getRequest());

if (injectedLatencyMillis != null && injectedLatencyMillis > 0) {
if (throttlingContextData.getInjectedLatencyMillis() != null && throttlingContextData.getInjectedLatencyMillis() > 0) {
// Delay closing the connection for configured time.
ctx.executor()
throttlingContextData.getCtx().executor()
.schedule(
() -> {
CurrentPassport.fromChannel(ctx.channel()).add(PassportState.SERVER_CH_REJECTING);
ctx.close();
CurrentPassport.fromChannel(throttlingContextData.getCtx().channel()).add(PassportState.SERVER_CH_REJECTING);
throttlingContextData.getCtx().close();
},
injectedLatencyMillis,
throttlingContextData.getInjectedLatencyMillis(),
TimeUnit.MILLISECONDS);
} else {
// Close the connection immediately.
CurrentPassport.fromChannel(ctx.channel()).add(PassportState.SERVER_CH_REJECTING);
ctx.close();
CurrentPassport.fromChannel(throttlingContextData.getCtx().channel()).add(PassportState.SERVER_CH_REJECTING);
throttlingContextData.getCtx().close();
}
}

/**
* Sends a rejection response back to the client, and fires a {@link RequestRejectedEvent} back up the pipeline.
*
* @param ctx the channel handler processing the request
* @param nfStatus the status to use for metric reporting
* @param reason the reason for rejecting the request. This is not sent back to the client.
* @param request the request that is being rejected.
* @param injectedLatencyMillis optional parameter to delay sending a response. The reject notification is still
* sent up the pipeline.
* @param rejectedCode the HTTP code to send back to the client.
* @param rejectedBody the HTTP body to be sent back. It is assumed to be of type text/plain.
* @param rejectionHeaders additional HTTP headers to add to the rejection response
* @param throttlingContextData The ThrottlingContextData object
*/
public static void sendRejectionResponse(
ChannelHandlerContext ctx,
StatusCategory nfStatus,
String reason,
HttpRequest request,
@Nullable Integer injectedLatencyMillis,
HttpResponseStatus rejectedCode,
String rejectedBody,
Map<String, String> rejectionHeaders) {
boolean shouldClose = closeConnectionAfterReject(ctx.channel());
Map<String, String> rejectionHeaders, ThrottlingContextData throttlingContextData) {
boolean shouldClose = closeConnectionAfterReject(throttlingContextData.getCtx().channel());
// Write out a rejection response message.
FullHttpResponse response = createRejectionResponse(rejectedCode, rejectedBody, shouldClose, rejectionHeaders);

if (injectedLatencyMillis != null && injectedLatencyMillis > 0) {
if (throttlingContextData.getInjectedLatencyMillis() != null && throttlingContextData.getInjectedLatencyMillis() > 0) {
// Delay writing the response for configured time.
ctx.executor()
throttlingContextData.getCtx().executor()
.schedule(
() -> {
CurrentPassport.fromChannel(ctx.channel()).add(PassportState.IN_REQ_REJECTED);
ctx.writeAndFlush(response);
CurrentPassport.fromChannel(throttlingContextData.getCtx().channel()).add(PassportState.IN_REQ_REJECTED);
throttlingContextData.getCtx().writeAndFlush(response);
},
injectedLatencyMillis,
throttlingContextData.getInjectedLatencyMillis(),
TimeUnit.MILLISECONDS);
} else {
// Write the response immediately.
CurrentPassport.fromChannel(ctx.channel()).add(PassportState.IN_REQ_REJECTED);
ctx.writeAndFlush(response);
CurrentPassport.fromChannel(throttlingContextData.getCtx().channel()).add(PassportState.IN_REQ_REJECTED);
throttlingContextData.getCtx().writeAndFlush(response);
}

// Notify other handlers that we've rejected this request.
notifyHandlers(ctx, nfStatus, rejectedCode, reason, request);
notifyHandlers(throttlingContextData.getCtx(), throttlingContextData.getNfStatus(), rejectedCode, throttlingContextData.getReason(), throttlingContextData.getRequest());
}

/**
Expand Down Expand Up @@ -193,15 +168,15 @@ public static void handleRejection(
// Send a rejection response.
HttpRequest request = msg instanceof HttpRequest ? (HttpRequest) msg : null;
reject(
ctx,
rejectionType,
nfStatus,
reason,
request,
injectedLatencyMillis,
rejectedCode,
rejectedBody,
rejectionHeaders);
rejectionHeaders, new ThrottlingContextData(
ctx,
nfStatus,
reason,
request,
injectedLatencyMillis));
}

if (shouldDropMessage) {
Expand All @@ -214,78 +189,58 @@ public static void handleRejection(
/**
* Switches on the rejection type to decide how to reject the request and or close the conn.
*
* @param ctx the channel handler processing the request
* @param rejectionType the type of rejection
* @param nfStatus the status to use for metric reporting
* @param reason the reason for rejecting the request. This is not sent back to the client.
* @param request the request that is being rejected.
* @param injectedLatencyMillis optional parameter to delay sending a response. The reject notification is still
* sent up the pipeline.
* @param rejectedCode the HTTP code to send back to the client.
* @param rejectedBody the HTTP body to be sent back. It is assumed to be of type text/plain.
* @param throttlingContextData The ThrottlingContextData object
*/
public static void reject(
ChannelHandlerContext ctx,
RejectionType rejectionType,
StatusCategory nfStatus,
String reason,
HttpRequest request,
@Nullable Integer injectedLatencyMillis,
HttpResponseStatus rejectedCode,
String rejectedBody) {
String rejectedBody, ThrottlingContextData throttlingContextData) {
reject(
ctx,
rejectionType,
nfStatus,
reason,
request,
injectedLatencyMillis,
rejectedCode,
rejectedBody,
Collections.emptyMap());
Collections.emptyMap(), new ThrottlingContextData(
throttlingContextData.getCtx(),
throttlingContextData.getNfStatus(),
throttlingContextData.getReason(),
throttlingContextData.getRequest(),
throttlingContextData.getInjectedLatencyMillis()));
}

/**
* Switches on the rejection type to decide how to reject the request and or close the conn.
*
* @param ctx the channel handler processing the request
* @param rejectionType the type of rejection
* @param nfStatus the status to use for metric reporting
* @param reason the reason for rejecting the request. This is not sent back to the client.
* @param request the request that is being rejected.
* @param injectedLatencyMillis optional parameter to delay sending a response. The reject notification is still
* sent up the pipeline.
* @param rejectedCode the HTTP code to send back to the client.
* @param rejectedBody the HTTP body to be sent back. It is assumed to be of type text/plain.
* @param rejectionHeaders additional HTTP headers to add to the rejection response
* @param throttlingContextData The ThrottlingContextData object
*/
public static void reject(
ChannelHandlerContext ctx,
RejectionType rejectionType,
StatusCategory nfStatus,
String reason,
HttpRequest request,
@Nullable Integer injectedLatencyMillis,
HttpResponseStatus rejectedCode,
String rejectedBody,
Map<String, String> rejectionHeaders) {
Map<String, String> rejectionHeaders, ThrottlingContextData throttlingContextData) {
switch (rejectionType) {
case REJECT:
sendRejectionResponse(
ctx,
nfStatus,
reason,
request,
injectedLatencyMillis,
rejectedCode,
rejectedBody,
rejectionHeaders);
rejectionHeaders, new ThrottlingContextData(
throttlingContextData.getCtx(),
throttlingContextData.getNfStatus(),
throttlingContextData.getReason(),
throttlingContextData.getRequest(),
throttlingContextData.getInjectedLatencyMillis()));
return;
case CLOSE:
rejectByClosingConnection(ctx, nfStatus, reason, request, injectedLatencyMillis);
rejectByClosingConnection(new ThrottlingContextData(throttlingContextData.getCtx(), throttlingContextData.getNfStatus(), throttlingContextData.getReason(), throttlingContextData.getRequest(), throttlingContextData.getInjectedLatencyMillis()));
return;
case ALLOW_THEN_CLOSE:
allowThenClose(ctx);
allowThenClose(throttlingContextData.getCtx());
return;
}
throw new AssertionError("Bad rejection type: " + rejectionType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.netty.common.throttle;
public class ThrottlingContextData{
private io.netty.channel.ChannelHandlerContext ctx;

public io.netty.channel.ChannelHandlerContext getCtx(){
return ctx;
}

public void setCtx(io.netty.channel.ChannelHandlerContext ctx){
this.ctx=ctx;
}

private com.netflix.zuul.stats.status.StatusCategory nfStatus;

public com.netflix.zuul.stats.status.StatusCategory getNfStatus(){
return nfStatus;
}

public void setNfStatus(com.netflix.zuul.stats.status.StatusCategory nfStatus){
this.nfStatus=nfStatus;
}

private java.lang.String reason;

public java.lang.String getReason(){
return reason;
}

public void setReason(java.lang.String reason){
this.reason=reason;
}

private io.netty.handler.codec.http.HttpRequest request;

public io.netty.handler.codec.http.HttpRequest getRequest(){
return request;
}

public void setRequest(io.netty.handler.codec.http.HttpRequest request){
this.request=request;
}

private java.lang.Integer injectedLatencyMillis;

public java.lang.Integer getInjectedLatencyMillis(){
return injectedLatencyMillis;
}

public void setInjectedLatencyMillis(java.lang.Integer injectedLatencyMillis){
this.injectedLatencyMillis=injectedLatencyMillis;
}

public ThrottlingContextData(io.netty.channel.ChannelHandlerContext ctx,com.netflix.zuul.stats.status.StatusCategory nfStatus,java.lang.String reason,io.netty.handler.codec.http.HttpRequest request,java.lang.Integer injectedLatencyMillis){
this.ctx=ctx;
this.nfStatus=nfStatus;
this.reason=reason;
this.request=request;
this.injectedLatencyMillis=injectedLatencyMillis;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.netflix.netty.common.SourceAddressChannelHandler;
import com.netflix.netty.common.ssl.SslHandshakeInfo;
import com.netflix.netty.common.throttle.RejectionUtils;
import com.netflix.netty.common.throttle.ThrottlingContextData;
import com.netflix.spectator.api.Spectator;
import com.netflix.zuul.context.CommonContextKeys;
import com.netflix.zuul.context.Debug;
Expand All @@ -39,24 +40,10 @@
import com.netflix.zuul.stats.status.ZuulStatusCategory;
import com.netflix.zuul.util.HttpUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.*;
import io.netty.channel.unix.Errors;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.util.AttributeKey;
Expand Down Expand Up @@ -151,11 +138,12 @@ private void channelReadInternal(final ChannelHandlerContext ctx, Object msg) th
ZuulStatusCategory.FAILURE_CLIENT_BAD_REQUEST,
"Invalid request provided: Decode failure");
RejectionUtils.rejectByClosingConnection(
ctx,
ZuulStatusCategory.FAILURE_CLIENT_BAD_REQUEST,
"decodefailure",
clientRequest,
/* injectedLatencyMillis= */ null);
/* injectedLatencyMillis= */ new ThrottlingContextData(
ctx,
ZuulStatusCategory.FAILURE_CLIENT_BAD_REQUEST,
"decodefailure",
clientRequest,
null));
return;
} else if (zuulRequest.hasBody() && zuulRequest.getBodyLength() > zuulRequest.getMaxBodySize()) {
String errorMsg = "Request too large. "
Expand All @@ -172,9 +160,9 @@ private void channelReadInternal(final ChannelHandlerContext ctx, Object msg) th
zuulRequest.getContext().setError(ze);
zuulRequest.getContext().setShouldSendErrorResponse(true);
} else if (zuulRequest
.getHeaders()
.getAll(HttpHeaderNames.HOST.toString())
.size()
.getHeaders()
.getAll(HttpHeaderNames.HOST.toString())
.size()
> 1) {
LOG.debug(
"Multiple Host headers. clientRequest = {} , uri = {}, info = {}",
Expand Down