programing

Python에서 스레딩을 사용하려면 어떻게 해야 하나요?

bestcode 2022. 9. 19. 23:47
반응형

Python에서 스레딩을 사용하려면 어떻게 해야 하나요?

Python의 스레드를 이해하려고 합니다.문서와 예시를 살펴봤지만, 솔직히 많은 예들이 지나치게 정교해서 이해하기 어렵습니다.

멀티스레딩을 위해 작업이 분할되는 것을 어떻게 명확히 보여줍니까?

이 질문이 2010년에 제기된 이후 Python에서 과 풀을 사용하여 간단한 멀티스레딩을 수행하는 방법이 실질적으로 간소화되었습니다.

아래 코드는 기사/블로그 투고에서 인용한 것입니다.이 투고에서는 반드시 (소속 없음) 병행성을 한 줄에 나타냅니다:A Better Model for Day Threading Tasks (일상 스레드화 태스크에 적합한 모델)아래에 정리합니다.결국 코드 몇 줄에 불과합니다.

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

다음 중 하나의 멀티스레드 버전:

results = []
for item in my_array:
    results.append(my_function(item))

묘사

맵은 멋진 작은 기능으로 Python 코드에 병렬을 쉽게 주입할 수 있는 열쇠입니다.익숙하지 않은 사람들에게 지도는 리스프와 같은 기능적 언어에서 가져온 것이다.이것은 다른 함수를 시퀀스에 매핑하는 함수입니다.

맵은 시퀀스에 걸쳐 반복을 처리하고 함수를 적용하여 모든 결과를 편리한 목록 끝에 저장합니다.

여기에 이미지 설명을 입력하십시오.


실행

맵 함수의 병렬 버전은 두 개의 라이브러리(멀티프로세싱)와 거의 알려지지 않았지만 마찬가지로 환상적인 스텝자녀(멀티프로세싱.dummy)에 의해 제공됩니다.

multiprocessing.dummy는 멀티프로세서 모듈과 완전히 동일하지만 스레드를 대신 사용합니다(중요한 구별CPU 부하가 높은 태스크에 여러 프로세스를 사용, I/O용(및 I/O 중) 스레드 사용).

multiprocessing.dummy는 멀티프로세싱 API를 복제하지만 스레드화 모듈 주변의 래퍼에 지나지 않습니다.

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

타이밍은 다음과 같습니다.

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

여러 인수 전달(Python 3.3 이상에서만 이와 같이 작동):

여러 어레이를 통과하려면:

results = pool.starmap(function, zip(list_a, list_b))

또는 상수 및 배열을 전달하려면:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

이전 버전의 Python을 사용하는 경우 이 회피책을 통해 여러 인수를 전달할 수 있습니다).

(유저 136036 님의 유익한 코멘트에 감사드립니다.)

다음은 간단한 예입니다. 몇 가지 대체 URL을 시도하고 응답할 첫 번째 URL의 내용을 반환해야 합니다.

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

각 및각 스레드가 종료해도 진행하지 합니다.각 서브스레드는 URL이 해결되어 응답하고 내용을 큐에 넣을 때까지 대기하고 있습니다.각 스레드는 데몬입니다(메인 스레드가 종료되면 프로세스가 계속 진행되지 않습니다).메인 스레드는 모든 서브스레드를 시작합니다.get까지 대기합니다.put그런 다음 결과를 내보내고 종료합니다(데몬 스레드이기 때문에 아직 실행 중일 수 있는 서브 스레드는 모두 다운됩니다.

Python에서 스레드를 적절하게 사용하는 것은 항상 I/O 작업에 연결됩니다(CPython은 CPU 바인딩 작업을 실행하기 위해 여러 코어를 사용하지 않기 때문에 스레드의 유일한 이유는 I/O를 기다리는 동안 프로세스를 차단하지 않기 때문입니다).큐는 작업을 스레드에 할당하거나 작업 결과를 수집하는 가장 좋은 방법이며 본질적으로 스레드 세이프하기 때문에 잠금, 조건, 이벤트, 세마포어 및 기타 스레드 간 조정/통신 개념을 걱정할 필요가 없습니다.

메모: Python에서 실제 병렬화를 수행하려면 멀티프로세서 모듈을 사용하여 병렬로 실행되는 여러 프로세스를 포킹해야 합니다(글로벌 인터프리터 잠금으로 인해 Python 스레드는 인터리빙을 제공하지만 실제로는 병렬이 아닌 직렬로 실행되며 I/O 작업을 인터리빙할 때만 유용합니다).

단, 인터리빙(또는 글로벌인터프리터 잠금에도 병렬화할 수 있는 I/O 조작을 실행하는 경우)만을 검토하고 있는 경우에는 스레드화 모듈을 시작합니다.매우 간단한 예로서 서브 범위를 병렬로 가산하여 큰 범위를 가산하는 문제를 생각해 보겠습니다.

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

위의 예는 매우 어리석은 예입니다.이는 I/O가 전혀 이루어지지 않으며 글로벌인터프리터 잠금으로 인해 인터리브(콘텍스트스위칭의 오버헤드 추가)를 통해 CPython에서 시리얼로 실행되기 때문입니다.

앞서 언급한 다른 항목과 마찬가지로 CPython은 GIL로 인해 I/O 대기 시에만 스레드를 사용할 수 있습니다.

CPU에 바인드된 작업에 여러 코어를 사용하는 경우 다음과 같이 멀티프로세싱을 사용합니다.

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

주의: 스레드에는 큐가 필요하지 않습니다.

이것은 10개의 프로세스가 동시에 실행되고 있는 것을 나타내는 가장 간단한 예입니다.

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

알렉스 마르텔리의 대답이 나를 도왔다.다만, 여기(적어도 나에게는) 더 유용하다고 생각되는 수정 버전이 있습니다.

업데이트됨: Python 2와 Python 3 모두에서 작동

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

함수의 경우,f 을 긋습니다

import threading
threading.Thread(target=f).start()

를 「」에 .f

threading.Thread(target=f, args=(a,b,c)).start()

코어와 같은 수의 스레드를 생성하여 (이 경우 셸 프로그램이라고 부릅니다) 많은 작업을 실행할 수 있도록 합니다.

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

Python 3는 병렬 태스크를 실행할 수 있습니다.이것은 우리의 일을 더 쉽게 해준다.

스레드 풀링과 프로세스 풀링이 있습니다.

다음은 통찰력을 제공합니다.

ThreadPoolExecutor 예시(소스)

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor(소스)

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

실제 작업이 수행되지 않고 대부분 CPU에 바인딩되어 있는 예를 많이 보았습니다.1000만에서 1005만 사이의 모든 소수를 계산하는 CPU 바인딩 태스크의 예를 다음에 나타냅니다.여기서는 네 가지 방법을 모두 사용했습니다.

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)
if __name__ == "__main__":
    main()

다음은 Mac OS X 4코어 머신에 대한 결과입니다.

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds

새로운 동시다발적 기술을 사용해서.선물 모듈

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

실행자 접근 방식은 이전에 Java에 손을 댄 적이 있는 모든 사람들에게 익숙한 것처럼 보일 수 있습니다.

, ,, , 이: 를 온전하게 /, sane, ut, ut, ut, ut, ut, ut, ut, ut, ut, ut, 실, ut, ut, , 실, ut, ut, ut, , ut, ut, ut, ut, ut, your, your, your, your, your, your, sane, sane, your, your, your, your, your, ,, ,를하지 않는 는, your,with할 수 있다)

스레드의 완벽한 예는 비동기 이벤트를 모니터링하는 것입니다.이 코드를 보세요.

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

IPython 세션을 열고 다음과 같은 작업을 수행하여 이 코드를 사용할 수 있습니다.

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

몇 분간 기다려 주세요.

>>> a[0] = 2
Mon = 2

및의 Python을 합니다.Threading ★★★★★★★★★★★★★★★★★」Queue초심자에게는 압도적으로 보일 수 있습니다.

'하다'를 .concurrent.futures.ThreadPoolExecutorPython 3 의 python 。

의의합과 with절과 목록 이해는 정말 매력적일 수 있습니다.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

이 투고에서 빌리면 멀티스레딩, 멀티프로세서, 비동기 중 하나를 선택하는 방법을 알 수 있습니다.asyncio용도를 나타냅니다.

Python 3에는 동시성과 병렬성을 만들기 위해 새로운 내장 라이브러리가 있습니다.선물

실험을 작업 4가지 작업)을해 보겠습니다..sleep() by 'method' (메서드)Threading-Pool:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker):
    futures = []
    tic = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))
        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())
    print(f'Total elapsed time by {max_worker} workers:', time()-tic)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

출력:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[주의] :

  • 위의 결과에서 알 수 있듯이, 가장 좋은 경우는 이 네 가지 작업에 대해 3명의 작업자가 있었습니다.
  • 블로킹/O)이 아닌 태스크가 있는 multiprocessingthreading는 수 .ThreadPoolExecutor로로 합니다.ProcessPoolExecutor.

다음으로 스레드를 사용한 CSV Import의 간단한 예를 제시하겠습니다.(라이브러리 포함은 목적에 따라 다를 수 있습니다.)

도우미 기능:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

드라이버 기능:

import_handler(csv_file_name)

제가 이 문제에 직접 대처해야 했을 때 도움이 되었다고 생각되는 간단한 예와 설명으로 기여하고 싶습니다.

이 답변에서는 Python의 GIL(글로벌 인터프리터 잠금)에 대한 정보와 multiprocessing.dummy 및 몇 가지 간단한 벤치마크를 사용하여 작성된 간단한 일상 예제를 볼 수 있습니다.

글로벌 인터프리터 잠금(GIL)

Python은 진정한 의미의 멀티 스레딩을 허용하지 않습니다.멀티 스레드 패키지가 있지만 코드 속도를 높이기 위해 멀티 스레드를 사용하는 것은 좋지 않습니다.

Python은 GIL(Global Interpreter Lock)이라는 구조를 가지고 있습니다.GIL을 사용하면 한 번에 하나의 스레드만 실행할 수 있습니다.스레드는 GIL을 취득하고 약간의 작업을 수행한 후 다음 스레드에 GIL을 전달합니다.

이는 매우 빠르게 이루어지기 때문에 인간의 눈에는 스레드가 병렬로 실행되는 것처럼 보일 수 있지만 실제로는 동일한 CPU 코어를 교대로 사용하고 있을 뿐입니다.

이 모든 GIL 패스는 실행에 오버헤드를 더합니다.즉, 코드를 더 빨리 실행하려면 스레드 패키지를 자주 사용하는 것은 좋지 않습니다.

Python의 스레드 패키지를 사용하는 데는 이유가 있습니다.효율을 중시하지 않고 몇 가지 작업을 동시에 실행하고 싶다면, 이 방법은 매우 편리하고 편리합니다.또는 (일부 I/O 등) 대기해야 하는 코드를 실행하고 있는 경우에는 매우 의미가 있을 수 있습니다.그러나 스레드 라이브러리는 추가 CPU 코어를 사용할 수 없습니다.

멀티 스레딩은 운영 체제(멀티 프로세싱을 통해), Python 코드를 호출하는 일부 외부 애플리케이션(: Spark 또는 Hadoop) 또는 Python 코드가 호출하는 일부 코드(예: Python 코드가 값비싼 멀티 스레드 작업을 수행하는 C 함수를 호출하도록 할 수 있음)에 아웃소싱할 수 있습니다.

이것이 중요한 이유

많은 사람들이 GIL이 무엇인지 알기 전에 Python의 고급 멀티 스레드 코드에서 병목 현상을 찾기 위해 많은 시간을 소비하기 때문입니다.

이 정보가 명확해지면 내 코드는 다음과 같습니다.

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

여기 도움이 되는 간단한 예를 포함한 멀티스레딩이 있습니다.Python에서 멀티 스레딩이 어떻게 동작하는지 쉽게 이해할 수 있습니다.이전 스레드의 작업이 완료될 때까지 다른 스레드에 대한 접근을 방지하기 위해 잠금 장치를 사용했습니다.이 코드 라인을 사용함으로써

tLock = 스레드화.BoundedSemaphore(값=4)

한 번에 여러 프로세스를 허용하고 이후 또는 이전 프로세스가 완료된 후에 실행되는 나머지 스레드를 유지할 수 있습니다.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

이전 솔루션 중 실제로 관리자 권한이 없는 GNU/Linux 서버에서 여러 코어를 사용한 솔루션은 없었습니다.그들은 단지 하나의 코어로 달렸어요.

는 낮은 의 것을 사용했어요.os.fork여러 프로세스를 생성하는 인터페이스입니다.이것은 나에게 효과가 있었던 코드입니다.

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

두 번째 anwser의 python3 버전으로서:

import queue as Queue
import threading
import urllib.request

# Called by each thread
def get_url(q, url):
    q.put(urllib.request.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com", "http://www.python.org","https://wiki.python.org/moin/"]

q = Queue.Queue()
def thread_func():
    for u in theurls:
        t = threading.Thread(target=get_url, args = (q,u))
        t.daemon = True
        t.start()

    s = q.get()
    
def non_thread_func():
    for u in theurls:
        get_url(q,u)
        

    s = q.get()
   

테스트할 수 있습니다.

start = time.time()
thread_func()
end = time.time()
print(end - start)

start = time.time()
non_thread_func()
end = time.time()
print(end - start)

non_timeout_func()는 thread_func()의 4배에 해당하는 시간을 필요로 합니다.

import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()

그것은 매우 이해하기 쉽다.스레드를 실행하는 두 가지 간단한 방법은 다음과 같습니다.

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def a(a=1, b=2):
    print(a)
    time.sleep(5)
    print(b)
    return a+b

def b(**kwargs):
    if "a" in kwargs:
        print("am b")
    else:
        print("nothing")
        
to_do=[]
executor = ThreadPoolExecutor(max_workers=4)
ex1=executor.submit(a)
to_do.append(ex1)
ex2=executor.submit(b, **{"a":1})
to_do.append(ex2)

for future in as_completed(to_do):
    print("Future {} and Future Return is {}\n".format(future, future.result()))

print("threading")

to_do=[]
to_do.append(threading.Thread(target=a))
to_do.append(threading.Thread(target=b, kwargs={"a":1}))

for threads in to_do:
    threads.start()
    
for threads in to_do:
    threads.join()

언급URL : https://stackoverflow.com/questions/2846653/how-can-i-use-threading-in-python

반응형