Skip to content
Home » 업 비트 파이썬 | 코인 자동 매매에 관심 있는 사람에게 필요한 영상 26245 투표 이 답변

업 비트 파이썬 | 코인 자동 매매에 관심 있는 사람에게 필요한 영상 26245 투표 이 답변

당신은 주제를 찾고 있습니까 “업 비트 파이썬 – 코인 자동 매매에 관심 있는 사람에게 필요한 영상“? 다음 카테고리의 웹사이트 kk.taphoamini.com 에서 귀하의 모든 질문에 답변해 드립니다: kk.taphoamini.com/wiki. 바로 아래에서 답을 찾을 수 있습니다. 작성자 초보코딩 이(가) 작성한 기사에는 조회수 25,169회 및 좋아요 411개 개의 좋아요가 있습니다.

Table of Contents

업 비트 파이썬 주제에 대한 동영상 보기

여기에서 이 주제에 대한 비디오를 시청하십시오. 주의 깊게 살펴보고 읽고 있는 내용에 대한 피드백을 제공하세요!

d여기에서 코인 자동 매매에 관심 있는 사람에게 필요한 영상 – 업 비트 파이썬 주제에 대한 세부정보를 참조하세요

#코인 #업비트 #자동매매 #파이썬 #코딩
업비트 API를 활용해서 자동매매에 필요한 명령어들을 직접 체험해보는 영상입니다.

업 비트 파이썬 주제에 대한 자세한 내용은 여기를 참조하세요.

[Python x Upbit][EP1] 내 파이썬과 업비트 API를 연결해 버리자

바로 시작하겠습니다. 오늘은,. 파이썬으로 업비트 Open API에 연결하여 잔고 조회하는 방법. 을 알아보고 실행해보겠습니다. 저만 따라 오시면 함께 …

+ 여기에 더 보기

Source: lapina.tistory.com

Date Published: 5/5/2022

View: 5084

[암호화폐] 파이썬과 업비트 API를 이용한 자동매매 upgrade

이전에 한 종목에 대하여 파이썬과 업비트 API를 이용한 자동매매 예제를 github에 올렸습니다. 신기하게도 제 프로그램을 관심있게 보던 분이 계셨습니다.

+ 여기를 클릭

Source: www.steemcoinpan.com

Date Published: 6/27/2022

View: 2368

[2] 파이썬으로 업비트 거래하기 – 이것저것 공부방 – 티스토리

업비트 API 신청하기 https://upbit.com/service_center/open_api_gue 들어가서 계좌만들고 OpenAPI 신청을 하면된다. 나는 그냥 다 체크했다.

+ 여기에 표시

Source: duckracoon.tistory.com

Date Published: 7/5/2022

View: 1901

sharebook-kr/pyupbit: python wrapper for upbit API – GitHub

get_tickers 함수는 업비트가 지원하는 모든 암호화폐 목록을 얻어옵니다. print(pyupbit.get_tickers()).

+ 여기에 표시

Source: github.com

Date Published: 3/13/2021

View: 2552

[Toy Project] 자동매매 프로그램 만들기 – 3. pyupbit 모듈 알아보기

pyupbit 모듈이란, 업비트 API를 파이썬에서 쉽게 사용하기 위해서 … 각 화폐별 ticker가 필요하다. get_tickers 함수를 이용하면 업비트에서 거래 …

+ 여기에 표시

Source: rebro.kr

Date Published: 12/15/2022

View: 9579

업 비트 파이썬 | 누구나 할 수 있는 비트코인 투자 자동화 강의 …

파이썬을 활용하여 비트코인 투자 전략을 구현해보고 업비트 거래소 API를 통해 가격이 기술적 전략에 부합할 때 자동으로 매매하는 프로그램을 개발합니다. 백테스팅을 …

+ 여기에 표시

Source: ppa.maxfit.vn

Date Published: 4/12/2021

View: 5041

업비트 REST API를 이용한 비트코인 가격 추출 파이썬 …

가장 빠르게 익히는 방법은 직접 해보는 것입니다. 이번 포스팅을 통해 REST API를 활용하여 비트코인 가격을 추출하는 파이썬 프로그래밍 방법을 …

+ 자세한 내용은 여기를 클릭하십시오

Source: coffee4m.com

Date Published: 6/22/2021

View: 10000

파이썬 업비트 웹소켓 접속방법 – 비트코인 자동매매 프로그램

업비트에서 코인 정보를 받아오는 방법은 크게 두 가지 방법으로 나눌 수 있는데요. 먼저 API를 개별 호출하여 정보를 얻어오는 방법과 웹소켓을 이용 …

+ 여기에 더 보기

Source: technfin.tistory.com

Date Published: 7/29/2022

View: 8034

주제와 관련된 이미지 업 비트 파이썬

주제와 관련된 더 많은 사진을 참조하십시오 코인 자동 매매에 관심 있는 사람에게 필요한 영상. 댓글에서 더 많은 관련 이미지를 보거나 필요한 경우 더 많은 관련 기사를 볼 수 있습니다.

코인 자동 매매에 관심 있는 사람에게 필요한 영상
코인 자동 매매에 관심 있는 사람에게 필요한 영상

주제에 대한 기사 평가 업 비트 파이썬

  • Author: 초보코딩
  • Views: 조회수 25,169회
  • Likes: 좋아요 411개
  • Date Published: 2021. 4. 27.
  • Video Url link: https://www.youtube.com/watch?v=No2SvNI_me8

[Python x Upbit][EP1] 내 파이썬과 업비트 API를 연결해 버리자_야너두 코인봇으로 부자 될 수 있어!(feat. 자빛스)

반응형

지난 편에서 코인봇으로 부자가 되는 서막을 알렸습니다.

저는 코인봇 개발에 대한 장대한 목표와 구체적인 계획을 세웠습니다.

동료가 있을지는 모르겠습니다. 동료가 되실 분은 좋아요를 눌러주세요.

저의 마음은 이미 두근대기 시작했습니다.

바로 시작하겠습니다.

오늘은,

파이썬으로 업비트 Open API에 연결하여 잔고 조회하는 방법

을 알아보고 실행해보겠습니다. 저만 따라 오시면 함께 부자 되실 수 있습니다.

다만, 준비된 자만이 기회를 잡을 수 있습니다.

사전 준비물을 안내드리겠습니다.

사전 준비

1. 파이썬의 기초

2. 주피터 노트북

3. 나는 할 수 있다 라는 믿음

준비 되셨으면 시작하겠습니다.

업비트에 들어갑니다.

슬픈 저의 계좌가 제 눈 앞을 가립니다.

조금만 기다려.. 내가.. 따따블 아니 따따따따블로 만들어 줄게..!!

살펴보기

업비트에 로그인하시고, 새벽 두시네요^^

고객센터 >> Open API 안내 로 이동합니다.

업비트 개발자 센터로 가봅니다.

저는 아직 “개발자” 라는 말이 멋있습니다.

자, 겟 스타리드 눌러줍니다.

시작되었습니다.

자 바로 API Reference로 가줍니다.

뭐든지, 나보다 더 똑똑한 사람들이 이미 생각해서 만들어 놓았습니다. 뭐든지요.

API Reference에 가면 처음으로 나오는녀석입니다.

자산>>전체계좌 조회

살펴보겠습니다. 아래 주석을 봐주세요.

계좌를 조회하기 위해 관련된 라이브러리를 먼저 설치해 줍시다.

터미널에서 설치하든 주피터에서 설치하든 아나콘다 가상환경을 만들어 설치하든 설치해 줍시다.

#라이브러리 설치가 필요하겠습니다. import os import jwt #pip install PyJWT import uuid #pip install uuid import hashlib #pip install hashlib from urllib.parse import urlencode import requests #urllib, requests는 보통 설치되어있습니다만 한번더 확인해 보세요. access_key = os.environ[‘UPBIT_OPEN_API_ACCESS_KEY’] secret_key = os.environ[‘UPBIT_OPEN_API_SECRET_KEY’] server_url = os.environ[‘UPBIT_OPEN_API_SERVER_URL’] payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = requests.get(server_url + “/v1/accounts”, headers=headers) print(res.json())

다음엔 뭘 해야할지 눈치챘습니다.

Upbit Open API 키를 받자.

내 파이썬에서 업비트 API를 활용하기 위해 인증하는 절차가 필요합니다.

로그인과 같은 절차라고 보면 됩니다.

저는 알파고보다 똑똑한 봇을 만들것이기 때문에 다 선택했습니다.

(봇으로 거래를 하려면 “주문하기”, “출금하기” 까지 다 선택하고 IP를 입력해야함. 두가지 선택 안하면 IP입력 안함)

IP는 제 MAC의 IP를 입력했습니다.

MAC은 아래의 명령어로 IP를 알 수 있습니다. (뒤에서 말하겠지만 이것보다 네이버에서 내 IP주소 검색해서 입력하는게 확실합니다.)

mac$ ipconfig getifaddr en0

모드 체크하고 발급받기를 클릭하면 ACCESS_KEY와 SECRET_KEY를 발급해 줍니다.

Upbit Open API 키를 파이썬이 알아볼 수 있게 입력해주자.

아래를 복붙해서 실행하면, 위의 코드에서 바로 실행할 수 있게 기본 환경변수에 키를 입력해 주게 된다.

즉, 매번 실행할 필요는 없음.

import os #기본 변수 설정 os.environ[‘UPBIT_OPEN_API_ACCESS_KEY’] = ‘당신이 받은 키를 복사해서 붙여넣기’ os.environ[‘UPBIT_OPEN_API_SECRET_KEY’] = ‘당신이 받은 키를 복사해서 붙여넣기’

그 다음 server_url을 요렇게 바꿔주자.

server_url = “https://api.upbit.com”

그렇게 하면 완성된 코드는 다음과 같다.

코드 실행

import os import jwt import uuid import hashlib from urllib.parse import urlencode import requests access_key = os.environ[‘UPBIT_OPEN_API_ACCESS_KEY’] secret_key = os.environ[‘UPBIT_OPEN_API_SECRET_KEY’] server_url = “https://api.upbit.com” payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = requests.get(server_url + “/v1/accounts”, headers=headers) print(res.json())

그리고 실행해 본다.

오잉

{‘error’: {‘message’: ‘인증된 IP가 아닙니다.’, ‘name’: ‘no_authorization_i_p’}}

후잉..

네이버에서 내 IP 주소를 찾아보니 다른 IP가 나온다.

API 키 받은 페이지로 가서 아래로 내려가면 IP를 변경할 수 있다.

네이버에서 찾은 IP도 얹어주자.

계좌 조회 결과 확인

조회 결과를 확인할 수 있었다.

[{‘currency’: ‘KRW’, ‘balance’: ‘xxx’, ‘locked’: ‘xxx’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}, {‘currency’: ‘DOGE’, ‘balance’:…………..

결과는 json 형태로 나오게끔 되어있다.

결과물에 어떤 내용이 있는지 살펴보면 다음과 같다.

Response (자세히 보려면 클릭)

필드 설명 타입 currency 화폐를 의미하는 영문 대문자 코드 String balance 주문가능 금액/수량 NumberString locked 주문 중 묶여있는 금액/수량 NumberString avg_buy_price 매수평균가 NumberString avg_buy_price_modified 매수평균가 수정 여부 Boolean unit_currency 평단가 기준 화폐 String

내 화폐

주문가능 금액/수량

매수 평균가

평단가 기준 화폐와 같은 정보를 볼 수 있다.

여기에서는 자동 매매를 하면서 balance를 확인할 때 많이 사용할 것 같다.

특정 잔액 이하가 되면 매도를 할 때까지 매수를 멈추도록 해야 되니까.

balance만 조회하려면 아래칸에 다음과 같이 코드를 작성해 주자.

json 파일은 복잡해 보일지 몰라도 결국 list, dictionary의 조합이다.

리스트를 indexing 하고 dictionary의 Key 를 불러와 주면 된다.

그리고 텍스트로 불러와지는 녀석을 숫자로 보기 위해 float() 처리를 해준다.

data = res.json() my_balance = float(data[0][‘balance’]) my_balance

실행하면 코드를 실행했을 때 내 잔액을 조회할 수 있게되고, my_balance 라는 변수로 활용할 수가 있게 되었다.

잔액을 까기 부끄러워서 안까게 된다. 괜히.

결론과 고찰

몇 분이 채 걸리지 않아 Python 과 Upbit Open API 를 연결 시켰고, 내 계좌를 데이터화 하여 조회하였다.

그리고, Upbit OpenAPI 가이드가 정말 잘 만들어져 있다는 것을 알게 되었다.

이 게임이 시작 된 이상 이 가이드는 나의 교과서다. 어떤 내용이 있는지 완벽히 파악하고, 이를 잘 엮어야 한다.

어쨌든 오늘로써

내 자동매매 봇에 눈이 달렸다.

눈만 달렸다.

단순히 눈으로 조회하는 것만으로는 아무것도 할 수 없다.

봇이 알아서 실시간으로 시세를 조회하고, 분봉, 일봉, 거래량 등 많은 정보를 종합적으로 판단하여 자동 매매를 할 수 있게 해야 한다.

즉 상황을 판단할 수 있는 뇌와,

적절한 타이밍에 매수, 매도 할 수 있는 결단력과

이를 실행할 수 있는 손과

나에게 보고 할 수 있는 텔레파시 수단 까지 마련해 주어야 한다.

이쯤에서 내 봇에게 이름을 지어주어야 겠다.

봇..

봇하면..아이언맨.. 아이언맨 하면..

자비스.

“자 빛 스”

자 : 스스로 자 (자동으로)

빛 : 빛나는

스 : Sweet 한 코인을 쌓는 로봇

자동으로 빛나는 스윗한 코인을 쌓아서 나를 부자로 만들어줘 나의 자빛스!!!!!!

자빛스야 잘 부탁해

여기까지 읽어 주셨다면 감사합니다.

다음편에서는 업비트 openAPI 를 이용해서 뭘 할 수 있을지 분석해 보는 시간을 가져야 겠습니다.

728×90

반응형

LIST

[2] 파이썬으로 업비트 거래하기

728×90

반응형

✏️ 업비트 API 신청하기

https://upbit.com/service_center/open_api_guide

들어가서 계좌만들고 OpenAPI 신청을 하면된다. 나는 그냥 다 체크했다. IP의 경우 네이버에 내IP라고 검색하면 찾을 수 있다. access_key랑 secret_key는 메모장에 복사해놓고 잘 저장해놓자.

✏️ pyupbit

그냥 넣은 그림

업비트 API를 랩핑한 파이썬 라이브러리가 있다. pyupbit를 이용할거다.

우선 내가 필요한 내용만 정리할 것이기 때문에 더 필요한 내용이 있다면 아래 깃허브주소를 참고하기바란다.

https://github.com/sharebook-kr/pyupbit

1. pyupbit 설치하기

$ pip install pyupbit

2. 가상화폐 지정 및 현재가 불러오기

3. 로그인하기

access_key = ‘API발급때 받은거’ secret_key = ‘API발급때 받은거’ upbit = pyupbit.Upbit(access_key, secret_key)

4. 잔고조회하기

balance = upbit.get_balances() print(balance)

[{‘currency’: ‘KRW’, ‘balance’: ‘0.27342679’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}, {‘currency’: ‘ETH’, ‘balance’: ‘0.01466775’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘3801000’, ‘avg_buy_price_modified’: False, ‘unit_currency’: ‘KRW’}]

ETH가 0.01466775ETH 있고, avg_buy_price (매수평균가)가 3801000KRW다.

5. 시장가주문하기(매도/매수)

* volume(수량기준), price(한화기준)

-시장가매도(sell_market_order)

ETH 0.01466775 매도

3581000.0 [{‘currency’: ‘KRW’, ‘balance’: ‘0.27342679’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}, {‘currency’: ‘ETH’, ‘balance’: ‘0.01466775’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘3801000’, ‘avg_buy_price_modified’: False, ‘unit_currency’: ‘KRW’}] [{‘currency’: ‘KRW’, ‘balance’: ‘52499.22357042’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}]

-시장가 매수(buy_market_order)

* 시장가매수를 코인개수로 지정할수 없음. price주어야함.

DOGE 5만원치 시장가매수

161.0 [{‘currency’: ‘KRW’, ‘balance’: ‘2474.22357182’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}, {‘currency’: ‘DOGE’, ‘balance’: ‘308.6419753’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘162’, ‘avg_buy_price_modified’: False, ‘unit_currency’: ‘KRW’}]

난 이거면 충분해서 API는 여기까지만 정리한다. 더 필요한건 깃허브에서. 보니까 pyupbit는 업비트에서 정식으로 제공하는 라이브러리가 아니라 누가 따로 수고를 해준것같다. 매우 감사한다.

다음 글은 백테스팅 관련글이 될 것 같다. 코드를 완성해서 돌아오겠다.

728×90

반응형

sharebook-kr/pyupbit: python wrapper for upbit API

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?

[Toy Project] 자동매매 프로그램 만들기 – 3. pyupbit 모듈 알아보기

728×90

반응형

[목차]

1. pyupbit 모듈 설치

2. 암호화폐 목록 얻기

3. 암호화폐 현재가 얻기

4. 암호화폐 과거 데이터 조회

5. 암호화폐 호가 조회

6. 잔고 조회

7. 지정가 매수/매도

8. 시장가 매수/매도

9. 주문 취소/조회

1. pyupbit 모듈 설치

pyupbit 모듈이란, 업비트 API를 파이썬에서 쉽게 사용하기 위해서 저자들이 개발한 모듈 이다. 굳이 API를 호출할 필요 없이 pyupbit 모듈을 이용하면 간편한 경우가 많다.

pyupbit github 와 wikidocs.net/31063 를 참고하여 작성하였다.

먼저 pyupbit 모듈을 설치해보자. 프롬프트 창에 “pip install pyupbit”를 입력한다.

그 후, 다음 코드를 실행했을 때 오류가 발생하지 않는다면 pyupbit 모듈이 잘 설치가 된 것이다.

import pyupbit print(pyupbit.Upbit) #

이제, pyupbit를 이용할 수 있는 대표적인 기능 몇 가지를 알아보자.

2. 암호화폐 목록 얻기

암호화폐를 거래하기 위해서는 각 화폐별 ticker가 필요하다. get_tickers 함수를 이용하면 업비트에서 거래할 수 있는 모든 암호화폐의 문자열 ticker를 리스트로 반환해준다.

import pyupbit print(pyupbit.get_tickers()) # OUTPUT [‘KRW-BTC’, ‘KRW-ETH’, ‘BTC-ETH’, ‘BTC-LTC’, ‘BTC-XRP’, ‘BTC-ETC’, …]

업비트에서는 원화(KRW)뿐만 아니라 BTC, USDT 마켓 또한 지원하므로 주로 이용하는 원화 마켓의 암호화폐를 선별하고 싶다면 다음과 같이 옵션을 추가해주어야 한다.

import pyupbit print(pyupbit.get_tickers(fiat=”KRW”)) # OUTPUT [‘KRW-BTC’, ‘KRW-ETH’, ‘KRW-NEO’, ‘KRW-MTL’, ‘KRW-LTC’, ‘KRW-XRP’, …]

fiat은 우리가 사용하는 종이 화폐, 즉 명목화폐(fiat money)에서 나온 말이다. 동일한 방법으로 BTC / USDT 마켓의 암호화폐도 골라낼 수 있다.

3. 암호화폐 현재가 얻기

get_current_price 함수를 이용하면 암호화폐의 마지막 체결 가격, 즉 현재가를 얻을 수 있다. 인자에 암호화폐의 ticker를 넣어주면 float형으로 값을 반환한다.

import pyupbit print(pyupbit.get_current_price(“KRW-BTC”)) # OUTPUT 64069000.0

만약 여러 암호화폐를 한 번에 조회하고 싶다면, ticker들을 담은 리스트를 인자로 넘겨주면 된다. 이 경우에는 반환 값으로 {key : value}가 {ticker : 현재가}인 딕셔너리가 반환된다.

import pyupbit print(pyupbit.get_current_price([“KRW-BTC”, “KRW-ETH”, “KRW-XRP”])) # OUTPUT {‘KRW-BTC’: 63984000.0, ‘KRW-ETH’: 5020000.0, ‘KRW-XRP’: 1695.0}

몇 분 사이에 값이 비트코인 가격이 소폭 하락한 것을 볼 수 있다. get_current_price 함수는 한 번에 최대 100개의 암호화폐를 조회할 수 있다.

4. 암호화폐 과거 데이터 조회

get_ohlcv 함수를 이용하여 인자에 ticker를 넘겨주면 해당 암호화폐의 OHLCV 데이터를 pandas DataFrame으로 반환한다.

시가 / 고가 / 저가 / 종가 / 거래량 / 거래금액 을 구할 수 있다.

import pyupbit print(pyupbit.get_ohlcv(ticker=”KRW-BTC”)) # OUTPUT open high low close volume value 2020-10-26 09:00:00 14687000.0 14920000.0 14358000.0 14764000.0 3031.758804 4.461594e+10 2020-10-27 09:00:00 14772000.0 15457000.0 14683000.0 15312000.0 3964.125320 5.957730e+10 2020-10-28 09:00:00 15300000.0 15557000.0 14635000.0 15060000.0 5752.669847 8.715575e+10 2020-10-29 09:00:00 15060000.0 15379000.0 14703000.0 15207000.0 3516.504745 5.284074e+10 2020-10-30 09:00:00 15207000.0 15460000.0 14899000.0 15340000.0 3753.488127 5.689144e+10 … … … … … … … 2021-05-09 09:00:00 71728000.0 73129000.0 70147000.0 71505000.0 11786.596492 8.435439e+11 2021-05-10 09:00:00 71506000.0 72460000.0 70175000.0 70902000.0 13095.617952 9.373196e+11 2021-05-11 09:00:00 70902000.0 71750000.0 67597000.0 69741000.0 11913.277828 8.271176e+11 2021-05-12 09:00:00 69714000.0 70499000.0 60854000.0 62257000.0 22173.238069 1.505872e+12 2021-05-13 09:00:00 62380000.0 64987000.0 60200000.0 64445000.0 5088.587170 3.218080e+11

get_ohlcv 함수는 다음과 같이 구성되어 있다.

get_ohlcv(ticker=’KRW-BTC’, interval=’day’, count=200, to=None, period=0.1) -> Union[DataFrame, None]

매개변수로 ticker, interval, count, to, period가 존재하며, 반환 값으로 Union[DataFrame, None]을 갖는다.

interval 변수는 조회 단위를 나타낸다. 분(1/3/5/10/15/30/60/240), 일, 주, 월별 데이터를 구할 수 있으며 인자로 넘길 땐 다음과 같다.

[ day / minute1 / minute3 / minute5 / minute10 / minute15 / minute30 / minute60 / minute240 / week / month ]

만약 interval을 지정하지 않는다면 기본값은 day이다.

count 변수는 조회 개수를 나타낸다. 최근 영업일부터 이전 count만큼의 영업일까지의 데이터를 가져오게 된다.

count를 지정하지 않는다면 기본값은 200이다.

to 변수는 입력된 시점의 이전까지의 데이터를 얻을 수 있다. 예를 들어 to=”20210101″이라고 하면, 2020년 12월 31일부터 이전 count만큼의 영업일까지의 데이터를 가져온다. 즉, 2020-06-15~2020-12-31의 데이터를 가져온다.

to를 지정하지 않는다면 기본값은 None이며, 이는 현재 일부터 가져오게 된다.

period는 데이터를 수집하는 주기를 말한다. count가 200 이하라면 period 옵션은 무시되며, count가 200보다 큰 경우 다른 API와 함께 사용한다면 조회 주기를 늘려주어야 한다.

import pyupbit print(pyupbit.get_ohlcv(ticker=”KRW-BTC”, interval=”minute3″, count=20)) # OUTPUT open high low close volume value 2021-05-13 10:36:00 64072000.0 64109000.0 63900000.0 63918000.0 58.438715 3.741627e+09 2021-05-13 10:39:00 63900000.0 63919000.0 63802000.0 63875000.0 73.630070 4.702404e+09 2021-05-13 10:42:00 63865000.0 64147000.0 63862000.0 64019000.0 60.718397 3.887090e+09 2021-05-13 10:45:00 64014000.0 64182000.0 63987000.0 64152000.0 36.676134 2.349825e+09 2021-05-13 10:48:00 64158000.0 64448000.0 64110000.0 64445000.0 63.697727 4.096094e+09 2021-05-13 10:51:00 64425000.0 64498000.0 64366000.0 64497000.0 78.271995 5.046166e+09 2021-05-13 10:54:00 64480000.0 64800000.0 64468000.0 64786000.0 99.956416 6.458304e+09 2021-05-13 10:57:00 64727000.0 64994000.0 64726000.0 64993000.0 92.597657 6.010476e+09 2021-05-13 11:00:00 64993000.0 64998000.0 64468000.0 64480000.0 110.172045 7.133575e+09 2021-05-13 11:03:00 64490000.0 64950000.0 64468000.0 64577000.0 93.156690 6.030081e+09 2021-05-13 11:06:00 64562000.0 64670000.0 64500000.0 64613000.0 66.252347 4.278387e+09 2021-05-13 11:09:00 64635000.0 64786000.0 64547000.0 64770000.0 57.174851 3.695290e+09 2021-05-13 11:12:00 64750000.0 64993000.0 64700000.0 64993000.0 91.189028 5.915082e+09 2021-05-13 11:15:00 64992000.0 65300000.0 64975000.0 65300000.0 134.847742 8.781921e+09 2021-05-13 11:18:00 65286000.0 65472000.0 64952000.0 64955000.0 117.236921 7.657944e+09 2021-05-13 11:21:00 64952000.0 65182000.0 64952000.0 65060000.0 75.174626 4.891415e+09 2021-05-13 11:24:00 65060000.0 65126000.0 64600000.0 64693000.0 86.530141 5.611444e+09 2021-05-13 11:27:00 64669000.0 64959000.0 64634000.0 64833000.0 72.705191 4.712628e+09 2021-05-13 11:30:00 64833000.0 64863000.0 64499000.0 64650000.0 99.585294 6.435314e+09 2021-05-13 11:33:00 64650000.0 64700000.0 64449000.0 64460000.0 70.165524 4.532421e+09

일봉을 구하는 경우 기준 시간을 직접 지정할 수 있다.

get_daily_ohlcv_from_base 함수를 이용하여 base에 원하는 시간을 입력하면 해당 시간을 기준으로 하루가 계산된다.

import pyupbit print(pyupbit.get_daily_ohlcv_from_base(ticker=”KRW-BTC”, base=13)) # OUTPUT open high low close volume 2021-05-04 13:00:00 68030000.0 69751000.0 66500000.0 69124000.0 4311.390464 2021-05-05 13:00:00 69023000.0 70007000.0 67040000.0 68935000.0 11311.231399 2021-05-06 13:00:00 68910000.0 70853000.0 67186000.0 67498000.0 13871.493052 2021-05-07 13:00:00 67500000.0 69729000.0 66900000.0 69087000.0 10614.771277 2021-05-08 13:00:00 69100000.0 72500000.0 68993000.0 72402000.0 13002.555370 2021-05-09 13:00:00 72455000.0 73129000.0 70147000.0 72099000.0 11998.092245 2021-05-10 13:00:00 72098000.0 72460000.0 68500000.0 69552000.0 13981.115840 2021-05-11 13:00:00 69552000.0 70499000.0 67597000.0 70144000.0 10589.004324 2021-05-12 13:00:00 70170000.0 70499000.0 60200000.0 64451000.0 26736.304001

5. 암호화폐 호가 조회

암호화폐의 매수/매도 호가 조회는 get_orderbook 함수를 이용한다. 인자로 ticker를 넘겨주고 반환 값으로 딕셔너리로 구성된 리스트를 반환한다.

import pyupbit print(pyupbit.get_orderbook(tickers=”KRW-BTC”)) # OUTPUT [{‘market’: ‘KRW-BTC’, ‘timestamp’: 1620874024245, ‘total_ask_size’: 3.97732861, ‘total_bid_size’: 2.52959603, ‘orderbook_units’: [{‘ask_price’: 64090000.0, ‘bid_price’: 64070000.0, ‘ask_size’: 0.00031294, ‘bid_size’: 0.06156758}, {‘ask_price’: 64093000.0, ‘bid_price’: 64066000.0, ‘ask_size’: 0.26436965, ‘bid_size’: 0.08471909}, {‘ask_price’: 64094000.0, ‘bid_price’: 64065000.0, ‘ask_size’: 0.06992863, ‘bid_size’: 0.16189154}, {‘ask_price’: 64096000.0, ‘bid_price’: 64064000.0, ‘ask_size’: 1.98105323, ‘bid_size’: 0.00156093}, {‘ask_price’: 64098000.0, ‘bid_price’: 64061000.0, ‘ask_size’: 0.00204766, ‘bid_size’: 0.06961012}, {‘ask_price’: 64107000.0, ‘bid_price’: 64060000.0, ‘ask_size’: 0.06455513, ‘bid_size’: 1.16250019}, {‘ask_price’: 64125000.0, ‘bid_price’: 64029000.0, ‘ask_size’: 0.00549689, ‘bid_size’: 0.02838788}, {‘ask_price’: 64127000.0, ‘bid_price’: 64028000.0, ‘ask_size’: 0.01021383, ‘bid_size’: 0.03123633}, {‘ask_price’: 64128000.0, ‘bid_price’: 64019000.0, ‘ask_size’: 0.00069257, ‘bid_size’: 0.00447529}, {‘ask_price’: 64135000.0, ‘bid_price’: 64018000.0, ‘ask_size’: 0.19030409, ‘bid_size’: 0.00234309}, {‘ask_price’: 64138000.0, ‘bid_price’: 64005000.0, ‘ask_size’: 0.05639289, ‘bid_size’: 0.23895895}, {‘ask_price’: 64153000.0, ‘bid_price’: 64000000.0, ‘ask_size’: 0.1938, ‘bid_size’: 0.45682086}, {‘ask_price’: 64156000.0, ‘bid_price’: 63998000.0, ‘ask_size’: 1.00601281, ‘bid_size’: 0.06531679}, {‘ask_price’: 64173000.0, ‘bid_price’: 63997000.0, ‘ask_size’: 0.0187122, ‘bid_size’: 0.02281961}, {‘ask_price’: 64174000.0, ‘bid_price’: 63982000.0, ‘ask_size’: 0.11343609, ‘bid_size’: 0.13738778}] }]

market은 ticker이고 timestamp는 조회시간(단위 : ms)이며, total_ask_size, total_bid_size는 총 매도/매수 크기를 말한다.

orderbook_units는 딕셔너리가 원소인 리스트이며, ask_price는 매도 호가, bid_price는 매수 호가, ask_size는 매도 호가 수량, bid_size는 매수 호가 수량을 의미한다.

get_orderbook에 여러 ticker를 넘겨주면 마찬가지로 한 번에 여러 암호화폐에 대한 호가를 구할 수 있다.

6. 잔고 조회

잔고를 조회하기 위해서는 API 사용 신청을 한 후 발급받았던 access key와 secret key가 필요하다. 이를 이용하여 Upbit 클래스의 인스턴스를 생성한 후 , get_balances 함수를 이용하면 잔고를 조회할 수 있다.

import pyupbit access = “User access key” # access key 직접 입력 secret = “User secret key” # secret key 직접 입력 upbit = pyupbit.Upbit(access, secret) print(upbit.get_balances()) # OUTPUT [{‘currency’: ‘KRW’, ‘balance’: ‘0.68421149’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}, {‘currency’: ‘XRP’, ‘balance’: ‘0.00000081’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘1815’, ‘avg_buy_price_modified’: False, ‘unit_currency’: ‘KRW’}]

코인을 하고 남은 흔적들이 있다. 현재 원화가 0.68원 정도, 아주 소량의 리플이 존재하는 것을 볼 수 있다.

avg_buy_price는 평균 매수가를 의미하고, avg_buy_price_modified는 평균 매수가가 수정되었는지 여부를 나타낸다.

get_balances 함수는 실제로 get_balances(contain_req=False) -> Union[tuple, None] 로 이루어져 있다.

만약 get_balances에 True를 넣어주면, API를 호출할 수 있는 빈도의 제한을 추가로 반환한다.

import pyupbit access = “User access key” # access key 직접 입력 secret = “User secret key” # secret key 직접 입력 upbit = pyupbit.Upbit(access, secret) print(upbit.get_balances()) # OUTPUT ([{‘currency’: ‘KRW’, ‘balance’: ‘0.68421149’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘0’, ‘avg_buy_price_modified’: True, ‘unit_currency’: ‘KRW’}, {‘currency’: ‘XRP’, ‘balance’: ‘0.00000081’, ‘locked’: ‘0.0’, ‘avg_buy_price’: ‘1815’, ‘avg_buy_price_modified’: False, ‘unit_currency’: ‘KRW’}], {‘group’: ‘default’, ‘min’: 899, ‘sec’: 29})

API는 각 API마다 어떤 그룹에 속해있고, 그룹 단위로 호출을 제한한다. 위의 결과는 1분에 최대 899번, 1초당 최대 29번 API를 호출할 수 있다는 의미이다.

만약 특정 암호화폐나 원화에 대해서 잔고가 궁금하다면 get_balance 함수를 이용하면 된다.

인자로는 ticker를 넘겨준다.

import pyupbit access = “User access key” # access key 직접 입력 secret = “User secret key” # secret key 직접 입력 upbit = pyupbit.Upbit(access, secret) print(upbit.get_balance(“KRW”)) # OUTPUT 0.68421149 print(upbit.get_balance(“KRW-XRP”)) # OUTPUT 8.1e-07

7. 지정가 매수/매도

지정한 가격에 매도할 때에는 sell_limit_order 함수를, 매수할 때에는 buy_limit_order 함수를 이용한다.

마찬가지로 Upbit 클래스의 인스턴스를 통해서 호출할 수 있다.

import pyupbit access = “User access key” # access key 직접 입력 secret = “User secret key” # secret key 직접 입력 upbit = pyupbit.Upbit(access, secret) ret = upbit.buy_limit_order(“KRW-BTC”, 1000000, 1) print(ret) # OUTPUT {‘error’: {‘message’: ‘주문가능한 금액(KRW)이 부족합니다.’, ‘name’: ‘insufficient_funds_bid’}}

buy_limit_order 함수와 sell_limit_order 함수는 다음과 같이 구성되어 있다.

buy_limit_order(ticker, price, volume, contain_req=False) -> Union[tuple, None]

sell_limit_order(ticker, price, volume, contain_req=False) -> Union[tuple, None]

price에 원하는 매수/매도가를 넣고, volume에 매수/매도할 코인 개수를 넣는다.

위의 예시에서는 비트코인을 100만 원에 1개 매수하려는 코드이다. 잔고가 부족하여 에러가 발생했지만, 만약 잔고가 충분하다면 주문 정보는 딕셔너리로 반환된다. (아래 참고)

예시) https://github.com/sharebook-kr/pyupbit

주문을 통해 반환된 딕셔너리에서 uuid는 주문에 대한 고윳값으로, 이 값을 이용해서 주문을 다시 취소하거나 정정할 수 있다.

8. 시장가 매수/매도

시장가 매수/매도는 최우선 매도/매수 호가에 즉시 주문한다.

시장가 매수는 buy_market_order 함수, 매도는 sell_market_order 함수를 이용한다.

buy_market_order(ticker, price, contain_req=False) -> Union[tuple, None]

sell_market_order(ticker, volume, contain_req=False) -> Union[tuple, None]

시장가 매수를 하는 경우에는 매수할 금액을 넣어준다. 수수료가 제외된 금액이므로 만약 10000원 치를 구매한다면 실제로는 수수료 0.05%가 추가된 10005원의 현금을 가지고 있어야 한다.

시장가 매도를 하는 경우에는 매도할 개수를 넣어준다.

9. 주문 취소/조회

주문 취소를 하기 위해선 이전에 주문했던 정보의 uuid를 이용한다. 이 uuid 값을 cancel_order라는 함수의 인자로 넘겨주면 주문이 취소된다.

cancel_order(uuid, contain_req=False) -> Union[tuple, None]

주문을 조회하기 위해서는 get_order함수를 이용한다. 형태는 아래와 같다.

get_order(ticker_or_uuid, state=’wait’, kind=’normal’, contain_req=False) -> Union[tuple, None]

import pyupbit access = “User access key” # access key 직접 입력 secret = “User secret key” # secret key 직접 입력 upbit = pyupbit.Upbit(access, secret) ret = upbit.get_order(“KRW-BTC”) # 미체결 주문 ret = upbit.get_order(“KRW-BTC”, state=”done”) # 완료된 주문 ret = upbit.get_order(“UUID”) # 특정 주문 상세 조회

기본적으로 get_order 함수에 ticker만 넣으면 해당 암호화폐에 대한 미체결 주문 정보가 반환된다.

만약 state를 done으로 할당해주면 완료된 주문 정보가 반환된다.

get_order 함수에 주문의 고유 정보인 uuid를 넣으면 해당 주문에 대한 상세 정보가 반환된다. 이 경우 다른 매개변수들은 무시된다.

728×90

반응형

업 비트 파이썬 | 누구나 할 수 있는 비트코인 투자 자동화 강의 시작합니다 인기 답변 업데이트

당신은 주제를 찾고 있습니까 “업 비트 파이썬 – 누구나 할 수 있는 비트코인 투자 자동화 강의 시작합니다“? 다음 카테고리의 웹사이트 ppa.maxfit.vn 에서 귀하의 모든 질문에 답변해 드립니다: https://ppa.maxfit.vn/blog. 바로 아래에서 답을 찾을 수 있습니다. 작성자 조코딩 JoCoding 이(가) 작성한 기사에는 조회수 226,867회 및 좋아요 3,127개 개의 좋아요가 있습니다.

여기에서 이 주제에 대한 비디오를 시청하십시오. 주의 깊게 살펴보고 읽고 있는 내용에 대한 피드백을 제공하세요!

누구나 할 수 있는 파이썬 비트코인 투자 자동화 강의를 시작합니다. 파이썬을 활용하여 비트코인 투자 전략을 구현해보고 업비트 거래소 API를 통해 가격이 기술적 전략에 부합할 때 자동으로 매매하는 프로그램을 개발합니다.

백테스팅을 통해서 비트코인을 그냥 들고 있을 때와 변동성 돌파전략으로 자동매매를 했을때의 수익률의 차이도 계산해봅니다.

MDD가 낮은 것도 있지만 직접 해보니 확실히 떨어지기만 하는 날에는 매수를 진행하지 않으니 마음이 편한 느낌이 들더라고요!

조코딩 멤버십 가입(멤버십 전용 강의 월별 무제한 수강)

▶https://www.youtube.com/channel/UCQNE2JmbasNYbjGAcuBiRRg/join

조코딩 채널 강사 지원

▶https://forms.gle/LsbgU8xFL9gtzRSt6

디스코드 조코딩의 코딩 커뮤니티

▶https://discord.gg/zny87VeSaX

본 강의는 파이썬의 기본은 어느 정도 아신다는 전제하에 진행할 예정이니 파이썬을 전혀 모르시는 분들은 아래 재생목록으로 먼저 간단히 학습해보시는 것을 추천드립니다. 물론 파이썬을 잘 몰라도 따라오실 수 있도록 쉽게 풀어볼게요! 기본적인 투자 용어를 잘 모르시는 분들은 이전에 진행하였던 파이썬 주식 투자 자동화 강의도 참고하시면 도움이 되실 겁니다.

파이썬 기초 강의 재생목록 :

파이썬 비트코인 투자 자동화 강의 재생목록 :

파이썬 주식 투자 자동화 강의 재생목록 :

앞으로 강의 많은 기대 부탁드립니다! 🙂

#파이썬 #비트코인 #자동매매

목차

00:00 미리보기

00:05 인트로

00:24 강의 목차 소개

00:37 주식 투자와 비트코인 투자의 차이

00:53 시장 운영 시간의 차이

01:06 증권사와 거래소 차이

01:12 API의 차이

01:40 비트코인 투자 자동화 소개

01:45 거래소 수수료 비교

02:04 비트코인 투자 자동화의 큰 그림

02:32 투자 전략 소개 – 변동성 돌파 전략

03:28 백테스팅 결과 \u0026 실제 투자 후기 공유

03:37 최근 200일간 비트코인 가격 변화

04:05 가만히 들고있지 못하는 이유 – MDD

04:52 변동성 돌파 전략을 사용하면 하락을 피하는 이유

05:42 200일 백테스팅 결과

06:08 불안한 존버 vs 안정적인 자동투자

07:07 실제 투자 후기

08:00 경고

08:18 참고 문헌

08:33 구독, 좋아요, 알림설정 부탁드립니다

08:44 아웃트로

참고 문헌

위키 북스 : https://wikidocs.net/book/1665

파이썬을 이용한 비트코인 자동매매 : https://coupa.ng/bU54lR

(위 링크는 쿠팡 파트너스 링크로 일정액의 수수료를 지급 받을 수 있습니다.)

업비트 API를 파이썬에서 쉽게 사용하기 위해 저자들이 개발한 pyupbit모듈을 설치해 보겠습니다. 아나콘다 명령 프롬프트를 실행한 후 (시작 → Anaconda3 …

+ 더 읽기

Source: wikidocs.net

Date Published: 12/7/2021

View: 4458

바로 시작하겠습니다. 오늘은,. 파이썬으로 업비트 Open API에 연결하여 잔고 조회하는 방법. 을 알아보고 실행해보겠습니다. 저만 따라 오시면 함께 …

+ 여기에 더 보기

Source: lapina.tistory.com

Date Published: 6/29/2021

View: 9244

get_tickers 함수는 업비트가 지원하는 모든 암호화폐 목록을 얻어옵니다. print(pyupbit.get_tickers()).

+ 여기에 더 보기

Source: github.com

Date Published: 10/18/2021

View: 7858

이전에 한 종목에 대하여 파이썬과 업비트 API를 이용한 자동매매 예제를 github에 올렸습니다. 신기하게도 제 프로그램을 관심있게 보던 분이 계셨습니다.

+ 여기를 클릭

Source: www.steemcoinpan.com

Date Published: 5/10/2022

View: 4654

pyupbit 모듈이란, 업비트 API를 파이썬에서 쉽게 사용하기 위해서 저자들이 개발한 모듈이다. 굳이 API를 호출할 필요 없이 pyupbit 모듈을 이용하면 …

+ 여기에 자세히 보기

Source: rebro.kr

Date Published: 10/3/2021

View: 6531

업비트 API 신청하기 https://upbit.com/service_center/open_api_gue 들어가서 계좌만들고 OpenAPI 신청을 하면된다. 나는 그냥 다 체크했다.

+ 여기에 표시

Source: duckracoon.tistory.com

Date Published: 6/13/2021

View: 1916

가상화폐 자동 매매 프로그램| 업비트 자동매매 프로그램 구성 참고용으로 작성하는 글입니다. 여기서 작성 하는 매매 알고리즘은 그대로 하지 따라 …

+ 여기에 보기

Source: inspireman.tistory.com

Date Published: 12/17/2022

View: 9323

가장 빠르게 익히는 방법은 직접 해보는 것입니다. 이번 포스팅을 통해 REST API를 활용하여 비트코인 가격을 추출하는 파이썬 프로그래밍 방법을 …

+ 자세한 내용은 여기를 클릭하십시오

Source: coffee4m.com

Date Published: 10/8/2022

View: 7909

업비트에서 코인 정보를 받아오는 방법은 크게 두 가지 방법으로 나눌 수 있는데요. 먼저 API를 개별 호출하여 정보를 얻어오는 방법과 웹소켓을 이용 …

+ 여기에 더 보기

Source: technfin.tistory.com

Date Published: 2/15/2022

View: 3425

안녕하세요. 오늘은 비트코인 투자 자동화에 필요한 거래소 API를 발급 받고 자산을 조회하는 방법에 대해 알아보겠습니다. 업비트 가입 및 Open API …

+ 여기에 보기

Source: codingspooning.tistory.com

Date Published: 12/22/2022

View: 5650

주제와 관련된 더 많은 사진을 참조하십시오 누구나 할 수 있는 비트코인 투자 자동화 강의 시작합니다. 댓글에서 더 많은 관련 이미지를 보거나 필요한 경우 더 많은 관련 기사를 볼 수 있습니다.

import time import pyupbit from collections import deque #주문은 초당 8회, 분당 200회 / 주문 외 요청은 초당 30회, 분당 900회 사용 가능합니다. # 업비트 access key, secret key 변수 upbit_access = “your_access_key” upbit_secret = “your_secret_key” # 코인 리스트 tickers = [] # 코인 종가 담을 deque 변수 ma20 = deque(maxlen=20) ma60 = deque(maxlen=60) ma120 = deque(maxlen=120) # 원화로 매매 가능한 코인 리스트 만들기 tickers = pyupbit.get_tickers(fiat=”KRW”) # login upbit = pyupbit.Upbit(upbit_access, upbit_secret) # 잔고 조회 krw def get_balance_krw(): balance = upbit.get_balance(“KRW”) return balance # 잔고 조회 coin def get_balance_wallet(ticker): balances = upbit.get_balances() for b in balances: if b[‘currency’] == ticker[4:]: balance = b[‘balance’] avg_buy_price = b[‘avg_buy_price’] return float(avg_buy_price), float(balance) else: return int(0), int(0) # 코인 심볼 하나씩 받아와서 이동평균선 구한 후 매수 조건 탐색 def get_ticker_ma(ticker): ”’get_ohlcv 함수는 고가/시가/저가/종가/거래량을 DataFrame으로 반환합니다”’ df = pyupbit.get_ohlcv(ticker, interval=’day’) # 일봉 데이터 프레임 생성 ma20.extend(df[‘close’]) # ma20 변수에 종가 넣기 ma60.extend(df[‘close’]) # ma60 변수에 종가 넣기 ma120.extend(df[‘close’]) # ma120 변수에 종가 넣기 curr_ma20 = sum(ma20) / len(ma20) # ma20값 더해서 나누기 = 20일선 이동평균 curr_ma60 = sum(ma60) / len(ma60) # ma60값 더해서 나누기 = 60일선 이동평균 curr_ma120 = sum(ma120) / len(ma120) # ma20값 더해서 나누기 = 120일선 이동평균 now_price = pyupbit.get_current_price(ticker) # 코인의 현재가 open_price = df[‘open’][-1] # 당일 시가 구하기 buy_target_price = open_price + (open_price * 0.02) # 목표가 = 당일 시가 보다 2프로 이상 상승 금액 coin_check = get_balance_wallet(ticker) # 코인 보유 하고 있는지 체크 avg_price = coin_check[0] # 매수 평균가 balance = coin_check[1] # 코인 보유 개수 # 매수 평균가가 int 이면 매수 조건 체크 float이면 매도 조건 체크 if avg_price == int: # 이동 평균선 정배열 / 목표가보다 현재가 보다 높을 경우 매수 if curr_ma20 <= curr_ma60 and curr_ma60 <= curr_ma120 and buy_target_price <= now_price: # 50만원치 매수 volume = round(500000 / now_price * 0.995, 4) buy_order(ticker, volume) else: print('시세 감시 중') pass else: # 현재 보유 코인 수익률 계산 buy_profit = ((now_price - avg_price) / avg_price) * 100 profit = round(buy_profit, 2) # 평균 매수가 보다 2% 상승 시 매도 if profit >= 2.0: print(f”{ticker} : 목표가 도달 후 전량 매도”) sell_order(ticker, balance) time.sleep(3) else: print(f”코인명: {ticker}, 수익률: {profit}%” ) # 매수 주문 def buy_order(ticker, volume): try: while True: buy_result = upbit.buy_market_order(ticker, volume) if buy_result == None or ‘error’ in buy_result: print(“매수 재 주문”) time.sleep(1) else: return buy_result except: print(“매수 주문 이상”) # 매도 주문 def sell_order(ticker, volume): try: while True: sell_result = upbit.sell_market_order(ticker, volume) if sell_result == None or ‘error’ in sell_result: print(f”{sell_result}, 매도 재 주문”) time.sleep(1) else: return sell_result except: print(“매도 주문 이상”) # 코인 리스트에서 이동 평균선 함수로 하나씩 꺼내서 보내기 while True: try: for tk in tickers: get_ticker_ma(tk) time.sleep(2) except: print(‘오류 발생 무시’) pass

반응형

업비트에서 코인 정보를 받아오는 방법은 크게 두 가지 방법으로 나눌 수 있는데요. 먼저 API를 개별 호출하여 정보를 얻어오는 방법과 웹소켓을 이용해서 실시간 데이터를 구독하는 방법이 있습니다.

지금까지 Tech&Fin에서 다루었던 프로그램들에서는 첫 번째 방법인 API를 개별 호출하는 방법을 사용했었는데요.

이번 시간에는 API 개별 호출 방법과 웹소켓을 이용하는 방법은 어떤점이 다른지 살펴보고 웹소켓을 이용해서 실시간 체결 데이터를 받아오는 방법에 대해서 알아 보도록 하겠습니다.

목차 – 클릭하면 이동합니다.

웹소켓에 대한 생각

API 개별 호출과 웹소켓 구독의 차이점

기술적인 이야기는 뒤로하고 API를 개별 호출하는 것과 웹소켓을 구독하는 것은 목적의 차이가 크다고 생각합니다.

API를 개별 호출하는 방법에서는 액세스키가 필요하지 않은 공개 데이터는 물론 계좌 잔고 정보 등의 액세스키가 필요한 개인 데이터까지 모두 다룰 수 있습니다. 같은 맥락으로 주문/주문취소 등의 행위도 가능하게 됩니다.

하지만 웹소켓을 이용한 방법은 ①현재가 ②체결 ③호가, 이렇게 총 3가지 공개 데이터만 구독하여 받아볼 수 있습니다.

API를 개별 호출하는 방법은 원하는 순간에 프로그램에서 API를 호출해야 하지만 웹소켓 방식은 구독 데이터를 설정하여 한번 접속하면 실시간으로 데이터가 계속해서 수신 됩니다.

그래서 목적에 따라 API와 웹소켓을 혼용해서 사용하는 것이 좋습니다. 자세한 업비트 웹소켓 관련 내용은 아래 링크를 통해 업비트 공식 API 홈페이지에서 확인하실 수 있습니다.

https://docs.upbit.com/docs/upbit-quotation-websocket

웹소켓을 활용할 수 있는 방법

웹소켓을 실행하여 체결 데이터를 받아보면 데이터가 물 밀듯이 들어오게 되는 것을 보게 됩니다. 데이터가 너무 많아서 이 데이터로 무얼 해야 할 지 고민이 될 텐데요.

웹소켓을 이용해 받는 데이터는 양이 방대하기 때문에 실시간으로 메모리를 이용해 적재하고 사용하는 방법은 조금 어려울 수 있습니다. 저의 경우에는 아래와 같은 경우에 웹소켓을 활용하고 있습니다.

① 데이터 적재 후 사용 : 데이터 베이스에 웹소켓에서 받는 데이터를 모두 저장하면 쿼리를 이용해 여러가지 업비트 API에서 제공하지 않는 로직을 구현할 수 있습니다. 예를 들어 API에서는 종목별로만 조회할 수 있는 부분도 쿼리를 이용하면 전체 종목을 한번에 조회하여 시간을 줄일 수 있습니다.

또한 데이터를 적재하면 분석이 가능하게 됩니다. 예를 들면 세력이 어떻게 오전 9시에 호가창을 조절하면서 단기 펌핑을 주는지 등의 분석을 할 수 있고 매집봉으로 보이는 장대 양봉의 실제 매수/매도 비율이 얼마나 되는지 등의 데이터도 분석할 수 있게 됩니다.

② 단기 펌핑 종목을 빠르게 추출 : 전일 대비 상승률을 이용해 단기 펌핑하는 종목을 빠르게 잡아낼 수 있습니다. 예를들어 오전 9시에 10프로 이상 오른 종목을 아주 간편하게 골라낼 수 있게 됩니다.

업비트 웹소켓 사용방법

웹소켓 프로그램(websocket.py)

import os import sys import time import json import datetime import asyncio import logging import traceback import websockets # 실행 환경에 따른 공통 모듈 Import sys.path.append(os.path.dirname(os.path.dirname(__file__))) from module import upbit # 프로그램 정보 pgm_name = ‘websocket’ pgm_name_kr = ‘업비트 Ticker 웹소켓’ # —————————————————————————– # – Name : get_subscribe_items # – Desc : 구독 대상 종목 조회 # —————————————————————————– def get_subscribe_items(): try: subscribe_items = [] # KRW 마켓 전 종목 추출 items = upbit.get_items(‘KRW’, ”) # 종목코드 배열로 변환 for item in items: subscribe_items.append(item[‘market’]) return subscribe_items # ————————————— # Exception 처리 # —————————————- except Exception: raise # —————————————————————————– # – Name : upbit_ws_client # – Desc : 업비트 웹소켓 # —————————————————————————– async def upbit_ws_client(): try: # 중복 실행 방지용 seconds = 0 # 구독 데이터 조회 subscribe_items = get_subscribe_items() logging.info(‘구독 종목 개수 : ‘ + str(len(subscribe_items))) logging.info(‘구독 종목 : ‘ + str(subscribe_items)) # 구독 데이터 조립 subscribe_fmt = [ {“ticket”: “test-websocket”}, { “type”: “ticker”, “codes”: subscribe_items, “isOnlyRealtime”: True }, {“format”: “SIMPLE”} ] subscribe_data = json.dumps(subscribe_fmt) async with websockets.connect(upbit.ws_url) as websocket: await websocket.send(subscribe_data) while True: period = datetime.datetime.now() data = await websocket.recv() data = json.loads(data) logging.info(data) # 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작 if (period.second % 5) == 0 and seconds != period.second: # 중복 실행 방지 seconds = period.second # 종목 재조회 re_subscribe_items = get_subscribe_items() logging.info(‘

‘) logging.info(‘*************************************************’) logging.info(‘기존 종목[‘ + str(len(subscribe_items)) + ‘] : ‘ + str(subscribe_items)) logging.info(‘종목 재조회[‘ + str(len(re_subscribe_items)) + ‘] : ‘ + str(re_subscribe_items)) logging.info(‘*************************************************’) logging.info(‘

‘) # 현재 종목과 다르면 웹소켓 다시 시작 if subscribe_items != re_subscribe_items: logging.info(‘종목 달리짐! 웹소켓 다시 시작’) await websocket.close() time.sleep(1) await upbit_ws_client() # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception as e: logging.error(‘Exception Raised!’) logging.error(e) logging.error(‘Connect Again!’) # 웹소켓 다시 시작 await upbit_ws_client() # —————————————————————————– # – Name : main # – Desc : 메인 # —————————————————————————– async def main(): try: # 웹소켓 시작 await upbit_ws_client() except Exception as e: logging.error(‘Exception Raised!’) logging.error(e) # —————————————————————————– # – Name : main # – Desc : 메인 # —————————————————————————– if __name__ == “__main__”: # noinspection PyBroadException try: print(“***** USAGE ******”) print(“[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)”) if sys.platform.startswith(‘win32’): # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO) log_level = ‘I’ upbit.set_loglevel(log_level) else: # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO) log_level = sys.argv[1].upper() upbit.set_loglevel(log_level) if log_level == ”: logging.error(“입력값 오류!”) sys.exit(-1) logging.info(“***** INPUT ******”) logging.info(“[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO):” + str(log_level)) # ——————————————————————— # Logic Start! # ——————————————————————— # 웹소켓 시작 asyncio.run(main()) except KeyboardInterrupt: logging.error(“KeyboardInterrupt Exception 발생!”) logging.error(traceback.format_exc()) sys.exit(-100) except Exception: logging.error(“Exception 발생!”) logging.error(traceback.format_exc()) sys.exit(-200)

import asyncio import websockets

대표적으로 asyncio 비동기 호출 모듈과 websockets 모듈이 필요합니다. 모듈 설치 방법은 아래 포스팅을 참고하시면 됩니다.

2022.01.13 – [코딩스토리/리눅스] – 리눅스 서버에 파이썬 3.9 설치하기

# —————————————————————————– # – Name : get_subscribe_items # – Desc : 구독 대상 종목 조회 # —————————————————————————– def get_subscribe_items(): try: subscribe_items = [] # KRW 마켓 전 종목 추출 items = upbit.get_items(‘KRW’, ”) # 종목코드 배열로 변환 for item in items: subscribe_items.append(item[‘market’]) return subscribe_items # ————————————— # Exception 처리 # —————————————- except Exception: raise

구독 대상 즉, 어떤 종목들에 대해서 데이터를 받을지를 결정합니다. 위의 예제에서는 업비트에서 거래되는 KRW 마켓 종목 모두를 가져옵니다. 원하는 종목만 설정하고 싶으면 이 부분을 수정하면 됩니다.

# —————————————————————————– # – Name : upbit_ws_client # – Desc : 업비트 웹소켓 # —————————————————————————– async def upbit_ws_client(): try: # 중복 실행 방지용 seconds = 0 # 구독 데이터 조회 subscribe_items = get_subscribe_items() logging.info(‘구독 종목 개수 : ‘ + str(len(subscribe_items))) logging.info(‘구독 종목 : ‘ + str(subscribe_items)) # 구독 데이터 조립 subscribe_fmt = [ {“ticket”: “test-websocket”}, { “type”: “ticker”, “codes”: subscribe_items, “isOnlyRealtime”: True }, {“format”: “SIMPLE”} ] subscribe_data = json.dumps(subscribe_fmt) async with websockets.connect(upbit.ws_url) as websocket: await websocket.send(subscribe_data) while True: period = datetime.datetime.now() data = await websocket.recv() data = json.loads(data) logging.info(data) # 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작 if (period.second % 5) == 0 and seconds != period.second: # 중복 실행 방지 seconds = period.second # 종목 재조회 re_subscribe_items = get_subscribe_items() logging.info(‘

‘) logging.info(‘*************************************************’) logging.info(‘기존 종목[‘ + str(len(subscribe_items)) + ‘] : ‘ + str(subscribe_items)) logging.info(‘종목 재조회[‘ + str(len(re_subscribe_items)) + ‘] : ‘ + str(re_subscribe_items)) logging.info(‘*************************************************’) logging.info(‘

‘) # 현재 종목과 다르면 웹소켓 다시 시작 if subscribe_items != re_subscribe_items: logging.info(‘종목 달리짐! 웹소켓 다시 시작’) await websocket.close() time.sleep(1) await upbit_ws_client() # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception as e: logging.error(‘Exception Raised!’) logging.error(e) logging.error(‘Connect Again!’) # 웹소켓 다시 시작 await upbit_ws_client()

구독 데이터를 조회하여 웹소켓에 접속하고 데이터를 수신합니다. 한번 웹소켓에 구독을 시작하면 계속해서 실시간으로 데이터를 받게 되는데 그렇게 되면 신규로 상장하는 종목등이 생기면 해당 데이터는 받을 수 없습니다.

위의 로직에서는 5초마다 업비트 종목을 다시 조회하여 현재 구독하고 있는 종목과 변동이 있으면 웹소켓을 다시 시작하여 신규 종목이 추가된 새로운 종목들로 구독하도록 구현하였습니다. (테스트가 많이 안 되어서 오류가 발생할 수도 있습니다. 댓글로 알려주세요.)

공통코드(upbit.py)

반응형

import time import logging import requests import jwt import uuid import hashlib import math import os import pandas as pd import numpy from urllib.parse import urlencode from decimal import Decimal from datetime import datetime # Keys access_key = ‘업비트에서 발급받은 Access Key’ secret_key = ‘업비트에서 발급받은 Secret Key’ server_url = ‘https://api.upbit.com’ ws_url = ‘wss://api.upbit.com/websocket/v1’ line_target_url = ‘https://notify-api.line.me/api/notify’ line_token = ‘라인 메신저에서 발급받은 Token’ # 상수 설정 min_order_amt = 5000 # —————————————————————————– # – Name : set_loglevel # – Desc : 로그레벨 설정 # – Input # 1) level : 로그레벨 # 1. D(d) : DEBUG # 2. E(e) : ERROR # 3. 그외(기본) : INFO # – Output # —————————————————————————– def set_loglevel(level): try: # ——————————————————————— # 로그레벨 : DEBUG # ——————————————————————— if level.upper() == “D”: logging.basicConfig( format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s’, datefmt=’%Y/%m/%d %I:%M:%S %p’, level=logging.DEBUG ) # ——————————————————————— # 로그레벨 : ERROR # ——————————————————————— elif level.upper() == “E”: logging.basicConfig( format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s’, datefmt=’%Y/%m/%d %I:%M:%S %p’, level=logging.ERROR ) # ——————————————————————— # 로그레벨 : INFO # ——————————————————————— else: # —————————————————————————– # 로깅 설정 # 로그레벨(DEBUG, INFO, WARNING, ERROR, CRITICAL) # —————————————————————————– logging.basicConfig( format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s’, datefmt=’%Y/%m/%d %I:%M:%S %p’, level=logging.INFO ) # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : send_request # – Desc : 리퀘스트 처리 # – Input # 1) reqType : 요청 타입 # 2) reqUrl : 요청 URL # 3) reqParam : 요청 파라메타 # 4) reqHeader : 요청 헤더 # – Output # 4) reponse : 응답 데이터 # —————————————————————————– def send_request(reqType, reqUrl, reqParam, reqHeader): try: # 요청 가능회수 확보를 위해 기다리는 시간(초) err_sleep_time = 0.3 # 요청에 대한 응답을 받을 때까지 반복 수행 while True: # 요청 처리 response = requests.request(reqType, reqUrl, params=reqParam, headers=reqHeader) # 요청 가능회수 추출 if ‘Remaining-Req’ in response.headers: hearder_info = response.headers[‘Remaining-Req’] start_idx = hearder_info.find(“sec=”) end_idx = len(hearder_info) remain_sec = hearder_info[int(start_idx):int(end_idx)].replace(‘sec=’, ”) else: logging.error(“헤더 정보 이상”) logging.error(response.headers) break # 요청 가능회수가 3개 미만이면 요청 가능회수 확보를 위해 일정시간 대기 if int(remain_sec) < 3: logging.debug("요청 가능회수 한도 도달! 남은횟수:" + str(remain_sec)) time.sleep(err_sleep_time) # 정상 응답 if response.status_code == 200 or response.status_code == 201: break # 요청 가능회수 초과인 경우 elif response.status_code == 429: logging.error("요청 가능회수 초과!:" + str(response.status_code)) time.sleep(err_sleep_time) # 그 외 오류 else: logging.error("기타 에러:" + str(response.status_code)) logging.error(response.status_code) logging.error(response) break # 요청 가능회수 초과 에러 발생시에는 다시 요청 logging.info("[restRequest] 요청 재처리중...") return response # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_items # - Desc : 전체 종목 리스트 조회 # - Input # 1) market : 대상 마켓(콤마 구분자:KRW,BTC,USDT) # 2) except_item : 제외 종목(콤마 구분자:BTC,ETH) # - Output # 1) 전체 리스트 : 리스트 # ----------------------------------------------------------------------------- def get_items(market, except_item): try: # 조회결과 리턴용 rtn_list = [] # 마켓 데이터 markets = market.split(',') # 제외 데이터 except_items = except_item.split(',') url = "https://api.upbit.com/v1/market/all" querystring = {"isDetails": "false"} response = send_request("GET", url, querystring, "") data = response.json() # 조회 마켓만 추출 for data_for in data: for market_for in markets: if data_for['market'].split('-')[0] == market_for: rtn_list.append(data_for) # 제외 종목 제거 for rtnlist_for in rtn_list[:]: for exceptItemFor in except_items: for marketFor in markets: if rtnlist_for['market'] == marketFor + '-' + exceptItemFor: rtn_list.remove(rtnlist_for) return rtn_list # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : buycoin_mp # - Desc : 시장가 매수 # - Input # 1) target_item : 대상종목 # 2) buy_amount : 매수금액 # - Output # 1) rtn_data : 매수결과 # ----------------------------------------------------------------------------- def buycoin_mp(target_item, buy_amount): try: query = { 'market': target_item, 'side': 'bid', 'price': buy_amount, 'ord_type': 'price', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("시장가 매수 요청 완료! 결과:") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : buycoin_tg # - Desc : 지정가 매수 # - Input # 1) target_item : 대상종목 # 2) buy_amount : 매수금액 # 3) buy_price : 매수가격 # - Output # 1) rtn_data : 매수요청결과 # ----------------------------------------------------------------------------- def buycoin_tg(target_item, buy_amount, buy_price): try: # 매수수량 계산 vol = Decimal(str(buy_amount)) / Decimal(str(buy_price)) query = { 'market': target_item, 'side': 'bid', 'volume': vol, 'price': buy_price, 'ord_type': 'limit', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("지정가 매수요청 완료!") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : sellcoin_mp # - Desc : 시장가 매도 # - Input # 1) target_item : 대상종목 # 2) cancel_yn : 기존 주문 취소 여부 # - Output # 1) rtn_data : 매도결과 # ----------------------------------------------------------------------------- # 시장가 매도 def sellcoin_mp(target_item, cancel_yn): try: if cancel_yn == 'Y': # 기존 주문이 있으면 취소 cancel_order(target_item, "SELL") # 잔고 조회 cur_balance = get_balance(target_item) query = { 'market': target_item, 'side': 'ask', 'volume': cur_balance, 'ord_type': 'market', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("시장가 매도 요청 완료! 결과:") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : sellcoin_tg # - Desc : 지정가 매도 # - Input # 1) target_item : 대상종목 # 2) sell_price : 매도희망금액 # - Output # 1) rtn_data : 매도결과 # ----------------------------------------------------------------------------- def sellcoin_tg(target_item, sell_price): try: # 잔고 조회 cur_balance = get_balance(target_item) query = { 'market': target_item, 'side': 'ask', 'volume': cur_balance, 'price': sell_price, 'ord_type': 'limit', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("지정가 매도 설정 완료!") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_balance # - Desc : 주문가능 잔고 조회 # - Input # 1) target_item : 대상 종목 # - Output # 2) rtn_balance : 주문가능 잔고 # ----------------------------------------------------------------------------- def get_balance(target_item): try: # 주문가능 잔고 리턴용 rtn_balance = 0 # 최대 재시도 횟수 max_cnt = 0 # 잔고가 조회 될 때까지 반복 while True: # 조회 회수 증가 max_cnt = max_cnt + 1 payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("GET", server_url + "/v1/accounts", "", headers) my_asset = res.json() # 해당 종목에 대한 잔고 조회 # 잔고는 마켓에 상관없이 전체 잔고가 조회됨 for myasset_for in my_asset: if myasset_for['currency'] == target_item.split('-')[1]: rtn_balance = myasset_for['balance'] # 잔고가 0 이상일때까지 반복 if Decimal(str(rtn_balance)) > Decimal(str(0)): break # 최대 100회 수행 if max_cnt > 100: break logging.info(“[주문가능 잔고 리턴용] 요청 재처리중…”) return rtn_balance # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_candle # – Desc : 캔들 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 조회 범위 # – Output # 1) 캔들 정보 배열 # —————————————————————————– def get_candle(target_item, tick_kind, inq_range): try: # —————————————- # Tick 별 호출 URL 설정 # —————————————- # 분붕 if tick_kind == “1” or tick_kind == “3” or tick_kind == “5” or tick_kind == “10” or tick_kind == “15” or tick_kind == “30” or tick_kind == “60” or tick_kind == “240”: target_url = “minutes/” + tick_kind # 일봉 elif tick_kind == “D”: target_url = “days” # 주봉 elif tick_kind == “W”: target_url = “weeks” # 월봉 elif tick_kind == “M”: target_url = “months” # 잘못된 입력 else: raise Exception(“잘못된 틱 종류:” + str(tick_kind)) logging.debug(target_url) # —————————————- # Tick 조회 # —————————————- querystring = {“market”: target_item, “count”: inq_range} res = send_request(“GET”, server_url + “/v1/candles/” + target_url, querystring, “”) candle_data = res.json() logging.debug(candle_data) return candle_data # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_targetprice # – Desc : 호가단위 금액 계산 # – Input # 1) cal_type : H:호가로, R:비율로 # 2) st_price : 기준가격 # 3) chg_val : 변화단위 # – Output # 1) rtn_price : 계산된 금액 # —————————————————————————– def get_targetprice(cal_type, st_price, chg_val): try: # 계산된 가격 rtn_price = st_price # 호가단위로 계산 if cal_type.upper() == “H”: for i in range(0, abs(int(chg_val))): hoga_val = get_hoga(rtn_price) if Decimal(str(chg_val)) > 0: rtn_price = Decimal(str(rtn_price)) + Decimal(str(hoga_val)) elif Decimal(str(chg_val)) < 0: rtn_price = Decimal(str(rtn_price)) - Decimal(str(hoga_val)) else: break # 비율로 계산 elif cal_type.upper() == "R": while True: # 호가단위 추출 hoga_val = get_hoga(st_price) if Decimal(str(chg_val)) > 0: rtn_price = Decimal(str(rtn_price)) + Decimal(str(hoga_val)) elif Decimal(str(chg_val)) < 0: rtn_price = Decimal(str(rtn_price)) - Decimal(str(hoga_val)) else: break if Decimal(str(chg_val)) > 0: if Decimal(str(rtn_price)) >= Decimal(str(st_price)) * ( Decimal(str(1)) + (Decimal(str(chg_val))) / Decimal(str(100))): break elif Decimal(str(chg_val)) < 0: if Decimal(str(rtn_price)) <= Decimal(str(st_price)) * ( Decimal(str(1)) + (Decimal(str(chg_val))) / Decimal(str(100))): break return rtn_price # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_hoga # - Desc : 호가 금액 계산 # - Input # 1) cur_price : 현재가격 # - Output # 1) hoga_val : 호가단위 # ----------------------------------------------------------------------------- def get_hoga(cur_price): try: # 호가 단위 if Decimal(str(cur_price)) < 10: hoga_val = 0.01 elif Decimal(str(cur_price)) < 100: hoga_val = 0.1 elif Decimal(str(cur_price)) < 1000: hoga_val = 1 elif Decimal(str(cur_price)) < 10000: hoga_val = 5 elif Decimal(str(cur_price)) < 100000: hoga_val = 10 elif Decimal(str(cur_price)) < 500000: hoga_val = 50 elif Decimal(str(cur_price)) < 1000000: hoga_val = 100 elif Decimal(str(cur_price)) < 2000000: hoga_val = 500 else: hoga_val = 1000 return hoga_val # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_krwbal # - Desc : KRW 잔고 조회 # - Input # - Output # 1) KRW 잔고 Dictionary # 1. krw_balance : KRW 잔고 # 2. fee : 수수료 # 3. available_krw : 매수가능 KRW잔고(수수료를 고려한 금액) # ----------------------------------------------------------------------------- def get_krwbal(): try: # 잔고 리턴용 rtn_balance = {} # 수수료 0.05%(업비트 기준) fee_rate = 0.05 payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("GET", server_url + "/v1/accounts", "", headers) data = res.json() logging.debug(data) for dataFor in data: if (dataFor['currency']) == "KRW": krw_balance = math.floor(Decimal(str(dataFor['balance']))) # 잔고가 있는 경우만 if Decimal(str(krw_balance)) > Decimal(str(0)): # 수수료 fee = math.ceil(Decimal(str(krw_balance)) * (Decimal(str(fee_rate)) / Decimal(str(100)))) # 매수가능금액 available_krw = math.floor(Decimal(str(krw_balance)) – Decimal(str(fee))) else: # 수수료 fee = 0 # 매수가능금액 available_krw = 0 # 결과 조립 rtn_balance[‘krw_balance’] = krw_balance rtn_balance[‘fee’] = fee rtn_balance[‘available_krw’] = available_krw return rtn_balance # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_accounts # – Desc : 잔고정보 조회 # – Input # 1) except_yn : KRW 및 소액 제외 # 2) market_code : 마켓코드 추가(매도시 필요) # – Output # 1) 잔고 정보 # —————————————————————————– # 계좌 조회 def get_accounts(except_yn, market_code): try: rtn_data = [] # 해당 마켓에 존재하는 종목 리스트만 추출 market_item_list = get_items(market_code, ”) # 소액 제외 기준 min_price = 5000 payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/accounts”, “”, headers) account_data = res.json() for account_data_for in account_data: for market_item_list_for in market_item_list: # 해당 마켓에 있는 종목만 조합 if market_code + ‘-‘ + account_data_for[‘currency’] == market_item_list_for[‘market’]: # KRW 및 소액 제외 if except_yn == “Y” or except_yn == “y”: if account_data_for[‘currency’] != “KRW” and Decimal(str(account_data_for[‘avg_buy_price’])) * ( Decimal(str(account_data_for[‘balance’])) + Decimal( str(account_data_for[‘locked’]))) >= Decimal(str(min_price)): rtn_data.append( {‘market’: market_code + ‘-‘ + account_data_for[‘currency’], ‘balance’: account_data_for[‘balance’], ‘locked’: account_data_for[‘locked’], ‘avg_buy_price’: account_data_for[‘avg_buy_price’], ‘avg_buy_price_modified’: account_data_for[‘avg_buy_price_modified’]}) else: if account_data_for[‘currency’] != “KRW”: rtn_data.append( {‘market’: market_code + ‘-‘ + account_data_for[‘currency’], ‘balance’: account_data_for[‘balance’], ‘locked’: account_data_for[‘locked’], ‘avg_buy_price’: account_data_for[‘avg_buy_price’], ‘avg_buy_price_modified’: account_data_for[‘avg_buy_price_modified’]}) return rtn_data # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : chg_account_to_comma # – Desc : 잔고 종목 리스트를 콤마리스트로 변경 # – Input # 1) account_data : 잔고 데이터 # – Output # 1) 종목 리스트(콤마 구분자) # —————————————————————————– def chg_account_to_comma(account_data): try: rtn_data = “” for account_data_for in account_data: if rtn_data == ”: rtn_data = rtn_data + account_data_for[‘market’] else: rtn_data = rtn_data + ‘,’ + account_data_for[‘market’] return rtn_data # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_ticker # – Desc : 현재가 조회 # – Input # 1) target_itemlist : 대상 종목(콤마 구분자) # – Output # 2) 현재가 데이터 # —————————————————————————– def get_ticker(target_itemlist): try: url = “https://api.upbit.com/v1/ticker” querystring = {“markets”: target_itemlist} response = send_request(“GET”, url, querystring, “”) rtn_data = response.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : cancel_order # – Desc : 미체결 주문 취소 # – Input # 1) target_item : 대상종목 # 2) side : 매수/매도 구분(BUY/bid:매수, SELL/ask:매도, ALL:전체) # – Output # —————————————————————————– def cancel_order(target_item, side): try: # 미체결 주문 조회 order_data = get_order(target_item) # 매수/매도 구분 for order_data_for in order_data: if side == “BUY” or side == “buy”: if order_data_for[‘side’] == “ask”: order_data.remove(order_data_for) elif side == “SELL” or side == “sell”: if order_data_for[‘side’] == “bid”: order_data.remove(order_data_for) # 미체결 주문이 있으면 if len(order_data) > 0: # 미체결 주문내역 전체 취소 for order_data_for in order_data: cancel_order_uuid(order_data_for[‘uuid’]) # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : cancel_order_uuid # – Desc : 미체결 주문 취소 by UUID # – Input # 1) order_uuid : 주문 키 # – Output # 1) 주문 내역 취소 # —————————————————————————– def cancel_order_uuid(order_uuid): try: query = { ‘uuid’: order_uuid, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“DELETE”, server_url + “/v1/order”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order # – Desc : 미체결 주문 조회 # – Input # 1) target_item : 대상종목 # – Output # 1) 미체결 주문 내역 # —————————————————————————– def get_order(target_item): try: query = { ‘market’: target_item, ‘state’: ‘wait’, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order # – Desc : 미체결 주문 조회 # – Input # 1) side : 주문상태 # – Output # 1) 주문 내역 리스트 # —————————————————————————– def get_order_list(side): try: query = { ‘state’: side, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_rsi # – Desc : RSI 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 조회 범위 # – Output # 1) RSI 값 # —————————————————————————– def get_rsi(target_item, tick_kind, inq_range): try: # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) df = pd.DataFrame(candle_data) df = df.reindex(index=df.index[::-1]).reset_index() df[‘close’] = df[“trade_price”] # RSI 계산 def rsi(ohlc: pd.DataFrame, period: int = 14): ohlc[“close”] = ohlc[“close”] delta = ohlc[“close”].diff() up, down = delta.copy(), delta.copy() up[up < 0] = 0 down[down > 0] = 0 _gain = up.ewm(com=(period – 1), min_periods=period).mean() _loss = down.abs().ewm(com=(period – 1), min_periods=period).mean() RS = _gain / _loss return pd.Series(100 – (100 / (1 + RS)), name=”RSI”) rsi = round(rsi(df, 14).iloc[-1], 4) return rsi # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_mfi # – Desc : MFI 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) MFI 값 # —————————————————————————– def get_mfi(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # MFI 데이터 리턴용 mfi_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df[‘typical_price’] = (df[‘trade_price’] + df[‘high_price’] + df[‘low_price’]) / 3 df[‘money_flow’] = df[‘typical_price’] * df[‘candle_acc_trade_volume’] positive_mf = 0 negative_mf = 0 for i in range(0, 14): if df[“typical_price”][i] > df[“typical_price”][i + 1]: positive_mf = positive_mf + df[“money_flow”][i] elif df[“typical_price”][i] < df["typical_price"][i + 1]: negative_mf = negative_mf + df["money_flow"][i] if negative_mf > 0: mfi = 100 – (100 / (1 + (positive_mf / negative_mf))) else: mfi = 100 – (100 / (1 + (positive_mf))) mfi_list.append({“type”: “MFI”, “DT”: dfDt[0], “MFI”: round(mfi, 4)}) return mfi_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_macd # – Desc : MACD 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) MACD 값 # —————————————————————————– def get_macd(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # MACD 데이터 리턴용 macd_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) df = pd.DataFrame(candle_datas[0]) df = df.iloc[::-1] df = df[‘trade_price’] # MACD 계산 exp1 = df.ewm(span=12, adjust=False).mean() exp2 = df.ewm(span=26, adjust=False).mean() macd = exp1 – exp2 exp3 = macd.ewm(span=9, adjust=False).mean() for i in range(0, int(loop_cnt)): macd_list.append( {“type”: “MACD”, “DT”: candle_datas[0][i][‘candle_date_time_kst’], “MACD”: round(macd[i], 4), “SIGNAL”: round(exp3[i], 4), “OCL”: round(macd[i] – exp3[i], 4)}) return macd_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_bb # – Desc : 볼린저밴드 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) 볼린저 밴드 값 # —————————————————————————– def get_bb(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # 볼린저밴드 데이터 리턴용 bb_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df[‘trade_price’].iloc[::-1] # 표준편차(곱) unit = 2 band1 = unit * numpy.std(df[len(df) – 20:len(df)]) bb_center = numpy.mean(df[len(df) – 20:len(df)]) band_high = bb_center + band1 band_low = bb_center – band1 bb_list.append({“type”: “BB”, “DT”: dfDt[0], “BBH”: round(band_high, 4), “BBM”: round(bb_center, 4), “BBL”: round(band_low, 4)}) return bb_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_williams # – Desc : 윌리암스 %R 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) 윌리암스 %R 값 # —————————————————————————– def get_williamsR(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # 윌리암스R 데이터 리턴용 williams_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df.iloc[:14] # 계산식 # %R = (Highest High – Close)/(Highest High – Lowest Low) * -100 hh = numpy.max(df[‘high_price’]) ll = numpy.min(df[‘low_price’]) cp = df[‘trade_price’][0] w = (hh – cp) / (hh – ll) * -100 williams_list.append( {“type”: “WILLIAMS”, “DT”: dfDt[0], “HH”: round(hh, 4), “LL”: round(ll, 4), “CP”: round(cp, 4), “W”: round(w, 4)}) return williams_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_rsi # – Desc : RSI 조회 # – Input # 1) candle_data : 캔들 정보 # – Output # 1) RSI 값 # —————————————————————————– def get_rsi(candle_datas): try: # RSI 데이터 리턴용 rsi_data = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df.reindex(index=df.index[::-1]).reset_index() df[‘close’] = df[“trade_price”] # RSI 계산 def rsi(ohlc: pd.DataFrame, period: int = 14): ohlc[“close”] = ohlc[“close”] delta = ohlc[“close”].diff() up, down = delta.copy(), delta.copy() up[up < 0] = 0 down[down > 0] = 0 _gain = up.ewm(com=(period – 1), min_periods=period).mean() _loss = down.abs().ewm(com=(period – 1), min_periods=period).mean() RS = _gain / _loss return pd.Series(100 – (100 / (1 + RS)), name=”RSI”) rsi = round(rsi(df, 14).iloc[-1], 4) rsi_data.append({“type”: “RSI”, “DT”: dfDt[0], “RSI”: rsi}) return rsi_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_mfi # – Desc : MFI 조회 # – Input # 1) candle_datas : 캔들 정보 # – Output # 1) MFI 값 # —————————————————————————– def get_mfi(candle_datas): try: # MFI 데이터 리턴용 mfi_list = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df[‘typical_price’] = (df[‘trade_price’] + df[‘high_price’] + df[‘low_price’]) / 3 df[‘money_flow’] = df[‘typical_price’] * df[‘candle_acc_trade_volume’] positive_mf = 0 negative_mf = 0 for i in range(0, 14): if df[“typical_price”][i] > df[“typical_price”][i + 1]: positive_mf = positive_mf + df[“money_flow”][i] elif df[“typical_price”][i] < df["typical_price"][i + 1]: negative_mf = negative_mf + df["money_flow"][i] if negative_mf > 0: mfi = 100 – (100 / (1 + (positive_mf / negative_mf))) else: mfi = 100 – (100 / (1 + (positive_mf))) mfi_list.append({“type”: “MFI”, “DT”: dfDt[0], “MFI”: round(mfi, 4)}) return mfi_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_macd # – Desc : MACD 조회 # – Input # 1) candle_datas : 캔들 정보 # 2) loop_cnt : 반복 횟수 # – Output # 1) MACD 값 # —————————————————————————– def get_macd(candle_datas, loop_cnt): try: # MACD 데이터 리턴용 macd_list = [] df = pd.DataFrame(candle_datas[0]) df = df.iloc[::-1] df = df[‘trade_price’] # MACD 계산 exp1 = df.ewm(span=12, adjust=False).mean() exp2 = df.ewm(span=26, adjust=False).mean() macd = exp1 – exp2 exp3 = macd.ewm(span=9, adjust=False).mean() for i in range(0, int(loop_cnt)): macd_list.append( {“type”: “MACD”, “DT”: candle_datas[0][i][‘candle_date_time_kst’], “MACD”: round(macd[i], 4), “SIGNAL”: round(exp3[i], 4), “OCL”: round(macd[i] – exp3[i], 4)}) return macd_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_ma # – Desc : MA 조회 # – Input # 1) candle_datas : 캔들 정보 # 2) loop_cnt : 반복 횟수 # – Output # 1) MA 값 # —————————————————————————– def get_ma(candle_datas, loop_cnt): try: # MA 데이터 리턴용 ma_list = [] df = pd.DataFrame(candle_datas[0]) df = df.iloc[::-1] df = df[‘trade_price’] # MA 계산 ma5 = df.rolling(window=5).mean() ma10 = df.rolling(window=10).mean() ma20 = df.rolling(window=20).mean() ma60 = df.rolling(window=60).mean() ma120 = df.rolling(window=120).mean() for i in range(0, int(loop_cnt)): ma_list.append( {“type”: “MA”, “DT”: candle_datas[0][i][‘candle_date_time_kst’], “MA5”: ma5[i], “MA10”: ma10[i], “MA20”: ma20[i], “MA60”: ma60[i], “MA120”: ma120[i] , “MA_5_10”: str(Decimal(str(ma5[i])) – Decimal(str(ma10[i]))) , “MA_10_20”: str(Decimal(str(ma10[i])) – Decimal(str(ma20[i]))) , “MA_20_60”: str(Decimal(str(ma20[i])) – Decimal(str(ma60[i]))) , “MA_60_120”: str(Decimal(str(ma60[i])) – Decimal(str(ma120[i])))}) return ma_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_bb # – Desc : 볼린저밴드 조회 # – Input # 1) candle_datas : 캔들 정보 # – Output # 1) 볼린저 밴드 값 # —————————————————————————– def get_bb(candle_datas): try: # 볼린저밴드 데이터 리턴용 bb_list = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df[‘trade_price’].iloc[::-1] # 표준편차(곱) unit = 2 band1 = unit * numpy.std(df[len(df) – 20:len(df)]) bb_center = numpy.mean(df[len(df) – 20:len(df)]) band_high = bb_center + band1 band_low = bb_center – band1 bb_list.append({“type”: “BB”, “DT”: dfDt[0], “BBH”: round(band_high, 4), “BBM”: round(bb_center, 4), “BBL”: round(band_low, 4)}) return bb_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_williams # – Desc : 윌리암스 %R 조회 # – Input # 1) candle_datas : 캔들 정보 # – Output # 1) 윌리암스 %R 값 # —————————————————————————– def get_williams(candle_datas): try: # 윌리암스R 데이터 리턴용 williams_list = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df.iloc[:14] # 계산식 # %R = (Highest High – Close)/(Highest High – Lowest Low) * -100 hh = numpy.max(df[‘high_price’]) ll = numpy.min(df[‘low_price’]) cp = df[‘trade_price’][0] w = (hh – cp) / (hh – ll) * -100 williams_list.append( {“type”: “WILLIAMS”, “DT”: dfDt[0], “HH”: round(hh, 4), “LL”: round(ll, 4), “CP”: round(cp, 4), “W”: round(w, 4)}) return williams_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_cci # – Desc : CCI 조회 # – Input # 1) candle_data : 캔들 정보 # 2) loop_cnt : 조회 건수 # – Output # 1) CCI 값 # —————————————————————————– def get_cci(candle_data, loop_cnt): try: cci_val = 20 # CCI 데이터 리턴용 cci_list = [] # 사용하지 않는 캔들 갯수 정리(속도 개선) del candle_data[cci_val * 2:] # 오름차순 정렬 df = pd.DataFrame(candle_data) ordered_df = df.sort_values(by=[‘candle_date_time_kst’], ascending=[True]) # 계산식 : (Typical Price – Simple Moving Average) / (0.015 * Mean absolute Deviation) ordered_df[‘TP’] = (ordered_df[‘high_price’] + ordered_df[‘low_price’] + ordered_df[‘trade_price’]) / 3 ordered_df[‘SMA’] = ordered_df[‘TP’].rolling(window=cci_val).mean() ordered_df[‘MAD’] = ordered_df[‘TP’].rolling(window=cci_val).apply(lambda x: pd.Series(x).mad()) ordered_df[‘CCI’] = (ordered_df[‘TP’] – ordered_df[‘SMA’]) / (0.015 * ordered_df[‘MAD’]) # 개수만큼 조립 for i in range(0, loop_cnt): cci_list.append({“type”: “CCI”, “DT”: ordered_df[‘candle_date_time_kst’].loc[i], “CCI”: round(ordered_df[‘CCI’].loc[i], 4)}) return cci_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_indicators # – Desc : 보조지표 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) RSI # 2) MFI # 3) MACD # 4) BB # 5) WILLIAMS %R # 6) CCI # —————————————————————————– def get_indicators(target_item, tick_kind, inq_range, loop_cnt): try: # 보조지표 리턴용 indicator_data = [] # 캔들 데이터 조회용 candle_datas = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) if len(candle_data) >= 30: # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # RSI 정보 조회 rsi_data = get_rsi(candle_datas) # MFI 정보 조회 mfi_data = get_mfi(candle_datas) # MACD 정보 조회 macd_data = get_macd(candle_datas, loop_cnt) # BB 정보 조회 bb_data = get_bb(candle_datas) # WILLIAMS %R 조회 williams_data = get_williams(candle_datas) # MA 정보 조회 ma_data = get_ma(candle_datas, loop_cnt) # CCI 정보 조회 cci_data = get_cci(candle_data, loop_cnt) if len(rsi_data) > 0: indicator_data.append(rsi_data) if len(mfi_data) > 0: indicator_data.append(mfi_data) if len(macd_data) > 0: indicator_data.append(macd_data) if len(bb_data) > 0: indicator_data.append(bb_data) if len(williams_data) > 0: indicator_data.append(williams_data) if len(ma_data) > 0: indicator_data.append(ma_data) if len(cci_data) > 0: indicator_data.append(cci_data) return indicator_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order_status # – Desc : 주문 조회(상태별) # – Input # 1) target_item : 대상종목 # 2) status : 주문상태(wait : 체결 대기, watch : 예약주문 대기, done : 전체 체결 완료, cancel : 주문 취소) # – Output # 1) 주문 내역 # —————————————————————————– def get_order_status(target_item, status): try: query = { ‘market’: target_item, ‘state’: status, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : orderby_dict # – Desc : 딕셔너리 정렬 # – Input # 1) target_dict : 정렬 대상 딕셔너리 # 2) target_column : 정렬 대상 딕셔너리 # 3) order_by : 정렬방식(False:오름차순, True,내림차순) # – Output # 1) 정렬된 딕서너리 # —————————————————————————– def orderby_dict(target_dict, target_column, order_by): try: rtn_dict = sorted(target_dict, key=(lambda x: x[target_column]), reverse=order_by) return rtn_dict # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : filter_dict # – Desc : 딕셔너리 필터링 # – Input # 1) target_dict : 정렬 대상 딕셔너리 # 2) target_column : 정렬 대상 컬럼 # 3) filter : 필터 # – Output # 1) 필터링된 딕서너리 # —————————————————————————– def filter_dict(target_dict, target_column, filter): try: for target_dict_for in target_dict[:]: if target_dict_for[target_column] != filter: target_dict.remove(target_dict_for) return target_dict # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order_chance # – Desc : 주문 가능정보 조회 # – Input # 1) target_item : 대상종목 # – Output # 1) 주문 가능 정보 # —————————————————————————– def get_order_chance(target_item): try: query = { ‘market’: target_item, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders/chance”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_max_min # – Desc : MAX/MIN 값 조회 # – Input # 1) candle_datas : 캔들 정보 # 2) col_name : 대상 컬럼 # – Output # 1) MAX 값 # 2) MIN 값 # —————————————————————————– def get_max(candle_data, col_name_high, col_name_low): try: # MA 데이터 리턴용 max_min_list = [] df = pd.DataFrame(candle_data) df = df.iloc[::-1] # MAX 계산 max = numpy.max(df[col_name_high]) min = numpy.min(df[col_name_low]) max_min_list.append( {“MAX”: max, “MIN”: min}) return max_min_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : send_line_msg # – Desc : 라인 메세지 전송 # – Input # 1) message : 메세지 # – Output # 1) response : 발송결과(200:정상) # —————————————————————————– def send_line_message(message): try: headers = {‘Authorization’: ‘Bearer ‘ + line_token} data = {‘message’: message} response = requests.post(line_target_url, headers=headers, data=data) logging.debug(response) return response # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_indicator_sel # – Desc : 보조지표 조회(원하는 지표만) # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # 5) 보조지표 : 리스트 # – Output # 1) 보조지표 # —————————————————————————– def get_indicator_sel(target_item, tick_kind, inq_range, loop_cnt, indi_type): try: # 보조지표 리턴용 indicator_data = {} # 캔들 데이터 조회용 candle_datas = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) if len(candle_data) >= 30: # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) if ‘RSI’ in indi_type: # RSI 정보 조회 rsi_data = get_rsi(candle_datas) indicator_data[‘RSI’] = rsi_data if ‘MFI’ in indi_type: # MFI 정보 조회 mfi_data = get_mfi(candle_datas) indicator_data[‘MFI’] = mfi_data if ‘MACD’ in indi_type: # MACD 정보 조회 macd_data = get_macd(candle_datas, loop_cnt) indicator_data[‘MACD’] = macd_data if ‘BB’ in indi_type: # BB 정보 조회 bb_data = get_bb(candle_datas) indicator_data[‘BB’] = bb_data if ‘WILLIAMS’ in indi_type: # WILLIAMS %R 조회 williams_data = get_williams(candle_datas) indicator_data[‘WILLIAMS’] = williams_data if ‘MA’ in indi_type: # MA 정보 조회 ma_data = get_ma(candle_datas, loop_cnt) indicator_data[‘MA’] = ma_data if ‘CCI’ in indi_type: # CCI 정보 조회 cci_data = get_cci(candle_data, loop_cnt) indicator_data[‘CCI’] = cci_data if ‘CANDLE’ in indi_type: indicator_data[‘CANDLE’] = candle_data return indicator_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : send_msg # – Desc : 메세지 전송 # – Input # 1) sent_list : 메세지 발송 내역 # 2) key : 메세지 키 # 3) contents : 메세지 내용 # 4) msg_intval : 메세지 발송주기 # – Output # 1) sent_list : 메세지 발송 내역 # —————————————————————————– def send_msg(sent_list, key, contents, msg_intval): try: # msg_intval = ‘N’ 이면 메세지 발송하지 않음 if msg_intval.upper() != ‘N’: # 발송여부 체크 sent_yn = False # 발송이력 sent_dt = ” # 발송내역에 해당 키 존재 시 발송 이력 추출 for sent_list_for in sent_list: if key in sent_list_for.values(): sent_yn = True sent_dt = datetime.strptime(sent_list_for[‘SENT_DT’], ‘%Y-%m-%d %H:%M:%S’) # 기 발송 건 if sent_yn: logging.info(‘기존 발송 건’) # 현재 시간 추출 current_dt = datetime.strptime(datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’), ‘%Y-%m-%d %H:%M:%S’) # 시간 차이 추출 diff = current_dt – sent_dt # 발송 시간이 지난 경우에는 메세지 발송 if diff.seconds >= int(msg_intval): logging.info(‘발송 주기 도래 건으로 메시지 발송 처리!’) # 메세지 발송 send_line_message(contents) # 기존 메시지 발송이력 삭제 for sent_list_for in sent_list[:]: if key in sent_list_for.values(): sent_list.remove(sent_list_for) # 새로운 발송이력 추가 sent_list.append({‘KEY’: key, ‘SENT_DT’: datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}) else: logging.info(‘발송 주기 미 도래 건!’) # 최초 발송 건 else: logging.info(‘최초 발송 건’) # 메세지 발송 send_line_message(contents) # 새로운 발송이력 추가 sent_list.append({‘KEY’: key, ‘SENT_DT’: datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}) return sent_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : read_file # – Desc : 파일 읽기 # – Input # 1. name : 파일 명 # – Output # 1. 파일 내용 # —————————————————————————– def read_file(name): try: path = ‘./conf/’ + str(name) + ‘.txt’ f = open(path, ‘r’) line = f.readline() f.close() logging.debug(line) contents = line return contents # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise

실행 결과

python ./trade_bot/websocket.py I

sFTP를 이용해 프로그램을 서버에 옮긴 후 위와 같이 수행하면 영상과 같이 모든 코인에 대한 체결 정보를 실시간으로 수신할 수 있습니다. sFTP를 이용해 프로그램을 서버로 옮기는 방법은 아래 포스팅을 참고하시면 됩니다.

2021.12.02 – [코딩스토리/리눅스] – SSH Key를 이용해서 SFTP로 서버에 접속하기 – Filezilla sFTP 프로그램 이용

마치며

웹소켓을 이용하면 실시간 데이터를 빠르게 받을 수 있는 장점이 있지만 받을 수 있는 데이터의 형태가 한정되어 있다는 단점이 있습니다. 하지만 용도를 고려하여 적절하게 웹소켓을 사용한다면 조금 더 빠른 로직을 구현할 수 있습니다.

추후 데이터베이스 설치하는 방법을 살펴보고 웹소켓의 데이터를 DB에 적재하여 분석이나 다른 용도로 활용하는 방법도 살펴 보도록 하겠습니다.

블로그를 구독하시면 소식을 조금 더 빨리 받아 보실 수 있습니다. 감사합니다.

반응형

업비트 REST API를 이용한 비트코인 가격 추출 파이썬 프로그래밍

업비트 REST API를 이용한 비트코인 가격 추출 파이썬 프로그래밍

가장 빠르게 익히는 방법은 직접 해보는 것입니다. 이번 포스팅을 통해 REST API를 활용하여 비트코인 가격을 추출하는 파이썬 프로그래밍 방법을 익힐 수 있을 것입니다. 파이썬 프로그래밍 중 특히 함수 만들기와 파이썬 함수의 파라미터를 바꿔가며 데이터를 얻는 방법에 집중해주시기 바랍니다.

글의 순서

하루 단위의 비트코인 시세를 가져오는 업비트 REST API

업비트 REST API를 이용한 파이썬 코드

API를 사용하기 위한 파이썬 패키지 requests

파이썬 프로그래밍 함수 연습 : API를 이용하여 코인 가격 추출

하루 단위의 비트코인 시세를 가져오는 업비트 REST API

일일 코인가격에 대한 시계열 데이터를 가시화하기 위해서 업비트에서 제공하는 파이썬 코드를 이용해보겠습니다. 이 파이썬 코드는 일(Day) 캔들 데이터를 받아오는 REST API를 사용하고 있습니다.

API 실습01

https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2

API 실습01의 API는 캔들 데이터를 받아오는 API입니다. 캔들 데이터의 기본단위는 일(day)단위이며, 물음표(?) 뒤에 있는 파라미터를 조절해서 원하는 데이터를 얻을 수 있습니다. 참고자료 1의 링크를 타고 가면 ‘Response’와 ‘QUERY PARAMS’가 있습니다. QUERY PARAMS에서 파라미터를 조절하면 API 주소(엔드 포인트) 뒤쪽의 파라미터가 바뀌는 것을 볼 수 있습니다. API 실습01의 주소는 비트코인 원화 마켓(KRW-BTC)에서 캔들 2개를 보여주라는 의미입니다.

※ 주소를 웹 주소(web address) 또는 URL(uniform resource locator)이라고 합니다. 네트워크 상에서 자원이 어디 있는지를 알려주는 규약입니다. 여기서 자원은 우리가 받길 원하는 각종 자료들을 뜻합니다.

API 실습01 결과를 아래에 나타내었습니다.

API 실습01 결과

[{“market”:”KRW-BTC”,”candle_date_time_utc”:”2022-06-18T00:00:00″,”candle_date_time_kst”:”2022-06-18T09:00:00″,”opening_price”:27010000.00000000,”high_price”:27345000.00000000,”low_price”:26954000.00000000,”trade_price”:27017000.00000000,”timestamp”:1655527343448,”candle_acc_trade_price”:35349090363.39155000,”candle_acc_trade_volume”:1305.80173384,”prev_closing_price”:27010000.00000000,”change_price”:7000.00000000,”change_rate”:0.0002591633},{“market”:”KRW-BTC”,”candle_date_time_utc”:”2022-06-17T00:00:00″,”candle_date_time_kst”:”2022-06-17T09:00:00″,”opening_price”:26900000.00000000,”high_price”:27920000.00000000,”low_price”:26573000.00000000,”trade_price”:27010000.00000000,”timestamp”:1655510399478,”candle_acc_trade_price”:224342676396.38415000,”candle_acc_trade_volume”:8254.74720985,”prev_closing_price”:26898000.00000000,”change_price”:112000.00000000,”change_rate”:0.0041638784}]

업비트 REST API를 이용한 파이썬 코드

참고자료 1에는 코인가격을 가져올 수 있는 파이썬 코드가 있습니다. 그 코드의 API를 ‘API 실습01’로 바꿔서 업비트 API 활용01에 나타내었습니다. 이 코드를 실행시키면 ‘API 실습01 결과’와 똑같은 결과를 출력합니다. 파이썬 코드 실행 환경은 ‘파이썬 프로그래밍 시작하기 (1) 온라인 프로그래밍 환경 replit’이라는 포스팅을 참고하시기 바랍니다.

실습코드 : 업비트 API 활용01 import requests url = “https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2” headers = {“Accept”: “application/json”} response = requests.get(url, headers=headers) print(response.text) 1 2 3 4 5 6 import requests url = “https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2” headers = { “Accept” : “application/json” } response = requests . get ( url , headers = headers ) print ( response . text )

API를 사용하기 위한 파이썬 패키지 requests

업비트 API 활용01 코드의 첫줄에서 requests라는 파이썬 패키지를 불러옵니다. 이 파이썬 패키지는 API를 사용하기 위한 패키지입니다. 파이썬 패키지를 사용하려면 설치되어 있어야 합니다.

(1) replit에서 파이썬 패키지를 설치하는 방법

아래 그림과 같이 replit 환경에서 패키지를 선택하고, 검색창에 requests 입력하면 requests 라는 패키지를 찾을 수 있습니다. requests를 선택하고 설치(install) 하면, 오른쪽 콘솔 창에서 진행현황과 설치 완료 상태를 확인할 수 있습니다.

(2) replit 이외의 다른 파이썬 코딩환경에서 패키지 설치하는 방법

$python -m pip install requests

또는 pip install requests

파이썬 프로그래밍 함수 연습 : API를 이용하여 코인 가격 추출

업비트 API 활용01 코드를 기반으로 함수를 만들어 놓으면, 코드를 파악하기가 쉽고, 활용하기 편합니다. 함수를 만들 때는 어떤 값을 돌려받을지, 어떤 인자(=파라미터)를 넘겨줄지를 정하는 것이 중요합니다.

(1) REST API를 포함하는 함수 만들기 연습

일단 원래 코드를 크게 수정하지 않으면서, 함수에 ‘업비트 API 활용01’의 코드를 넣어보면 아래의 코드가 됩니다. get_coin_price라는 함수는 받는 인자 없이 response를 돌려줍니다. 변수 response에는 업비트 API를 이용해서 받은 데이터가 담겨있습니다. ‘API 실습01 결과’와 같은 출력값을 예상했지만, 의외로 출력되는 값은 입니다. 여기서 200이라는 값은 데이터가 들어있다는 것을 의미합니다.

실습코드 : 업비트 API 활용02 import requests import json def get_coin_price(): response = requests.get(“https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2”) return response price = get_coin_price() print(price) 1 2 3 4 5 6 7 8 9 import requests import json def get_coin_price ( ) : response = requests . get ( “https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2” ) return response price = get_coin_price ( ) print ( price )

(2) JSON을 이용한 데이터 확인

업비트 API 활용02라는 실습코드의 실행결과는 이었습니다. API로 데이터를 받아오긴 했으나, 우리가 원하는 형태로 바꾸기 위해서는 추가의 가공이 필요합니다. 일단 우리 눈에 익숙한 형태의 데이터로 바꿔주는 JSON을 이용해서 데이터를 확인하는 것이 우선입니다. 여기서 JSON은 JavaScript Object Notation의 줄임말로 사람이나 기계가 읽고 쓰기 쉬운 데이터 교환 형식입니다. JSON의 사용법은 무척 간단합니다. 아래의 코드에서처럼 변환하고자 하는 변수에 json을 붙여주기만 하면 됩니다.

업비트 API 활용02라는 실습코드에서 사용했던 response라는 변수를 raw_resp로 변경하고, 이 raw_resp를 JSON 형식으로 바꾸는 것입니다. response = raw_resp.json()에서처럼 raw_resp.json 이라고 쓰면 raw_resp는 JSON 형식으로 바뀝니다. 업비트 API 활용03 코드를 실행하면 위의 ‘API 실습01 결과’와 똑같은 결과를 출력합니다.

실습코드 : 업비트 API 활용03 import requests import json def get_coin_price(): raw_resp = requests.get(“https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2”) response = raw_resp.json() return response price = get_coin_price() print(price) 1 2 3 4 5 6 7 8 9 10 import requests import json def get_coin_price ( ) : raw_resp = requests . get ( “https://api.upbit.com/v1/candles/days?market=KRW-BTC&count=2” ) response = raw_resp . json ( ) return response price = get_coin_price ( ) print ( price )

(3) 함수에 파라미터 넘겨주기

이제는 get_coin_price라는 함수에 인자를 넘겨줘서 비트코인 뿐만 아니라 다른 암호화폐의 가격도 가져올 수 있고, 20일치, 200일치 데이터도 가져올 수 있게 해보겠습니다. 업비트 API 활용04 코드에서는 get_coin_price를 부를 때 KRW-BTC를 따옴표와 함께 문자열로 넘겨주고, 2를 숫자로 넘겨줍니다. KRW-BTC 대신 KRW-ETH를 넘겨주면 이더리움 가격을 받아올 수 있고, 2대신 20일을 넣으면 20일치 데이터가 출력되는 것을 확인하실 수 있습니다.

이렇게 인자를 넘겨주고, 함수 내에서 인자를 받는 것은 ‘파이썬 프로그래밍 시작 (3) 파이썬 함수 실습’이라는 포스팅의 ‘실습코드 : 함수09’에서 참고해보시기 바랍니다.

업비트 API 활용04의 get_coin_price() 함수는 파라미터를 받아와야 하므로 API의 주소가 다소 복잡해졌습니다. 그래서 url이라는 변수를 추가한 후, url에 API 주소를 담아두고, 이 주소가 제대로 만들어졌는지를 확인하기 위해 print(url)을 추가하였습니다. url에 담기는 API 주소에서 파라미터로 받아와야 하는 부분은 %s, %d로 표시되어 있습니다.

실습코드 : 업비트 API 활용04 import requests import json def get_coin_price(ticker,n_candle): url = (“https://api.upbit.com/v1/candles/days?market=%s&count=%d” %(ticker, n_candle)) print(url) raw_resp = requests.get(url) response = raw_resp.json() return response price = get_coin_price(“KRW-BTC”,2) print(price) 1 2 3 4 5 6 7 8 9 10 11 12 import requests import json def get_coin_price ( ticker , n_candle ) : url = ( “https://api.upbit.com/v1/candles/days?market=%s&count=%d” % ( ticker , n_candle ) ) print ( url ) raw_resp = requests . get ( url ) response = raw_resp . json ( ) return response price = get_coin_price ( “KRW-BTC” , 2 ) print ( price )

마치며 …

자동으로 데이터를 받아오는 코드를 만들기 위해 API를 살펴보는 중입니다. 업비트(UPbit)는 업비트의 API를 활용할 수 있도록 다양한 프로그래밍 언어로 샘플 코드를 제공하고 있습니다. 이번 포스팅에서는 이들 프로그래밍 언어 중 파이썬을 활용하는 방법을 정리하였습니다. API로 가격을 받아오는 부분을 함수로 구현하는 방법과, 파이썬 함수의 파라미터를 바꿔가며 데이터를 얻는 방법이 이 포스팅에서 가장 중요한 내용입니다.

이 포스팅에서 살펴본 내용을 요약해보겠습니다.

▶ 업비트에서 제공하는 파이썬 코드 내려받기

▶ 파이썬 코드 내 API의 주소(엔드포인트)와 파라미터 바꾸기

▶ 파이썬 함수 만들기

▶ 파이썬 함수에 인자 추가하기

API의 엔드포인트와 파라미터를 바꿔가면서 원하는 데이터를 받아 보시기 바랍니다.

함께 참고하면 더 좋은 글 :

1. 파이썬 프로그래밍 시작하기 (1) 온라인 프로그래밍 환경 replit

2. 파이썬 프로그래밍 시작 (3) 파이썬 함수 실습

3. 파이썬 프로그래밍 시작 (14) 패키지

4. 시계열 데이터 가시화 (2) 보고서용 파이썬 그래프 만들기

5. 파이썬 데이터 분석! 데이터 분석을 위한 코딩언어 파이썬

6. 알아두면 좋을 컴퓨터 작동원리. 폰 노이만 아키텍처

참고자료

[1] UPbit(2020),일(Day) 캔들

[2] JSON 개요

파이썬 업비트 웹소켓 접속방법 – 비트코인 자동매매 프로그램

반응형

업비트에서 코인 정보를 받아오는 방법은 크게 두 가지 방법으로 나눌 수 있는데요. 먼저 API를 개별 호출하여 정보를 얻어오는 방법과 웹소켓을 이용해서 실시간 데이터를 구독하는 방법이 있습니다.

지금까지 Tech&Fin에서 다루었던 프로그램들에서는 첫 번째 방법인 API를 개별 호출하는 방법을 사용했었는데요.

이번 시간에는 API 개별 호출 방법과 웹소켓을 이용하는 방법은 어떤점이 다른지 살펴보고 웹소켓을 이용해서 실시간 체결 데이터를 받아오는 방법에 대해서 알아 보도록 하겠습니다.

목차 – 클릭하면 이동합니다.

웹소켓에 대한 생각

API 개별 호출과 웹소켓 구독의 차이점

기술적인 이야기는 뒤로하고 API를 개별 호출하는 것과 웹소켓을 구독하는 것은 목적의 차이가 크다고 생각합니다.

API를 개별 호출하는 방법에서는 액세스키가 필요하지 않은 공개 데이터는 물론 계좌 잔고 정보 등의 액세스키가 필요한 개인 데이터까지 모두 다룰 수 있습니다. 같은 맥락으로 주문/주문취소 등의 행위도 가능하게 됩니다.

하지만 웹소켓을 이용한 방법은 ①현재가 ②체결 ③호가, 이렇게 총 3가지 공개 데이터만 구독하여 받아볼 수 있습니다.

API를 개별 호출하는 방법은 원하는 순간에 프로그램에서 API를 호출해야 하지만 웹소켓 방식은 구독 데이터를 설정하여 한번 접속하면 실시간으로 데이터가 계속해서 수신 됩니다.

그래서 목적에 따라 API와 웹소켓을 혼용해서 사용하는 것이 좋습니다. 자세한 업비트 웹소켓 관련 내용은 아래 링크를 통해 업비트 공식 API 홈페이지에서 확인하실 수 있습니다.

https://docs.upbit.com/docs/upbit-quotation-websocket

웹소켓을 활용할 수 있는 방법

웹소켓을 실행하여 체결 데이터를 받아보면 데이터가 물 밀듯이 들어오게 되는 것을 보게 됩니다. 데이터가 너무 많아서 이 데이터로 무얼 해야 할 지 고민이 될 텐데요.

웹소켓을 이용해 받는 데이터는 양이 방대하기 때문에 실시간으로 메모리를 이용해 적재하고 사용하는 방법은 조금 어려울 수 있습니다. 저의 경우에는 아래와 같은 경우에 웹소켓을 활용하고 있습니다.

① 데이터 적재 후 사용 : 데이터 베이스에 웹소켓에서 받는 데이터를 모두 저장하면 쿼리를 이용해 여러가지 업비트 API에서 제공하지 않는 로직을 구현할 수 있습니다. 예를 들어 API에서는 종목별로만 조회할 수 있는 부분도 쿼리를 이용하면 전체 종목을 한번에 조회하여 시간을 줄일 수 있습니다.

또한 데이터를 적재하면 분석이 가능하게 됩니다. 예를 들면 세력이 어떻게 오전 9시에 호가창을 조절하면서 단기 펌핑을 주는지 등의 분석을 할 수 있고 매집봉으로 보이는 장대 양봉의 실제 매수/매도 비율이 얼마나 되는지 등의 데이터도 분석할 수 있게 됩니다.

② 단기 펌핑 종목을 빠르게 추출 : 전일 대비 상승률을 이용해 단기 펌핑하는 종목을 빠르게 잡아낼 수 있습니다. 예를들어 오전 9시에 10프로 이상 오른 종목을 아주 간편하게 골라낼 수 있게 됩니다.

업비트 웹소켓 사용방법

웹소켓 프로그램(websocket.py)

import os import sys import time import json import datetime import asyncio import logging import traceback import websockets # 실행 환경에 따른 공통 모듈 Import sys.path.append(os.path.dirname(os.path.dirname(__file__))) from module import upbit # 프로그램 정보 pgm_name = ‘websocket’ pgm_name_kr = ‘업비트 Ticker 웹소켓’ # —————————————————————————– # – Name : get_subscribe_items # – Desc : 구독 대상 종목 조회 # —————————————————————————– def get_subscribe_items(): try: subscribe_items = [] # KRW 마켓 전 종목 추출 items = upbit.get_items(‘KRW’, ”) # 종목코드 배열로 변환 for item in items: subscribe_items.append(item[‘market’]) return subscribe_items # ————————————— # Exception 처리 # —————————————- except Exception: raise # —————————————————————————– # – Name : upbit_ws_client # – Desc : 업비트 웹소켓 # —————————————————————————– async def upbit_ws_client(): try: # 중복 실행 방지용 seconds = 0 # 구독 데이터 조회 subscribe_items = get_subscribe_items() logging.info(‘구독 종목 개수 : ‘ + str(len(subscribe_items))) logging.info(‘구독 종목 : ‘ + str(subscribe_items)) # 구독 데이터 조립 subscribe_fmt = [ {“ticket”: “test-websocket”}, { “type”: “ticker”, “codes”: subscribe_items, “isOnlyRealtime”: True }, {“format”: “SIMPLE”} ] subscribe_data = json.dumps(subscribe_fmt) async with websockets.connect(upbit.ws_url) as websocket: await websocket.send(subscribe_data) while True: period = datetime.datetime.now() data = await websocket.recv() data = json.loads(data) logging.info(data) # 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작 if (period.second % 5) == 0 and seconds != period.second: # 중복 실행 방지 seconds = period.second # 종목 재조회 re_subscribe_items = get_subscribe_items() logging.info(‘

‘) logging.info(‘*************************************************’) logging.info(‘기존 종목[‘ + str(len(subscribe_items)) + ‘] : ‘ + str(subscribe_items)) logging.info(‘종목 재조회[‘ + str(len(re_subscribe_items)) + ‘] : ‘ + str(re_subscribe_items)) logging.info(‘*************************************************’) logging.info(‘

‘) # 현재 종목과 다르면 웹소켓 다시 시작 if subscribe_items != re_subscribe_items: logging.info(‘종목 달리짐! 웹소켓 다시 시작’) await websocket.close() time.sleep(1) await upbit_ws_client() # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception as e: logging.error(‘Exception Raised!’) logging.error(e) logging.error(‘Connect Again!’) # 웹소켓 다시 시작 await upbit_ws_client() # —————————————————————————– # – Name : main # – Desc : 메인 # —————————————————————————– async def main(): try: # 웹소켓 시작 await upbit_ws_client() except Exception as e: logging.error(‘Exception Raised!’) logging.error(e) # —————————————————————————– # – Name : main # – Desc : 메인 # —————————————————————————– if __name__ == “__main__”: # noinspection PyBroadException try: print(“***** USAGE ******”) print(“[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)”) if sys.platform.startswith(‘win32’): # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO) log_level = ‘I’ upbit.set_loglevel(log_level) else: # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO) log_level = sys.argv[1].upper() upbit.set_loglevel(log_level) if log_level == ”: logging.error(“입력값 오류!”) sys.exit(-1) logging.info(“***** INPUT ******”) logging.info(“[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO):” + str(log_level)) # ——————————————————————— # Logic Start! # ——————————————————————— # 웹소켓 시작 asyncio.run(main()) except KeyboardInterrupt: logging.error(“KeyboardInterrupt Exception 발생!”) logging.error(traceback.format_exc()) sys.exit(-100) except Exception: logging.error(“Exception 발생!”) logging.error(traceback.format_exc()) sys.exit(-200)

import asyncio import websockets

대표적으로 asyncio 비동기 호출 모듈과 websockets 모듈이 필요합니다. 모듈 설치 방법은 아래 포스팅을 참고하시면 됩니다.

2022.01.13 – [코딩스토리/리눅스] – 리눅스 서버에 파이썬 3.9 설치하기

# —————————————————————————– # – Name : get_subscribe_items # – Desc : 구독 대상 종목 조회 # —————————————————————————– def get_subscribe_items(): try: subscribe_items = [] # KRW 마켓 전 종목 추출 items = upbit.get_items(‘KRW’, ”) # 종목코드 배열로 변환 for item in items: subscribe_items.append(item[‘market’]) return subscribe_items # ————————————— # Exception 처리 # —————————————- except Exception: raise

구독 대상 즉, 어떤 종목들에 대해서 데이터를 받을지를 결정합니다. 위의 예제에서는 업비트에서 거래되는 KRW 마켓 종목 모두를 가져옵니다. 원하는 종목만 설정하고 싶으면 이 부분을 수정하면 됩니다.

# —————————————————————————– # – Name : upbit_ws_client # – Desc : 업비트 웹소켓 # —————————————————————————– async def upbit_ws_client(): try: # 중복 실행 방지용 seconds = 0 # 구독 데이터 조회 subscribe_items = get_subscribe_items() logging.info(‘구독 종목 개수 : ‘ + str(len(subscribe_items))) logging.info(‘구독 종목 : ‘ + str(subscribe_items)) # 구독 데이터 조립 subscribe_fmt = [ {“ticket”: “test-websocket”}, { “type”: “ticker”, “codes”: subscribe_items, “isOnlyRealtime”: True }, {“format”: “SIMPLE”} ] subscribe_data = json.dumps(subscribe_fmt) async with websockets.connect(upbit.ws_url) as websocket: await websocket.send(subscribe_data) while True: period = datetime.datetime.now() data = await websocket.recv() data = json.loads(data) logging.info(data) # 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작 if (period.second % 5) == 0 and seconds != period.second: # 중복 실행 방지 seconds = period.second # 종목 재조회 re_subscribe_items = get_subscribe_items() logging.info(‘

‘) logging.info(‘*************************************************’) logging.info(‘기존 종목[‘ + str(len(subscribe_items)) + ‘] : ‘ + str(subscribe_items)) logging.info(‘종목 재조회[‘ + str(len(re_subscribe_items)) + ‘] : ‘ + str(re_subscribe_items)) logging.info(‘*************************************************’) logging.info(‘

‘) # 현재 종목과 다르면 웹소켓 다시 시작 if subscribe_items != re_subscribe_items: logging.info(‘종목 달리짐! 웹소켓 다시 시작’) await websocket.close() time.sleep(1) await upbit_ws_client() # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception as e: logging.error(‘Exception Raised!’) logging.error(e) logging.error(‘Connect Again!’) # 웹소켓 다시 시작 await upbit_ws_client()

구독 데이터를 조회하여 웹소켓에 접속하고 데이터를 수신합니다. 한번 웹소켓에 구독을 시작하면 계속해서 실시간으로 데이터를 받게 되는데 그렇게 되면 신규로 상장하는 종목등이 생기면 해당 데이터는 받을 수 없습니다.

위의 로직에서는 5초마다 업비트 종목을 다시 조회하여 현재 구독하고 있는 종목과 변동이 있으면 웹소켓을 다시 시작하여 신규 종목이 추가된 새로운 종목들로 구독하도록 구현하였습니다. (테스트가 많이 안 되어서 오류가 발생할 수도 있습니다. 댓글로 알려주세요.)

공통코드(upbit.py)

반응형

import time import logging import requests import jwt import uuid import hashlib import math import os import pandas as pd import numpy from urllib.parse import urlencode from decimal import Decimal from datetime import datetime # Keys access_key = ‘업비트에서 발급받은 Access Key’ secret_key = ‘업비트에서 발급받은 Secret Key’ server_url = ‘https://api.upbit.com’ ws_url = ‘wss://api.upbit.com/websocket/v1’ line_target_url = ‘https://notify-api.line.me/api/notify’ line_token = ‘라인 메신저에서 발급받은 Token’ # 상수 설정 min_order_amt = 5000 # —————————————————————————– # – Name : set_loglevel # – Desc : 로그레벨 설정 # – Input # 1) level : 로그레벨 # 1. D(d) : DEBUG # 2. E(e) : ERROR # 3. 그외(기본) : INFO # – Output # —————————————————————————– def set_loglevel(level): try: # ——————————————————————— # 로그레벨 : DEBUG # ——————————————————————— if level.upper() == “D”: logging.basicConfig( format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s’, datefmt=’%Y/%m/%d %I:%M:%S %p’, level=logging.DEBUG ) # ——————————————————————— # 로그레벨 : ERROR # ——————————————————————— elif level.upper() == “E”: logging.basicConfig( format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s’, datefmt=’%Y/%m/%d %I:%M:%S %p’, level=logging.ERROR ) # ——————————————————————— # 로그레벨 : INFO # ——————————————————————— else: # —————————————————————————– # 로깅 설정 # 로그레벨(DEBUG, INFO, WARNING, ERROR, CRITICAL) # —————————————————————————– logging.basicConfig( format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s’, datefmt=’%Y/%m/%d %I:%M:%S %p’, level=logging.INFO ) # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : send_request # – Desc : 리퀘스트 처리 # – Input # 1) reqType : 요청 타입 # 2) reqUrl : 요청 URL # 3) reqParam : 요청 파라메타 # 4) reqHeader : 요청 헤더 # – Output # 4) reponse : 응답 데이터 # —————————————————————————– def send_request(reqType, reqUrl, reqParam, reqHeader): try: # 요청 가능회수 확보를 위해 기다리는 시간(초) err_sleep_time = 0.3 # 요청에 대한 응답을 받을 때까지 반복 수행 while True: # 요청 처리 response = requests.request(reqType, reqUrl, params=reqParam, headers=reqHeader) # 요청 가능회수 추출 if ‘Remaining-Req’ in response.headers: hearder_info = response.headers[‘Remaining-Req’] start_idx = hearder_info.find(“sec=”) end_idx = len(hearder_info) remain_sec = hearder_info[int(start_idx):int(end_idx)].replace(‘sec=’, ”) else: logging.error(“헤더 정보 이상”) logging.error(response.headers) break # 요청 가능회수가 3개 미만이면 요청 가능회수 확보를 위해 일정시간 대기 if int(remain_sec) < 3: logging.debug("요청 가능회수 한도 도달! 남은횟수:" + str(remain_sec)) time.sleep(err_sleep_time) # 정상 응답 if response.status_code == 200 or response.status_code == 201: break # 요청 가능회수 초과인 경우 elif response.status_code == 429: logging.error("요청 가능회수 초과!:" + str(response.status_code)) time.sleep(err_sleep_time) # 그 외 오류 else: logging.error("기타 에러:" + str(response.status_code)) logging.error(response.status_code) logging.error(response) break # 요청 가능회수 초과 에러 발생시에는 다시 요청 logging.info("[restRequest] 요청 재처리중...") return response # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_items # - Desc : 전체 종목 리스트 조회 # - Input # 1) market : 대상 마켓(콤마 구분자:KRW,BTC,USDT) # 2) except_item : 제외 종목(콤마 구분자:BTC,ETH) # - Output # 1) 전체 리스트 : 리스트 # ----------------------------------------------------------------------------- def get_items(market, except_item): try: # 조회결과 리턴용 rtn_list = [] # 마켓 데이터 markets = market.split(',') # 제외 데이터 except_items = except_item.split(',') url = "https://api.upbit.com/v1/market/all" querystring = {"isDetails": "false"} response = send_request("GET", url, querystring, "") data = response.json() # 조회 마켓만 추출 for data_for in data: for market_for in markets: if data_for['market'].split('-')[0] == market_for: rtn_list.append(data_for) # 제외 종목 제거 for rtnlist_for in rtn_list[:]: for exceptItemFor in except_items: for marketFor in markets: if rtnlist_for['market'] == marketFor + '-' + exceptItemFor: rtn_list.remove(rtnlist_for) return rtn_list # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : buycoin_mp # - Desc : 시장가 매수 # - Input # 1) target_item : 대상종목 # 2) buy_amount : 매수금액 # - Output # 1) rtn_data : 매수결과 # ----------------------------------------------------------------------------- def buycoin_mp(target_item, buy_amount): try: query = { 'market': target_item, 'side': 'bid', 'price': buy_amount, 'ord_type': 'price', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("시장가 매수 요청 완료! 결과:") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : buycoin_tg # - Desc : 지정가 매수 # - Input # 1) target_item : 대상종목 # 2) buy_amount : 매수금액 # 3) buy_price : 매수가격 # - Output # 1) rtn_data : 매수요청결과 # ----------------------------------------------------------------------------- def buycoin_tg(target_item, buy_amount, buy_price): try: # 매수수량 계산 vol = Decimal(str(buy_amount)) / Decimal(str(buy_price)) query = { 'market': target_item, 'side': 'bid', 'volume': vol, 'price': buy_price, 'ord_type': 'limit', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("지정가 매수요청 완료!") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : sellcoin_mp # - Desc : 시장가 매도 # - Input # 1) target_item : 대상종목 # 2) cancel_yn : 기존 주문 취소 여부 # - Output # 1) rtn_data : 매도결과 # ----------------------------------------------------------------------------- # 시장가 매도 def sellcoin_mp(target_item, cancel_yn): try: if cancel_yn == 'Y': # 기존 주문이 있으면 취소 cancel_order(target_item, "SELL") # 잔고 조회 cur_balance = get_balance(target_item) query = { 'market': target_item, 'side': 'ask', 'volume': cur_balance, 'ord_type': 'market', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("시장가 매도 요청 완료! 결과:") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : sellcoin_tg # - Desc : 지정가 매도 # - Input # 1) target_item : 대상종목 # 2) sell_price : 매도희망금액 # - Output # 1) rtn_data : 매도결과 # ----------------------------------------------------------------------------- def sellcoin_tg(target_item, sell_price): try: # 잔고 조회 cur_balance = get_balance(target_item) query = { 'market': target_item, 'side': 'ask', 'volume': cur_balance, 'price': sell_price, 'ord_type': 'limit', } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), 'query_hash': query_hash, 'query_hash_alg': 'SHA512', } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("POST", server_url + "/v1/orders", query, headers) rtn_data = res.json() logging.info("") logging.info("----------------------------------------------") logging.info("지정가 매도 설정 완료!") logging.info(rtn_data) logging.info("----------------------------------------------") return rtn_data # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_balance # - Desc : 주문가능 잔고 조회 # - Input # 1) target_item : 대상 종목 # - Output # 2) rtn_balance : 주문가능 잔고 # ----------------------------------------------------------------------------- def get_balance(target_item): try: # 주문가능 잔고 리턴용 rtn_balance = 0 # 최대 재시도 횟수 max_cnt = 0 # 잔고가 조회 될 때까지 반복 while True: # 조회 회수 증가 max_cnt = max_cnt + 1 payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("GET", server_url + "/v1/accounts", "", headers) my_asset = res.json() # 해당 종목에 대한 잔고 조회 # 잔고는 마켓에 상관없이 전체 잔고가 조회됨 for myasset_for in my_asset: if myasset_for['currency'] == target_item.split('-')[1]: rtn_balance = myasset_for['balance'] # 잔고가 0 이상일때까지 반복 if Decimal(str(rtn_balance)) > Decimal(str(0)): break # 최대 100회 수행 if max_cnt > 100: break logging.info(“[주문가능 잔고 리턴용] 요청 재처리중…”) return rtn_balance # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_candle # – Desc : 캔들 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 조회 범위 # – Output # 1) 캔들 정보 배열 # —————————————————————————– def get_candle(target_item, tick_kind, inq_range): try: # —————————————- # Tick 별 호출 URL 설정 # —————————————- # 분붕 if tick_kind == “1” or tick_kind == “3” or tick_kind == “5” or tick_kind == “10” or tick_kind == “15” or tick_kind == “30” or tick_kind == “60” or tick_kind == “240”: target_url = “minutes/” + tick_kind # 일봉 elif tick_kind == “D”: target_url = “days” # 주봉 elif tick_kind == “W”: target_url = “weeks” # 월봉 elif tick_kind == “M”: target_url = “months” # 잘못된 입력 else: raise Exception(“잘못된 틱 종류:” + str(tick_kind)) logging.debug(target_url) # —————————————- # Tick 조회 # —————————————- querystring = {“market”: target_item, “count”: inq_range} res = send_request(“GET”, server_url + “/v1/candles/” + target_url, querystring, “”) candle_data = res.json() logging.debug(candle_data) return candle_data # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_targetprice # – Desc : 호가단위 금액 계산 # – Input # 1) cal_type : H:호가로, R:비율로 # 2) st_price : 기준가격 # 3) chg_val : 변화단위 # – Output # 1) rtn_price : 계산된 금액 # —————————————————————————– def get_targetprice(cal_type, st_price, chg_val): try: # 계산된 가격 rtn_price = st_price # 호가단위로 계산 if cal_type.upper() == “H”: for i in range(0, abs(int(chg_val))): hoga_val = get_hoga(rtn_price) if Decimal(str(chg_val)) > 0: rtn_price = Decimal(str(rtn_price)) + Decimal(str(hoga_val)) elif Decimal(str(chg_val)) < 0: rtn_price = Decimal(str(rtn_price)) - Decimal(str(hoga_val)) else: break # 비율로 계산 elif cal_type.upper() == "R": while True: # 호가단위 추출 hoga_val = get_hoga(st_price) if Decimal(str(chg_val)) > 0: rtn_price = Decimal(str(rtn_price)) + Decimal(str(hoga_val)) elif Decimal(str(chg_val)) < 0: rtn_price = Decimal(str(rtn_price)) - Decimal(str(hoga_val)) else: break if Decimal(str(chg_val)) > 0: if Decimal(str(rtn_price)) >= Decimal(str(st_price)) * ( Decimal(str(1)) + (Decimal(str(chg_val))) / Decimal(str(100))): break elif Decimal(str(chg_val)) < 0: if Decimal(str(rtn_price)) <= Decimal(str(st_price)) * ( Decimal(str(1)) + (Decimal(str(chg_val))) / Decimal(str(100))): break return rtn_price # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_hoga # - Desc : 호가 금액 계산 # - Input # 1) cur_price : 현재가격 # - Output # 1) hoga_val : 호가단위 # ----------------------------------------------------------------------------- def get_hoga(cur_price): try: # 호가 단위 if Decimal(str(cur_price)) < 10: hoga_val = 0.01 elif Decimal(str(cur_price)) < 100: hoga_val = 0.1 elif Decimal(str(cur_price)) < 1000: hoga_val = 1 elif Decimal(str(cur_price)) < 10000: hoga_val = 5 elif Decimal(str(cur_price)) < 100000: hoga_val = 10 elif Decimal(str(cur_price)) < 500000: hoga_val = 50 elif Decimal(str(cur_price)) < 1000000: hoga_val = 100 elif Decimal(str(cur_price)) < 2000000: hoga_val = 500 else: hoga_val = 1000 return hoga_val # ---------------------------------------- # Exception Raise # ---------------------------------------- except Exception: raise # ----------------------------------------------------------------------------- # - Name : get_krwbal # - Desc : KRW 잔고 조회 # - Input # - Output # 1) KRW 잔고 Dictionary # 1. krw_balance : KRW 잔고 # 2. fee : 수수료 # 3. available_krw : 매수가능 KRW잔고(수수료를 고려한 금액) # ----------------------------------------------------------------------------- def get_krwbal(): try: # 잔고 리턴용 rtn_balance = {} # 수수료 0.05%(업비트 기준) fee_rate = 0.05 payload = { 'access_key': access_key, 'nonce': str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key).decode('utf8') authorize_token = 'Bearer {}'.format(jwt_token) headers = {"Authorization": authorize_token} res = send_request("GET", server_url + "/v1/accounts", "", headers) data = res.json() logging.debug(data) for dataFor in data: if (dataFor['currency']) == "KRW": krw_balance = math.floor(Decimal(str(dataFor['balance']))) # 잔고가 있는 경우만 if Decimal(str(krw_balance)) > Decimal(str(0)): # 수수료 fee = math.ceil(Decimal(str(krw_balance)) * (Decimal(str(fee_rate)) / Decimal(str(100)))) # 매수가능금액 available_krw = math.floor(Decimal(str(krw_balance)) – Decimal(str(fee))) else: # 수수료 fee = 0 # 매수가능금액 available_krw = 0 # 결과 조립 rtn_balance[‘krw_balance’] = krw_balance rtn_balance[‘fee’] = fee rtn_balance[‘available_krw’] = available_krw return rtn_balance # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_accounts # – Desc : 잔고정보 조회 # – Input # 1) except_yn : KRW 및 소액 제외 # 2) market_code : 마켓코드 추가(매도시 필요) # – Output # 1) 잔고 정보 # —————————————————————————– # 계좌 조회 def get_accounts(except_yn, market_code): try: rtn_data = [] # 해당 마켓에 존재하는 종목 리스트만 추출 market_item_list = get_items(market_code, ”) # 소액 제외 기준 min_price = 5000 payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), } jwt_token = jwt.encode(payload, secret_key) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/accounts”, “”, headers) account_data = res.json() for account_data_for in account_data: for market_item_list_for in market_item_list: # 해당 마켓에 있는 종목만 조합 if market_code + ‘-‘ + account_data_for[‘currency’] == market_item_list_for[‘market’]: # KRW 및 소액 제외 if except_yn == “Y” or except_yn == “y”: if account_data_for[‘currency’] != “KRW” and Decimal(str(account_data_for[‘avg_buy_price’])) * ( Decimal(str(account_data_for[‘balance’])) + Decimal( str(account_data_for[‘locked’]))) >= Decimal(str(min_price)): rtn_data.append( {‘market’: market_code + ‘-‘ + account_data_for[‘currency’], ‘balance’: account_data_for[‘balance’], ‘locked’: account_data_for[‘locked’], ‘avg_buy_price’: account_data_for[‘avg_buy_price’], ‘avg_buy_price_modified’: account_data_for[‘avg_buy_price_modified’]}) else: if account_data_for[‘currency’] != “KRW”: rtn_data.append( {‘market’: market_code + ‘-‘ + account_data_for[‘currency’], ‘balance’: account_data_for[‘balance’], ‘locked’: account_data_for[‘locked’], ‘avg_buy_price’: account_data_for[‘avg_buy_price’], ‘avg_buy_price_modified’: account_data_for[‘avg_buy_price_modified’]}) return rtn_data # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : chg_account_to_comma # – Desc : 잔고 종목 리스트를 콤마리스트로 변경 # – Input # 1) account_data : 잔고 데이터 # – Output # 1) 종목 리스트(콤마 구분자) # —————————————————————————– def chg_account_to_comma(account_data): try: rtn_data = “” for account_data_for in account_data: if rtn_data == ”: rtn_data = rtn_data + account_data_for[‘market’] else: rtn_data = rtn_data + ‘,’ + account_data_for[‘market’] return rtn_data # —————————————- # Exception Raise # —————————————- except Exception: raise # —————————————————————————– # – Name : get_ticker # – Desc : 현재가 조회 # – Input # 1) target_itemlist : 대상 종목(콤마 구분자) # – Output # 2) 현재가 데이터 # —————————————————————————– def get_ticker(target_itemlist): try: url = “https://api.upbit.com/v1/ticker” querystring = {“markets”: target_itemlist} response = send_request(“GET”, url, querystring, “”) rtn_data = response.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : cancel_order # – Desc : 미체결 주문 취소 # – Input # 1) target_item : 대상종목 # 2) side : 매수/매도 구분(BUY/bid:매수, SELL/ask:매도, ALL:전체) # – Output # —————————————————————————– def cancel_order(target_item, side): try: # 미체결 주문 조회 order_data = get_order(target_item) # 매수/매도 구분 for order_data_for in order_data: if side == “BUY” or side == “buy”: if order_data_for[‘side’] == “ask”: order_data.remove(order_data_for) elif side == “SELL” or side == “sell”: if order_data_for[‘side’] == “bid”: order_data.remove(order_data_for) # 미체결 주문이 있으면 if len(order_data) > 0: # 미체결 주문내역 전체 취소 for order_data_for in order_data: cancel_order_uuid(order_data_for[‘uuid’]) # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : cancel_order_uuid # – Desc : 미체결 주문 취소 by UUID # – Input # 1) order_uuid : 주문 키 # – Output # 1) 주문 내역 취소 # —————————————————————————– def cancel_order_uuid(order_uuid): try: query = { ‘uuid’: order_uuid, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“DELETE”, server_url + “/v1/order”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order # – Desc : 미체결 주문 조회 # – Input # 1) target_item : 대상종목 # – Output # 1) 미체결 주문 내역 # —————————————————————————– def get_order(target_item): try: query = { ‘market’: target_item, ‘state’: ‘wait’, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order # – Desc : 미체결 주문 조회 # – Input # 1) side : 주문상태 # – Output # 1) 주문 내역 리스트 # —————————————————————————– def get_order_list(side): try: query = { ‘state’: side, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_rsi # – Desc : RSI 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 조회 범위 # – Output # 1) RSI 값 # —————————————————————————– def get_rsi(target_item, tick_kind, inq_range): try: # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) df = pd.DataFrame(candle_data) df = df.reindex(index=df.index[::-1]).reset_index() df[‘close’] = df[“trade_price”] # RSI 계산 def rsi(ohlc: pd.DataFrame, period: int = 14): ohlc[“close”] = ohlc[“close”] delta = ohlc[“close”].diff() up, down = delta.copy(), delta.copy() up[up < 0] = 0 down[down > 0] = 0 _gain = up.ewm(com=(period – 1), min_periods=period).mean() _loss = down.abs().ewm(com=(period – 1), min_periods=period).mean() RS = _gain / _loss return pd.Series(100 – (100 / (1 + RS)), name=”RSI”) rsi = round(rsi(df, 14).iloc[-1], 4) return rsi # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_mfi # – Desc : MFI 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) MFI 값 # —————————————————————————– def get_mfi(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # MFI 데이터 리턴용 mfi_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df[‘typical_price’] = (df[‘trade_price’] + df[‘high_price’] + df[‘low_price’]) / 3 df[‘money_flow’] = df[‘typical_price’] * df[‘candle_acc_trade_volume’] positive_mf = 0 negative_mf = 0 for i in range(0, 14): if df[“typical_price”][i] > df[“typical_price”][i + 1]: positive_mf = positive_mf + df[“money_flow”][i] elif df[“typical_price”][i] < df["typical_price"][i + 1]: negative_mf = negative_mf + df["money_flow"][i] if negative_mf > 0: mfi = 100 – (100 / (1 + (positive_mf / negative_mf))) else: mfi = 100 – (100 / (1 + (positive_mf))) mfi_list.append({“type”: “MFI”, “DT”: dfDt[0], “MFI”: round(mfi, 4)}) return mfi_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_macd # – Desc : MACD 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) MACD 값 # —————————————————————————– def get_macd(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # MACD 데이터 리턴용 macd_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) df = pd.DataFrame(candle_datas[0]) df = df.iloc[::-1] df = df[‘trade_price’] # MACD 계산 exp1 = df.ewm(span=12, adjust=False).mean() exp2 = df.ewm(span=26, adjust=False).mean() macd = exp1 – exp2 exp3 = macd.ewm(span=9, adjust=False).mean() for i in range(0, int(loop_cnt)): macd_list.append( {“type”: “MACD”, “DT”: candle_datas[0][i][‘candle_date_time_kst’], “MACD”: round(macd[i], 4), “SIGNAL”: round(exp3[i], 4), “OCL”: round(macd[i] – exp3[i], 4)}) return macd_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_bb # – Desc : 볼린저밴드 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) 볼린저 밴드 값 # —————————————————————————– def get_bb(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # 볼린저밴드 데이터 리턴용 bb_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df[‘trade_price’].iloc[::-1] # 표준편차(곱) unit = 2 band1 = unit * numpy.std(df[len(df) – 20:len(df)]) bb_center = numpy.mean(df[len(df) – 20:len(df)]) band_high = bb_center + band1 band_low = bb_center – band1 bb_list.append({“type”: “BB”, “DT”: dfDt[0], “BBH”: round(band_high, 4), “BBM”: round(bb_center, 4), “BBL”: round(band_low, 4)}) return bb_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_williams # – Desc : 윌리암스 %R 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) 윌리암스 %R 값 # —————————————————————————– def get_williamsR(target_item, tick_kind, inq_range, loop_cnt): try: # 캔들 데이터 조회용 candle_datas = [] # 윌리암스R 데이터 리턴용 williams_list = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df.iloc[:14] # 계산식 # %R = (Highest High – Close)/(Highest High – Lowest Low) * -100 hh = numpy.max(df[‘high_price’]) ll = numpy.min(df[‘low_price’]) cp = df[‘trade_price’][0] w = (hh – cp) / (hh – ll) * -100 williams_list.append( {“type”: “WILLIAMS”, “DT”: dfDt[0], “HH”: round(hh, 4), “LL”: round(ll, 4), “CP”: round(cp, 4), “W”: round(w, 4)}) return williams_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_rsi # – Desc : RSI 조회 # – Input # 1) candle_data : 캔들 정보 # – Output # 1) RSI 값 # —————————————————————————– def get_rsi(candle_datas): try: # RSI 데이터 리턴용 rsi_data = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df.reindex(index=df.index[::-1]).reset_index() df[‘close’] = df[“trade_price”] # RSI 계산 def rsi(ohlc: pd.DataFrame, period: int = 14): ohlc[“close”] = ohlc[“close”] delta = ohlc[“close”].diff() up, down = delta.copy(), delta.copy() up[up < 0] = 0 down[down > 0] = 0 _gain = up.ewm(com=(period – 1), min_periods=period).mean() _loss = down.abs().ewm(com=(period – 1), min_periods=period).mean() RS = _gain / _loss return pd.Series(100 – (100 / (1 + RS)), name=”RSI”) rsi = round(rsi(df, 14).iloc[-1], 4) rsi_data.append({“type”: “RSI”, “DT”: dfDt[0], “RSI”: rsi}) return rsi_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_mfi # – Desc : MFI 조회 # – Input # 1) candle_datas : 캔들 정보 # – Output # 1) MFI 값 # —————————————————————————– def get_mfi(candle_datas): try: # MFI 데이터 리턴용 mfi_list = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df[‘typical_price’] = (df[‘trade_price’] + df[‘high_price’] + df[‘low_price’]) / 3 df[‘money_flow’] = df[‘typical_price’] * df[‘candle_acc_trade_volume’] positive_mf = 0 negative_mf = 0 for i in range(0, 14): if df[“typical_price”][i] > df[“typical_price”][i + 1]: positive_mf = positive_mf + df[“money_flow”][i] elif df[“typical_price”][i] < df["typical_price"][i + 1]: negative_mf = negative_mf + df["money_flow"][i] if negative_mf > 0: mfi = 100 – (100 / (1 + (positive_mf / negative_mf))) else: mfi = 100 – (100 / (1 + (positive_mf))) mfi_list.append({“type”: “MFI”, “DT”: dfDt[0], “MFI”: round(mfi, 4)}) return mfi_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_macd # – Desc : MACD 조회 # – Input # 1) candle_datas : 캔들 정보 # 2) loop_cnt : 반복 횟수 # – Output # 1) MACD 값 # —————————————————————————– def get_macd(candle_datas, loop_cnt): try: # MACD 데이터 리턴용 macd_list = [] df = pd.DataFrame(candle_datas[0]) df = df.iloc[::-1] df = df[‘trade_price’] # MACD 계산 exp1 = df.ewm(span=12, adjust=False).mean() exp2 = df.ewm(span=26, adjust=False).mean() macd = exp1 – exp2 exp3 = macd.ewm(span=9, adjust=False).mean() for i in range(0, int(loop_cnt)): macd_list.append( {“type”: “MACD”, “DT”: candle_datas[0][i][‘candle_date_time_kst’], “MACD”: round(macd[i], 4), “SIGNAL”: round(exp3[i], 4), “OCL”: round(macd[i] – exp3[i], 4)}) return macd_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_ma # – Desc : MA 조회 # – Input # 1) candle_datas : 캔들 정보 # 2) loop_cnt : 반복 횟수 # – Output # 1) MA 값 # —————————————————————————– def get_ma(candle_datas, loop_cnt): try: # MA 데이터 리턴용 ma_list = [] df = pd.DataFrame(candle_datas[0]) df = df.iloc[::-1] df = df[‘trade_price’] # MA 계산 ma5 = df.rolling(window=5).mean() ma10 = df.rolling(window=10).mean() ma20 = df.rolling(window=20).mean() ma60 = df.rolling(window=60).mean() ma120 = df.rolling(window=120).mean() for i in range(0, int(loop_cnt)): ma_list.append( {“type”: “MA”, “DT”: candle_datas[0][i][‘candle_date_time_kst’], “MA5”: ma5[i], “MA10”: ma10[i], “MA20”: ma20[i], “MA60”: ma60[i], “MA120”: ma120[i] , “MA_5_10”: str(Decimal(str(ma5[i])) – Decimal(str(ma10[i]))) , “MA_10_20”: str(Decimal(str(ma10[i])) – Decimal(str(ma20[i]))) , “MA_20_60”: str(Decimal(str(ma20[i])) – Decimal(str(ma60[i]))) , “MA_60_120”: str(Decimal(str(ma60[i])) – Decimal(str(ma120[i])))}) return ma_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_bb # – Desc : 볼린저밴드 조회 # – Input # 1) candle_datas : 캔들 정보 # – Output # 1) 볼린저 밴드 값 # —————————————————————————– def get_bb(candle_datas): try: # 볼린저밴드 데이터 리턴용 bb_list = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df[‘trade_price’].iloc[::-1] # 표준편차(곱) unit = 2 band1 = unit * numpy.std(df[len(df) – 20:len(df)]) bb_center = numpy.mean(df[len(df) – 20:len(df)]) band_high = bb_center + band1 band_low = bb_center – band1 bb_list.append({“type”: “BB”, “DT”: dfDt[0], “BBH”: round(band_high, 4), “BBM”: round(bb_center, 4), “BBL”: round(band_low, 4)}) return bb_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_williams # – Desc : 윌리암스 %R 조회 # – Input # 1) candle_datas : 캔들 정보 # – Output # 1) 윌리암스 %R 값 # —————————————————————————– def get_williams(candle_datas): try: # 윌리암스R 데이터 리턴용 williams_list = [] # 캔들 데이터만큼 수행 for candle_data_for in candle_datas: df = pd.DataFrame(candle_data_for) dfDt = df[‘candle_date_time_kst’].iloc[::-1] df = df.iloc[:14] # 계산식 # %R = (Highest High – Close)/(Highest High – Lowest Low) * -100 hh = numpy.max(df[‘high_price’]) ll = numpy.min(df[‘low_price’]) cp = df[‘trade_price’][0] w = (hh – cp) / (hh – ll) * -100 williams_list.append( {“type”: “WILLIAMS”, “DT”: dfDt[0], “HH”: round(hh, 4), “LL”: round(ll, 4), “CP”: round(cp, 4), “W”: round(w, 4)}) return williams_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_cci # – Desc : CCI 조회 # – Input # 1) candle_data : 캔들 정보 # 2) loop_cnt : 조회 건수 # – Output # 1) CCI 값 # —————————————————————————– def get_cci(candle_data, loop_cnt): try: cci_val = 20 # CCI 데이터 리턴용 cci_list = [] # 사용하지 않는 캔들 갯수 정리(속도 개선) del candle_data[cci_val * 2:] # 오름차순 정렬 df = pd.DataFrame(candle_data) ordered_df = df.sort_values(by=[‘candle_date_time_kst’], ascending=[True]) # 계산식 : (Typical Price – Simple Moving Average) / (0.015 * Mean absolute Deviation) ordered_df[‘TP’] = (ordered_df[‘high_price’] + ordered_df[‘low_price’] + ordered_df[‘trade_price’]) / 3 ordered_df[‘SMA’] = ordered_df[‘TP’].rolling(window=cci_val).mean() ordered_df[‘MAD’] = ordered_df[‘TP’].rolling(window=cci_val).apply(lambda x: pd.Series(x).mad()) ordered_df[‘CCI’] = (ordered_df[‘TP’] – ordered_df[‘SMA’]) / (0.015 * ordered_df[‘MAD’]) # 개수만큼 조립 for i in range(0, loop_cnt): cci_list.append({“type”: “CCI”, “DT”: ordered_df[‘candle_date_time_kst’].loc[i], “CCI”: round(ordered_df[‘CCI’].loc[i], 4)}) return cci_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_indicators # – Desc : 보조지표 조회 # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # – Output # 1) RSI # 2) MFI # 3) MACD # 4) BB # 5) WILLIAMS %R # 6) CCI # —————————————————————————– def get_indicators(target_item, tick_kind, inq_range, loop_cnt): try: # 보조지표 리턴용 indicator_data = [] # 캔들 데이터 조회용 candle_datas = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) if len(candle_data) >= 30: # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) # RSI 정보 조회 rsi_data = get_rsi(candle_datas) # MFI 정보 조회 mfi_data = get_mfi(candle_datas) # MACD 정보 조회 macd_data = get_macd(candle_datas, loop_cnt) # BB 정보 조회 bb_data = get_bb(candle_datas) # WILLIAMS %R 조회 williams_data = get_williams(candle_datas) # MA 정보 조회 ma_data = get_ma(candle_datas, loop_cnt) # CCI 정보 조회 cci_data = get_cci(candle_data, loop_cnt) if len(rsi_data) > 0: indicator_data.append(rsi_data) if len(mfi_data) > 0: indicator_data.append(mfi_data) if len(macd_data) > 0: indicator_data.append(macd_data) if len(bb_data) > 0: indicator_data.append(bb_data) if len(williams_data) > 0: indicator_data.append(williams_data) if len(ma_data) > 0: indicator_data.append(ma_data) if len(cci_data) > 0: indicator_data.append(cci_data) return indicator_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order_status # – Desc : 주문 조회(상태별) # – Input # 1) target_item : 대상종목 # 2) status : 주문상태(wait : 체결 대기, watch : 예약주문 대기, done : 전체 체결 완료, cancel : 주문 취소) # – Output # 1) 주문 내역 # —————————————————————————– def get_order_status(target_item, status): try: query = { ‘market’: target_item, ‘state’: status, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : orderby_dict # – Desc : 딕셔너리 정렬 # – Input # 1) target_dict : 정렬 대상 딕셔너리 # 2) target_column : 정렬 대상 딕셔너리 # 3) order_by : 정렬방식(False:오름차순, True,내림차순) # – Output # 1) 정렬된 딕서너리 # —————————————————————————– def orderby_dict(target_dict, target_column, order_by): try: rtn_dict = sorted(target_dict, key=(lambda x: x[target_column]), reverse=order_by) return rtn_dict # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : filter_dict # – Desc : 딕셔너리 필터링 # – Input # 1) target_dict : 정렬 대상 딕셔너리 # 2) target_column : 정렬 대상 컬럼 # 3) filter : 필터 # – Output # 1) 필터링된 딕서너리 # —————————————————————————– def filter_dict(target_dict, target_column, filter): try: for target_dict_for in target_dict[:]: if target_dict_for[target_column] != filter: target_dict.remove(target_dict_for) return target_dict # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_order_chance # – Desc : 주문 가능정보 조회 # – Input # 1) target_item : 대상종목 # – Output # 1) 주문 가능 정보 # —————————————————————————– def get_order_chance(target_item): try: query = { ‘market’: target_item, } query_string = urlencode(query).encode() m = hashlib.sha512() m.update(query_string) query_hash = m.hexdigest() payload = { ‘access_key’: access_key, ‘nonce’: str(uuid.uuid4()), ‘query_hash’: query_hash, ‘query_hash_alg’: ‘SHA512’, } jwt_token = jwt.encode(payload, secret_key).decode(‘utf8’) authorize_token = ‘Bearer {}’.format(jwt_token) headers = {“Authorization”: authorize_token} res = send_request(“GET”, server_url + “/v1/orders/chance”, query, headers) rtn_data = res.json() return rtn_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_max_min # – Desc : MAX/MIN 값 조회 # – Input # 1) candle_datas : 캔들 정보 # 2) col_name : 대상 컬럼 # – Output # 1) MAX 값 # 2) MIN 값 # —————————————————————————– def get_max(candle_data, col_name_high, col_name_low): try: # MA 데이터 리턴용 max_min_list = [] df = pd.DataFrame(candle_data) df = df.iloc[::-1] # MAX 계산 max = numpy.max(df[col_name_high]) min = numpy.min(df[col_name_low]) max_min_list.append( {“MAX”: max, “MIN”: min}) return max_min_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : send_line_msg # – Desc : 라인 메세지 전송 # – Input # 1) message : 메세지 # – Output # 1) response : 발송결과(200:정상) # —————————————————————————– def send_line_message(message): try: headers = {‘Authorization’: ‘Bearer ‘ + line_token} data = {‘message’: message} response = requests.post(line_target_url, headers=headers, data=data) logging.debug(response) return response # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : get_indicator_sel # – Desc : 보조지표 조회(원하는 지표만) # – Input # 1) target_item : 대상 종목 # 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 – 분, D-일, W-주, M-월) # 3) inq_range : 캔들 조회 범위 # 4) loop_cnt : 지표 반복계산 횟수 # 5) 보조지표 : 리스트 # – Output # 1) 보조지표 # —————————————————————————– def get_indicator_sel(target_item, tick_kind, inq_range, loop_cnt, indi_type): try: # 보조지표 리턴용 indicator_data = {} # 캔들 데이터 조회용 candle_datas = [] # 캔들 추출 candle_data = get_candle(target_item, tick_kind, inq_range) if len(candle_data) >= 30: # 조회 횟수별 candle 데이터 조합 for i in range(0, int(loop_cnt)): candle_datas.append(candle_data[i:int(len(candle_data))]) if ‘RSI’ in indi_type: # RSI 정보 조회 rsi_data = get_rsi(candle_datas) indicator_data[‘RSI’] = rsi_data if ‘MFI’ in indi_type: # MFI 정보 조회 mfi_data = get_mfi(candle_datas) indicator_data[‘MFI’] = mfi_data if ‘MACD’ in indi_type: # MACD 정보 조회 macd_data = get_macd(candle_datas, loop_cnt) indicator_data[‘MACD’] = macd_data if ‘BB’ in indi_type: # BB 정보 조회 bb_data = get_bb(candle_datas) indicator_data[‘BB’] = bb_data if ‘WILLIAMS’ in indi_type: # WILLIAMS %R 조회 williams_data = get_williams(candle_datas) indicator_data[‘WILLIAMS’] = williams_data if ‘MA’ in indi_type: # MA 정보 조회 ma_data = get_ma(candle_datas, loop_cnt) indicator_data[‘MA’] = ma_data if ‘CCI’ in indi_type: # CCI 정보 조회 cci_data = get_cci(candle_data, loop_cnt) indicator_data[‘CCI’] = cci_data if ‘CANDLE’ in indi_type: indicator_data[‘CANDLE’] = candle_data return indicator_data # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : send_msg # – Desc : 메세지 전송 # – Input # 1) sent_list : 메세지 발송 내역 # 2) key : 메세지 키 # 3) contents : 메세지 내용 # 4) msg_intval : 메세지 발송주기 # – Output # 1) sent_list : 메세지 발송 내역 # —————————————————————————– def send_msg(sent_list, key, contents, msg_intval): try: # msg_intval = ‘N’ 이면 메세지 발송하지 않음 if msg_intval.upper() != ‘N’: # 발송여부 체크 sent_yn = False # 발송이력 sent_dt = ” # 발송내역에 해당 키 존재 시 발송 이력 추출 for sent_list_for in sent_list: if key in sent_list_for.values(): sent_yn = True sent_dt = datetime.strptime(sent_list_for[‘SENT_DT’], ‘%Y-%m-%d %H:%M:%S’) # 기 발송 건 if sent_yn: logging.info(‘기존 발송 건’) # 현재 시간 추출 current_dt = datetime.strptime(datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’), ‘%Y-%m-%d %H:%M:%S’) # 시간 차이 추출 diff = current_dt – sent_dt # 발송 시간이 지난 경우에는 메세지 발송 if diff.seconds >= int(msg_intval): logging.info(‘발송 주기 도래 건으로 메시지 발송 처리!’) # 메세지 발송 send_line_message(contents) # 기존 메시지 발송이력 삭제 for sent_list_for in sent_list[:]: if key in sent_list_for.values(): sent_list.remove(sent_list_for) # 새로운 발송이력 추가 sent_list.append({‘KEY’: key, ‘SENT_DT’: datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}) else: logging.info(‘발송 주기 미 도래 건!’) # 최초 발송 건 else: logging.info(‘최초 발송 건’) # 메세지 발송 send_line_message(contents) # 새로운 발송이력 추가 sent_list.append({‘KEY’: key, ‘SENT_DT’: datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}) return sent_list # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise # —————————————————————————– # – Name : read_file # – Desc : 파일 읽기 # – Input # 1. name : 파일 명 # – Output # 1. 파일 내용 # —————————————————————————– def read_file(name): try: path = ‘./conf/’ + str(name) + ‘.txt’ f = open(path, ‘r’) line = f.readline() f.close() logging.debug(line) contents = line return contents # —————————————- # 모든 함수의 공통 부분(Exception 처리) # —————————————- except Exception: raise

실행 결과

python ./trade_bot/websocket.py I

sFTP를 이용해 프로그램을 서버에 옮긴 후 위와 같이 수행하면 영상과 같이 모든 코인에 대한 체결 정보를 실시간으로 수신할 수 있습니다. sFTP를 이용해 프로그램을 서버로 옮기는 방법은 아래 포스팅을 참고하시면 됩니다.

2021.12.02 – [코딩스토리/리눅스] – SSH Key를 이용해서 SFTP로 서버에 접속하기 – Filezilla sFTP 프로그램 이용

마치며

웹소켓을 이용하면 실시간 데이터를 빠르게 받을 수 있는 장점이 있지만 받을 수 있는 데이터의 형태가 한정되어 있다는 단점이 있습니다. 하지만 용도를 고려하여 적절하게 웹소켓을 사용한다면 조금 더 빠른 로직을 구현할 수 있습니다.

추후 데이터베이스 설치하는 방법을 살펴보고 웹소켓의 데이터를 DB에 적재하여 분석이나 다른 용도로 활용하는 방법도 살펴 보도록 하겠습니다.

블로그를 구독하시면 소식을 조금 더 빨리 받아 보실 수 있습니다. 감사합니다.

반응형

키워드에 대한 정보 업 비트 파이썬

다음은 Bing에서 업 비트 파이썬 주제에 대한 검색 결과입니다. 필요한 경우 더 읽을 수 있습니다.

이 기사는 인터넷의 다양한 출처에서 편집되었습니다. 이 기사가 유용했기를 바랍니다. 이 기사가 유용하다고 생각되면 공유하십시오. 매우 감사합니다!

사람들이 주제에 대해 자주 검색하는 키워드 코인 자동 매매에 관심 있는 사람에게 필요한 영상

  • 코인
  • 업비트
  • 자동매매
  • 파이썬
  • 코딩
  • 암호화폐
  • API

코인 #자동 #매매에 #관심 #있는 #사람에게 #필요한 #영상


YouTube에서 업 비트 파이썬 주제의 다른 동영상 보기

주제에 대한 기사를 시청해 주셔서 감사합니다 코인 자동 매매에 관심 있는 사람에게 필요한 영상 | 업 비트 파이썬, 이 기사가 유용하다고 생각되면 공유하십시오, 매우 감사합니다.

See also  우리나라 와 외국 의 가족 복지 정책 비교 | 제4장 1교시 한국의 가족복지 정책과 전달체계 16 개의 새로운 답변이 업데이트되었습니다.

Leave a Reply

Your email address will not be published. Required fields are marked *