Lsiron

AIR PANG(ts, express, mysql)- 3(openAPI 데이터 받아오기 그리고 트랜잭션 적용 및 node-cron 사용법 2) 본문

개발일지/AIR PANG

AIR PANG(ts, express, mysql)- 3(openAPI 데이터 받아오기 그리고 트랜잭션 적용 및 node-cron 사용법 2)

Lsiron 2024. 8. 17. 07:22

현재 폴더 구조

data_project/
├── back                                   # 백엔드 프로젝트 루트 디렉토리
│   ├── logs                               # 로그 파일들이 위치하는 디렉토리
│   ├── node_modules                       # 의존성 모듈들이 위치하는 디렉토리
│   └── src                                # 소스 파일들이 위치하는 디렉토리
│       ├── config                         # 환경설정 관련 파일들이 위치하는 디렉토리
│       │   └── db.config.ts               # 데이터베이스 설정 파일 (MySQL 연결 설정)
│       ├── controllers                    # 컨트롤러 파일들이 위치하는 디렉토리
│       │   └── updateDataCron.ts          # 데이터 업데이트 스케줄링 컨트롤러
│       ├── dto                            # 데이터 전송 객체들이 위치하는 디렉토리
│       ├── middlewares                    # 미들웨어 파일들이 위치하는 디렉토리
│       ├── repositories                   # 데이터 저장소 파일들이 위치하는 디렉토리
│       │   ├── locationRepository.ts      # 위치 정보 데이터 저장소 파일
│       │   └── updateDataRepository.ts    # 데이터 업데이트를 위한 저장소 파일
│       ├── routes                         # 라우트 정의 파일들이 위치하는 디렉토리
│       ├── services                       # 서비스 파일들이 위치하는 디렉토리
│       │   ├── locationService.ts         # 위치 정보 관련 비즈니스 로직 파일
│       │   └── updateDataService.ts       # 데이터 업데이트 관련 비즈니스 로직 파일
│       ├── types                          # 타입 정의 파일들이 위치하는 디렉토리
│       │   └── location.ts                # 위치 및 대기오염 데이터 관련 타입 정의 파일
│       ├── utils                          # 유틸리티 함수들이 위치하는 디렉토리
│       │   ├── aqi.ts                     # AQI 계산 등 유틸리티 함수들이 포함된 파일
│       │   └── logger.ts                  # 로깅 유틸리티 (Winston 설정)
│       ├── app.ts                         # 애플리케이션 진입점 파일 (Express 설정 및 미들웨어 구성)
│       └── index.ts                       # 서버 시작 파일 (Express 서버 실행)
│   ├── .env                               # 환경변수 설정 파일 (개발 환경)
│   ├── .env.production                    # 프로덕션 환경변수 설정 파일
│   ├── .gitignore                         # Git에서 무시할 파일을 지정하는 파일
│   ├── nodemon.json                       # Nodemon 설정 파일 (개발 중 서버 자동 재시작 설정)
│   ├── package-lock.json                  # 정확한 버전의 패키지를 설치하기 위한 파일
│   ├── package.json                       # 프로젝트 메타데이터 및 의존성 목록
│   └── tsconfig.json                      # TypeScript 설정 파일 (컴파일러 옵션 등)

현재 데이터 구조

현재 지정한 타입

import { RowDataPacket } from 'mysql2';

export interface Location extends RowDataPacket {
  id: number;
  address_a_name: string;
  address_b_name: string;
}

export interface AirQualityItem {
  cityName: string;
  pm10Value: number;
  pm25Value: number;
  o3Value: number;
  no2Value: number;
  coValue: number;
  so2Value: number;
  dataTime: string;
}

현재 진행 과정

1. 사전에 db에 넣어둔 지역데이터를 가져온다.

2. openAPI에 설정된 지역과 db에 넣어둔 지역이 일치하는 경우, 설정된 지역의 데이터들을 가져온다.

3. 지역의 데이터들을 db에 넣는다. ( 데이터가 안 들어오거나, 잘못 들어올 경우 이 전의 작업들을 취소시킨다. )

4. 위 작업이 자동으로 돌아가도록 한다.

 

3. 지역의 데이터들을 db에 넣는다. ( 데이터가 안 들어오거나, 잘못 들어올 경우 이 전의 작업들을 취소시킨다. ) 

이제 세부지역 데이터들을 모두 추출했으니, 이 데이터들을 db에 넣어줄 쿼리문을 짜야한다.

updateDataRepository.ts 로 가서 아래와 같이 쿼리문을 짜주자.

import pool from '@_config/db.config';
import type { AirQualityItem } from '@_types/location';

export class UpdateDataRepository {
  public async insertOrUpdateAirQualityData(locationId: number, item: AirQualityItem): Promise<void> {
    const query = `
      INSERT INTO realtime_air_quality (location_id, pm10, pm25, o3, no2, co, so2, timestamp)
      VALUES (?, ?, ?, ?, ?, ?, ?, ?)
      ON DUPLICATE KEY UPDATE
        pm10 = VALUES(pm10),
        pm25 = VALUES(pm25),
        o3 = VALUES(o3),
        no2 = VALUES(no2),
        co = VALUES(co),
        so2 = VALUES(so2),
        timestamp = VALUES(timestamp)
    `;
    // 값이 하나라도 undefined 혹은 null 일 경우 기본값 0으로 설정
    await pool.query(query, [
      locationId,
      item.pm10Value || 0,
      item.pm25Value || 0,
      item.o3Value || 0,
      item.no2Value || 0,
      item.coValue || 0,
      item.so2Value || 0,
      item.dataTime
    ]);
  }
}

 

updateDataRepository 클래스를 선언하고 export 해준다.

 

insertOrUpdateAirQualityData 메서드는 외부에서 접근할 수 있도록 public으로 지정했다. 이 메서드는 비동기 함수이며, 대기질 데이터를 삽입하거나 업데이트하기 위해 locationId와 item을 인자로 받는다.

 

1. locationId: 각 대기질 데이터가 어떤 위치와 관련이 있는지를 식별하기 위한 값이다. 이 값은 데이터베이스에 대기질 데이터를 저장할 때 해당 위치와 매핑되도록 전달된다.

 

2. item: 대기질 측정 데이터를 포함한 객체로, pm10Value, pm25Value, o3Value, no2Value, coValue, so2Value, 그리고 dataTime과 같은 속성을 가지고 있다.

 

이제 INSERT INTO ... ON DUPLICATE KEY UPDATE 쿼리를 사용하여 데이터베이스에 대기질 데이터를 삽입해주자.

 

이 쿼리의 동작 방식은 다음과 같다.

 

1. 처음 데이터가 삽입될 때는 기본적으로 INSERT 문을 사용하여 데이터를 삽입한다.

 

2. 만약 중복된 키(여기서는 location_id)가 발생할 경우, 새 데이터를 추가하지 않고 기존 데이터를 업데이트한다. 즉, ON DUPLICATE KEY UPDATE 구문을 통해 중복 키가 발생하면, 기존 레코드를 새로운 값으로 덮어쓴다.

 

에어팡 프로젝트에서 과거의 실시간 대기질 데이터를 저장할 필요가 없기 때문에, 중복된 location_id에 대해서는 기존 데이터를 업데이트하는 방식으로 구현했다.

 

ON DUPLICATE KEY UPDATE 구문에서는 VALUES() 함수를 사용하여, 삽입하려던 새 값들을 참조하고, 해당 값들로 기존 레코드를 업데이트한다.

 

이 동작을 통해 삽입하려던 데이터를 중복된 키가 발생한 경우에도 활용할 수 있다.

 

결과적으로 업데이트되는 값들은 VALUES() 함수로 참조된 삽입하려던 값들이 된다.

 

허나 전제조건으로 중복 키 발생을 처리하기 위해 location_id 필드에 UNIQUE 제약 조건을 설정해야 한다.

 

location_id 필드가 고유한 위치를 식별하는 필드이기 때문에, 동일한 위치에 대해 중복 데이터를 방지하고, 삽입된 데이터를 업데이트할 수 있도록 보장된다.

 

쿼리문 내의 플레이스 홀더(?)에 삽입할 값들은 item 객체에서 가져오고, 값이 undefined 또는 null일 경우 기본값으로 0이 삽입되도록 해주자. 이를 통해 데이터가 없는 경우에도 에러 없이 기본값이 저장된다.

 

이제 다시 updateDataRepository.ts 로 가서 db에 넣는 작업을 해주자.

import axios from 'axios';
import { UpdateDataRepository } from '@_repositories/updateDataRepository'; // repository 클래스 가져오기
import { LocationService } from '@_services/locationService';
import pool from '@_config/db.config';
import logger from '@_utils/logger'; 
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
import timezone from 'dayjs/plugin/timezone';

dayjs.extend(utc);
dayjs.extend(timezone);

const API_KEY = encodeURIComponent(process.env.LOCATION_API_KEY || '');

export class UpdateDataService {
    private updateDataRepository: UpdateDataRepository; // repository 클래스 불러오기
    private locationService: LocationService;   

    constructor() {
      this.updateDataRepository = new UpdateDataRepository(); // 클래스 초기화
      this.locationService = new LocationService(); 
    }
  
    public async fetchAndStoreData(): Promise<void> {
      logger.info('Fetching and storing data process started.');
      
      const locations = await this.locationService.loadAllLocations();
      const provinces = Array.from(new Set(locations.map(loc => loc.address_a_name))); 
      
      const connection = await pool.getConnection(); // 데이터베이스 연결
      
      for (const province of provinces) {
        logger.info(`Fetching real-time data for ${province}.`);
        const url = `http://apis.data.go.kr/어쩌구/저쩌구?sidoName=${encodeURIComponent(province)}&serviceKey=${API_KEY}`;
        const response = await axios.get(url);
        
        if (!response.data || !response.data.response || !response.data.response.body || !response.data.response.body.items) {
          throw new Error(`Failed to fetch real-time data for ${province}`); 
        }
         
        const items = response.data.response.body.items || [];
        
        for (const loc of locations.filter(loc => loc.address_a_name === province)) {
          const item = items.find((it: AirQualityItem) => it.cityName === loc.address_b_name) || {
            pm10Value: 0,
            pm25Value: 0,
            o3Value: 0,
            no2Value: 0,
            coValue: 0,
            so2Value: 0,
            dataTime: new Date().toISOString(),
          };
          
          item.dataTime = dayjs(item.dataTime).tz('Asia/Seoul').format('YYYY-MM-DD HH:mm:ss');
          
          await this.updateDataRepository.insertOrUpdateAirQualityData(loc.id, item); // 데이터 삽입
        }
      }   
    }
}

 

repository 클래스를 불러온 후, 클래스 내부에서만 접근 할 수 있도록 private 으로 지정 해 주자.

 

이 후, 클래스 초기화를 위해 constructor로 넣어주고, 데이터베이스를 연결시켜주자.

 

마지막으로 updateDataRepository의 insertOrUpdateAirQualityData 메서드에 인자로 loc.id(location_id) 와 item을 넣어주고 비동기처리만 해주면 db에 넣는 작업까지 모두 마무리가 된다.

 

여기서 예외처리만 하면 끝이맞는데 뭔가 이상하다.

 

데이터를 잘 불러오고 잘 저장하는데, 한 세부지역에서 예기치 못한 오류가 발생하여 데이터를 제대로 불러오지 못 할 경우, 에러가 발생하여 서버가 터진다. 물론 여기까진 괜찮다. 허나?

 

db를 들어가보니 에러가 난 세부지역 보다 앞선 지역들은 5시로 업데이트가 돼 있고 에러가 난 세부지역 부터 그 후, 지역들은 모두 4시 데이터를 가지고있다.

즉, 에러가 난 세부지역 기준으로 5시 대기오염 물질 정보와 4시 대기오염 물질 정보가 섞인 것. 데이터가 일관성이 없다.

처음엔 오류만 잡으려는 방법만 찾고 갖은 방법을 써도 어디선가 언제쯤 한번은 에러가 발생하여 서버가 터지고 데이터가 섞였다. 파고든 끝에 찾은 방법은 트랜잭션이다.

 

내가 겪던 문제점을 바로 해결 해 주었다. 세부지역 에서 데이터를 잘못 받아오는 순간 받아오던 데이터 작업을 그대로 취소 시키는 유용한 기능이었다. 금융쪽에서 많이 사용된다고 하는데 추 후 집중적으로 파고 들어야겠다.

 

그럼 트랜잭션을 넣어보자 작업은 간단하다. 범위를 잘 설정 해 주면 된다.

 

for (const province of provinces) 루프에서 각 메인지역(province)에 대해 데이터를 가져오기 전에 beginTransaction 으로 트랜잭션을 시작 해 주자.

 

성공적으로 모든 데이터가 들어가면 commit, 오류가 발생하면 rollback 을 설정 해 주자.

 

이렇게 되면 예를 들어 서울 경기 ... 까지 잘 가져오다가 제주에서 에러가 발생하면 트랜잭션 롤백이 되어 제주 이전 지역들의 데이터가 db에 업데이트 되는 것을 막을 것 이다.

 

마지막으로 commit 과 rollback 둘 다 상관없이 release가 되도록 해 주자.

 

release를 호출하는 이유는 우리가 connection Pool을 사용하고 있기 때문에, 데이터베이스 연결을 사용한 후에는 연결을 반환하여 connection pool을 관리하기 위함이다.

 

우리 프로젝트 팀이 총 10개의 전화기를 가지고 있다 가정했을 때, 내가 5개의 전화기로 통화를 하고나서 더 이상 통화를 하지 않으면 수화기를 원위치 시켜놓는 개념이다.

 

내가 통화를 하지 않는데 수화기를 계속 들고있으면, 다른 사람이 사용하지 못 한다. 

import axios from 'axios';
import { UpdateDataRepository } from '@_repositories/updateDataRepository';
import { LocationService } from '@_services/locationService';
import type { AirQualityItem } from '@_types/location';
import pool from '@_config/db.config';
import logger from '@_utils/logger';
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
import timezone from 'dayjs/plugin/timezone';

dayjs.extend(utc);
dayjs.extend(timezone);

const API_KEY = encodeURIComponent(process.env.LOCATION_API_KEY || '');

export class UpdateDataService {
  private updateDataRepository: UpdateDataRepository;
  private locationService: LocationService;

  constructor() {
    this.updateDataRepository = new UpdateDataRepository();
    this.locationService = new LocationService();
  }

  public async fetchAndStoreData(): Promise<void> {
    logger.info('Fetching and storing data process started.');
  
    const locations = await this.locationService.loadAllLocations();
    const provinces = Array.from(new Set(locations.map(loc => loc.address_a_name))); 
  
    try {
      const connection = await pool.getConnection(); 
      try {
        await connection.beginTransaction(); // 트랜잭션 시작
        
        for (const province of provinces) {
          logger.info(`Fetching real-time data for ${province}.`);
          const url = `http://apis.data.go.kr/B552584/ArpltnStatsSvc/getCtprvnMesureSidoLIst?sidoName=${encodeURIComponent(province)}&searchCondition=DAILY&pageNo=1&numOfRows=100&returnType=json&serviceKey=${API_KEY}`;
          const response = await axios.get(url);
  
          if (!response.data || !response.data.response || !response.data.response.body || !response.data.response.body.items) {
            throw new Error(`Failed to fetch real-time data for ${province}`); 
          }
  
          const items = response.data.response.body.items || [];

          for (const loc of locations.filter(loc => loc.address_a_name === province)) {
            const item = items.find((it: AirQualityItem) => it.cityName === loc.address_b_name) || {
              pm10Value: 0,
              pm25Value: 0,
              o3Value: 0,
              no2Value: 0,
              coValue: 0,
              so2Value: 0,
              dataTime: new Date().toISOString(),
            };
            
            item.dataTime = dayjs(item.dataTime).tz('Asia/Seoul').format('YYYY-MM-DD HH:mm:ss');

            await this.updateDataRepository.insertOrUpdateAirQualityData(loc.id, item); // 데이터 삽입
          }
        }
  
        await connection.commit(); // 트랜잭션 커밋
        logger.info('Successfully fetched and stored data for all regions.');
       } catch (error) {
        await connection.rollback(); // 트랜잭션 롤백
        logger.error('An error occurred during data processing, transaction has been rolled back:', error);
       } finally {
        connection.release(); // 연결해제
       }
     } catch (error) {
      logger.error('Failed to fetch and store data for all regions:', error);
    }
  }  
}

 

비동기 작업으로 트랜잭션이 시작, 커밋, 롤백 되도록 하였고, 커밋과 롤백 상관없이 release 처리를 해 주었으며, 예외처리까지 모두 해 주었다. 트랜잭션이 성공적으로 작동하는 것을 확인 하였다.

 

그렇다면 마지막은 위 작업들이 서버를 켜놓거나 무중단 배포를 해 놓았을 시, 스케쥴러가 작동하여 자동으로 진행되도록 하는 것이다. 

 

4. 위 작업이 자동으로 돌아가도록 한다. 

위 작업이 자동으로 돌아가도록 하기 위해선 서버리스 방법과 스케쥴러를 사용하는 방법이 있지만, 에어팡 프로젝트는 시간관계상 스케쥴러를 사용하기로 했다.

 

우리가 사용할 스케쥴러는 바로 node-cron이다. 먼저 아래 명령어를 통해 node-cron을 설치해주자.

$ npm install node-cron @types/node-cron

 

기본 코드는 이 링크를 참고하자. https://www.npmjs.com/package/node-cron

 

node-cron

A simple cron-like task scheduler for Node.js. Latest version: 3.0.3, last published: 9 months ago. Start using node-cron in your project by running `npm i node-cron`. There are 1262 other projects in the npm registry using node-cron.

www.npmjs.com

 

이제 updateDataCron.ts 파일로 가서 cron 설정을 해 주자.

import cron from 'node-cron';
import { UpdateDataService } from '@_services/updateDataService';

export class UpdateDataCron {
  private updateDataService: UpdateDataService;

  constructor() {
    this.updateDataService = new UpdateDataService();
  }

  public startCronJob(): void {
    cron.schedule('0 */24 * * *', () => {
      this.updateDataService.fetchAndStoreData();
    });

    // 서버 시작 시 한번 실행 (테스트 할 땐 주석처리)
    // this.updateDataService.fetchAndStoreData();
  }
}

 

UpdateDataCron 클래스를 선언하고 export 해준다.

 

UpdateDataService 클래스를 불러온 후, 클래스 내부에서만 접근 할 수 있도록 private 으로 지정 해 주자.

 

이 후, 클래스 초기화를 위해 constructor로 넣어주고, startCronJob 메서드를 외부에서 접근하여 사용할 수 있도록 public으로 지정 해 주자. 타입은 반환하는 값이 없기 때문에 void로 지정 해 준다.

 

이 후, 기본코드에 나와 있듯이 cron.schedule의 첫 번째 인자로 시간을 정해주고, 콜백함수로 updateDataService의 fetchAndStoreData 메서드가 작동하도록 해주면 된다.

( 에어팡 프로젝트는 24시간 마다 한 번씩 불러오도록 했다 = openAPI 일일트래픽이 500이기 때문이다. )

 

예기치 못한 오류가 발생하여 데이터를 업데이트 해 주지 못했을 때 서버를 재 시작할 경우 즉시 실시간 데이터를 가져오는 작업이 실행 되도록 스케쥴링을 해 주지 않은 코드도 넣어주자.

 

테스트 할 땐 주석처리를 해 주자. 테스트 할 때 주석 처리를 하지 않으면 재실행 할 때마다 데이터를 가져온다. 

 

마지막으로 app.ts 로 가서 서버를 실행 했을 때, cron 작업이 실행 되도록 해 주자.

import 'reflect-metadata';
import express, { Request, Response, NextFunction } from 'express';
import cors from 'cors';
import cookieParser from 'cookie-parser';
import session from 'express-session';
import dotenv from 'dotenv';
import passport from 'passport';
import routes from '@_routes/index';
import { UpdateDataCron } from '@_controllers/updateDataCron'; // UpdateDataCron 클래스 가져오기
import '@_config/passport.config';

dotenv.config();

const app = express();

app.use(cors({
  origin: process.env.CLIENT_URL,
  credentials: true 
}));

app.use(express.json());
app.use(cookieParser());

app.use(session({
  secret: process.env.SESSION_SECRET || '',
  resave: false,
  saveUninitialized: false,
  cookie: {
    secure: process.env.NODE_ENV === '', 
    httpOnly: true, 
    maxAge: 24 * 60 * 60 * 1000, 
  }
}));

app.use(passport.initialize());
app.use(passport.session());

app.use((req: Request, res: Response, next: NextFunction) => {
  next();
});

app.use('/api', routes);

// 크론 작업 시작
const updateDataCron = new UpdateDataCron(); // updateDataCron 변수로 설정 후 인스턴스 생성:
updateDataCron.startCronJob(); // startCronJob 메서드 호출:

app.use((err: any, req: Request, res: Response, next: NextFunction) => {
  console.error('Server error:', err);
  res.status(500).send('Something broke!');
});

export default app;

 

UpdateDataCron 클래스를 가져와서 해당 클래스의 인스턴스를 생성하고 이를 updateDataCron 변수에 저장하여, startCronJob 메서드가 호출 하도록 해 준다.

 

1. 사전에 db에 넣어둔 지역데이터를 가져온다.

2. openAPI에 설정된 지역과 db에 넣어둔 지역이 일치하는 경우, 설정된 지역의 데이터들을 가져온다.

3. 지역의 데이터들을 db에 넣는다. ( 데이터가 안 들어오거나, 잘못 들어올 경우 이 전의 작업들을 취소시킨다. )

4. 위 작업이 자동으로 돌아가도록 한다.

 

모든 작업이 완료 되었다.

 

openAPI를 불러오고, 트랜잭션을 적용하며, 스케쥴링 사용까지 정말 길었다. 정상적으로 작동 하는 것을 보니 신기할 따름이다.