Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncated packets #77

Open
buger opened this issue Jun 18, 2024 · 3 comments
Open

Truncated packets #77

buger opened this issue Jun 18, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@buger
Copy link

buger commented Jun 18, 2024

Hi! Any reason why you have fixed max packet payload size to 1500?
I have MTUs which is bigger then this, like 65536, and it for sure gets cut.
Do you think it can be set automatically based on MTU? I guess it mostly affects memory usage?

@mozillazg
Copy link
Owner

Any reason why you have fixed max packet payload size to 1500?

Because it's the recommended value for most interfaces and the -s flag is not implemented yet.

Do you think it can be set automatically based on MTU?

I'll try to implement the -s/--snapshot-length flag like tcpdump, after that, this issue will be fixed.

@mozillazg
Copy link
Owner

mozillazg commented Jun 24, 2024

@buger Please try version of #78, it implemented the -s/--snapshot-length flag and allowed to change the max packet payload size (default 262144, same as tcpdump) : https://github.com/mozillazg/ptcpdump/actions/runs/9647201633

@buger
Copy link
Author

buger commented Jun 24, 2024

Thanks! I also tried to play with it by myself, to implement similar functionality without limit (well almost).

#define MAX_PACKET_SIZE 65535
#define MIN(a, b) ((a) < (b) ? (a) : (b))

static __always_inline int process_packet_chunk(struct __sk_buff *skb, u32 packet_id, u32 *offset, u16 *chunk_index, bool egress, u32 packet_size) {
    struct packet_event_t *event;
    u32 chunk_size;

    TRACE("Processing chunk: packet_id=%u, offset=%u, packet_size=%u",
               packet_id, *offset, packet_size);

    // Validate offset
    if (*offset >= packet_size) {
        TRACE("Invalid offset: packet_id=%u, offset=%u, packet_size=%u",
                   packet_id, *offset, packet_size);
        return -1;
    }

    // Calculate and validate chunk_size
    chunk_size = MIN(packet_size - *offset, MAX_PAYLOAD_SIZE);
    if (chunk_size == 0) {
        TRACE("Zero chunk size: packet_id=%u, offset=%u, packet_size=%u",
                   packet_id, *offset, packet_size);
        return -1;
    }

    // Additional check for verifier
    if (chunk_size > MAX_PAYLOAD_SIZE) {
        bpf_printk("Chunk size too large: packet_id=%u, offset=%u, chunk_size=%u",
                   packet_id, *offset, chunk_size);
        return -1;
    }

    TRACE("Chunk size calculated: packet_id=%u, offset=%u, chunk_size=%u",
               packet_id, *offset, chunk_size);

    event = bpf_ringbuf_reserve(&packet_flow, sizeof(struct packet_event_t), 0);
    if (!event) {
        TRACE("Failed to reserve ringbuf: packet_id=%u, offset=%u, chunk_size=%u",
                   packet_id, *offset, chunk_size);
        return -1;
    }

    __builtin_memset(&event->meta, 0, sizeof(event->meta));

    if (chunk_size < 2) {
        chunk_size = 2;
    }

    if (*offset + chunk_size > packet_size) {
        chunk_size = 1;
    }

    // Use bpf_skb_load_bytes to read packet data
    if (bpf_skb_load_bytes(skb, *offset, event->payload, chunk_size) < 0) {
        TRACE("Failed to load packet data: packet_id=%u, offset=%u, chunk_size=%u",
                   packet_id, *offset, chunk_size);
        bpf_ringbuf_discard(event, 0);
        return -1;
    }

    event->meta.packet_id = packet_id;
    event->meta.packet_type = egress ? EGRESS_PACKET : INGRESS_PACKET;
    event->meta.timestamp = bpf_ktime_get_ns();
    event->meta.ifindex = skb->ifindex;
    event->meta.packet_size = packet_size;
    event->meta.payload_len = chunk_size;
    event->meta.chunk_index = *chunk_index;
    event->meta.is_last_chunk = (*offset + chunk_size) >= packet_size ? 1 : 0;

    // if (pid_meta.pid > 0) {
    //     event->meta.process.pid = pid_meta.pid;
    //     event->meta.process.mntns_id = pid_meta.mntns_id;
    //     event->meta.process.netns_id = pid_meta.netns_id;
    //     __builtin_memcpy(&event->meta.process.cgroup_name, &pid_meta.cgroup_name, sizeof(pid_meta.cgroup_name));
    // }

    bpf_ringbuf_submit(event, 0);

    *offset += chunk_size;
    (*chunk_index)++;

    TRACE("Successfully processed chunk: packet_id=%u, offset=%u, chunk_size=%u, chunk_index=%u",
               packet_id, *offset, chunk_size, *chunk_index);

    return 0;
}

static __always_inline void handle_tc(struct __sk_buff *skb, bool egress) {
    // Ensure we can access the packet data
    if (bpf_skb_pull_data(skb, 0) < 0) {
        // bpf_printk("Failed to pull skb data in handle_tc");
        return;
    }

    u32 packet_size = skb->len;
    long skb_size = (void *)(long)skb->data_end - (void *)(long)skb->data;

    if (!pcap_filter((void *)(long)skb->data, (void *)(long)skb->data_end, (void *)skb, (void *)(long)skb->data, (void *)(long)skb->data_end)) {
        // bpf_printk("Packet filtered out by pcap_filter");
        return;
    }

    if (packet_size <= skb_size) {
        // We need packets only with data in it
        // bpf_printk("Packet size is less than packet_size: packet_size=%u, skb_size=%ld", packet_size, skb_size);
        return;
    }

    // struct process_meta_t pid_meta = {0};
    // if (get_pid_meta(skb, &pid_meta, egress) < 0) {
    //     TRACE("Failed to get pid meta");
    //     return;
    // }

    u32 offset = 0;
    u16 chunk_index = 0;
    u32 packet_id = bpf_get_prandom_u32();

    TRACE("Starting packet processing: packet_id=%u, packet_size=%u, skb packet size: %u", packet_id, packet_size, skb_size);

    #pragma unroll
    for (int i = 0; i < 4; i++) {
        if (offset >= packet_size) {
            TRACE("Reached end of packet: packet_id=%u, offset=%u, packet_size=%u",
                       packet_id, offset, packet_size);
            break;
        }
        if (process_packet_chunk(skb, packet_id, &offset, &chunk_index, egress, packet_size) < 0) {
            TRACE("Failed to process chunk: packet_id=%u, chunk_index=%u", packet_id, chunk_index);
            break;
        }
    }

    TRACE("Finished packet processing: packet_id=%u, final_offset=%u, final_chunk_index=%u",
               packet_id, offset, chunk_index);
}

As you can see I switch to ring buff here, because were unable to make it work current way, and after implemented packet reconcilation logic on app level:

type BpfPacketEvent struct {
	BpfPacketEventT

	FullPayload []byte
}

type packetChunk struct {
	event      BpfPacketEventT
	receivedAt time.Time
}

type packetAssembler struct {
	chunks          map[uint32][]packetChunk
	mutex           sync.Mutex
	cleanupInterval time.Duration
}

func newPacketAssembler() *packetAssembler {
	pa := &packetAssembler{
		chunks:          make(map[uint32][]packetChunk),
		cleanupInterval: 30 * time.Second,
	}
	go pa.periodicCleanup()
	return pa
}

func (pa *packetAssembler) addChunk(event BpfPacketEventT) (*BpfPacketEvent, bool) {
	pa.mutex.Lock()
	defer pa.mutex.Unlock()

	chunk := packetChunk{
		event:      event,
		receivedAt: time.Now(),
	}

	packetID := event.Meta.PacketId
	pa.chunks[packetID] = append(pa.chunks[packetID], chunk)

	if event.Meta.IsLastChunk && event.Meta.ChunkIndex == uint8(len(pa.chunks[packetID])-1) {
		completeEvent := pa.assemblePacket(packetID)
		delete(pa.chunks, packetID)

		if event.Meta.PacketSize > 6000 {
			log.Warn().Msgf("at the moment EBPF input supports MTU only up to 6000, and your single packet size is bigger than it; actual size: %d", event.Meta.PacketSize)
		}
		return completeEvent, true
	}

	return nil, false
}

func (pa *packetAssembler) assemblePacket(packetID uint32) *BpfPacketEvent {
	chunks := pa.chunks[packetID]

	if len(chunks) == 1 {
		// If there's only one chunk, reuse its payload without copying
		return &BpfPacketEvent{
			BpfPacketEventT: chunks[0].event,
			FullPayload:     chunks[0].event.Payload[:chunks[0].event.Meta.PayloadLen],
		}
	}

	totalSize := 0
	for _, chunk := range chunks {
		totalSize += int(chunk.event.Meta.PayloadLen)
	}

	completeEvent := BpfPacketEvent{
		BpfPacketEventT: chunks[0].event, // Preserve metadata from the first chunk
		FullPayload:     make([]byte, totalSize),
	}

	offset := 0
	for _, chunk := range chunks {
		copy(completeEvent.FullPayload[offset:], chunk.event.Payload[:chunk.event.Meta.PayloadLen])
		offset += int(chunk.event.Meta.PayloadLen)
	}

	return &completeEvent
}

func (pa *packetAssembler) periodicCleanup() {
	ticker := time.NewTicker(pa.cleanupInterval)
	defer ticker.Stop()

	for range ticker.C {
		pa.cleanup()
	}
}

func (pa *packetAssembler) cleanup() {
	pa.mutex.Lock()
	defer pa.mutex.Unlock()

	now := time.Now()
	for packetID, chunks := range pa.chunks {
		if now.Sub(chunks[0].receivedAt) > pa.cleanupInterval {
			delete(pa.chunks, packetID)
		}
	}
}

func (b *BPF) PullPacketEvents(ctx context.Context, chanSize int) (<-chan BpfPacketEvent, error) {
	// reader, err := perf.NewReader(b.objs.PacketEvents, 1500*1000)
	// if err != nil {
	// 	return nil, xerrors.Errorf(": %w", err)
	// }
	ch := make(chan BpfPacketEvent, chanSize)
	go func() {
		defer close(ch)
		defer b.Close()
		// defer reader.Close()
		b.handlePacketEvents(ctx, ch)
	}()

	return ch, nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants