Skip to content

Commit da2fe01

Browse files
authored
Cuebot reserve all cores (#1313)
**Link the Issue(s) this Pull Request is related to.** Fixes #1297 **Summarize your change.** As in many render engines, we should be able to set a negative core requirement. minCores=8 > reserve 8 cores minCores=0 > reserve all cores minCores=-2 > reserve all cores minus 2 This PR addresses this feature by handling negative core requests. Cuebot will try to match this number against the number of cores on each host. The frame will be booked only if all cores are available in this scenario. If the host is busy (even slightly), the frame is **not** booked, to avoid filling the remaining cores. **Testing** I would need some guidance to create proper tests for cuebot. **Screenshot** ![negative_cores](https://github.com/AcademySoftwareFoundation/OpenCue/assets/5556461/d9c4400c-824a-40cc-9ba9-2f76a3fd8ceb) Update: There is now a "ALL" text for zero cores, or "ALL (-2)" for negative cores reservation. ![core_reservation](https://github.com/user-attachments/assets/88802b15-3ccd-4cb5-90b7-58e532523ae6) (cuesubmit feature in another PR #1284) --------- Signed-off-by: Kern Attila GERMAIN <5556461+KernAttila@users.noreply.github.com>
1 parent 388255c commit da2fe01

15 files changed

+159
-36
lines changed

cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java

+47-1
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@
2424
import com.imageworks.spcue.grpc.host.LockState;
2525
import com.imageworks.spcue.util.CueUtil;
2626

27+
import org.apache.logging.log4j.Logger;
28+
import org.apache.logging.log4j.LogManager;
29+
2730
public class DispatchHost extends Entity
2831
implements HostInterface, FacilityInterface, ResourceContainer {
2932

33+
private static final Logger logger = LogManager.getLogger(DispatchHost.class);
34+
3035
public String facilityId;
3136
public String allocationId;
3237
public LockState lockState;
@@ -76,12 +81,53 @@ public String getFacilityId() {
7681
return facilityId;
7782
}
7883

84+
public boolean canHandleNegativeCoresRequest(int requestedCores) {
85+
// Request is positive, no need to test further.
86+
if (requestedCores > 0) {
87+
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
88+
return true;
89+
}
90+
// All cores are available, validate the request.
91+
if (cores == idleCores) {
92+
logger.debug(getName() + " can handle the job with " + requestedCores + " cores.");
93+
return true;
94+
}
95+
// Some or all cores are busy, avoid booking again.
96+
logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores.");
97+
return false;
98+
}
99+
100+
public int handleNegativeCoresRequirement(int requestedCores) {
101+
// If we request a <=0 amount of cores, return positive core count.
102+
// Request -2 on a 24 core machine will return 22.
103+
104+
if (requestedCores > 0) {
105+
// Do not process positive core requests.
106+
logger.debug("Requested " + requestedCores + " cores.");
107+
return requestedCores;
108+
}
109+
if (requestedCores <=0 && idleCores < cores) {
110+
// If request is negative but cores are already used, return 0.
111+
// We don't want to overbook the host.
112+
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
113+
return 0;
114+
}
115+
// Book all cores minus the request
116+
int totalCores = idleCores + requestedCores;
117+
logger.debug("Requested " + requestedCores + " cores <= 0, " +
118+
idleCores + " cores are free, booking " + totalCores + " cores");
119+
return totalCores;
120+
}
121+
79122
@Override
80123
public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) {
81-
124+
minCores = handleNegativeCoresRequirement(minCores);
82125
if (idleCores < minCores) {
83126
return false;
84127
}
128+
if (minCores <= 0) {
129+
return false;
130+
}
85131
else if (idleMemory < minMemory) {
86132
return false;
87133
}

cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import com.imageworks.spcue.dispatcher.ResourceContainer;
2323
import com.imageworks.spcue.grpc.renderpartition.RenderPartitionType;
2424

25+
import org.apache.logging.log4j.Logger;
26+
import org.apache.logging.log4j.LogManager;
27+
2528
/**
2629
* Contains information about local desktop cores a user has
2730
* assigned to the given job.
@@ -33,6 +36,8 @@
3336
public class LocalHostAssignment extends Entity
3437
implements ResourceContainer {
3538

39+
private static final Logger logger = LogManager.getLogger(LocalHostAssignment.class);
40+
3641
private int idleCoreUnits;
3742
private long idleMemory;
3843
private int idleGpuUnits;
@@ -62,12 +67,37 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu
6267
this.maxGpuMemory = maxGpuMemory;
6368
}
6469

70+
public int handleNegativeCoresRequirement(int requestedCores) {
71+
// If we request a <=0 amount of cores, return positive core count.
72+
// Request -2 on a 24 core machine will return 22.
73+
74+
if (requestedCores > 0) {
75+
// Do not process positive core requests.
76+
logger.debug("Requested " + requestedCores + " cores.");
77+
return requestedCores;
78+
}
79+
if (requestedCores <=0 && idleCoreUnits < threads) {
80+
// If request is negative but cores are already used, return 0.
81+
// We don't want to overbook the host.
82+
logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs.");
83+
return 0;
84+
}
85+
// Book all cores minus the request
86+
int totalCores = idleCoreUnits + requestedCores;
87+
logger.debug("Requested " + requestedCores + " cores <= 0, " +
88+
idleCoreUnits + " cores are free, booking " + totalCores + " cores");
89+
return totalCores;
90+
}
91+
6592
@Override
6693
public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) {
67-
94+
minCores = handleNegativeCoresRequirement(minCores);
6895
if (idleCoreUnits < minCores) {
6996
return false;
7097
}
98+
if (minCores <= 0) {
99+
return false;
100+
}
71101
else if (idleMemory < minMemory) {
72102
return false;
73103
}

cuebot/src/main/java/com/imageworks/spcue/SortableShow.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ public boolean isSkipped(String tags, long cores, long memory) {
5454
try {
5555
if (failed.containsKey(tags)) {
5656
long [] mark = failed.get(tags);
57-
if (cores <= mark[0]) {
58-
logger.info("skipped due to not enough cores " + cores + " <= " + mark[0]);
57+
if (cores < mark[0]) {
58+
logger.info("skipped due to not enough cores " + cores + " < " + mark[0]);
5959
return true;
6060
}
61-
else if (memory <= mark[1]) {
62-
logger.info("skipped due to not enough memory " + memory + " <= " + mark[1]);
61+
else if (memory < mark[1]) {
62+
logger.info("skipped due to not enough memory " + memory + " < " + mark[1]);
6363
return true;
6464
}
6565
}

cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,21 @@
2222
import com.imageworks.spcue.dispatcher.Dispatcher;
2323
import com.imageworks.spcue.grpc.host.ThreadMode;
2424

25+
import org.apache.logging.log4j.Logger;
26+
import org.apache.logging.log4j.LogManager;
27+
2528
public class VirtualProc extends FrameEntity implements ProcInterface {
2629

30+
private static final Logger logger = LogManager.getLogger(VirtualProc.class);
31+
2732
public String hostId;
2833
public String allocationId;
2934
public String frameId;
3035
public String hostName;
3136
public String os;
3237
public byte[] childProcesses;
3338

39+
public boolean canHandleNegativeCoresRequest;
3440
public int coresReserved;
3541
public long memoryReserved;
3642
public long memoryUsed;
@@ -111,7 +117,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St
111117
proc.coresReserved = proc.coresReserved + host.strandedCores;
112118
}
113119

114-
if (proc.coresReserved >= 100) {
120+
proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved);
121+
122+
if (proc.coresReserved == 0) {
123+
logger.debug("Reserving all cores");
124+
proc.coresReserved = host.cores;
125+
}
126+
else if (proc.coresReserved < 0) {
127+
logger.debug("Reserving all cores minus " + proc.coresReserved);
128+
proc.coresReserved = host.cores + proc.coresReserved;
129+
}
130+
else if (proc.coresReserved >= 100) {
115131

116132
int originalCores = proc.coresReserved;
117133

cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public interface LayerDao {
5959
public List<LayerDetail> getLayerDetails(JobInterface job);
6060

6161
/**
62-
* Returns true if supplied layer is compelte.
62+
* Returns true if supplied layer is complete.
6363
*
6464
* @param layer
6565
* @return boolean
@@ -82,7 +82,7 @@ public interface LayerDao {
8282
void insertLayerDetail(LayerDetail l);
8383

8484
/**
85-
* gets a layer detail from an object that implments layer
85+
* gets a layer detail from an object that implements layer
8686
*
8787
* @param layer
8888
* @return LayerDetail
@@ -167,7 +167,7 @@ public interface LayerDao {
167167
void updateLayerTags(LayerInterface layer, Set<String> tags);
168168

169169
/**
170-
* Insert a key/valye pair into the layer environment
170+
* Insert a key/value pair into the layer environment
171171
*
172172
* @param layer
173173
* @param key
@@ -292,7 +292,7 @@ public interface LayerDao {
292292

293293
/**
294294
* Update all layers of the set type in the specified job
295-
* with the new min cores requirement.
295+
* with the new min gpu requirement.
296296
*
297297
* @param job
298298
* @param gpus
@@ -304,17 +304,16 @@ public interface LayerDao {
304304
* Update a layer's max cores value, which limits how
305305
* much threading can go on.
306306
*
307-
* @param job
308-
* @param cores
309-
* @param type
307+
* @param layer
308+
* @param threadable
310309
*/
311310
void updateThreadable(LayerInterface layer, boolean threadable);
312311

313312
/**
314313
* Update a layer's timeout value, which limits how
315314
* much the frame can run on a host.
316315
*
317-
* @param job
316+
* @param layer
318317
* @param timeout
319318
*/
320319
void updateTimeout(LayerInterface layer, int timeout);
@@ -323,8 +322,8 @@ public interface LayerDao {
323322
* Update a layer's LLU timeout value, which limits how
324323
* much the frame can run on a host without updates in the log file.
325324
*
326-
* @param job
327-
* @param timeout
325+
* @param layer
326+
* @param timeout_llu
328327
*/
329328
void updateTimeoutLLU(LayerInterface layer, int timeout_llu);
330329

@@ -341,7 +340,7 @@ public interface LayerDao {
341340

342341
/**
343342
* Appends a tag to the current set of tags. If the tag
344-
* already exists than nothing happens.
343+
* already exists then nothing happens.
345344
*
346345
* @param layer
347346
* @param val
@@ -363,8 +362,9 @@ public interface LayerDao {
363362
* Update layer usage with processor time usage.
364363
* This happens when the proc has completed or failed some work.
365364
*
366-
* @param proc
365+
* @param layer
367366
* @param newState
367+
* @param exitStatus
368368
*/
369369
void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus);
370370

@@ -387,6 +387,9 @@ public interface LayerDao {
387387

388388
/**
389389
* Enable/disable memory optimizer.
390+
*
391+
* @param layer
392+
* @param state
390393
*/
391394
void enableMemoryOptimizer(LayerInterface layer, boolean state);
392395

cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@
5151
import com.imageworks.spcue.util.CueUtil;
5252
import com.imageworks.spcue.util.SqlUtil;
5353

54-
public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
5554

55+
import org.apache.logging.log4j.Logger;
56+
import org.apache.logging.log4j.LogManager;
57+
58+
public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
59+
private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class);
5660
private static final String INSERT_OUTPUT_PATH =
5761
"INSERT INTO " +
5862
"layer_output " +
@@ -77,7 +81,7 @@ public void insertLayerOutput(LayerInterface layer, String filespec) {
7781
"FROM " +
7882
"layer_output " +
7983
"WHERE " +
80-
"pk_layer = ?" +
84+
"pk_layer = ? " +
8185
"ORDER BY " +
8286
"ser_order";
8387

cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,16 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {
264264

265265
VirtualProc proc = VirtualProc.build(host, frame, selfishServices);
266266

267-
if (host.idleCores < frame.minCores ||
267+
if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) {
268+
logger.debug("Cannot dispatch job, host is busy.");
269+
break;
270+
}
271+
272+
if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) ||
268273
host.idleMemory < frame.minMemory ||
269274
host.idleGpus < frame.minGpus ||
270275
host.idleGpuMemory < frame.minGpuMemory) {
276+
logger.debug("Cannot dispatch, insufficient resources.");
271277
break;
272278
}
273279

@@ -283,6 +289,8 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {
283289

284290
boolean success = new DispatchFrameTemplate(proc, job, frame, false) {
285291
public void wrapDispatchFrame() {
292+
logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " +
293+
proc.coresReserved + " coresReserved");
286294
dispatch(frame, proc);
287295
dispatchSummary(proc, frame, "Booking");
288296
return;

cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ public void handleHostReport(HostReport report, boolean isBoot) {
245245
*/
246246
String msg = null;
247247
boolean hasLocalJob = bookingManager.hasLocalHostAssignment(host);
248+
int coresToReserve = host.handleNegativeCoresRequirement(Dispatcher.CORE_POINTS_RESERVED_MIN);
248249

249250
if (hasLocalJob) {
250251
List<LocalHostAssignment> lcas =
@@ -253,13 +254,13 @@ public void handleHostReport(HostReport report, boolean isBoot) {
253254
bookingManager.removeInactiveLocalHostAssignment(lca);
254255
}
255256
}
256-
257+
257258
if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) {
258259
msg = String.format(
259-
"%s doens't have enough free space in the temporary directory (mcp), %dMB",
260+
"%s doesn't have enough free space in the temporary directory (mcp), %dMB",
260261
host.name, (report.getHost().getFreeMcp()/1024));
261262
}
262-
else if (host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
263+
else if (coresToReserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
263264
msg = String.format("%s doesn't have enough idle cores, %d needs %d",
264265
host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN);
265266
}
@@ -268,7 +269,7 @@ else if (host.idleMemory < Dispatcher.MEM_RESERVED_MIN) {
268269
host.name, host.idleMemory, Dispatcher.MEM_RESERVED_MIN);
269270
}
270271
else if (report.getHost().getFreeMem() < CueUtil.MB512) {
271-
msg = String.format("%s doens't have enough free system mem, %d needs %d",
272+
msg = String.format("%s doesn't have enough free system mem, %d needs %d",
272273
host.name, report.getHost().getFreeMem(), Dispatcher.MEM_RESERVED_MIN);
273274
}
274275
else if(!host.hardwareState.equals(HardwareState.UP)) {

cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public JobDetail createJob(BuildableJob buildableJob) {
274274
}
275275
}
276276

277-
if (layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
277+
if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) {
278278
layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN;
279279
}
280280

0 commit comments

Comments
 (0)