关闭 运行 python 线程时出现问题

Problem with closing a running python thread

我有一个字符串形式的单行加密代码和一个解码和执行加密代码的函数(代码建立到网络中另一台计算机的后门连接,但这在这里不是很相关)一次加密代码 运行s 程序不会继续到下一行,因为代码不会停止(为了保持连接有效)。

我要做的是运行代码10秒,然后终止,重新运行。

但我无法终止代码,因为它已加密且无法更改。我以为我可以 运行 它在一个单独的线程上然后在 10 秒后强行终止它,但问题是互联网上提到的所有解决方案都需要稍微更改加密代码。

Python 似乎没有办法在 'n' 秒后强行关闭一个 运行ning 线程,这很容易解决我的问题。

以下代码演示了我的问题:

import threading, time

encrypted_code = ")'cba'(tnirp :eurT elihw"

def decrypt_and_execute(encrypted_code):
    exec(encrypted_code[::-1])

def run(encrypted_code, run_time):
    my_thread = threading.Thread(target=decrypt_and_execute, args=(encrypted_code,))
    my_thread.start()
    time.sleep(run_time)
    # CODE TO STOP 'my_thread' without changing 'encrypted_code' NEEDED HERE#

while True:
    threading.Thread(target=run, args=(encrypted_code, 10)).start()
    time.sleep(10)

以上是我尝试的解决方案(这里的加密代码是一个不断打印abc的无限while循环)。 我试图在不关闭整个应用程序的情况下停止 'my_thread' 的任何代码都不起作用。

也许带痕迹的线程技术对你有用,是一个你可以停止的线程子类。来自 https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/

或者使用子进程,它们已经终止。同样的 post 很好地涵盖了这一点。

编辑:出于某种原因,这未能终止相关案例中的线程。 multiprocess 模块解决了这个问题:killing processes 是停止所有操作系统的可靠方法,multiprocess 模块有一个 spawn 方法,它为子进程提供一个类似于 API 的线程,并且可以用 [=20] 启动新进程=] 代码也在 Windows 上,但没有安装 Python。

无论如何,对于其他情况,这种线程技术也应该有效。实际上很少需要线程,但有时确实需要。

# Python program using 
# traces to kill threads 

import sys 

import trace 
import threading 
import time 

class thread_with_trace(threading.Thread): 
  def __init__(self, *args, **keywords): 
    threading.Thread.__init__(self, *args, **keywords) 
    self.killed = False

  def start(self): 
    self.__run_backup = self.run 
    self.run = self.__run       
    threading.Thread.start(self) 

  def __run(self): 
    sys.settrace(self.globaltrace) 
    self.__run_backup() 
    self.run = self.__run_backup 

  def globaltrace(self, frame, event, arg): 
    if event == 'call': 
      return self.localtrace 

    else: 
      return None 

  def localtrace(self, frame, event, arg): 
    if self.killed: 
      if event == 'line': 
        raise SystemExit() 

    return self.localtrace 
  
  def kill(self): 
    self.killed = True

def func(): 
  while True: 
    print('thread running') 

t1 = thread_with_trace(target = func) 
t1.start() 

time.sleep(2) 
t1.kill() 
t1.join() 

if not t1.isAlive(): 
  print('thread killed')