#!/usr/bin/python # # Copyright (C) 2009 Tan Swee Heng # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. __author__ = 'thesweeheng@gmail.com' from gdata.finance.service import \ FinanceService, PortfolioQuery, PositionQuery from gdata.finance import \ PortfolioEntry, PortfolioData, TransactionEntry, TransactionData, \ Price, Commission, Money import datetime import sys def PrintReturns(pfx, d): """Print returns.""" print pfx, '%1.5f(1w) %1.5f(4w) %1.5f(3m) %1.5f(YTD)' % tuple( float(i) for i in (d.return1w, d.return4w, d.return3m, d.returnYTD)) pfx = ' ' * len(pfx) print pfx, '%1.5f(1y) %1.5f(3y) %1.5f(5y) %1.5f(overall)' % tuple( float(i) for i in (d.return1y, d.return3y, d.return5y, d.return_overall)) PrRtn = PrintReturns def PrintTransactions(transactions): """Print transactions.""" print " Transactions:" fmt = ' %4s %-23s %-10s %6s %-11s %-11s' print fmt % ('ID','Date','Type','Shares','Price','Commission') for txn in transactions: d = txn.transaction_data print fmt % (txn.transaction_id, d.date or '----', d.type, d.shares, d.price.money[0], d.commission.money[0]) if d.notes: print " Notes:", d.notes print def PrintPosition(pos, with_returns=False): """Print single position.""" print ' Position :', pos.position_title print ' Ticker ID :', pos.ticker_id print ' Symbol :', pos.symbol print ' Last updated :', pos.updated.text d = pos.position_data print ' Shares :', d.shares if with_returns: print ' Gain % :', d.gain_percentage PrRtn(' Returns :', d) print ' Cost basis :', d.cost_basis print ' Days gain :', d.days_gain print ' Gain :', d.gain print ' Market value :', d.market_value print if pos.transactions: print " \n" PrintTransactions(pos.transactions) print " \n" def PrintPositions(positions, with_returns=False): for pos in positions: PrintPosition(pos, with_returns) def PrintPortfolio(pfl, with_returns=False): """Print single portfolio.""" print 'Portfolio Title :', pfl.portfolio_title print 'Portfolio ID :', pfl.portfolio_id print ' Last updated :', pfl.updated.text d = pfl.portfolio_data print ' Currency :', d.currency_code if with_returns: print ' Gain % :', d.gain_percentage PrRtn(' Returns :', d) print ' Cost basis :', d.cost_basis print ' Days gain :', d.days_gain print ' Gain :', d.gain print ' Market value :', d.market_value print if pfl.positions: print " \n" PrintPositions(pfl.positions, with_returns) print " \n" def PrintPortfolios(portfolios, with_returns=False): for pfl in portfolios: PrintPortfolio(pfl, with_returns) def ShowCallDetails(meth): def wrap(*args, **kwargs): print '@', meth.__name__, args[1:], kwargs meth(*args, **kwargs) return wrap class FinanceTester(object): def __init__(self, email, password): self.client = FinanceService(source='gdata-finance-test') self.client.ClientLogin(email, password) def GetPortfolios(self, with_returns=False, inline_positions=False): query = PortfolioQuery() query.returns = with_returns query.positions = inline_positions return self.client.GetPortfolioFeed(query=query).entry def GetPositions(self, portfolio, with_returns=False, inline_transactions=False): query = PositionQuery() query.returns = with_returns query.transactions = inline_transactions return self.client.GetPositionFeed(portfolio, query=query).entry def GetTransactions(self, position=None, portfolio=None, ticker=None): if position: feed = self.client.GetTransactionFeed(position) elif portfolio and ticker: feed = self.client.GetTransactionFeed( portfolio_id=portfolio.portfolio_id, ticker_id=ticker) return feed.entry @ShowCallDetails def TestShowDetails(self, with_returns=False, inline_positions=False, inline_transactions=False): portfolios = self.GetPortfolios(with_returns, inline_positions) for pfl in portfolios: PrintPortfolio(pfl, with_returns) positions = self.GetPositions(pfl, with_returns, inline_transactions) for pos in positions: PrintPosition(pos, with_returns) PrintTransactions(self.GetTransactions(pos)) def DeletePortfoliosByName(self, portfolio_titles): for pfl in self.GetPortfolios(): if pfl.portfolio_title in portfolio_titles: self.client.DeletePortfolio(pfl) def AddPortfolio(self, portfolio_title, currency_code): pfl = PortfolioEntry(portfolio_data=PortfolioData( currency_code=currency_code)) pfl.portfolio_title = portfolio_title return self.client.AddPortfolio(pfl) def UpdatePortfolio(self, portfolio, portfolio_title=None, currency_code=None): if portfolio_title: portfolio.portfolio_title = portfolio_title if currency_code: portfolio.portfolio_data.currency_code = currency_code return self.client.UpdatePortfolio(portfolio) def DeletePortfolio(self, portfolio): self.client.DeletePortfolio(portfolio) @ShowCallDetails def TestManagePortfolios(self): pfl_one = 'Portfolio Test: Emerging Markets 12345' pfl_two = 'Portfolio Test: Renewable Energy 31415' print '---- Deleting portfolios ----' self.DeletePortfoliosByName([pfl_one, pfl_two]) PrintPortfolios(self.GetPortfolios()) print '---- Adding new portfolio ----' pfl = self.AddPortfolio(pfl_one, 'SGD') PrintPortfolios(self.GetPortfolios()) print '---- Changing portfolio title and currency code ----' pfl = self.UpdatePortfolio(pfl, pfl_two, 'USD') PrintPortfolios(self.GetPortfolios()) print '---- Deleting portfolio ----' self.DeletePortfolio(pfl) PrintPortfolios(self.GetPortfolios()) def Transact(self, type, portfolio, ticker, date=None, shares=None, notes=None, price=None, commission=None, currency_code=None): if price is not None: price = Price(money=[Money(amount=str(price), currency_code=currency_code or portfolio.portfolio_data.currency_code)]) if commission is not None: commission = Commission(money=[Money(amount=str(comission), currency_code=currency_code or portfolio.portfolio_data.currency_code)]) if date is not None and isinstance(date, datetime.datetime): date = date.isoformat() if shares is not None: shares = str(shares) txn = TransactionEntry(transaction_data=TransactionData(type=type, date=date, shares=shares, notes=notes, price=price, commission=commission)) return self.client.AddTransaction(txn, portfolio_id=portfolio.portfolio_id, ticker_id=ticker) def Buy(self, portfolio, ticker, **kwargs): return self.Transact('Buy', portfolio, ticker, **kwargs) def Sell(self, portfolio, ticker, **kwargs): return self.Transact('Sell', portfolio, ticker, **kwargs) def GetPosition(self, portfolio, ticker, with_returns=False, inline_transactions=False): query = PositionQuery() query.returns = with_returns query.transactions = inline_transactions return self.client.GetPosition( portfolio_id=portfolio.portfolio_id, ticker_id=ticker, query=query) def DeletePosition(self, position): self.client.DeletePosition(position_entry=position) def UpdateTransaction(self, transaction): self.client.UpdateTransaction(transaction) def DeleteTransaction(self, transaction): self.client.DeleteTransaction(transaction) @ShowCallDetails def TestManageTransactions(self): pfl_title = 'Transaction Test: Technology 27182' self.DeletePortfoliosByName([pfl_title]) print '---- Adding new portfolio ----' pfl = self.AddPortfolio(pfl_title, 'USD') PrintPortfolios(self.GetPortfolios()) print '---- Adding buy transactions ----' tkr1 = 'NASDAQ:GOOG' date = datetime.datetime(2009,04,01) days = datetime.timedelta(1) txn1 = self.Buy(pfl, tkr1, shares=500, price=321.00, date=date) txn2 = self.Buy(pfl, tkr1, shares=150, price=312.00, date=date+15*days) pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True) PrintPosition(pos, with_returns=True) PrintTransactions(self.GetTransactions(pos)) print '---- Adding sell transactions ----' txn3 = self.Sell(pfl, tkr1, shares=400, price=322.00, date=date+30*days) txn4 = self.Sell(pfl, tkr1, shares=200, price=330.00, date=date+45*days) pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True) PrintPosition(pos, with_returns=True) PrintTransactions(self.GetTransactions(pos)) print "---- Modifying first and deleting third ----" txn1.transaction_data.shares = '400.0' self.UpdateTransaction(txn1) self.DeleteTransaction(txn3) pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True) PrintPosition(pos, with_returns=True) PrintTransactions(self.GetTransactions(pos)) print "---- Deleting position ----" print "Number of positions (before):", len(self.GetPositions(pfl)) self.DeletePosition(pos) print "Number of positions (after) :", len(self.GetPositions(pfl)) print '---- Deleting portfolio ----' self.DeletePortfolio(pfl) PrintPortfolios(self.GetPortfolios()) if __name__ == '__main__': try: email = sys.argv[1] password = sys.argv[2] cases = sys.argv[3:] except IndexError: print "Usage: test_finance account@google.com password [0 1 2...]" sys.exit(1) tester = FinanceTester(email, password) tests = [ tester.TestShowDetails, lambda: tester.TestShowDetails(with_returns=True), tester.TestManagePortfolios, tester.TestManageTransactions, lambda: tester.TestShowDetails(with_returns=True, inline_positions=True), lambda: tester.TestShowDetails(with_returns=True, inline_positions=True, inline_transactions=True),] if not cases: cases = range(len(tests)) for i in cases: print "===== TEST CASE", i, "="*50 tests[int(i)]()