-
Notifications
You must be signed in to change notification settings - Fork 8
Multiple streams, flyweights for PING and GOAWAY #4
Conversation
for stream lifecycle.
* Processing PING frame * Sending GO_AWAY frame and closing connection for some validation errors * Adding code to detect slab when enough data is not available.
pom.xml
Outdated
<artifactId>nukleus-tcp</artifactId> | ||
<version>0.3</version> | ||
<scope>test</scope> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep each nukleus self-contained with no inter-nukleus dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the dependency
case CLOSED: | ||
inClosed(http2RO); | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the design patterns we've been following is to model the states as lambdas, so the state
field becomes the lambda and is called directly by the generic entry point. A state transition then updates the state
field to reference the new lambda instead. Let's continue to use that approach here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state machine is not implemented completely. I will address this in future PR.
if (!(http2RO.type() == Http2FrameType.HEADERS || http2RO.type() == Http2FrameType.PRIORITY)) | ||
{ | ||
connection.error(Http2ErrorCode.PROTOCOL_ERROR); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to design the methods as having a single return
for both increased clarity of implementation and opportunity for runtime optimization.
this.state = State.IDLE; | ||
} | ||
|
||
void decode(Http2FrameFW http2RO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't been passing flyweights as method parameters, instead treating them as decoders only and keeping their scope for reuse more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this method signature to follow MessageHandler.onMessage(...)
signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now moving into SourceInputStreamFactory, removes the need to pass flyweights
CLOSED | ||
} | ||
|
||
private final SourceInputStreamFactory.SourceInputStream connection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing the connection into the Http2Stream
feels slightly off - perhaps it would be feasible to pass the Target
for writing back to the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps moving this to an inner class of SourceInputStreamFactory
would also help to manage the scope of the flyweights more explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of that, but it might become a very large class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved Http2Stream into SourceInputStreamFactory for now
public DirectBuffer payload() | ||
{ | ||
return payloadRO; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we are missing Stream
-like utility methods on ListFW
, as this inline lambda would be better specified as listFW.stream().filter(x -> x.id() == key).findFirst()
.
Please file an issue on maven-nukleus-plugin
to request that capability on the generated ListFW
flyweight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was thinking the same. Will file an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, please link back to it here when filed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the issue: reaktivity/nukleus-maven-plugin#16
Thread.sleep(10000000); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class doesn't belong in this repository.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -94,7 +95,8 @@ | |||
LangUtil.rethrowUnchecked(ex); | |||
} | |||
|
|||
reaktor = Reaktor.launch(configuration, n -> "http".equals(n), Http2Controller.class::isAssignableFrom); | |||
//reaktor = Reaktor.launch(configuration, n -> "http".equals(n), Http2Controller.class::isAssignableFrom); | |||
reaktor = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update to ensure this benchmark is functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was just copied from Http nukleus, so it is not functional yet. It will be addressed in the future PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a useful non-functional checkpoint on the initial implementation work.
Please get the benchmark into a running state before we merge this PR, you can see a working example in the tcp
nukleus.
No need to spend effort on actually optimizing the code just yet, that can be covered in future PRs no problem.
.header(hf -> hf.indexed(2)) // :method: GET | ||
.header(hf -> hf.indexed(6)) // :scheme: http | ||
.header(hf -> hf.indexed(4)) // :path: / | ||
.header(hf -> hf.literal(l -> l.type(INCREMENTAL_INDEXING).name(1).value("www.example.com"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use h
instead of hf
for header, much more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
MutableDirectBuffer buf = new UnsafeBuffer(bytes); | ||
|
||
Http2ContinuationFW fw = new Http2ContinuationFW.Builder() | ||
.wrap(buf, 1, buf.capacity()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming 1
here is a placeholder for any non-zero value to make sure we are calculating offsets correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also give feedback if removing the Http2
prefix on the flyweight class names caused any issues.
case 12 : return INADEQUATE_SECURITY; | ||
case 13 : return HTTP_1_1_REQUIRED; | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to default
case for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
{ | ||
this.type = (byte) type; | ||
Types.TYPES.put(this.type, this); | ||
} | ||
|
||
public byte getType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be named type()
for consistency with the others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
public static Http2FrameType from(byte type) | ||
public static FrameType from(byte type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to FrameType get(int type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
int streamId = Http2FrameFW.streamId(buffer, offset); | ||
if (streamId != 0) | ||
{ | ||
throw new Http2Exception(String.format("Invalid stream-id=%d for GOAWAY frame", streamId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a custom unchecked exception type, something like IllegalArgumentException
is sufficient here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
super(cause); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed, see above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
byte[] bytes = new byte[100]; | ||
MutableDirectBuffer buf = new UnsafeBuffer(bytes); | ||
|
||
GoawayFW fw = new GoawayFW.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use goaway
as variable name instead of fw
for improved readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
MutableDirectBuffer buf = new UnsafeBuffer(bytes); | ||
|
||
DirectBuffer payload = new UnsafeBuffer(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}); | ||
PingFW fw = new PingFW.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fw
-> ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -32,7 +32,7 @@ public void encode() | |||
byte[] bytes = new byte[100]; | |||
MutableDirectBuffer buf = new UnsafeBuffer(bytes); | |||
|
|||
Http2RstStreamFW fw = new Http2RstStreamFW.Builder() | |||
RstStreamFW fw = new RstStreamFW.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fw
-> reset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -38,7 +38,7 @@ public void decode() | |||
}; | |||
|
|||
DirectBuffer buffer = new UnsafeBuffer(bytes); | |||
Http2SettingsFW fw = new Http2SettingsFW().wrap(buffer, 2, buffer.capacity()); | |||
SettingsFW fw = new SettingsFW().wrap(buffer, 2, buffer.capacity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fw
-> settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -31,7 +31,7 @@ public void encode() | |||
byte[] bytes = new byte[100]; | |||
MutableDirectBuffer buf = new UnsafeBuffer(bytes); | |||
|
|||
Http2WindowUpdateFW fw = new Http2WindowUpdateFW.Builder() | |||
WindowUpdateFW fw = new WindowUpdateFW.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fw
-> window
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
public byte getType() | ||
public byte type() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need type()
as an int
or byte
?
Doesn't seem quite right to be casting on every access when we control the storage type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using byte now.
conflict with core flyweights)
Many changes: