Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release GIL and Run in Background Thread #1886

Merged
merged 4 commits into from
Sep 7, 2024

Conversation

michaelbynum
Copy link
Contributor

@michaelbynum michaelbynum commented Aug 19, 2024

Fixes #1593

This PR makes two changes:

  1. Release the GIL when calling run/solve so that other threads can run.
  2. Execute run/solve in a background thread so that keyboard interrupts work.

@jajhall jajhall changed the base branch from master to latest August 19, 2024 21:37
Copy link
Member

@jajhall jajhall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity check - probably me being dumb about Python

When running in background thread (singular), presumably HiGHS can still run using multiple threads.

@michaelbynum
Copy link
Contributor Author

I think it should be fine, but it is worth double checking. Let me get back to you.

@few
Copy link
Contributor

few commented Aug 28, 2024

I tried the branch and I think there is something missing. When I now press CTRL+C, I immediately end up in my "except KeyboardInterrupt:" block as intended, but the solver keeps running in the background as can be seen by the continued production of log output.

@michaelbynum
Copy link
Contributor Author

That's odd. I didn't see behavior like that. I'll take another look.

@few
Copy link
Contributor

few commented Aug 28, 2024

Well, it works if the python program immediately terminates on CTRL+C, but not if it keeps running and doing other stuff.

My use case is to collect intermediate solutions with a callback (requires #1882) and when the user hits CTRL+C, I want to continue to work with the best intermediate solution. Having the solver still running and modifying its state is a problem.

@michaelbynum
Copy link
Contributor Author

Ahh. Got it. Let me make some modifications.

@michaelbynum
Copy link
Contributor Author

michaelbynum commented Aug 30, 2024

I tried the branch and I think there is something missing. When I now press CTRL+C, I immediately end up in my "except KeyboardInterrupt:" block as intended, but the solver keeps running in the background as can be seen by the continued production of log output.

I'm having trouble reproducing this. What OS are you seeing this on?
Scratch that. I am seeing this behavior.

@michaelbynum
Copy link
Contributor Author

Sanity check - probably me being dumb about Python

When running in background thread (singular), presumably HiGHS can still run using multiple threads.

@jajhall, I confirmed that HiGHS can still use multiple threads.

@michaelbynum
Copy link
Contributor Author

I should probably make modifications so that a results object is still returned even if there is a keyboard interrupt.

@michaelbynum
Copy link
Contributor Author

The only way I see to handle the keyboard interrupt is with events, which would have to be set in a callback or something.

@few
Copy link
Contributor

few commented Sep 1, 2024

This seems to work:

import highspy
import time

def main():
    h = highspy.Highs()
    h.readModel("30n20b8.mps")

    last_intermediate_solution = None
    solver_should_stop = False
    solver_stoped = False
    def callback(callback_type, message, data_out, data, user_callback_data):
        if callback_type == highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution:
            nonlocal last_intermediate_solution
            last_intermediate_solution = data_out.mip_solution
        elif callback_type in (highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt, highspy.cb.HighsCallbackType.kCallbackIpmInterrupt, highspy.cb.HighsCallbackType.kCallbackMipInterrupt):
            nonlocal solver_should_stop, solver_stoped
            if solver_should_stop:
                data.user_interrupt = True
                solver_stoped = True
                
    h.setCallback(callback, None)
    h.startCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
    h.startCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
    h.startCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
    h.startCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
    try:
        h.run()
    except KeyboardInterrupt:
        solver_should_stop = True
        print("KeyboardInterrupt: Waiting for highs to finish...")
        while not solver_stoped:
            time.sleep(0.1)
        print("KeyboardInterrupt: Highs finished")
        
    h.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
    h.stopCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
    h.stopCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
    h.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)


    model_status = h.getModelStatus()
    print(f"model_status: {model_status}")
    
    solution = None
    if model_status == highspy.HighsModelStatus.kOptimal:
        solution = h.getSolution().col_value
    elif model_status == highspy.HighsModelStatus.kInterrupt and last_intermediate_solution is not None:
        solution = last_intermediate_solution
        model_status = highspy.HighsModelStatus.kOptimal
    elif model_status == highspy.HighsModelStatus.kInfeasible:
        pass

    print(f"solution found: {solution is not None}")
main()

The mps file can be found here: https://miplib.zib.de/instance_details_30n20b8.html

If one waits until the solver reports BestSol != inf, then last_intermediate_solution will contain that solution.

While writing this I came across these issues: #1904, #1905

EDIT: I tried replacing the time.sleep(0.1) with a threading.Condition.wait, but then one always runs into #1905 with model_status == kNotSet instead of kInterrupt.

@few few mentioned this pull request Sep 1, 2024
@michaelbynum
Copy link
Contributor Author

Thanks, @few. I can work something like this into the PR, but I'd like to hear from @jajhall or @galabovaa, first. This would involve creating a callback that ran every time HiGHS was run from Python. If we do this, I think the callback should be in c++ (which is not a problem). I'm not sure I see a way around it if we want keyboard interrupts to work properly. However, this might negate the need to run in a background thread. If @jajhall is okay with using a callback for this, I'll go ahead and update the PR.

@few
Copy link
Contributor

few commented Sep 3, 2024

I'm not sure if fiddling with the signal handling in a library is such a nice thing for the application. I was thinking about this:
a) Merge this PR as is
b) After #1905 is fixed, add an example based on my code above showcasing how to set up callbacks to properly deal with keyboard interrupts. For simple scripts, your PR will just work as those terminate immediately and don't notice that the highs thread keeps running. Larger applications that want to deal with KeyboardInterrupt in their own way then have all the flexibility.

@few
Copy link
Contributor

few commented Sep 3, 2024

I just tried the branch on Windows... and it does not work. The solver just keeps running. Once it finishes, "KeyboardInterrupt: Waiting for highs to finish..." is printed, which hangs, because highs has already stopped.

@few
Copy link
Contributor

few commented Sep 6, 2024

I might have found a way to make it work on Windows. It depends on the pywin32 package though. Add these import at the top of highspy.py and replace the solve() function.

from threading import local, Thread, Event
import sys
if sys.platform == 'win32':
    import win32event
    import win32api

def solve(self):
        """Runs the solver on the current problem.

        Returns:
            A HighsStatus object containing the solve status.
        """
        res = _ThreadingResult()
        
        if sys.platform == 'win32':
            finished_event = win32event.CreateEvent(None, 0, 0, None)
            
            def thread_func():
                try:
                    self._run(res)
                finally:
                    win32event.SetEvent(finished_event)
            
            t = Thread(target=thread_func)
            t.start()
        
            stop_event = win32event.CreateEvent(None, 0, 0, None)
            win32api.SetConsoleCtrlHandler(lambda x: win32event.SetEvent(stop_event), True)
            
            wait_objects = [stop_event, finished_event]
            wait_result = win32event.WaitForMultipleObjects(wait_objects, False, win32event.INFINITE)
            
            if wait_result == win32event.WAIT_OBJECT_0:
                raise KeyboardInterrupt
        else:
            t = Thread(target=self._run, args=(res,))
            t.start()
            t.join()
        
        return res.out

In my minimal testing, the solver works as usual if not interrupted and interrupts in the same way as on linux on CTRL+C.

@jajhall
Copy link
Member

jajhall commented Sep 6, 2024

I'm learning more about Python multithreading in a parallelism session at the NumFOCUS Project Summit, and some of the discussion above is making a lot more sense 🙂. If nothing else, I know what "releasing the GIL" refers to!

@jajhall jajhall merged commit f7be435 into ERGO-Code:latest Sep 7, 2024
110 checks passed
@few
Copy link
Contributor

few commented Sep 7, 2024

I found a version that works on linux and windows with the branch as it was merged. Is there interest to have that as an example? I would wait for the fix for #1905 though.

import highspy
import threading

class CallbackManager:
    def __init__(self, model):
        self.last_intermediate_solution = None
        self._model = model
        self._solver_should_stop = False
        self._solver_stopped = threading.Condition()

    def _callback(self, callback_type, message, data_out, data_in, user_callback_data):
        if callback_type == highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution:
            self.last_intermediate_solution = data_out.mip_solution
        elif callback_type in (highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt, highspy.cb.HighsCallbackType.kCallbackIpmInterrupt, highspy.cb.HighsCallbackType.kCallbackMipInterrupt):
            if self._solver_should_stop:
                data_in.user_interrupt = True
                with self._solver_stopped:
                    self._solver_stopped.notify()
    
    def _stop(self):
        self._solver_should_stop = True
        with self._solver_stopped:
            self._solver_stopped.wait()
    
    def _enable_callbacks(self):
        self._model.setCallback(self._callback, None)
        self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
        self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
        self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
        self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
        
    def _disable_callbacks(self):
        self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
        self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
        self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
        self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
    
    def start_solver(self, solve_func):
        self._enable_callbacks()
        
        try:
            t = threading.Thread(target=solve_func)
            t.start()
            while t.is_alive():
                t.join(0.1)
        except KeyboardInterrupt:
            print("KeyboardInterrupt: Waiting for highs to finish...")
            self._stop()
            print("KeyboardInterrupt: Highs finished")
        finally:
            self._disable_callbacks()
                

h = highspy.Highs()
assert h.readModel("30n20b8.mps") == highspy.HighsStatus.kOk, "model file not found. download it here: https://miplib.zib.de/instance_details_30n20b8.html"
callback_manager = CallbackManager(h)
callback_manager.start_solver(lambda: h.run())

model_status = h.getModelStatus()
print(f"model_status: {model_status}")

solution = None
if model_status == highspy.HighsModelStatus.kOptimal:
    solution = h.getSolution().col_value
elif model_status in (highspy.HighsModelStatus.kInterrupt, highspy.HighsModelStatus.kNotset) and callback_manager.last_intermediate_solution is not None:
    solution = callback_manager.last_intermediate_solution
    model_status = highspy.HighsModelStatus.kOptimal
elif model_status == highspy.HighsModelStatus.kInfeasible:
    pass

print(f"solution found: {solution is not None}")

@mathgeekcoder
Copy link
Contributor

Hi @michaelbynum, @jajhall, and @few!

After rebasing my branch to include this PR I'm seeing many random deadlocks in my extended highspy unit tests. I've made some fixes, but wanted to confirm some details.

  1. I don't think highspy.solve() should be called automatically in a separate thread. It seems to be causing the deadlock issues and it becomes messy when the user actually wants to run multiple HiGHS instances in different threads. If the user wants support for KeyboardInterupts, then the code by @few works great! I don't think we can integrate this behaviour into highspy by default, since the user might want to use the callbacks for other things, and again, it gets messy if the user has multiple instances of HiGHS running in different threads.

  2. I believe the GIL release code below can be simplified to .def("run", &Highs::run, py::call_guard<py::gil_scoped_release>()). That is, the py::gil_scoped_release releases the GIL in the constructor and acquires the GIL in the destructor. Whereas py::gil_scoped_acquire() does the opposite - and is unnecessary?

HighsStatus highs_run(Highs* h) {
  py::gil_scoped_release release;
  HighsStatus status = h->run();
  py::gil_scoped_acquire();  # unnecessary?
  return status;
}
  1. Since we're releasing the GIL in run, don't we need to re-acquire the GIL when C++ calls the python callbacks? e.g., see the code below. Also, this fixes highspy: Passing user_callback_data to setCallback raises exception #1904, by interpreting the void* as a python "object" (using handle to avoid reference counting issues).
// Wrap the setCallback function. Pass a lambda wrapper around the python function 
// that acquires the GIL and appropriately handle user data passed to the callback
HighsStatus highs_setCallback(
    Highs* h,
    std::function<void(int, const std::string&, const HighsCallbackDataOut*, HighsCallbackDataIn*, py::handle)> fn,
    py::handle data) {

  return h->setCallback([fn, data](int callbackType, const std::string& msg,
                                   const HighsCallbackDataOut* dataOut,
                                   HighsCallbackDataIn* dataIn, void* d) {
    py::gil_scoped_acquire acquire;
    return fn(callbackType, msg, dataOut, dataIn, py::handle(reinterpret_cast<PyObject*>(d)));
  }, data.ptr());
}

Thanks! BTW: I did some tests with multiple HiGHS instances/threads in python, and releasing the GIL does make a big performance improvement!

@michaelbynum
Copy link
Contributor Author

I think I might agree. I think callbacks may be a better way to handle keyboard interrupts. I think you are also correct about python callbacks, except that the GIL needs released again after the callback.

@michaelbynum
Copy link
Contributor Author

I can create an optional callback that can be enabled with an option to handle keyboard interrupts?

@mathgeekcoder
Copy link
Contributor

I think I might agree. I think callbacks may be a better way to handle keyboard interrupts. I think you are also correct about python callbacks, except that the GIL needs released again after the callback.

I believe the py::gil_scoped_acquire should release the GIL in its destructor.

@mathgeekcoder
Copy link
Contributor

I can create an optional callback that can be enabled with an option to handle keyboard interrupts?

Yeah I agree. Thinking on this further, I think we can have an additional python callback proxy that can enable optional keyboard interupts, while still allowing the user to have custom callback code. I'll have a quick play with the idea I have in mind.

@michaelbynum
Copy link
Contributor Author

Ahh... That is convenient.

@mathgeekcoder
Copy link
Contributor

I have an approach that seems reasonable to me. For example:

h = highspy.Highs()
h.readModel('30n20b8.mps')
h.HandleKeyboardInterrupt = True

solutions = []
h.cbMipImprovingSolution.subscribe(lambda e: solutions.append(e.data_out.mip_solution))

h.solve()
print(solutions)

The HandleKeyboardInterrupt = True will add callbacks that check for termination and will launch the solve in a separate thread. I've added support for multiple callbacks on each event, so the user can still capture events - and even specify up-front which event they'd like to handle.

The user can also manually cancel the solve, e.g., the following will cancel when the first MIP solution is found, and then wait until HiGHS finishes up.

h.cbMipImprovingSolution.subscribe(lambda e: h.cancelSolve())
h.solve()

The code seems to be working well; however I will need to add some extra unit tests.

@few
Copy link
Contributor

few commented Sep 11, 2024

That sounds great.

h = highspy.Highs()
h.readModel('30n20b8.mps')
h.HandleKeyboardInterrupt = True

solutions = []
h.cbMipImprovingSolution.subscribe(lambda e: solutions.append(e.data_out.mip_solution))

h.solve()
print(solutions)

What happens from the POV of the python script when a KeyboardInterrupt occurs during h.solve()? I'd assume there is no KeyboardInterrupt to capture and h.solve() just finishes with model status kInterrupt.

What happens if a second keyboard interrupt occurs while the solver is still running (and hasn't reached one of the interrupt callbacks yet)?

@mathgeekcoder
Copy link
Contributor

Great questions!

What happens from the POV of the python script when a KeyboardInterrupt occurs during h.solve()? I'd assume there is no KeyboardInterrupt to capture and h.solve() just finishes with model status kInterrupt.

KeyboardInterrupts are supported if HandleKeyboardInterrupts = True. I borrowed and adapted your code. See below for a rough outline (I've removed docstrings for brevity).

Threading Code
def solve(self):
    if self.HandleKeyboardInterrupt == False:
        return super().run()
    else:
        return self.joinSolve(self.startSolve())

# Starts the solver in a separate thread.  Useful for handling KeyboardInterrupts.
# Do not attempt to modify the model while the solver is running.
def startSolve(self):
    if self.is_solver_running() == False:
        self.__solver_started.acquire()
        self.__solver_should_stop = False
        self.__solver_status = None

        t = Thread(target=self.__solve, daemon=True)
        t.start()

        # wait for solver thread to acquire __solver_stopped to avoid synchronization issues
        with self.__solver_started:
            return t
    else:
        raise Exception("Solver is already running.")
    
def joinSolve(self, solver_thread=None):
    if solver_thread is not None:
        try:
            while solver_thread.is_alive():
                solver_thread.join(0.1)
        except KeyboardInterrupt:
            print('KeyboardInterrupt: Waiting for HiGHS to finish...')
            self.cancelSolve()

    try:
        # wait for shared lock, i.e., solver to finish
        self.__solver_stopped.acquire(True)
    except KeyboardInterrupt:
        pass  # ignore additional KeyboardInterrupt here
    finally:
        self.__solver_stopped.release()
        return self.__solver_status

def wait(self, timeout=-1.0):
    result = False, None

    try:
        result = self.__solver_stopped.acquire(True, timeout=timeout), self.__solver_status
        return result
    finally:
        if result[0] == True:
            self.__solver_status = None  # reset status
            self.__solver_stopped.release()

# internal solve method for use with threads
# will set the status of the solver when finished and release the shared lock
def __solve(self):
    with self.__solver_stopped:
        self.__solver_started.release()  # allow main thread to continue
        self.__solver_status = super().run()

What happens if a second keyboard interrupt occurs while the solver is still running (and hasn't reached one of the interrupt callbacks yet)?

I'm happy for feedback here. By default, I ignore any future Ctrl-C interrupts. We could instead force terminate after X attempts (see below), or we could just leave it up to the user if they want this behaviour.

def custom_wait(h, limit=5):
    result = (False, None)

    for count in range(limit):
        try:
            while result[0] == False:
                result = h.wait(0.1)
            return result[1]

        except KeyboardInterrupt:
            print(f'Ctrl-C pressed {count+1} times: Waiting for HiGHS to finish. ({limit} times to force termination)')
            h.cancelSolve()
    
    # if we reach this point, we should force termination
    print("Forcing termination...")
    exit(1)

h = highspy.Highs()
h.readModel("30n20b8.mps")
h.HandleUserInterrupt = True  # add interrupt callbacks, but user will handle KeyboardInterrupts manually

h.startSolve()
custom_wait(h)

Which gives:

Ctrl-C pressed 1 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 2 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 3 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 4 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 5 times: Waiting for HiGHS to finish. (5 times to force termination)
Forcing termination...

@mathgeekcoder
Copy link
Contributor

FYI @michaelbynum, @jajhall, and @few,

I found the cause of deadlocking when running HiGHS in a temporary thread. e.g., the following code will randomly deadlock.

Note: this is not just a python issue. C++ will also deadlock. BTW: I tagged @jajhall since you'll be looking at some parallel code in the near future.

for _ in range(10):
    h = # some small highs model, can be same model for each loop or different instance (doesn't matter)
    t = Thread(target=h.run)
    t.start()
    t.join()

TLDR: calling HighsTaskExecutor::shutdown() or Highs::resetGlobalScheduler() within the temporary thread will avoid the issue. Alternatively, we can rewrite the synchronization code in HighsTaskExecutor.

# e.g., this does not deadlock
for i in range(10):
    t = Thread(target=lambda: h.run() and highspy.Highs.resetGlobalScheduler(False))
    t.start()
    t.join()

Details:

  1. Each time we solve in a new thread, highs::parallel::initialize_scheduler is called, which initializes a new thread_local mainWorkerHandle and spins up new worker threads (that are detached immediately).
  2. These worker threads can take some time to start, but HiGHS can start processing on the already active ones.
  3. HiGHS can finish before all the worker threads start, and the main "temporary thread" terminates.
  4. When the main thread dies, the thread_local mainWorkerHandle destructor calls HighsTaskExecutor::shutdown() which blocks until all worker threads are initialized and synchronized.
  5. This is where it's hard to debug, but it seems that the thread scheduler (at least in Windows), waits to start these "un-started" worker threads until the "currently destructing" thread dies - which is blocked, waiting for these "un-started" worker threads.
  6. Hence deadlock.

If we call HighsTaskExecutor::shutdown() at the end of the temporary thread (instead of in its destructor), the thread scheduler isn't blocked, and it can synchronize as expected.

That said, this synchronization isn't strictly necessary. It should be possible to inform the worker threads to shutdown without waiting for them to initialize - since they already share the same this pointer. The last worker thread to die will also delete the shared memory. The required code change is small, but probably needs significant testing.

@jajhall: Do you think this is something worth doing? Should I log an issue / submit a PR?

@jajhall
Copy link
Member

jajhall commented Sep 12, 2024

This is potentially very interesting, as we've got a long-standing issue #1044 that @galabovaa is currently working on

@jajhall
Copy link
Member

jajhall commented Sep 12, 2024

Do you think this is something worth doing? Should I log an issue / submit a PR?

Thanks for the offer to help, but let's see what @galabovaa thinks first

@mathgeekcoder
Copy link
Contributor

Good point! I hadn't made that connection, but after looking at the history, it's very possibly related. I've also noticed the deadlock is more likely to occur if numThreads is larger.

Looks like there's a lot of connected issues and history, so I don't know reproduction steps for all of them (to verify). But I'm happy to work with @galabovaa if she'd like!

@galabovaa
Copy link
Contributor

Thank you for your observations on the thread scheduler, @mathgeekcoder! I have been trying to work out from where within HiGHS to call shutdown(), and how to restructure the code, in order to resolve this threads issue. It's popped up from different users calling HiGHS from different interfaces, and it is very hard to reproduce or debug. As you say, changes would require signifiacnt testing, so we have been cautious about this one.

I will get a chance to dig deeper next week. We have to make a change to the code, for sure. Testing-wise, we get data race warnings from the thread sanitizer, so clearing those would be a great first step. Valgrind pops some warnings too and I suspect they are all related.

If what you have in mind is a small change to the code, it would be great if you could open a PR with your suggested modifications. As long as it is not something you would have to spend a lot of time on. We may get another suggestion for handling it next week, but I think it would be great to explore your idea of informing the worker threads to shutdown without waiting for them to initialize.

@mathgeekcoder
Copy link
Contributor

Happy to help!

If what you have in mind is a small change to the code, it would be great if you could open a PR with your suggested modifications.

I've started a PR but noticed a couple of small issues with the "latest" highspy impl. I've already made some minor fixes (not pushed yet), but I want to do some more testing tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python Interface should release the GIL when run gets called
5 participants