001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019 020import java.util.ArrayList; 021import java.util.concurrent.*; 022 023/** 024 * <p> 025 * Used to implement callback based result passing of a promised computation. 026 * Can be converted to a future using the future() method. 027 * </p> 028 * 029 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 030 */ 031public class Promise<T> extends PromiseCallback<T> { 032 033 ArrayList<PromiseCallback<T>> callbacks = new ArrayList<PromiseCallback<T>>(1); 034 T value; 035 Throwable error; 036 Future<T> future=null; 037 038 private class PromiseFuture extends PromiseCallback<T> implements Future<T> { 039 CountDownLatch latch = new CountDownLatch(1); 040 041 public boolean cancel(boolean mayInterruptIfRunning) { 042 return false; 043 } 044 045 public boolean isCancelled() { 046 return false; 047 } 048 049 public boolean isDone() { 050 return latch.getCount() == 0; 051 } 052 053 public T get() throws InterruptedException, ExecutionException { 054 latch.await(); 055 return value(); 056 } 057 058 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 059 if (latch.await(timeout, unit)) { 060 return value(); 061 } else { 062 throw new TimeoutException(); 063 } 064 } 065 066 public void onComplete(T value, Throwable error) { 067 latch.countDown(); 068 } 069 070 private T value() throws ExecutionException { 071 if( error!=null ) { 072 throw new ExecutionException(error); 073 } 074 return value; 075 } 076 } 077 078 public Future<T> future() { 079 if( future == null ) { 080 PromiseFuture future = new PromiseFuture(); 081 watch(future); 082 this.future = future; 083 } 084 return future; 085 } 086 087 public void watch(PromiseCallback<T> callback) { 088 if (callback == null) 089 throw new IllegalArgumentException("callback cannot be null"); 090 boolean queued = false; 091 synchronized (this) { 092 if (callbacks != null) { 093 callbacks.add(callback); 094 queued = true; 095 } 096 } 097 if (!queued) { 098 callback.onComplete(value, error); 099 } 100 } 101 102 @Override 103 public void onComplete(T value, Throwable error) { 104 if( value!=null && error !=null ) { 105 throw new IllegalArgumentException("You can not have both a vaule and error"); 106 } 107 ArrayList<PromiseCallback<T>> callbacks; 108 synchronized (this) { 109 callbacks = this.callbacks; 110 if (callbacks != null) { 111 this.value = value; 112 this.error = error; 113 this.callbacks = null; 114 } 115 } 116 if (callbacks != null) { 117 for (PromiseCallback callback : callbacks) { 118 callback.onComplete(this.value, this.error); 119 } 120 } 121 } 122 123}