掘金 后端 ( ) • 2024-06-14 17:45

theme: channing-cyan

前言

一直对于 I/O密集 、CPU密集、集群这三者有点儿疑惑,这里写一个案例看一下。

home.png

一、环境

  • 安装mysql;
  • node v18+;

二、测试代码

package 依赖相关

{
  "name": "nodetest",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "async": "^3.2.5",
    "express": "^4.19.2",
    "mysql": "^2.18.1",
    "pm2": "^5.4.0"
  }
}

app.js 主程序

const express = require("express");
const app = express();
const mysql = require("mysql");
const util = require("util");

const pool = mysql.createPool({
  connectionLimit: 10,
  host: "localhost",
  user: "root",
  password: "123456",
  database: "mydatabase",
  connectTimeout: 30000, // 连接超时时间,单位是毫秒
  acquireTimeout: 30000, // 获取连接的超时时间,单位是毫秒
});

pool.query = util.promisify(pool.query);

function logWorkerStatus(process, status, index, type, count) {
  console.log(
    `Worker id ${process.pid} ${status} current index: ${index} 接口类型:${type} `
  );
}
app.use((req, res, next) => {
  console.log(`Request received: ${req.method} ${req.path}`);
  next();
});

app.use(express.json());
app.get("/", (req, res) => {
  res.send("Hello World");
});
app.get("/testsql/:index", async (req, res) => {
  const index = req.params.index;
  logWorkerStatus(process, "开始", index, "testsql");
  try {
    const user = {
      name: "test user " + index,
      create_time: Date.now(),
    };
    const sqlQuery = "SELECT COUNT(*) AS count FROM User WHERE name = ?";
    const resultsQuery = await pool.query(sqlQuery, [user.name]);
    const sql = "INSERT INTO User (name, create_time) VALUES (?, ?)";
    const results = await pool.query(sql, [user.name, user.create_time]);
    res.send({ id: results.insertId });
    logWorkerStatus(process, "结束", index, "testsql", resultsQuery[0].count);
  } catch (error) {
    console.error(error);
    res.status(500).send({ error });
  }
});

app.get("/testio/:index", (req, res) => {
  const index = req.params.index;
  logWorkerStatus(process, "开始", index, "testio");
  const fs = require("fs");
  const data = fs.readFileSync("text.txt", "utf8");
  fs.writeFileSync("text.txt", data + "\n Hello World" + index);
  res.send("Hello World");
  logWorkerStatus(process, "结束", index, "testio");
});

app.get("/testcpu/:index", async (req, res) => {
  const index = req.params.index;
  logWorkerStatus(process, "开始", index, "testcpu");
  function cpuIntensiveTask(n) {
    if (n < 2) return 1;
    else return cpuIntensiveTask(n - 1) + cpuIntensiveTask(n - 2);
  }
  cpuIntensiveTask(40);
  logWorkerStatus(process, "结束", index, "testcpu");
  res.send({ id: index });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server is running on port ${PORT}`);
});

test.js 测试脚本

const http = require("http");
const async = require("async");

const options = {
  host: "localhost",
  port: 3000,
  method: "GET",
  headers: {
    "Content-Type": "application/json",
  },
};

const test = (path) => {
  const concurrency = 10;
  let times = [];
  let success_count = 0;
  let error_count = 0;

  async.timesLimit(
    concurrency,
    concurrency,
    (n, next) => {
      const start_time = Date.now();
      const reqOptions = { ...options, path: `${path}${n}` };
      const req = http.request(reqOptions, (res) => {
        let data = "";
        res.on("data", (chunk) => {
          data += chunk;
        });
        res.on("end", () => {
          const request_time = Date.now() - start_time;
          times.push(request_time);
          console.log(request_time);
          success_count += 1;

          next(null, data);
        });
      });
      req.on("error", (err) => {
        error_count += 1;
        next(err);
      });
      req.end();
    },
    (err) => {
      if (err) {
        console.error(err);
      }
      const total_time = times.reduce((a, b) => a + b, 0);
      const max_time = Math.max(...times);
      const min_time = Math.min(...times);
      const average_time = total_time / times.length;

      console.table({
        Path: path,
        "Total time": total_time,
        "Max time": max_time,
        "Min time": min_time,
        "Average time": average_time,
        "Success count": success_count,
        "Error count": error_count,
      });
    }
  );
};

test("/testcpu/");
//test("/testsql/");
test("/testio/");

cluster.js 集群入口

const cluster = require("cluster");
const os = require("os");

if (cluster.isMaster) {
  const numCPUs = os.cpus().length;
  console.log(numCPUs);
  console.log(`Master ${process.pid} is running`);

  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on("exit", (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died`);
    cluster.fork();
  });
} else {
  require("./app");
  console.log(`Worker ${process.pid} started`);
}

cluster.on("online", (worker) => {
  console.log(`Worker ${worker.process.pid} is online`);
});

cluster.on("disconnect", (worker) => {
  console.log(`Worker ${worker.process.pid} disconnected`);
});

三、简单解释

目录结构

image.png

测试单体应用

  • 启动单应用 node app.js
  • 启动测试脚本 node test.js

测试集群

  • 启动服务 node cluster.js
  • 启动测试脚本 node test.js

结论

对于大多数项目来说,nodejs作为后台服务足够使用了。 本人大概统计了周围做开发的同学,对于CPU密集型的应用几乎没有,基本上都是I/O密集型的。 只是说,对于企业开发来说,JAVA的生态无可挑战。