KnowHow

技術的なメモを中心にまとめます。
検索にて調べることができます。

[改良版4]asyncioを用いてサーバ側のコマンド結果をクライアントで取得するコード。client.pyのエラーハンドリングを追加して改善しました。

登録日 :2024/09/22 08:21
カテゴリ :Python基礎

さらにコードを改良しました。client.pyでのエラーハンドリングを追記しています。
以下のようなファイル構成です。

server.py
client.py
submit.py (client.pyを実行するラッパースクリプト)
config/settings.py (設定ファイル)
log/server.log(ログファイル)

config/settings.py

import os
import sys

dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(dir_path)


"""
Share settings
"""
#server.pyを実行するサーバのIPアドレスは適切に変更する。以下はlocalhostの設定。
#ポート番号も、既存サービスと重複しないようなものを選定する。
SERVER_IP = '127.0.0.1'
SERVER_PORT = 8888

"""
For Server settings
"""
LOG_FILE = dir_path + '/log/server.log'
LIC_FILE = dir_path + '/log/server.log'
BASE_COMMAND = 'cat ' + LIC_FILE + ' | grep '
TIMEOUT = 10
STR_LEN_LIMIT = 10

server.py

サーバ側で起動しておくプログラム。APIとして、client.pyからの応答を受け取り結果を返答する。
インタプリタ(シバン)#!/usr/bin/python3 も書いておく。

#!/usr/bin/python3

import asyncio
import asyncio.subprocess
import shlex
import logging
import sys
import os
import datetime
import gc

dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(dir_path)

from config import settings

#logging.basicConfig(filename=settings.LOG_FILE, level=logging.DEBUG)
logging.basicConfig(filename=settings.LOG_FILE, level=logging.INFO)
logger = logging.getLogger(__name__)
logger.debug({'add path': dir_path})


class FetchLogAPIServer(object):
    def __init__(self, _base_command, _strlen, _timeout):
        self.lock = asyncio.Lock()
        self.base_command = _base_command
        self.strlen = _strlen
        self.timeout = _timeout

    async def run_command(self, reader, writer):
        dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
        try:
            data = await reader.read()
            client_addr = writer.get_extra_info('peername')
            client_ip = client_addr[0]
            client_port = client_addr[1]
            logger.debug({'time': dt_now, 'client_ip': client_ip, 'client_port': client_port})
            rev_msg = data.decode()
            name, num = rev_msg.split(',')
            print({'time': dt_now, 'client_ip': client_ip, 'name': name, 'num': num})
            logger.debug({'time': dt_now, 'input name': name, 'input num': num})

            if not name.isalnum() or len(name) > int(self.strlen):
                logger.error({'time': dt_now,
                              'status': 'failed',
                              'client ip': client_ip,
                              '[ERROR]Bad name': name})
                raise ValueError(f"{name} is an Invalid or Too long input")

            if not num.isdigit():
                logger.error({'time': dt_now,
                              'status': 'failed',
                              'client ip': client_ip,
                              '[ERROR]Bad num': num})
                raise ValueError(f"{num} is not number.")

            full_command = self.base_command + shlex.quote(name) + ' | tail -n ' + shlex.quote(num)
            logger.debug({'time': dt_now, "Executing command": full_command})

            async with self.lock:
                proc = await asyncio.create_subprocess_exec(
                    'sh', '-c', full_command,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE)
                try:
                    stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.timeout)
                    res = str(stdout.decode())
                    exitcode = await proc.wait()
                    if proc.returncode != 0:
                        error_message = f"Command failed with exit code {exitcode}: {stderr.decode()}"
                        logger.error({'time': dt_now,
                                      'status': 'failed',
                                      'client ip': client_ip,
                                      '[ERROR]Command failed': full_command})
                        writer.write(error_message.encode())
                    else:
                        writer.write(res.encode())
                        logger.info({'time': dt_now,
                                     'status': 'success',
                                     'client_ip': client_ip,
                                     'name': name,
                                     'num': num})
                except asyncio.TimeoutError:
                    proc.kill()
                    logger.error({'time': dt_now,
                                  'status': 'failed',
                                  'client ip': client_ip,
                                  "[ERROR]Command time out": full_command})
                    writer.write("Command timed out".encode())
        except Exception as e:
            logger.error({'time': dt_now,
                          'status': 'failed',
                          "[ERROR]Catch Exception": str(e)})
            writer.write(f"Error: {str(e)}".encode())
        finally:
            await writer.drain()
            writer.close()


if __name__ == '__main__':
    ip = settings.SERVER_IP
    port = settings.SERVER_PORT
    base_command = settings.BASE_COMMAND
    timeout = settings.TIMEOUT
    strlen = settings.STR_LEN_LIMIT

    loop = asyncio.get_event_loop()
    counter_sever = FetchLogAPIServer(base_command, strlen, timeout)
    coro = asyncio.start_server(counter_sever.run_command,
                                ip, port, loop=loop)

    server = loop.run_until_complete(coro)
    print('server {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

    gc.collect()

client.py

server.pyと通信をする。server.pyに信号を送り、server.pyから結果を取得するプログラム。

#!/usr/bin/python3

import os
import sys
import asyncio
import gc
import datetime
import logging
from optparse import OptionParser

dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(dir_path)

from config import settings

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)


class AwaitableClass(object):
    def __init__(self, arg, _loop):
        self.name = arg[0]
        self.num = str(arg[1])
        self.loop = _loop
        self._ip = settings.SERVER_IP
        self._port = settings.SERVER_PORT

    def __await__(self):
        async def request_server():
            try:
                reader, writer = await asyncio.open_connection(
                    self._ip, self._port)
                send_msg = self.name + "," + self.num
                writer.write(send_msg.encode())
                writer.write_eof()
                data = await reader.read()
                data = data.decode()
                return data
            except Exception as e:
                _messages = "The server-side service may be down, so please contact support."
                logger.error({"ERROR": _messages})
        return request_server().__await__()


async def main(name, _loop):
    # print('chunk reader')
    dt_now = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
    print(dt_now)
    result = await AwaitableClass(name, _loop)
    print(result)


if __name__ == '__main__':
    usage = 'usage: %prog -u <username> -n <number>'
    parser = OptionParser(usage=usage)
    parser.add_option('-u', '--user', action='store', type='string',
                      dest='name', help='user name')
    parser.add_option('-n', '--num', action='store', type='int',
                      dest='num', help='tail number')
    options, args = parser.parse_args()
    try:
        username = str(options.name)
        if username is None or username == "None":
            raise Exception("user name is required.")
        nums = int(options.num)

        logger.debug({'input username': username})
        logger.debug({'number of log lines': nums})

        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait([
            main([username, nums], loop),
        ]))
        loop.close()
    except Exception as e:
        logger.error({"ERROR": str(e)})

    gc.collect()

submit.sh

client.pyを実行するためのラッパースクリプト
実行ユーザ名を取得して自動で引数を与えます。ラッパーでも入力引数のチェックをしています。

#!/bin/bash

SCRIPT_DIR='/home/app/asyncio_server/client'
SCRIPT_NAME='client.py'
DEFAULT_NUM=20
DEBUG=false

function echo_usage() {
    echo "Usage  : $0 <number>"
    echo "Example: $0 5"
}

function echo_message(){
    echo "------------------------------------------------------------------"
    echo "The default log displayed is the latest ${DEFAULT_NUM} lines. "
    echo "If you want to change this, please provide a number as an argument."
    echo_usage
    echo "------------------------------------------------------------------"
}

# 引数の数をチェック
if [ $# -eq 0 ]; then
    # 引数がない場合、デフォルト値を使用
    echo_message
    NUM=${DEFAULT_NUM}
elif [ $# -eq 1 ]; then
    # 引数が1つの場合
    if [[ $1 =~ ^[0-9]+$ ]]; then
        # 引数が数値の場合、NUMに設定
        NUM=$1
    else
        # 引数が数値でない場合、エラーメッセージを表示して終了
        echo "[ERROR]: The argument must be a number."
        echo_usage
        exit 1
    fi
else
    # 引数が2つ以上の場合、エラーメッセージを表示して終了
    echo "[ERROR]: Too many arguments. Only 0 or 1 argument is allowed."
    echo_usage
    exit 1
fi

SCRIPT='python3 '${SCRIPT_DIR}'/'${SCRIPT_NAME}' -u '${USER}' -n '${NUM}

if $DEBUG;
then
  echo ${SCRIPT}
fi

eval ${SCRIPT}

プログラムをサービス化する

RockyLinuxサーバで、server.pyをサービス化する方法です。
まずサービスファイルを作成します。
ExecStartで指定するserver.pyのパスは適切に修正してください。
「fetchlogserver.service」

[root@nis1 system]# cat fetchlogserver.service
[Unit]
Description=My Python App
After=network.target
StartLimitIntervalSec=0

[Service]
ExecStart=/usr/bin/python3 /home/app/asyncio_server/server/server.py
Restart=always
User=nobody
RestartSec=1s

[Service]
User=root
Group=root

[Install]
WantedBy=multi-user.target

サービスファイルを、/etc/systemd/systemに配置します。

サービスファイルをリロードして起動し、ステータスを確認します。

systemctl daemon-reload
systemctl start fetchlogserver.service
systemctl status fetchlogserver.service

もしfailedとなった場合は、ログを見て適切に対処しましょう。

journalctl -u fetchlogserver.service

問題なければ、サービスの自動起動も有効にしておきます。

systemctl enable fetchlogserver.service

firewalldが有効の場合は、以下の手順でポートを解放する

firewall-cmd --get-active-zones
sudo firewall-cmd --zone=public --add-port=8888/tcp
sudo firewall-cmd --zone=public --add-port=8888/tcp --permanent
sudo firewall-cmd --reload
sudo firewall-cmd --zone=public --list-ports
[root@headnode temp]# firewall-cmd --get-active-zones
libvirt
  interfaces: virbr0
public
  interfaces: enp0s8
[root@headnode temp]# sudo firewall-cmd --zone=public --add-port=8888/tcp
success
[root@headnode temp]# sudo firewall-cmd --zone=public --add-port=8888/tcp --permanent
success
[root@headnode temp]# sudo firewall-cmd --reload
success
[root@headnode temp]# sudo firewall-cmd --zone=public --list-ports
6817/tcp 6818/tcp 6819/tcp 7817/tcp 8888/tcp
[root@headnode temp]# vi firewalld
[root@headnode temp]#

(補足)
サービスファイルはシンボリックリンクでもよい。
アプリケーションと一緒にサービスファイルを保管しておくことで、メンテナンスや管理がしやすいと思われる。※シンボリックリンクではうまくいかないこともあることが分かった。(SELinuxが有効の場合。この時は直接ファイルを配置する必要がある)
例)

ln -s /home/app/asyncio_server/server/config/service/fetchlogserver.service /etc/systemd/system/fetchlogserver.service
[root@nis1 system]# ll
lrwxrwxrwx. 1 root root   35  8  3 19:36  display-manager.service -> /usr/lib/systemd/system/gdm.service
lrwxrwxrwx. 1 root root   69  9 22 08:37  fetchlogserver.service -> /home/app/asyncio_server/server/config/service/fetchlogserver.service

Appendix 1 クライアントからの実行例

クライアントから、submit.shプログラムを実行すると以下のようになる。

[n1025@nis1 client]$ ./submit.sh
------------------------------------------------------------------
The default log displayed is the latest 20 lines.
If you want to change this, please provide a number as an argument.
Usage: ./submit.sh <integer number>
------------------------------------------------------------------
2024/09/21 09:17:55

[n1025@nis1 client]$ ./submit.sh 2
2024/09/21 09:18:01
INFO:__main__:{'time': '2024/09/21 09:17:55', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'n1025'}

[n1025@nis1 client]$

サーバ側のログにも記録されている。

[n1025@nis1 server]$ ls
config  log  server.py  server.py.org
[n1025@nis1 server]$ tail -10 log/server.log
INFO:__main__:{'time': '2024/09/21 08:43:59', 'status': 'success', 'client_ip': '192.168.56.101', 'argument': 'user01'}
INFO:__main__:{'time': '2024/09/21 08:44:13', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'n1100'}
INFO:__main__:{'time': '2024/09/21 09:09:43', 'status': 'success', 'client_ip': '192.168.56.101', 'argument': 'user01'}
INFO:__main__:{'time': '2024/09/21 09:10:00', 'status': 'success', 'client_ip': '192.168.56.101', 'argument': 'user01'}
INFO:__main__:{'time': '2024/09/21 09:10:46', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'root'}
INFO:__main__:{'time': '2024/09/21 09:11:03', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'root'}
INFO:__main__:{'time': '2024/09/21 09:13:24', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'root'}
INFO:__main__:{'time': '2024/09/21 09:13:31', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'root'}
INFO:__main__:{'time': '2024/09/21 09:17:55', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'n1025'}
INFO:__main__:{'time': '2024/09/21 09:18:01', 'status': 'success', 'client_ip': '192.168.56.104', 'argument': 'n1025'}
[n1025@nis1 server]$

Appendix 2 サーバ側のプログラム変更や、設定ファイルsettings.pyを変更したときの反映方法

プログラムを修正したり、settings.pyの変更を行った場合は、修正内容を反映するためにサービスを再起動する。

[root@nis1 server]# systemctl restart fetchlogserver.service
[root@nis1 server]# systemctl status fetchlogserver.service
 fetchlogserver.service - My Python App
   Loaded: loaded (/etc/systemd/system/fetchlogserver.service; disabled; vendor preset: disabled)
   Active: active (running) since Sat 2024-09-21 09:25:00 JST; 6s ago
 Main PID: 114474 (python3)
    Tasks: 1 (limit: 11004)
   Memory: 9.3M
   CGroup: /system.slice/fetchlogserver.service
           └─114474 /usr/bin/python3 /home/app/asyncio_server/server/server.py

 9 21 09:25:00 nis1 systemd[1]: fetchlogserver.service: Succeeded.
 9 21 09:25:00 nis1 systemd[1]: Stopped My Python App.
 9 21 09:25:00 nis1 systemd[1]: Started My Python App.
[root@nis1 server]# 

Appendix3 起動停止が適切に行われているか。

systemdで起動停止したサービスについて、psコマンドにてゾンビなどが残っていないかを確認する。

[root@nis1 log]# systemctl status fetchlogserver.service
 fetchlogserver.service - My Python App
   Loaded: loaded (/etc/systemd/system/fetchlogserver.service; disabled; vendor preset>
   Active: active (running) since Sat 2024-09-21 09:25:00 JST; 10h ago
 Main PID: 114474 (python3)
    Tasks: 1 (limit: 11004)
   Memory: 9.3M
   CGroup: /system.slice/fetchlogserver.service
           mq114474 /usr/bin/python3 /home/app/asyncio_server/server/server.py

 9 21 09:25:00 nis1 systemd[1]: fetchlogserver.service: Succeeded.
 9 21 09:25:00 nis1 systemd[1]: Stopped My Python App.
 9 21 09:25:00 nis1 systemd[1]: Started My Python App.
[root@nis1 log]#
[root@nis1 log]# ps aux | grep python3
root      134368  0.2  1.0 303176 18268 ?        Ss   07:04   0:00 /usr/bin/python3 /home/app/asyncio_server/server/server.py
root      134381  0.0  0.0 221944  1180 pts/2    R+   07:04   0:00 grep --color=auto python3
[root@nis1 log]#

停止してみよう

[root@nis1 log]# systemctl stop fetchlogserver.service
[root@nis1 log]# systemctl status fetchlogserver.service
 fetchlogserver.service - My Python App
   Loaded: loaded (/etc/systemd/system/fetchlogserver.service; disabled; vendor preset>
   Active: inactive (dead)

 9 22 06:57:59 nis1 systemd[1]: fetchlogserver.service: Succeeded.
 9 22 06:57:59 nis1 systemd[1]: Stopped My Python App.
 9 22 06:58:46 nis1 systemd[1]: Started My Python App.
 9 22 07:04:07 nis1 systemd[1]: Stopping My Python App...
 9 22 07:04:07 nis1 systemd[1]: fetchlogserver.service: Succeeded.
 9 22 07:04:07 nis1 systemd[1]: Stopped My Python App.
 9 22 07:04:22 nis1 systemd[1]: Started My Python App.
 9 22 07:05:17 nis1 systemd[1]: Stopping My Python App...
 9 22 07:05:17 nis1 systemd[1]: fetchlogserver.service: Succeeded.
 9 22 07:05:17 nis1 systemd[1]: Stopped My Python App.
[root@nis1 log]# ps aux | grep python3
root      134402  0.0  0.0 221944  1100 pts/2    R+   07:05   0:00 grep --color=auto python3
[root@nis1 log]#

python3で起動していたserver.pyのプロセスが消えていることを確認できた。