Python

Python入門ガイド: データと時間の処理・プロセスと並行処理の実装方法

データと時間の処理は、Pythonにおいて非常に重要な要素です。特に、プロセスの作成や並行処理の実装は、効率的なプログラム実行や高速なアプリケーションの開発に不可欠です。この記事では、Pythonにおけるプロセスと並行処理の概念と具体的な実装方法について詳しく解説します。

プログラムとプロセス

この記事を理解するにあたり最低限必要な用語を解説します。なんとなく概念はわかるかと思いますが、しっかり理解することが大切です。

プログラムとは

コンピュータに望む機能を実現するため、指示を与えるコードと扱うデータの総称。アプリもスクリプトもプログラムだし、OSもプログラム。プロセスやスレッドにして動かすのはプログラム。

プロセスとは

ひとつの独立したプログラムがOSによってRAM上に配置された状態。CPUが実行するためのコードも配置されているが、CPUの実行時情報を保持するのはスレッドの役目。

現在のOSでは、それぞれのプロセスは独立したメモリ空間に配置されているため、何の手段もなく別のプロセスにアクセスすることは出来ない。

subprocessによるプロセスの作成

Pythonのsubprocessモジュールを使用すると、外部プログラムを実行し、そのプロセスを制御することができます。以下は、subprocessを使用して外部コマンドを実行する例です。

import subprocess

# 外部コマンドの実行
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)

# 実行結果の表示
print(result.stdout)

multiprocessingによるプロセスの作成

multiprocessingモジュールは、Pythonの標準ライブラリであり、複数のプロセスを作成して並行処理を実現するためのツールセットです。以下は、multiprocessingを使用してプロセスを作成する例です。

import multiprocessing

# プロセスで実行する関数
def worker(name):
    print(f'Hello, {name}!')

# プロセスの作成と実行
process = multiprocessing.Process(target=worker, args=('Alice',))
process.start()
process.join()

terminate()によるプロセスの強制終了

プロセスを正常に終了させる場合は、join()メソッドを使用しますが、時にはプロセスを強制終了させる必要があります。以下は、terminate()メソッドを使用してプロセスを強制終了させる例です。

import multiprocessing
import time

# プロセスで実行する関数
def worker():
    while True:
        print('Working...')
        time.sleep(1)

# プロセスの作成と実行
process = multiprocessing.Process(target=worker)
process.start()

# 5秒後にプロセスを強制終了
time.sleep(5)
process.terminate()

osによるシステム情報の入手

Pythonのosモジュールを使用すると、システムに関する情報を入手することができます。以下は、osモジュールを使用してシステム情報を取得する例です。

import os

# プロセスIDの取得
pid = os.getpid()
print(f'プロセスID: {pid}')

# カレントディレクトリの取得
current_dir = os.getcwd()
print(f'カレントディレクトリ: {current_dir}')

# 環境変数の取得
env_vars = os.environ
print('環境変数:')
for var, value in env_vars.items():
    print(f'{var}: {value}')

psutilによるプロセス情報の取得

psutilは、Pythonの外部モジュールであり、システムのプロセスやシステムリソースに関する情報を取得するための便利なツールです。以下は、psutilを使用してプロセス情報を取得する例です。

import psutil

# 全てのプロセスの情報を取得
processes = psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info'])
for process in processes:
    print(f'プロセスID: {process.info["pid"]}')
    print(f'プロセス名: {process.info["name"]}')
    print(f'CPU使用率: {process.info["cpu_percent"]}')
    print(f'メモリ使用量: {process.info["memory_info"].rss}')

コマンドの自動実行

プロジェクトでの開発を進める際に、よくあるコマンド操作をまとめておくことがよくあります。 例えば不要なファイルを削除したり、フォーマッターを適用したり、本番用にビルドしたりなどがあります。

Invoke

Invokeは、Pythonのパッケージであり、シェルコマンドやタスクの自動化を行うためのツールです。以下は、Invokeを使用してコマンドを実行する例です。

from invoke import run

# コマンドの実行
result = run('ls -l', hide=True)

# 実行結果の表示
print(result.stdout)

Invokeは、ビルドツールのmakeやRubyのRakeのようなものをPythonで実現できます。

あまり使い道が伝わらないと思いますので、もう一つタスクを作成するサンプルを提示します。

以下のようなPythonスクリプトをtasks.pyという名前で保存しておきます。
ファイル名はtasks.pyでなければなりません。

from invoke import task, run

@task
def startjob(ctx, taskname, verbose=False, iteration=10):
    print ("starting job...")
    print ("taskname = %s" % taskname)
    print ("verbose = %s" % verbose)
    print ("iteration = %d" % iteration)
    run("ls -l | head -n 5")
    print ("completing job...")

シェルからinvoke(省略してinvでも可)と入力することで、@taskデコレータが付与された処理を呼び出すことができます。

下記の様にコマンドラインでタスクを実行できます。

$ inv --list
Available tasks:

  startjob

# startjobタスクのhelp表示。
$ inv --help startjob
Usage: inv[oke] [--core-opts] startjob [--options] [other tasks here ...]

Docstring:
  none

Options:
  -i, --iteration
  -t STRING, --taskname=STRING
  -v, --verbose

# startjob実行。
$ inv startjob tasks1
starting job...
taskname = tasks1
verbose = False
iteration = 10
total 96
-rw-r--r--  1 kenjih  staff   134  4  2 00:39 counter.py
drwxr-xr-x@ 5 kenjih  staff   170 11 30  2009 examples
-rw-r--r--  1 kenjih  staff   296  4  9 19:46 groupby.py
-rw-r--r--  1 kenjih  staff    95 11 21  2015 hoge.py
completing job...

# オプション指定
$ inv startjob tasks1 --verbose --iteration 300
starting job...
taskname = tasks1
verbose = True
iteration = 300
total 96
-rw-r--r--  1 kenjih  staff   134  4  2 00:39 counter.py
drwxr-xr-x@ 5 kenjih  staff   170 11 30  2009 examples
-rw-r--r--  1 kenjih  staff   296  4  9 19:46 groupby.py
-rw-r--r--  1 kenjih  staff    95 11 21  2015 hoge.py
completing job...

タスクが増えてきたらtasks.py内で関数を定義して処理を共通化したり、namespaceを使ってtasks.pyを複数ファイルに分割したりして、プロジェクトに合わせて拡張していくことができます。上手く使うと非常に使いやすいコマンドラインツールが作成できます。

その他のコマンド実行ヘルパー

Pythonには、subprocessモジュールやshモジュールなど、さまざまなコマンド実行ヘルパーライブラリがあります。これらのライブラリを使用することで、シェルコマンドの実行や標準入出力の取得などを簡単に行うことができます。是非使用してみて下さい。

並行処理

並行処理は、複数のタスクを同時に実行するための方法です。Pythonでは、さまざまな方法で並行処理を実現することができます。

この記事を理解するにあたり最低限必要な用語を併せて解説しています。なんとなく概念はわかるかと思いますが、しっかり理解することが大切です。

キュー

キューは、データを一時的に保管するためのデータ構造であり、マルチスレッドやマルチプロセスの間でデータをやり取りするために使用されます。以下は、Pythonのqueueモジュールを使用してキューを操作する例です。

import queue

# キューの作成
q = queue.Queue()

# データの追加
q.put('Data 1')
q.put('Data 2')

# データの取得
data1 = q.get()
data2 = q.get()

print(data1)
print(data2)

キューとは

キュー(queue)、あるいは待ち行列はコンピュータの基本的なデータ構造の一つ。 データを先入れ先出しのリスト構造で保持するものです。 キューからデータを取り出すときには、先に入れられたデータから順に取り出される。 キューにデータを入れることをエンキュー、取り出すことをデキューいいます。

プロセス

前述の通り、multiprocessingモジュールを使用することで、複数のプロセスを作成し並行処理を行うことができます。

スレッド

スレッドは、プログラム内での処理の単位であり、Pythonのthreadingモジュールを使用して実装することができます。以下は、スレッドを使用して並行処理を行う例です。

import threading

# スレッドで実行する関数
def worker(name):
    print(f'Hello, {name}!')

# スレッドの作成と実行
thread = threading.Thread(target=worker, args=('Alice',))
thread.start()
thread.join()

concurrent.futures

concurrent.futuresは、Pythonの組み込みモジュールであり、並行処理を簡単に実現するためのツールです。以下は、concurrent.futuresを使用して並行処理を行う例です。

import concurrent.futures

# 関数を並行実行するためのプールを作成
with concurrent.futures.ThreadPoolExecutor() as executor:
    # 非同期処理をスケジュール
    future1 = executor.submit(worker, 'Alice')
    future2 = executor.submit(worker, 'Bob')

    # 結果の取得
    result1 = future1.result()
    result2 = future2.result()

    print(result1)
    print(result2)

グリーンスレッドとgevent

グリーンスレッドは、軽量なスレッドの実装であり、geventというサードパーティライブラリを使用することで実現できます。以下は、geventを使用して並行処理を行う例です。

from gevent import monkey; monkey.patch_all()
import gevent

# 非同期で実行する関数
def worker(name):
    print(f'Hello, {name}!')
    gevent.sleep(1)

# イベントループの作成
tasks = [gevent.spawn(worker, 'Alice'), gevent.spawn(worker, 'Bob')]

# イベントループの実行
gevent.joinall(tasks)

スレッドとは

CPUの実行単位。属するプロセス、CPUレジスタ、スタックエリアなどを持っているため、OSがスレッドを順次切り替えることでコア数を超える複数の処理を並行(見かけ上並列)処理が出来る。

OSによっては、プロセス=スレッドな場合もあります。

またプログラミング言語によってはスレッドと言いつつも、OS上の1ネイティブスレッドの中でその言語での複数スレッドを回している実装もあります。

twisted

twistedは、Pythonの非同期ネットワーキングフレームワークであり、イベントドリブンなプログラミングを実現するためのツールです。以下は、twistedを使用して非同期処理を行う例です。

from twisted.internet import reactor, defer

# 非同期で実行する関数
def worker(name):
    print(f'Hello, {name}!')
    reactor.callLater(1, reactor.stop)

# 非同期処理の実行
deferred = defer.Deferred()
deferred.addCallback(worker, 'Alice')
deferred.callback(None)

# イベントループの開始
reactor.run()

asyncio

asyncioは、Pythonの非同期プログラミングをサポートする標準ライブラリであり、コルーチンとイベントループを使用して非同期処理を実現します。以下は、asyncioを使用して非同期処理を行う例です。

import asyncio

# コルーチンで実行する関数
async def worker(name):
    print(f'Hello, {name}!')
    await asyncio.sleep(1)

# イベントループの作成と実行
loop = asyncio.get_event_loop()
tasks = [worker('Alice'), worker('Bob')]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

Redis

Redisは、高速なデータベースとメッセージブローカーの役割を果たすインメモリデータストアです。Pythonのredisモジュールを使用することで、PythonからRedisを操作することができます。以下は、redisモジュールを使用してRedisを操作する例です。

import redis

# Redisに接続
r = redis.Redis(host='localhost', port=6379, db=0)

# データの追加
r.set('key1', 'value1')

# データの取得
value = r.get('key1')
print(value)

キューを越えて

並行処理を使用してキューを越えたデータ共有やタスクの調整を行うためには、マルチプロセスやマルチスレッドの間でのデータの共有や同期を実現する必要があります。Pythonでは、multiprocessingモジュールやthreadingモジュールを使用して、これらの機能を実現することができます。

以下は、Pythonのmultiprocessingモジュールとthreadingモジュールを使用して、マルチプロセスとマルチスレッド間でのデータの共有と同期を実現するサンプルコードです。

マルチプロセスのデータ共有と同期

import multiprocessing

# 共有変数の作成
shared_value = multiprocessing.Value('i', 0)

# データ共有と同期を行う関数
def worker_func(value):
    with value.get_lock():
        value.value += 1

# マルチプロセスの作成
processes = []
for _ in range(5):
    p = multiprocessing.Process(target=worker_func, args=(shared_value,))
    processes.append(p)
    p.start()

# 各プロセスの終了を待つ
for p in processes:
    p.join()

# 共有変数の値を表示
print(shared_value.value)

このコードでは、multiprocessing.Valueを使用して整数型の共有変数を作成し、マルチプロセスでのデータ共有と同期を行っています。value.get_lock()を使用してロックを獲得し、複数のプロセスが同時に値を変更することを防ぎます。

マルチスレッドのデータ共有と同期

import threading

# 共有変数の作成
shared_value = 0
lock = threading.Lock()

# データ共有と同期を行う関数
def worker_func():
    global shared_value
    with lock:
        shared_value += 1

# マルチスレッドの作成
threads = []
for _ in range(5):
    t = threading.Thread(target=worker_func)
    threads.append(t)
    t.start()

# 各スレッドの終了を待つ
for t in threads:
    t.join()

# 共有変数の値を表示
print(shared_value)

このコードでは、threading.Lockを使用してスレッド間のデータ共有と同期を行っています。ロックを獲得している間は他のスレッドが共有変数にアクセスできないようになっています。

これらのコードを使用することで、マルチプロセスやマルチスレッド間でのデータ共有と同期を実現することができます。注意点として、データの共有や同期は適切に管理する必要があり、競合状態やデッドロックなどの問題が発生しないように注意する必要があります。

応用アプリケーション

並行処理の概念と具体的な実装方法を理解したので、以下にいくつかの応用アプリケーションの例を示します。

並列Webスクレイピング

複数のウェブサイトから情報をスクレイピングする際に、マルチプロセスやマルチスレッドを使用して並行してデータを収集することができます。以下は、Pythonのrequestsモジュールとconcurrent.futuresを使用した並列Webスクレイピングの例です。

import requests
from concurrent.futures import ThreadPoolExecutor

# スクレイピングするウェブサイトのURLリスト
urls = ['https://example.com', 'https://example.org', 'https://example.net']

# 各ウェブサイトを並行してスクレイピングする関数
def scrape_website(url):
    response = requests.get(url)
    # スクレイピングのロジックを実装する
    # ...

# マルチスレッドでウェブスクレイピングを実行
with ThreadPoolExecutor() as executor:
    # 各ウェブサイトをスレッドで処理する
    results = executor.map(scrape_website, urls)

    # 各スレッドの結果を処理する
    for result in results:
        # 結果の処理を実装する
        # ...

スクレイピングについては別記事で詳しく書きますが、上の処理を応用すると次の様にPythonのrequestsモジュールとconcurrent.futuresを使用してAmazon.co.jpから「python」の本のランキングデータを取得する事などが可能です。

import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

# スクレイピング対象のURL
url = 'https://www.amazon.co.jp/gp/bestsellers/books/492352/ref=zg_bs_nav_books_2_466298'

# スクレイピングする関数
def scrape_book_ranking(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')

    # ランキングデータの抽出
    ranking_items = soup.find_all('span', class_='zg-badge-text')
    rankings = [item.get_text(strip=True) for item in ranking_items]

    # 結果の表示
    for ranking in rankings:
        print(ranking)

# マルチスレッドでスクレイピングを実行
with ThreadPoolExecutor() as executor:
    executor.submit(scrape_book_ranking, url)

このコードでは、指定したURLからリクエストを送り、BeautifulSoupを使用してHTMLを解析しています。ランキングデータを抽出して表示しています。

注意点として、Amazon.co.jpのWebページの構造が変更される可能性があるため、スクレイピングコードを実行する前にWebページの構造を確認し、必要な要素を正しく指定してください。

また、この例ではマルチスレッドを使用していますが、Amazon.co.jpへの大量のリクエストを送ることは避けるべきです。Webスクレイピングを行う場合は、適切な間隔を空けてリクエストを送るようにしてください。また、Amazon.co.jpの利用規約に従ってスクレイピングを行ってください。

並列データ処理パイプライン

複数のデータ処理ステップをパイプラインとして構築し、マルチプロセスやマルチスレッドを使用してデータを並列に処理することができます。以下は、Pythonのmultiprocessingモジュールを使用した並列データ処理パイプラインの例です。

import multiprocessing

# データ処理ステップ1の関数
def process_step1(data):
    # データ処理ステップ1の処理を実装する
    # ...

# データ処理ステップ2の関数
def process_step2(data):
    # データ処理ステップ2の処理を実装する
    # ...

# パイプラインの処理を定義
def pipeline(data):
    # データ処理ステップ1のプロセスを作成
    step1_process = multiprocessing.Process(target=process_step1, args=(data,))
    step1_process.start()

    # データ処理ステップ2のプロセスを作成
    step2_process = multiprocessing.Process(target=process_step2, args=(data,))
    step2_process.start()

    # 各ステップの終了を待つ
    step1_process.join()
    step2_process.join()

# データのリスト
data_list = [...]
# パイプラインの実行
for data in data_list:
    pipeline(data)

これらは、並行処理を使用した応用アプリケーションの例の一部です。並行処理を活用することで、処理速度の向上やリソースの効率的な利用を実現することができます。

まとめ

データと

時間に関するプロセスと並行処理について学びました。プロセスの作成や終了、システム情報の取得、プロセス情報の取得など、Pythonのsubprocessやmultiprocessing、os、psutilなどのモジュールを使用してプロセスを操作する方法を解説しました。

また、コマンドの自動実行についても学びました。Invokeやその他のコマンド実行ヘルパーを使用することで、Pythonからコマンドを自動的に実行する方法を解説しました。

さらに、並行処理に関する様々な手法とツールを学びました。キュー、プロセス、スレッド、concurrent.futures、グリーンスレッド、twisted、asyncio、Redisなどを使用することで、並行処理を実現する方法を解説しました。

最後に、これらの機能を使って高度な応用アプリケーションを作成する方法を学びました。並列Webスクレイピングや並列データ処理パイプラインなど、並行処理を活用した応用アプリケーションの例を見てきました。

並行処理は、多くのプログラムでパフォーマンスの向上や効率的な処理を実現するために重要な概念です。Pythonの豊富なモジュールやツールを活用して、データと時間に関する処理を効果的に行うための知識を身につけることができました。これらの知識を活かして、より高度なプログラムを開発していきましょう。