diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 93d48aaec71..79915dfab61 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -145,6 +145,9 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi reservedCores := map[uint16]struct{}{} var coreOverlap bool + hostVolumeClaims := map[string]int{} + exclusiveHostVolumeClaims := []string{} + // For each alloc, add the resources for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations @@ -163,6 +166,18 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi reservedCores[core] = struct{}{} } } + + // Job will be nil in the scheduler, where we're not performing this check anyways + if checkDevices && alloc.Job != nil { + group := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, volReq := range group.Volumes { + hostVolumeClaims[volReq.Source]++ + if volReq.AccessMode == + CSIVolumeAccessMode(HostVolumeAccessModeSingleNodeSingleWriter) { + exclusiveHostVolumeClaims = append(exclusiveHostVolumeClaims, volReq.Source) + } + } + } } if coreOverlap { @@ -198,12 +213,18 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi return false, "bandwidth exceeded", used, nil } - // Check devices + // Check devices and host volumes if checkDevices { accounter := NewDeviceAccounter(node) if accounter.AddAllocs(allocs) { return false, "device oversubscribed", used, nil } + + for _, exclusiveClaim := range exclusiveHostVolumeClaims { + if hostVolumeClaims[exclusiveClaim] > 1 { + return false, "conflicting claims for host volume with single-writer", used, nil + } + } } // Allocations fit! diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 2b9476c8769..97a22762f71 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -538,6 +538,54 @@ func TestAllocsFit_Devices(t *testing.T) { require.True(fit) } +// Tests that AllocsFit detects volume collisions for volumes that have +// exclusive access +func TestAllocsFit_ExclusiveVolumes(t *testing.T) { + ci.Parallel(t) + + n := node2k() + a1 := &Allocation{ + TaskGroup: "group", + Job: &Job{TaskGroups: []*TaskGroup{{Name: "group", Volumes: map[string]*VolumeRequest{ + "foo": { + Source: "example", + AccessMode: CSIVolumeAccessMode(HostVolumeAccessModeSingleNodeSingleWriter), + }, + }}}}, + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{CpuShares: 500}, + Memory: AllocatedMemoryResources{MemoryMB: 500}, + }, + }, + }, + } + a2 := a1.Copy() + a2.AllocatedResources.Tasks["web"] = &AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 500}, + Memory: AllocatedMemoryResources{MemoryMB: 500}, + } + a2.Job.TaskGroups[0].Volumes["foo"].AccessMode = CSIVolumeAccessModeMultiNodeReader + + // Should fit one allocation + fit, _, _, err := AllocsFit(n, []*Allocation{a1}, nil, true) + must.NoError(t, err) + must.True(t, fit) + + // Should not fit second allocation + fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a2}, nil, true) + must.NoError(t, err) + must.False(t, fit) + must.Eq(t, "conflicting claims for host volume with single-writer", msg) + + // Should not fit second allocation but won't detect since we disabled + // checking host volumes + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) + must.NoError(t, err) + must.True(t, fit) +} + // TestAllocsFit_MemoryOversubscription asserts that only reserved memory is // used for capacity func TestAllocsFit_MemoryOversubscription(t *testing.T) {