Skip to content

Commit

Permalink
[Feature] Support HashJoin For Shards (#698)
Browse files Browse the repository at this point in the history
* hash join

* add remark

* return resultx

* filter func

* optimize plan

* optimize plan

* optimize

* check nil

* left/right join

* formatter import

* fix ci

* fix ci

* add ut

* fix ut

* fix ci

* fix where

* imports-formatter

* fix ut

* fix ut

---------

Co-authored-by: huangwenkang <642380437@qq>
  • Loading branch information
huangwenkan9 and huangwenkang authored Jul 25, 2023
1 parent 0457c45 commit 32df22b
Show file tree
Hide file tree
Showing 9 changed files with 1,017 additions and 64 deletions.
21 changes: 21 additions & 0 deletions pkg/runtime/ast/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ExpressionNode interface {
Node
Restorer
Mode() ExpressionMode
Clone() ExpressionNode
}

type LogicalExpressionNode struct {
Expand Down Expand Up @@ -78,6 +79,14 @@ func (l *LogicalExpressionNode) Mode() ExpressionMode {
return EmLogical
}

func (l *LogicalExpressionNode) Clone() ExpressionNode {
return &LogicalExpressionNode{
Or: l.Or,
Left: l.Left.Clone(),
Right: l.Right.Clone(),
}
}

type NotExpressionNode struct {
E ExpressionNode
}
Expand All @@ -98,6 +107,12 @@ func (n *NotExpressionNode) Mode() ExpressionMode {
return EmNot
}

func (n *NotExpressionNode) Clone() ExpressionNode {
return &NotExpressionNode{
E: n.E.Clone(),
}
}

type PredicateExpressionNode struct {
P PredicateNode
}
Expand All @@ -116,3 +131,9 @@ func (a *PredicateExpressionNode) Restore(flag RestoreFlag, sb *strings.Builder,
func (a *PredicateExpressionNode) Mode() ExpressionMode {
return EmPredicate
}

func (a *PredicateExpressionNode) Clone() ExpressionNode {
return &PredicateExpressionNode{
P: a.P.Clone(),
}
}
54 changes: 54 additions & 0 deletions pkg/runtime/ast/expression_atom.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ExpressionAtom interface {
Node
Restorer
phantom() expressionAtomPhantom
Clone() ExpressionAtom
}

type IntervalExpressionAtom struct {
Expand Down Expand Up @@ -104,6 +105,13 @@ func (ie *IntervalExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (ie *IntervalExpressionAtom) Clone() ExpressionAtom {
return &IntervalExpressionAtom{
Unit: ie.Unit,
Value: ie.Value.Clone(),
}
}

type SystemVariableExpressionAtom struct {
Name string
System bool
Expand Down Expand Up @@ -146,6 +154,14 @@ func (sy *SystemVariableExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (sy *SystemVariableExpressionAtom) Clone() ExpressionAtom {
return &SystemVariableExpressionAtom{
Name: sy.Name,
System: sy.System,
Global: sy.Global,
}
}

type UnaryExpressionAtom struct {
Operator string
Inner Node // ExpressionAtom or *BinaryComparisonPredicateNode
Expand Down Expand Up @@ -185,6 +201,10 @@ func (u *UnaryExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (u *UnaryExpressionAtom) Clone() ExpressionAtom {
panic("implement me")
}

type ConstantExpressionAtom struct {
Inner interface{}
}
Expand All @@ -202,6 +222,12 @@ func (c *ConstantExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (c *ConstantExpressionAtom) Clone() ExpressionAtom {
return &ConstantExpressionAtom{
Inner: c.Inner,
}
}

func constant2string(value interface{}) string {
switch v := value.(type) {
case proto.Null:
Expand Down Expand Up @@ -300,6 +326,12 @@ func (c ColumnNameExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (c ColumnNameExpressionAtom) Clone() ExpressionAtom {
res := make(ColumnNameExpressionAtom, len(c))
copy(res, c)
return res
}

type VariableExpressionAtom int

func (v VariableExpressionAtom) Accept(visitor Visitor) (interface{}, error) {
Expand All @@ -324,6 +356,10 @@ func (v VariableExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (v VariableExpressionAtom) Clone() ExpressionAtom {
return v
}

type MathExpressionAtom struct {
Left ExpressionAtom
Operator string
Expand Down Expand Up @@ -358,6 +394,14 @@ func (m *MathExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (m *MathExpressionAtom) Clone() ExpressionAtom {
return &MathExpressionAtom{
Left: m.Left.Clone(),
Operator: m.Operator,
Right: m.Right.Clone(),
}
}

type NestedExpressionAtom struct {
First ExpressionNode
}
Expand All @@ -380,6 +424,12 @@ func (n *NestedExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (n *NestedExpressionAtom) Clone() ExpressionAtom {
return &NestedExpressionAtom{
First: n.First.Clone(),
}
}

type FunctionCallExpressionAtom struct {
F Node // *Function OR *AggrFunction OR *CaseWhenElseFunction OR *CastFunction
}
Expand Down Expand Up @@ -413,3 +463,7 @@ func (f *FunctionCallExpressionAtom) Restore(flag RestoreFlag, sb *strings.Build
func (f *FunctionCallExpressionAtom) phantom() expressionAtomPhantom {
return expressionAtomPhantom{}
}

func (f *FunctionCallExpressionAtom) Clone() ExpressionAtom {
panic("implement me")
}
53 changes: 53 additions & 0 deletions pkg/runtime/ast/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type PredicateNode interface {
Node
Restorer
phantom() predicateNodePhantom
Clone() PredicateNode
}

type LikePredicateNode struct {
Expand Down Expand Up @@ -81,6 +82,14 @@ func (l *LikePredicateNode) phantom() predicateNodePhantom {
return predicateNodePhantom{}
}

func (l *LikePredicateNode) Clone() PredicateNode {
return &LikePredicateNode{
Not: l.Not,
Left: l.Left.Clone(),
Right: l.Right.Clone(),
}
}

type RegexpPredicationNode struct {
Left PredicateNode
Right PredicateNode
Expand Down Expand Up @@ -111,6 +120,14 @@ func (rp *RegexpPredicationNode) phantom() predicateNodePhantom {
return predicateNodePhantom{}
}

func (rp *RegexpPredicationNode) Clone() PredicateNode {
return &RegexpPredicationNode{
Left: rp.Left.Clone(),
Right: rp.Right.Clone(),
Not: rp.Not,
}
}

type BinaryComparisonPredicateNode struct {
Left PredicateNode
Right PredicateNode
Expand Down Expand Up @@ -158,6 +175,14 @@ func (b *BinaryComparisonPredicateNode) phantom() predicateNodePhantom {
return predicateNodePhantom{}
}

func (b *BinaryComparisonPredicateNode) Clone() PredicateNode {
return &BinaryComparisonPredicateNode{
Left: b.Left.Clone(),
Right: b.Right.Clone(),
Op: b.Op,
}
}

type AtomPredicateNode struct {
A ExpressionAtom
}
Expand Down Expand Up @@ -185,6 +210,12 @@ func (a *AtomPredicateNode) phantom() predicateNodePhantom {
return predicateNodePhantom{}
}

func (a *AtomPredicateNode) Clone() PredicateNode {
return &AtomPredicateNode{
A: a.A.Clone(),
}
}

type BetweenPredicateNode struct {
Not bool
Key PredicateNode
Expand Down Expand Up @@ -222,6 +253,15 @@ func (b *BetweenPredicateNode) phantom() predicateNodePhantom {
return predicateNodePhantom{}
}

func (b *BetweenPredicateNode) Clone() PredicateNode {
return &BetweenPredicateNode{
Not: b.Not,
Key: b.Key.Clone(),
Left: b.Left.Clone(),
Right: b.Right.Clone(),
}
}

type InPredicateNode struct {
Not bool
P PredicateNode
Expand Down Expand Up @@ -264,3 +304,16 @@ func (ip *InPredicateNode) Restore(flag RestoreFlag, sb *strings.Builder, args *
func (ip *InPredicateNode) phantom() predicateNodePhantom {
return predicateNodePhantom{}
}

func (ip *InPredicateNode) Clone() PredicateNode {
e := make([]ExpressionNode, 0, len(ip.E))
for _, node := range ip.E {
e = append(e, node.Clone())
}

return &InPredicateNode{
Not: ip.Not,
P: ip.P.Clone(),
E: e,
}
}
7 changes: 7 additions & 0 deletions pkg/runtime/ast/select_element.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ func (s *SelectElementColumn) Suffix() string {
return s.Name[len(s.Name)-1]
}

func (s *SelectElementColumn) Prefix() string {
if len(s.Name) < 2 {
return ""
}
return s.Name[len(s.Name)-2]
}

func (s *SelectElementColumn) Restore(flag RestoreFlag, sb *strings.Builder, args *[]int) error {
if err := ColumnNameExpressionAtom(s.Name).Restore(flag, sb, args); err != nil {
return errors.WithStack(err)
Expand Down
Loading

0 comments on commit 32df22b

Please sign in to comment.